##// END OF EJS Templates
Semi-working refactored ipcluster....
Brian Granger -
Show More
@@ -1,329 +1,330 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """A simple configuration system.
3 """A simple configuration system.
4
4
5 Authors:
5 Authors:
6
6
7 * Brian Granger
7 * Brian Granger
8 """
8 """
9
9
10 #-----------------------------------------------------------------------------
10 #-----------------------------------------------------------------------------
11 # Copyright (C) 2008-2009 The IPython Development Team
11 # Copyright (C) 2008-2009 The IPython Development Team
12 #
12 #
13 # Distributed under the terms of the BSD License. The full license is in
13 # Distributed under the terms of the BSD License. The full license is in
14 # the file COPYING, distributed as part of this software.
14 # the file COPYING, distributed as part of this software.
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18 # Imports
18 # Imports
19 #-----------------------------------------------------------------------------
19 #-----------------------------------------------------------------------------
20
20
21 import __builtin__
21 import __builtin__
22 import os
22 import os
23 import sys
23 import sys
24
24
25 from IPython.external import argparse
25 from IPython.external import argparse
26 from IPython.utils.genutils import filefind
26 from IPython.utils.genutils import filefind
27
27
28 #-----------------------------------------------------------------------------
28 #-----------------------------------------------------------------------------
29 # Exceptions
29 # Exceptions
30 #-----------------------------------------------------------------------------
30 #-----------------------------------------------------------------------------
31
31
32
32
33 class ConfigError(Exception):
33 class ConfigError(Exception):
34 pass
34 pass
35
35
36
36
37 class ConfigLoaderError(ConfigError):
37 class ConfigLoaderError(ConfigError):
38 pass
38 pass
39
39
40
40
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42 # Config class for holding config information
42 # Config class for holding config information
43 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
44
44
45
45
46 class Config(dict):
46 class Config(dict):
47 """An attribute based dict that can do smart merges."""
47 """An attribute based dict that can do smart merges."""
48
48
49 def __init__(self, *args, **kwds):
49 def __init__(self, *args, **kwds):
50 dict.__init__(self, *args, **kwds)
50 dict.__init__(self, *args, **kwds)
51 # This sets self.__dict__ = self, but it has to be done this way
51 # This sets self.__dict__ = self, but it has to be done this way
52 # because we are also overriding __setattr__.
52 # because we are also overriding __setattr__.
53 dict.__setattr__(self, '__dict__', self)
53 dict.__setattr__(self, '__dict__', self)
54
54
55 def _merge(self, other):
55 def _merge(self, other):
56 to_update = {}
56 to_update = {}
57 for k, v in other.items():
57 for k, v in other.items():
58 if not self.has_key(k):
58 if not self.has_key(k):
59 to_update[k] = v
59 to_update[k] = v
60 else: # I have this key
60 else: # I have this key
61 if isinstance(v, Config):
61 if isinstance(v, Config):
62 # Recursively merge common sub Configs
62 # Recursively merge common sub Configs
63 self[k]._merge(v)
63 self[k]._merge(v)
64 else:
64 else:
65 # Plain updates for non-Configs
65 # Plain updates for non-Configs
66 to_update[k] = v
66 to_update[k] = v
67
67
68 self.update(to_update)
68 self.update(to_update)
69
69
70 def _is_section_key(self, key):
70 def _is_section_key(self, key):
71 if key[0].upper()==key[0] and not key.startswith('_'):
71 if key[0].upper()==key[0] and not key.startswith('_'):
72 return True
72 return True
73 else:
73 else:
74 return False
74 return False
75
75
76 def has_key(self, key):
76 def has_key(self, key):
77 if self._is_section_key(key):
77 if self._is_section_key(key):
78 return True
78 return True
79 else:
79 else:
80 return dict.has_key(self, key)
80 return dict.has_key(self, key)
81
81
82 def _has_section(self, key):
82 def _has_section(self, key):
83 if self._is_section_key(key):
83 if self._is_section_key(key):
84 if dict.has_key(self, key):
84 if dict.has_key(self, key):
85 return True
85 return True
86 return False
86 return False
87
87
88 def copy(self):
88 def copy(self):
89 return type(self)(dict.copy(self))
89 return type(self)(dict.copy(self))
90
90
91 def __copy__(self):
91 def __copy__(self):
92 return self.copy()
92 return self.copy()
93
93
94 def __deepcopy__(self, memo):
94 def __deepcopy__(self, memo):
95 import copy
95 import copy
96 return type(self)(copy.deepcopy(self.items()))
96 return type(self)(copy.deepcopy(self.items()))
97
97
98 def __getitem__(self, key):
98 def __getitem__(self, key):
99 # Because we use this for an exec namespace, we need to delegate
99 # Because we use this for an exec namespace, we need to delegate
100 # the lookup of names in __builtin__ to itself. This means
100 # the lookup of names in __builtin__ to itself. This means
101 # that you can't have section or attribute names that are
101 # that you can't have section or attribute names that are
102 # builtins.
102 # builtins.
103 try:
103 try:
104 return getattr(__builtin__, key)
104 return getattr(__builtin__, key)
105 except AttributeError:
105 except AttributeError:
106 pass
106 pass
107 if self._is_section_key(key):
107 if self._is_section_key(key):
108 try:
108 try:
109 return dict.__getitem__(self, key)
109 return dict.__getitem__(self, key)
110 except KeyError:
110 except KeyError:
111 c = Config()
111 c = Config()
112 dict.__setitem__(self, key, c)
112 dict.__setitem__(self, key, c)
113 return c
113 return c
114 else:
114 else:
115 return dict.__getitem__(self, key)
115 return dict.__getitem__(self, key)
116
116
117 def __setitem__(self, key, value):
117 def __setitem__(self, key, value):
118 # Don't allow names in __builtin__ to be modified.
118 # Don't allow names in __builtin__ to be modified.
119 if hasattr(__builtin__, key):
119 if hasattr(__builtin__, key):
120 raise ConfigError('Config variable names cannot have the same name '
120 raise ConfigError('Config variable names cannot have the same name '
121 'as a Python builtin: %s' % key)
121 'as a Python builtin: %s' % key)
122 if self._is_section_key(key):
122 if self._is_section_key(key):
123 if not isinstance(value, Config):
123 if not isinstance(value, Config):
124 raise ValueError('values whose keys begin with an uppercase '
124 raise ValueError('values whose keys begin with an uppercase '
125 'char must be Config instances: %r, %r' % (key, value))
125 'char must be Config instances: %r, %r' % (key, value))
126 else:
126 else:
127 dict.__setitem__(self, key, value)
127 dict.__setitem__(self, key, value)
128
128
129 def __getattr__(self, key):
129 def __getattr__(self, key):
130 try:
130 try:
131 return self.__getitem__(key)
131 return self.__getitem__(key)
132 except KeyError, e:
132 except KeyError, e:
133 raise AttributeError(e)
133 raise AttributeError(e)
134
134
135 def __setattr__(self, key, value):
135 def __setattr__(self, key, value):
136 try:
136 try:
137 self.__setitem__(key, value)
137 self.__setitem__(key, value)
138 except KeyError, e:
138 except KeyError, e:
139 raise AttributeError(e)
139 raise AttributeError(e)
140
140
141 def __delattr__(self, key):
141 def __delattr__(self, key):
142 try:
142 try:
143 dict.__delitem__(self, key)
143 dict.__delitem__(self, key)
144 except KeyError, e:
144 except KeyError, e:
145 raise AttributeError(e)
145 raise AttributeError(e)
146
146
147
147
148 #-----------------------------------------------------------------------------
148 #-----------------------------------------------------------------------------
149 # Config loading classes
149 # Config loading classes
150 #-----------------------------------------------------------------------------
150 #-----------------------------------------------------------------------------
151
151
152
152
153 class ConfigLoader(object):
153 class ConfigLoader(object):
154 """A object for loading configurations from just about anywhere.
154 """A object for loading configurations from just about anywhere.
155
155
156 The resulting configuration is packaged as a :class:`Struct`.
156 The resulting configuration is packaged as a :class:`Struct`.
157
157
158 Notes
158 Notes
159 -----
159 -----
160 A :class:`ConfigLoader` does one thing: load a config from a source
160 A :class:`ConfigLoader` does one thing: load a config from a source
161 (file, command line arguments) and returns the data as a :class:`Struct`.
161 (file, command line arguments) and returns the data as a :class:`Struct`.
162 There are lots of things that :class:`ConfigLoader` does not do. It does
162 There are lots of things that :class:`ConfigLoader` does not do. It does
163 not implement complex logic for finding config files. It does not handle
163 not implement complex logic for finding config files. It does not handle
164 default values or merge multiple configs. These things need to be
164 default values or merge multiple configs. These things need to be
165 handled elsewhere.
165 handled elsewhere.
166 """
166 """
167
167
168 def __init__(self):
168 def __init__(self):
169 """A base class for config loaders.
169 """A base class for config loaders.
170
170
171 Examples
171 Examples
172 --------
172 --------
173
173
174 >>> cl = ConfigLoader()
174 >>> cl = ConfigLoader()
175 >>> config = cl.load_config()
175 >>> config = cl.load_config()
176 >>> config
176 >>> config
177 {}
177 {}
178 """
178 """
179 self.clear()
179 self.clear()
180
180
181 def clear(self):
181 def clear(self):
182 self.config = Config()
182 self.config = Config()
183
183
184 def load_config(self):
184 def load_config(self):
185 """Load a config from somewhere, return a Struct.
185 """Load a config from somewhere, return a Struct.
186
186
187 Usually, this will cause self.config to be set and then returned.
187 Usually, this will cause self.config to be set and then returned.
188 """
188 """
189 return self.config
189 return self.config
190
190
191
191
192 class FileConfigLoader(ConfigLoader):
192 class FileConfigLoader(ConfigLoader):
193 """A base class for file based configurations.
193 """A base class for file based configurations.
194
194
195 As we add more file based config loaders, the common logic should go
195 As we add more file based config loaders, the common logic should go
196 here.
196 here.
197 """
197 """
198 pass
198 pass
199
199
200
200
201 class PyFileConfigLoader(FileConfigLoader):
201 class PyFileConfigLoader(FileConfigLoader):
202 """A config loader for pure python files.
202 """A config loader for pure python files.
203
203
204 This calls execfile on a plain python file and looks for attributes
204 This calls execfile on a plain python file and looks for attributes
205 that are all caps. These attribute are added to the config Struct.
205 that are all caps. These attribute are added to the config Struct.
206 """
206 """
207
207
208 def __init__(self, filename, path=None):
208 def __init__(self, filename, path=None):
209 """Build a config loader for a filename and path.
209 """Build a config loader for a filename and path.
210
210
211 Parameters
211 Parameters
212 ----------
212 ----------
213 filename : str
213 filename : str
214 The file name of the config file.
214 The file name of the config file.
215 path : str, list, tuple
215 path : str, list, tuple
216 The path to search for the config file on, or a sequence of
216 The path to search for the config file on, or a sequence of
217 paths to try in order.
217 paths to try in order.
218 """
218 """
219 super(PyFileConfigLoader, self).__init__()
219 super(PyFileConfigLoader, self).__init__()
220 self.filename = filename
220 self.filename = filename
221 self.path = path
221 self.path = path
222 self.full_filename = ''
222 self.full_filename = ''
223 self.data = None
223 self.data = None
224
224
225 def load_config(self):
225 def load_config(self):
226 """Load the config from a file and return it as a Struct."""
226 """Load the config from a file and return it as a Struct."""
227 self._find_file()
227 self._find_file()
228 self._read_file_as_dict()
228 self._read_file_as_dict()
229 self._convert_to_config()
229 self._convert_to_config()
230 return self.config
230 return self.config
231
231
232 def _find_file(self):
232 def _find_file(self):
233 """Try to find the file by searching the paths."""
233 """Try to find the file by searching the paths."""
234 self.full_filename = filefind(self.filename, self.path)
234 self.full_filename = filefind(self.filename, self.path)
235
235
236 def _read_file_as_dict(self):
236 def _read_file_as_dict(self):
237 """Load the config file into self.config, with recursive loading."""
237 """Load the config file into self.config, with recursive loading."""
238 # This closure is made available in the namespace that is used
238 # This closure is made available in the namespace that is used
239 # to exec the config file. This allows users to call
239 # to exec the config file. This allows users to call
240 # load_subconfig('myconfig.py') to load config files recursively.
240 # load_subconfig('myconfig.py') to load config files recursively.
241 # It needs to be a closure because it has references to self.path
241 # It needs to be a closure because it has references to self.path
242 # and self.config. The sub-config is loaded with the same path
242 # and self.config. The sub-config is loaded with the same path
243 # as the parent, but it uses an empty config which is then merged
243 # as the parent, but it uses an empty config which is then merged
244 # with the parents.
244 # with the parents.
245 def load_subconfig(fname):
245 def load_subconfig(fname):
246 loader = PyFileConfigLoader(fname, self.path)
246 loader = PyFileConfigLoader(fname, self.path)
247 sub_config = loader.load_config()
247 sub_config = loader.load_config()
248 self.config._merge(sub_config)
248 self.config._merge(sub_config)
249
249
250 # Again, this needs to be a closure and should be used in config
250 # Again, this needs to be a closure and should be used in config
251 # files to get the config being loaded.
251 # files to get the config being loaded.
252 def get_config():
252 def get_config():
253 return self.config
253 return self.config
254
254
255 namespace = dict(load_subconfig=load_subconfig, get_config=get_config)
255 namespace = dict(load_subconfig=load_subconfig, get_config=get_config)
256 execfile(self.full_filename, namespace)
256 execfile(self.full_filename, namespace)
257
257
258 def _convert_to_config(self):
258 def _convert_to_config(self):
259 if self.data is None:
259 if self.data is None:
260 ConfigLoaderError('self.data does not exist')
260 ConfigLoaderError('self.data does not exist')
261
261
262
262
263 class CommandLineConfigLoader(ConfigLoader):
263 class CommandLineConfigLoader(ConfigLoader):
264 """A config loader for command line arguments.
264 """A config loader for command line arguments.
265
265
266 As we add more command line based loaders, the common logic should go
266 As we add more command line based loaders, the common logic should go
267 here.
267 here.
268 """
268 """
269
269
270
270
271 class NoConfigDefault(object): pass
271 class NoConfigDefault(object): pass
272 NoConfigDefault = NoConfigDefault()
272 NoConfigDefault = NoConfigDefault()
273
273
274
274 class ArgParseConfigLoader(CommandLineConfigLoader):
275 class ArgParseConfigLoader(CommandLineConfigLoader):
275
276
276 # arguments = [(('-f','--file'),dict(type=str,dest='file'))]
277 # arguments = [(('-f','--file'),dict(type=str,dest='file'))]
277 arguments = ()
278 arguments = ()
278
279
279 def __init__(self, *args, **kw):
280 def __init__(self, *args, **kw):
280 """Create a config loader for use with argparse.
281 """Create a config loader for use with argparse.
281
282
282 The args and kwargs arguments here are passed onto the constructor
283 The args and kwargs arguments here are passed onto the constructor
283 of :class:`argparse.ArgumentParser`.
284 of :class:`argparse.ArgumentParser`.
284 """
285 """
285 super(CommandLineConfigLoader, self).__init__()
286 super(CommandLineConfigLoader, self).__init__()
286 self.args = args
287 self.args = args
287 self.kw = kw
288 self.kw = kw
288
289
289 def load_config(self, args=None):
290 def load_config(self, args=None):
290 """Parse command line arguments and return as a Struct."""
291 """Parse command line arguments and return as a Struct."""
291 self._create_parser()
292 self._create_parser()
292 self._parse_args(args)
293 self._parse_args(args)
293 self._convert_to_config()
294 self._convert_to_config()
294 return self.config
295 return self.config
295
296
296 def get_extra_args(self):
297 def get_extra_args(self):
297 if hasattr(self, 'extra_args'):
298 if hasattr(self, 'extra_args'):
298 return self.extra_args
299 return self.extra_args
299 else:
300 else:
300 return []
301 return []
301
302
302 def _create_parser(self):
303 def _create_parser(self):
303 self.parser = argparse.ArgumentParser(*self.args, **self.kw)
304 self.parser = argparse.ArgumentParser(*self.args, **self.kw)
304 self._add_arguments()
305 self._add_arguments()
305 self._add_other_arguments()
306 self._add_other_arguments()
306
307
307 def _add_other_arguments(self):
308 def _add_other_arguments(self):
308 pass
309 pass
309
310
310 def _add_arguments(self):
311 def _add_arguments(self):
311 for argument in self.arguments:
312 for argument in self.arguments:
312 if not argument[1].has_key('default'):
313 if not argument[1].has_key('default'):
313 argument[1]['default'] = NoConfigDefault
314 argument[1]['default'] = NoConfigDefault
314 self.parser.add_argument(*argument[0],**argument[1])
315 self.parser.add_argument(*argument[0],**argument[1])
315
316
316 def _parse_args(self, args=None):
317 def _parse_args(self, args=None):
317 """self.parser->self.parsed_data"""
318 """self.parser->self.parsed_data"""
318 if args is None:
319 if args is None:
319 self.parsed_data, self.extra_args = self.parser.parse_known_args()
320 self.parsed_data, self.extra_args = self.parser.parse_known_args()
320 else:
321 else:
321 self.parsed_data, self.extra_args = self.parser.parse_known_args(args)
322 self.parsed_data, self.extra_args = self.parser.parse_known_args(args)
322
323
323 def _convert_to_config(self):
324 def _convert_to_config(self):
324 """self.parsed_data->self.config"""
325 """self.parsed_data->self.config"""
325 for k, v in vars(self.parsed_data).items():
326 for k, v in vars(self.parsed_data).items():
326 if v is not NoConfigDefault:
327 if v is not NoConfigDefault:
327 exec_str = 'self.config.' + k + '= v'
328 exec_str = 'self.config.' + k + '= v'
328 exec exec_str in locals(), globals()
329 exec exec_str in locals(), globals()
329
330
@@ -1,251 +1,263 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython cluster directory
4 The IPython cluster directory
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import shutil
19 import shutil
20
20
21 from IPython.core import release
21 from IPython.core import release
22 from IPython.config.loader import PyFileConfigLoader
22 from IPython.config.loader import PyFileConfigLoader
23 from IPython.core.application import Application
23 from IPython.core.application import Application
24 from IPython.core.component import Component
24 from IPython.core.component import Component
25 from IPython.config.loader import ArgParseConfigLoader, NoConfigDefault
25 from IPython.config.loader import ArgParseConfigLoader, NoConfigDefault
26 from IPython.utils.traitlets import Unicode
26 from IPython.utils.traitlets import Unicode
27
27
28 #-----------------------------------------------------------------------------
28 #-----------------------------------------------------------------------------
29 # Imports
29 # Imports
30 #-----------------------------------------------------------------------------
30 #-----------------------------------------------------------------------------
31
31
32
32
33 class ClusterDir(Component):
33 class ClusterDir(Component):
34 """An object to manage the cluster directory and its resources.
34 """An object to manage the cluster directory and its resources.
35
35
36 The cluster directory is used by :command:`ipcontroller`,
36 The cluster directory is used by :command:`ipcontroller`,
37 :command:`ipcontroller` and :command:`ipcontroller` to manage the
37 :command:`ipcontroller` and :command:`ipcontroller` to manage the
38 configuration, logging and security of these applications.
38 configuration, logging and security of these applications.
39
39
40 This object knows how to find, create and manage these directories. This
40 This object knows how to find, create and manage these directories. This
41 should be used by any code that want's to handle cluster directories.
41 should be used by any code that want's to handle cluster directories.
42 """
42 """
43
43
44 security_dir_name = Unicode('security')
44 security_dir_name = Unicode('security')
45 log_dir_name = Unicode('log')
45 log_dir_name = Unicode('log')
46 security_dir = Unicode()
46 security_dir = Unicode()
47 log_dir = Unicode('')
47 log_dir = Unicode('')
48 location = Unicode('')
48 location = Unicode('')
49
49
50 def __init__(self, location):
50 def __init__(self, location):
51 super(ClusterDir, self).__init__(None)
51 super(ClusterDir, self).__init__(None)
52 self.location = location
52 self.location = location
53
53
54 def _location_changed(self, name, old, new):
54 def _location_changed(self, name, old, new):
55 if not os.path.isdir(new):
55 if not os.path.isdir(new):
56 os.makedirs(new, mode=0777)
56 os.makedirs(new, mode=0777)
57 else:
57 else:
58 os.chmod(new, 0777)
58 os.chmod(new, 0777)
59 self.security_dir = os.path.join(new, self.security_dir_name)
59 self.security_dir = os.path.join(new, self.security_dir_name)
60 self.log_dir = os.path.join(new, self.log_dir_name)
60 self.log_dir = os.path.join(new, self.log_dir_name)
61 self.check_dirs()
61
62
62 def _log_dir_changed(self, name, old, new):
63 def _log_dir_changed(self, name, old, new):
63 if not os.path.isdir(new):
64 self.check_log_dir()
64 os.mkdir(new, 0777)
65
66 def check_log_dir(self):
67 if not os.path.isdir(self.log_dir):
68 os.mkdir(self.log_dir, 0777)
65 else:
69 else:
66 os.chmod(new, 0777)
70 os.chmod(self.log_dir, 0777)
67
71
68 def _security_dir_changed(self, name, old, new):
72 def _security_dir_changed(self, name, old, new):
69 if not os.path.isdir(new):
73 self.check_security_dir()
70 os.mkdir(new, 0700)
74
75 def check_security_dir(self):
76 if not os.path.isdir(self.security_dir):
77 os.mkdir(self.security_dir, 0700)
71 else:
78 else:
72 os.chmod(new, 0700)
79 os.chmod(self.security_dir, 0700)
80
81 def check_dirs(self):
82 self.check_security_dir()
83 self.check_log_dir()
73
84
74 def load_config_file(self, filename):
85 def load_config_file(self, filename):
75 """Load a config file from the top level of the cluster dir.
86 """Load a config file from the top level of the cluster dir.
76
87
77 Parameters
88 Parameters
78 ----------
89 ----------
79 filename : unicode or str
90 filename : unicode or str
80 The filename only of the config file that must be located in
91 The filename only of the config file that must be located in
81 the top-level of the cluster directory.
92 the top-level of the cluster directory.
82 """
93 """
83 loader = PyFileConfigLoader(filename, self.location)
94 loader = PyFileConfigLoader(filename, self.location)
84 return loader.load_config()
95 return loader.load_config()
85
96
86 def copy_config_file(self, config_file, path=None):
97 def copy_config_file(self, config_file, path=None, overwrite=False):
87 """Copy a default config file into the active cluster directory.
98 """Copy a default config file into the active cluster directory.
88
99
89 Default configuration files are kept in :mod:`IPython.config.default`.
100 Default configuration files are kept in :mod:`IPython.config.default`.
90 This function moves these from that location to the working cluster
101 This function moves these from that location to the working cluster
91 directory.
102 directory.
92 """
103 """
93 if path is None:
104 if path is None:
94 import IPython.config.default
105 import IPython.config.default
95 path = IPython.config.default.__file__.split(os.path.sep)[:-1]
106 path = IPython.config.default.__file__.split(os.path.sep)[:-1]
96 path = os.path.sep.join(path)
107 path = os.path.sep.join(path)
97 src = os.path.join(path, config_file)
108 src = os.path.join(path, config_file)
98 dst = os.path.join(self.location, config_file)
109 dst = os.path.join(self.location, config_file)
110 if not os.path.isfile(dst) or overwrite:
99 shutil.copy(src, dst)
111 shutil.copy(src, dst)
100
112
101 def copy_all_config_files(self):
113 def copy_all_config_files(self, path=None, overwrite=False):
102 """Copy all config files into the active cluster directory."""
114 """Copy all config files into the active cluster directory."""
103 for f in ['ipcontroller_config.py', 'ipengine_config.py']:
115 for f in ['ipcontroller_config.py', 'ipengine_config.py',
104 self.copy_config_file(f)
116 'ipcluster_config.py']:
117 self.copy_config_file(f, path=path, overwrite=overwrite)
105
118
106 @classmethod
119 @classmethod
107 def find_cluster_dir_by_profile(cls, path, profile='default'):
120 def find_cluster_dir_by_profile(cls, path, profile='default'):
108 """Find/create a cluster dir by profile name and return its ClusterDir.
121 """Find/create a cluster dir by profile name and return its ClusterDir.
109
122
110 This will create the cluster directory if it doesn't exist.
123 This will create the cluster directory if it doesn't exist.
111
124
112 Parameters
125 Parameters
113 ----------
126 ----------
114 path : unicode or str
127 path : unicode or str
115 The directory path to look for the cluster directory in.
128 The directory path to look for the cluster directory in.
116 profile : unicode or str
129 profile : unicode or str
117 The name of the profile. The name of the cluster directory
130 The name of the profile. The name of the cluster directory
118 will be "cluster_<profile>".
131 will be "cluster_<profile>".
119 """
132 """
120 dirname = 'cluster_' + profile
133 dirname = 'cluster_' + profile
121 cluster_dir = os.path.join(os.getcwd(), dirname)
134 cluster_dir = os.path.join(os.getcwd(), dirname)
122 if os.path.isdir(cluster_dir):
135 if os.path.isdir(cluster_dir):
123 return ClusterDir(cluster_dir)
136 return ClusterDir(cluster_dir)
124 else:
137 else:
125 if not os.path.isdir(path):
138 if not os.path.isdir(path):
126 raise IOError("Directory doesn't exist: %s" % path)
139 raise IOError("Directory doesn't exist: %s" % path)
127 cluster_dir = os.path.join(path, dirname)
140 cluster_dir = os.path.join(path, dirname)
128 return ClusterDir(cluster_dir)
141 return ClusterDir(cluster_dir)
129
142
130 @classmethod
143 @classmethod
131 def find_cluster_dir(cls, cluster_dir):
144 def find_cluster_dir(cls, cluster_dir):
132 """Find/create a cluster dir and return its ClusterDir.
145 """Find/create a cluster dir and return its ClusterDir.
133
146
134 This will create the cluster directory if it doesn't exist.
147 This will create the cluster directory if it doesn't exist.
135
148
136 Parameters
149 Parameters
137 ----------
150 ----------
138 cluster_dir : unicode or str
151 cluster_dir : unicode or str
139 The path of the cluster directory. This is expanded using
152 The path of the cluster directory. This is expanded using
140 :func:`os.path.expandvars` and :func:`os.path.expanduser`.
153 :func:`os.path.expandvars` and :func:`os.path.expanduser`.
141 """
154 """
142 cluster_dir = os.path.expandvars(os.path.expanduser(cluster_dir))
155 cluster_dir = os.path.expandvars(os.path.expanduser(cluster_dir))
143 return ClusterDir(cluster_dir)
156 return ClusterDir(cluster_dir)
144
157
145
158
146 class AppWithClusterDirArgParseConfigLoader(ArgParseConfigLoader):
159 class AppWithClusterDirArgParseConfigLoader(ArgParseConfigLoader):
147 """Default command line options for IPython cluster applications."""
160 """Default command line options for IPython cluster applications."""
148
161
149 def _add_other_arguments(self):
162 def _add_other_arguments(self):
150 self.parser.add_argument('-ipythondir', '--ipython-dir',
163 self.parser.add_argument('-ipythondir', '--ipython-dir',
151 dest='Global.ipythondir',type=str,
164 dest='Global.ipythondir',type=str,
152 help='Set to override default location of Global.ipythondir.',
165 help='Set to override default location of Global.ipythondir.',
153 default=NoConfigDefault,
166 default=NoConfigDefault,
154 metavar='Global.ipythondir')
167 metavar='Global.ipythondir')
155 self.parser.add_argument('-p','-profile', '--profile',
168 self.parser.add_argument('-p','-profile', '--profile',
156 dest='Global.profile',type=str,
169 dest='Global.profile',type=str,
157 help='The string name of the profile to be used. This determines '
170 help='The string name of the profile to be used. This determines '
158 'the name of the cluster dir as: cluster_<profile>. The default profile '
171 'the name of the cluster dir as: cluster_<profile>. The default profile '
159 'is named "default". The cluster directory is resolve this way '
172 'is named "default". The cluster directory is resolve this way '
160 'if the --cluster-dir option is not used.',
173 'if the --cluster-dir option is not used.',
161 default=NoConfigDefault,
174 default=NoConfigDefault,
162 metavar='Global.profile')
175 metavar='Global.profile')
163 self.parser.add_argument('-log_level', '--log-level',
176 self.parser.add_argument('-log_level', '--log-level',
164 dest="Global.log_level",type=int,
177 dest="Global.log_level",type=int,
165 help='Set the log level (0,10,20,30,40,50). Default is 30.',
178 help='Set the log level (0,10,20,30,40,50). Default is 30.',
166 default=NoConfigDefault)
179 default=NoConfigDefault)
167 self.parser.add_argument('-cluster_dir', '--cluster-dir',
180 self.parser.add_argument('-cluster_dir', '--cluster-dir',
168 dest='Global.cluster_dir',type=str,
181 dest='Global.cluster_dir',type=str,
169 help='Set the cluster dir. This overrides the logic used by the '
182 help='Set the cluster dir. This overrides the logic used by the '
170 '--profile option.',
183 '--profile option.',
171 default=NoConfigDefault,
184 default=NoConfigDefault,
172 metavar='Global.cluster_dir')
185 metavar='Global.cluster_dir')
173
186
174
187
175 class ApplicationWithClusterDir(Application):
188 class ApplicationWithClusterDir(Application):
176 """An application that puts everything into a cluster directory.
189 """An application that puts everything into a cluster directory.
177
190
178 Instead of looking for things in the ipythondir, this type of application
191 Instead of looking for things in the ipythondir, this type of application
179 will use its own private directory called the "cluster directory"
192 will use its own private directory called the "cluster directory"
180 for things like config files, log files, etc.
193 for things like config files, log files, etc.
181
194
182 The cluster directory is resolved as follows:
195 The cluster directory is resolved as follows:
183
196
184 * If the ``--cluster-dir`` option is given, it is used.
197 * If the ``--cluster-dir`` option is given, it is used.
185 * If ``--cluster-dir`` is not given, the application directory is
198 * If ``--cluster-dir`` is not given, the application directory is
186 resolve using the profile name as ``cluster_<profile>``. The search
199 resolve using the profile name as ``cluster_<profile>``. The search
187 path for this directory is then i) cwd if it is found there
200 path for this directory is then i) cwd if it is found there
188 and ii) in ipythondir otherwise.
201 and ii) in ipythondir otherwise.
189
202
190 The config file for the application is to be put in the cluster
203 The config file for the application is to be put in the cluster
191 dir and named the value of the ``config_file_name`` class attribute.
204 dir and named the value of the ``config_file_name`` class attribute.
192 """
205 """
193
206
194 def create_default_config(self):
207 def create_default_config(self):
195 super(ApplicationWithClusterDir, self).create_default_config()
208 super(ApplicationWithClusterDir, self).create_default_config()
196 self.default_config.Global.profile = 'default'
209 self.default_config.Global.profile = 'default'
197 self.default_config.Global.cluster_dir = ''
210 self.default_config.Global.cluster_dir = ''
198
211
199 def create_command_line_config(self):
212 def create_command_line_config(self):
200 """Create and return a command line config loader."""
213 """Create and return a command line config loader."""
201 return AppWithClusterDirArgParseConfigLoader(
214 return AppWithClusterDirArgParseConfigLoader(
202 description=self.description,
215 description=self.description,
203 version=release.version
216 version=release.version
204 )
217 )
205
218
206 def find_config_file_name(self):
219 def find_config_file_name(self):
207 """Find the config file name for this application."""
220 """Find the config file name for this application."""
208 # For this type of Application it should be set as a class attribute.
221 # For this type of Application it should be set as a class attribute.
209 if not hasattr(self, 'config_file_name'):
222 if not hasattr(self, 'config_file_name'):
210 self.log.critical("No config filename found")
223 self.log.critical("No config filename found")
211
224
212 def find_config_file_paths(self):
225 def find_config_file_paths(self):
213 """This resolves the cluster directory and sets ``config_file_paths``.
226 """This resolves the cluster directory and sets ``config_file_paths``.
214
227
215 This does the following:
228 This does the following:
216 * Create the :class:`ClusterDir` object for the application.
229 * Create the :class:`ClusterDir` object for the application.
217 * Set the ``cluster_dir`` attribute of the application and config
230 * Set the ``cluster_dir`` attribute of the application and config
218 objects.
231 objects.
219 * Set ``config_file_paths`` to point to the cluster directory.
232 * Set ``config_file_paths`` to point to the cluster directory.
220 """
233 """
221
234
222 # Create the ClusterDir object for managing everything
235 # Create the ClusterDir object for managing everything
223 try:
236 try:
224 cluster_dir = self.command_line_config.Global.cluster_dir
237 cluster_dir = self.command_line_config.Global.cluster_dir
225 except AttributeError:
238 except AttributeError:
226 cluster_dir = self.default_config.Global.cluster_dir
239 cluster_dir = self.default_config.Global.cluster_dir
227 cluster_dir = os.path.expandvars(os.path.expanduser(cluster_dir))
240 cluster_dir = os.path.expandvars(os.path.expanduser(cluster_dir))
228 if cluster_dir:
241 if cluster_dir:
229 # Just use cluster_dir
242 # Just use cluster_dir
230 self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
243 self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
231 else:
244 else:
232 # Then look for a profile
245 # Then look for a profile
233 try:
246 try:
234 self.profile = self.command_line_config.Global.profile
247 self.profile = self.command_line_config.Global.profile
235 except AttributeError:
248 except AttributeError:
236 self.profile = self.default_config.Global.profile
249 self.profile = self.default_config.Global.profile
237 self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
250 self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
238 self.ipythondir, self.profile)
251 self.ipythondir, self.profile)
239
252
240 # Set the cluster directory
253 # Set the cluster directory
241 self.cluster_dir = self.cluster_dir_obj.location
254 self.cluster_dir = self.cluster_dir_obj.location
242
255
243 # These have to be set because they could be different from the one
256 # These have to be set because they could be different from the one
244 # that we just computed. Because command line has the highest
257 # that we just computed. Because command line has the highest
245 # priority, this will always end up in the master_config.
258 # priority, this will always end up in the master_config.
246 self.default_config.Global.cluster_dir = self.cluster_dir
259 self.default_config.Global.cluster_dir = self.cluster_dir
247 self.command_line_config.Global.cluster_dir = self.cluster_dir
260 self.command_line_config.Global.cluster_dir = self.cluster_dir
248 self.log.info("Cluster directory set to: %s" % self.cluster_dir)
249
261
250 # Set the search path to the cluster directory
262 # Set the search path to the cluster directory
251 self.config_file_paths = (self.cluster_dir,)
263 self.config_file_paths = (self.cluster_dir,)
@@ -1,92 +1,131 b''
1 # encoding: utf-8
1 # encoding: utf-8
2
2
3 """A class that manages the engines connection to the controller."""
3 """A class that manages the engines connection to the controller."""
4
4
5 __docformat__ = "restructuredtext en"
5 __docformat__ = "restructuredtext en"
6
6
7 #-------------------------------------------------------------------------------
7 #-------------------------------------------------------------------------------
8 # Copyright (C) 2008 The IPython Development Team
8 # Copyright (C) 2008 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-------------------------------------------------------------------------------
12 #-------------------------------------------------------------------------------
13
13
14 #-------------------------------------------------------------------------------
14 #-------------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-------------------------------------------------------------------------------
16 #-------------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import cPickle as pickle
19 import cPickle as pickle
20
20
21 from twisted.python import log, failure
21 from twisted.python import log, failure
22 from twisted.internet import defer
22 from twisted.internet import defer
23 from twisted.internet.defer import inlineCallbacks, returnValue
23
24
24 from IPython.kernel.fcutil import find_furl
25 from IPython.kernel.fcutil import find_furl
25 from IPython.kernel.enginefc import IFCEngine
26 from IPython.kernel.enginefc import IFCEngine
27 from IPython.kernel.twistedutil import sleep_deferred
26
28
27 #-------------------------------------------------------------------------------
29 #-------------------------------------------------------------------------------
28 # The ClientConnector class
30 # The ClientConnector class
29 #-------------------------------------------------------------------------------
31 #-------------------------------------------------------------------------------
30
32
33
34 class EngineConnectorError(Exception):
35 pass
36
37
31 class EngineConnector(object):
38 class EngineConnector(object):
32 """Manage an engines connection to a controller.
39 """Manage an engines connection to a controller.
33
40
34 This class takes a foolscap `Tub` and provides a `connect_to_controller`
41 This class takes a foolscap `Tub` and provides a `connect_to_controller`
35 method that will use the `Tub` to connect to a controller and register
42 method that will use the `Tub` to connect to a controller and register
36 the engine with the controller.
43 the engine with the controller.
37 """
44 """
38
45
39 def __init__(self, tub):
46 def __init__(self, tub):
40 self.tub = tub
47 self.tub = tub
41
48
42 def connect_to_controller(self, engine_service, furl_or_file):
49 def connect_to_controller(self, engine_service, furl_or_file,
50 delay=0.1, max_tries=10):
43 """
51 """
44 Make a connection to a controller specified by a furl.
52 Make a connection to a controller specified by a furl.
45
53
46 This method takes an `IEngineBase` instance and a foolcap URL and uses
54 This method takes an `IEngineBase` instance and a foolcap URL and uses
47 the `tub` attribute to make a connection to the controller. The
55 the `tub` attribute to make a connection to the controller. The
48 foolscap URL contains all the information needed to connect to the
56 foolscap URL contains all the information needed to connect to the
49 controller, including the ip and port as well as any encryption and
57 controller, including the ip and port as well as any encryption and
50 authentication information needed for the connection.
58 authentication information needed for the connection.
51
59
52 After getting a reference to the controller, this method calls the
60 After getting a reference to the controller, this method calls the
53 `register_engine` method of the controller to actually register the
61 `register_engine` method of the controller to actually register the
54 engine.
62 engine.
55
63
56 :Parameters:
64 This method will try to connect to the controller multiple times with
65 a delay in between. Each time the FURL file is read anew.
66
67 Parameters
68 __________
57 engine_service : IEngineBase
69 engine_service : IEngineBase
58 An instance of an `IEngineBase` implementer
70 An instance of an `IEngineBase` implementer
59 furl_or_file : str
71 furl_or_file : str
60 A furl or a filename containing a furl
72 A furl or a filename containing a furl
73 delay : float
74 The intial time to wait between connection attempts. Subsequent
75 attempts have increasing delays.
76 max_tries : int
77 The maximum number of connection attempts.
61 """
78 """
62 if not self.tub.running:
79 if not self.tub.running:
63 self.tub.startService()
80 self.tub.startService()
64 self.engine_service = engine_service
81 self.engine_service = engine_service
65 self.engine_reference = IFCEngine(self.engine_service)
82 self.engine_reference = IFCEngine(self.engine_service)
83
84 d = self._try_to_connect(furl_or_file, delay, max_tries, attempt=0)
85 return d
86
87 @inlineCallbacks
88 def _try_to_connect(self, furl_or_file, delay, max_tries, attempt):
89 """Try to connect to the controller with retry logic."""
90 if attempt < max_tries:
91 log.msg("Attempting to connect to controller [%r]: %s" % \
92 (attempt, furl_or_file))
66 try:
93 try:
67 self.furl = find_furl(furl_or_file)
94 self.furl = find_furl(furl_or_file)
68 except ValueError:
95 # Uncomment this to see the FURL being tried.
69 return defer.fail(failure.Failure())
96 # log.msg("FURL: %s" % self.furl)
97 rr = yield self.tub.getReference(self.furl)
98 except:
99 if attempt==max_tries-1:
100 # This will propagate the exception all the way to the top
101 # where it can be handled.
102 raise
70 else:
103 else:
71 d = self.tub.getReference(self.furl)
104 yield sleep_deferred(delay)
72 d.addCallbacks(self._register, self._log_failure)
105 yield self._try_to_connect(
73 return d
106 furl_or_file, 1.5*delay, max_tries, attempt+1
74
107 )
75 def _log_failure(self, reason):
108 else:
76 log.err('EngineConnector: engine registration failed:')
109 result = yield self._register(rr)
77 log.err(reason)
110 returnValue(result)
78 return reason
111 else:
112 raise EngineConnectorError(
113 'Could not connect to controller, max_tries (%r) exceeded. '
114 'This usually means that i) the controller was not started, '
115 'or ii) a firewall was blocking the engine from connecting '
116 'to the controller.' % max_tries
117 )
79
118
80 def _register(self, rr):
119 def _register(self, rr):
81 self.remote_ref = rr
120 self.remote_ref = rr
82 # Now register myself with the controller
121 # Now register myself with the controller
83 desired_id = self.engine_service.id
122 desired_id = self.engine_service.id
84 d = self.remote_ref.callRemote('register_engine', self.engine_reference,
123 d = self.remote_ref.callRemote('register_engine', self.engine_reference,
85 desired_id, os.getpid(), pickle.dumps(self.engine_service.properties,2))
124 desired_id, os.getpid(), pickle.dumps(self.engine_service.properties,2))
86 return d.addCallbacks(self._reference_sent, self._log_failure)
125 return d.addCallback(self._reference_sent)
87
126
88 def _reference_sent(self, registration_dict):
127 def _reference_sent(self, registration_dict):
89 self.engine_service.id = registration_dict['id']
128 self.engine_service.id = registration_dict['id']
90 log.msg("engine registration succeeded, got id: %r" % self.engine_service.id)
129 log.msg("engine registration succeeded, got id: %r" % self.engine_service.id)
91 return self.engine_service.id
130 return self.engine_service.id
92
131
@@ -1,236 +1,239 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 Foolscap related utilities.
4 Foolscap related utilities.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 from __future__ import with_statement
19
18 import os
20 import os
19 import tempfile
21 import tempfile
20
22
21 from twisted.internet import reactor, defer
23 from twisted.internet import reactor, defer
22 from twisted.python import log
24 from twisted.python import log
23
25
24 from foolscap import Tub, UnauthenticatedTub
26 from foolscap import Tub, UnauthenticatedTub
25
27
26 from IPython.config.loader import Config
28 from IPython.config.loader import Config
27
29
28 from IPython.kernel.configobjfactory import AdaptedConfiguredObjectFactory
30 from IPython.kernel.configobjfactory import AdaptedConfiguredObjectFactory
29
31
30 from IPython.kernel.error import SecurityError
32 from IPython.kernel.error import SecurityError
31
33
32 from IPython.utils.traitlets import Int, Str, Bool, Instance
34 from IPython.utils.traitlets import Int, Str, Bool, Instance
33 from IPython.utils.importstring import import_item
35 from IPython.utils.importstring import import_item
34
36
35 #-----------------------------------------------------------------------------
37 #-----------------------------------------------------------------------------
36 # Code
38 # Code
37 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
38
40
39
41
40 # We do this so if a user doesn't have OpenSSL installed, it will try to use
42 # We do this so if a user doesn't have OpenSSL installed, it will try to use
41 # an UnauthenticatedTub. But, they will still run into problems if they
43 # an UnauthenticatedTub. But, they will still run into problems if they
42 # try to use encrypted furls.
44 # try to use encrypted furls.
43 try:
45 try:
44 import OpenSSL
46 import OpenSSL
45 except:
47 except:
46 Tub = UnauthenticatedTub
48 Tub = UnauthenticatedTub
47 have_crypto = False
49 have_crypto = False
48 else:
50 else:
49 have_crypto = True
51 have_crypto = True
50
52
51
53
52 def check_furl_file_security(furl_file, secure):
54 def check_furl_file_security(furl_file, secure):
53 """Remove the old furl_file if changing security modes."""
55 """Remove the old furl_file if changing security modes."""
54 if os.path.isfile(furl_file):
56 if os.path.isfile(furl_file):
55 f = open(furl_file, 'r')
57 f = open(furl_file, 'r')
56 oldfurl = f.read().strip()
58 oldfurl = f.read().strip()
57 f.close()
59 f.close()
58 if (oldfurl.startswith('pb://') and not secure) or (oldfurl.startswith('pbu://') and secure):
60 if (oldfurl.startswith('pb://') and not secure) or (oldfurl.startswith('pbu://') and secure):
59 os.remove(furl_file)
61 os.remove(furl_file)
60
62
61
63
62 def is_secure(furl):
64 def is_secure(furl):
63 """Is the given FURL secure or not."""
65 """Is the given FURL secure or not."""
64 if is_valid(furl):
66 if is_valid(furl):
65 if furl.startswith("pb://"):
67 if furl.startswith("pb://"):
66 return True
68 return True
67 elif furl.startswith("pbu://"):
69 elif furl.startswith("pbu://"):
68 return False
70 return False
69 else:
71 else:
70 raise ValueError("invalid FURL: %s" % furl)
72 raise ValueError("invalid FURL: %s" % furl)
71
73
72
74
73 def is_valid(furl):
75 def is_valid(furl):
74 """Is the str a valid FURL or not."""
76 """Is the str a valid FURL or not."""
75 if isinstance(furl, str):
77 if isinstance(furl, str):
76 if furl.startswith("pb://") or furl.startswith("pbu://"):
78 if furl.startswith("pb://") or furl.startswith("pbu://"):
77 return True
79 return True
78 else:
80 else:
79 return False
81 return False
80
82
81
83
82 def find_furl(furl_or_file):
84 def find_furl(furl_or_file):
83 """Find, validate and return a FURL in a string or file."""
85 """Find, validate and return a FURL in a string or file."""
84 if isinstance(furl_or_file, str):
86 if isinstance(furl_or_file, str):
85 if is_valid(furl_or_file):
87 if is_valid(furl_or_file):
86 return furl_or_file
88 return furl_or_file
87 if os.path.isfile(furl_or_file):
89 if os.path.isfile(furl_or_file):
88 furl = open(furl_or_file, 'r').read().strip()
90 with open(furl_or_file, 'r') as f:
91 furl = f.read().strip()
89 if is_valid(furl):
92 if is_valid(furl):
90 return furl
93 return furl
91 raise ValueError("not a FURL or a file containing a FURL: %s" % furl_or_file)
94 raise ValueError("Not a FURL or a file containing a FURL: %s" % furl_or_file)
92
95
93
96
94 def get_temp_furlfile(filename):
97 def get_temp_furlfile(filename):
95 """Return a temporary FURL file."""
98 """Return a temporary FURL file."""
96 return tempfile.mktemp(dir=os.path.dirname(filename),
99 return tempfile.mktemp(dir=os.path.dirname(filename),
97 prefix=os.path.basename(filename))
100 prefix=os.path.basename(filename))
98
101
99
102
100 def make_tub(ip, port, secure, cert_file):
103 def make_tub(ip, port, secure, cert_file):
101 """Create a listening tub given an ip, port, and cert_file location.
104 """Create a listening tub given an ip, port, and cert_file location.
102
105
103 Parameters
106 Parameters
104 ----------
107 ----------
105 ip : str
108 ip : str
106 The ip address or hostname that the tub should listen on.
109 The ip address or hostname that the tub should listen on.
107 Empty means all interfaces.
110 Empty means all interfaces.
108 port : int
111 port : int
109 The port that the tub should listen on. A value of 0 means
112 The port that the tub should listen on. A value of 0 means
110 pick a random port
113 pick a random port
111 secure: bool
114 secure: bool
112 Will the connection be secure (in the Foolscap sense).
115 Will the connection be secure (in the Foolscap sense).
113 cert_file: str
116 cert_file: str
114 A filename of a file to be used for theSSL certificate.
117 A filename of a file to be used for theSSL certificate.
115
118
116 Returns
119 Returns
117 -------
120 -------
118 A tub, listener tuple.
121 A tub, listener tuple.
119 """
122 """
120 if secure:
123 if secure:
121 if have_crypto:
124 if have_crypto:
122 tub = Tub(certFile=cert_file)
125 tub = Tub(certFile=cert_file)
123 else:
126 else:
124 raise SecurityError("OpenSSL/pyOpenSSL is not available, so we "
127 raise SecurityError("OpenSSL/pyOpenSSL is not available, so we "
125 "can't run in secure mode. Try running without "
128 "can't run in secure mode. Try running without "
126 "security using 'ipcontroller -xy'.")
129 "security using 'ipcontroller -xy'.")
127 else:
130 else:
128 tub = UnauthenticatedTub()
131 tub = UnauthenticatedTub()
129
132
130 # Set the strport based on the ip and port and start listening
133 # Set the strport based on the ip and port and start listening
131 if ip == '':
134 if ip == '':
132 strport = "tcp:%i" % port
135 strport = "tcp:%i" % port
133 else:
136 else:
134 strport = "tcp:%i:interface=%s" % (port, ip)
137 strport = "tcp:%i:interface=%s" % (port, ip)
135 log.msg("Starting listener with [secure=%r] on: %s" % (secure, strport))
138 log.msg("Starting listener with [secure=%r] on: %s" % (secure, strport))
136 listener = tub.listenOn(strport)
139 listener = tub.listenOn(strport)
137
140
138 return tub, listener
141 return tub, listener
139
142
140
143
141 class FCServiceFactory(AdaptedConfiguredObjectFactory):
144 class FCServiceFactory(AdaptedConfiguredObjectFactory):
142 """This class creates a tub with various services running in it.
145 """This class creates a tub with various services running in it.
143
146
144 The basic idea is that :meth:`create` returns a running :class:`Tub`
147 The basic idea is that :meth:`create` returns a running :class:`Tub`
145 instance that has a number of Foolscap references registered in it.
148 instance that has a number of Foolscap references registered in it.
146 This class is a subclass of :class:`IPython.core.component.Component`
149 This class is a subclass of :class:`IPython.core.component.Component`
147 so the IPython configuration and component system are used.
150 so the IPython configuration and component system are used.
148
151
149 Attributes
152 Attributes
150 ----------
153 ----------
151 interfaces : Config
154 interfaces : Config
152 A Config instance whose values are sub-Config objects having two
155 A Config instance whose values are sub-Config objects having two
153 keys: furl_file and interface_chain.
156 keys: furl_file and interface_chain.
154
157
155 The other attributes are the standard ones for Foolscap.
158 The other attributes are the standard ones for Foolscap.
156 """
159 """
157
160
158 ip = Str('', config=True)
161 ip = Str('', config=True)
159 port = Int(0, config=True)
162 port = Int(0, config=True)
160 secure = Bool(True, config=True)
163 secure = Bool(True, config=True)
161 cert_file = Str('', config=True)
164 cert_file = Str('', config=True)
162 location = Str('', config=True)
165 location = Str('', config=True)
163 reuse_furls = Bool(False, config=True)
166 reuse_furls = Bool(False, config=True)
164 interfaces = Instance(klass=Config, kw={}, allow_none=False, config=True)
167 interfaces = Instance(klass=Config, kw={}, allow_none=False, config=True)
165
168
166 def __init__(self, config, adaptee):
169 def __init__(self, config, adaptee):
167 super(FCServiceFactory, self).__init__(config, adaptee)
170 super(FCServiceFactory, self).__init__(config, adaptee)
168 self._check_reuse_furls()
171 self._check_reuse_furls()
169
172
170 def _ip_changed(self, name, old, new):
173 def _ip_changed(self, name, old, new):
171 if new == 'localhost' or new == '127.0.0.1':
174 if new == 'localhost' or new == '127.0.0.1':
172 self.location = '127.0.0.1'
175 self.location = '127.0.0.1'
173
176
174 def _check_reuse_furls(self):
177 def _check_reuse_furls(self):
175 furl_files = [i.furl_file for i in self.interfaces.values()]
178 furl_files = [i.furl_file for i in self.interfaces.values()]
176 for ff in furl_files:
179 for ff in furl_files:
177 fullfile = self._get_security_file(ff)
180 fullfile = self._get_security_file(ff)
178 if self.reuse_furls:
181 if self.reuse_furls:
179 log.msg("Reusing FURL file: %s" % fullfile)
182 log.msg("Reusing FURL file: %s" % fullfile)
180 else:
183 else:
181 if os.path.isfile(fullfile):
184 if os.path.isfile(fullfile):
182 log.msg("Removing old FURL file: %s" % fullfile)
185 log.msg("Removing old FURL file: %s" % fullfile)
183 os.remove(fullfile)
186 os.remove(fullfile)
184
187
185 def _get_security_file(self, filename):
188 def _get_security_file(self, filename):
186 return os.path.join(self.config.Global.security_dir, filename)
189 return os.path.join(self.config.Global.security_dir, filename)
187
190
188 def create(self):
191 def create(self):
189 """Create and return the Foolscap tub with everything running."""
192 """Create and return the Foolscap tub with everything running."""
190
193
191 self.tub, self.listener = make_tub(
194 self.tub, self.listener = make_tub(
192 self.ip, self.port, self.secure,
195 self.ip, self.port, self.secure,
193 self._get_security_file(self.cert_file)
196 self._get_security_file(self.cert_file)
194 )
197 )
195 # log.msg("Interfaces to register [%r]: %r" % \
198 # log.msg("Interfaces to register [%r]: %r" % \
196 # (self.__class__, self.interfaces))
199 # (self.__class__, self.interfaces))
197 if not self.secure:
200 if not self.secure:
198 log.msg("WARNING: running with no security: %s" % \
201 log.msg("WARNING: running with no security: %s" % \
199 self.__class__.__name__)
202 self.__class__.__name__)
200 reactor.callWhenRunning(self.set_location_and_register)
203 reactor.callWhenRunning(self.set_location_and_register)
201 return self.tub
204 return self.tub
202
205
203 def set_location_and_register(self):
206 def set_location_and_register(self):
204 """Set the location for the tub and return a deferred."""
207 """Set the location for the tub and return a deferred."""
205
208
206 if self.location == '':
209 if self.location == '':
207 d = self.tub.setLocationAutomatically()
210 d = self.tub.setLocationAutomatically()
208 else:
211 else:
209 d = defer.maybeDeferred(self.tub.setLocation,
212 d = defer.maybeDeferred(self.tub.setLocation,
210 "%s:%i" % (self.location, self.listener.getPortnum()))
213 "%s:%i" % (self.location, self.listener.getPortnum()))
211 self.adapt_to_interfaces(d)
214 self.adapt_to_interfaces(d)
212
215
213 def adapt_to_interfaces(self, d):
216 def adapt_to_interfaces(self, d):
214 """Run through the interfaces, adapt and register."""
217 """Run through the interfaces, adapt and register."""
215
218
216 for ifname, ifconfig in self.interfaces.iteritems():
219 for ifname, ifconfig in self.interfaces.iteritems():
217 ff = self._get_security_file(ifconfig.furl_file)
220 ff = self._get_security_file(ifconfig.furl_file)
218 log.msg("Adapting [%s] to interface: %s" % \
221 log.msg("Adapting [%s] to interface: %s" % \
219 (self.adaptee.__class__.__name__, ifname))
222 (self.adaptee.__class__.__name__, ifname))
220 log.msg("Saving FURL for interface [%s] to file: %s" % (ifname, ff))
223 log.msg("Saving FURL for interface [%s] to file: %s" % (ifname, ff))
221 check_furl_file_security(ff, self.secure)
224 check_furl_file_security(ff, self.secure)
222 adaptee = self.adaptee
225 adaptee = self.adaptee
223 for i in ifconfig.interface_chain:
226 for i in ifconfig.interface_chain:
224 adaptee = import_item(i)(adaptee)
227 adaptee = import_item(i)(adaptee)
225 d.addCallback(self.register, adaptee, furl_file=ff)
228 d.addCallback(self.register, adaptee, furl_file=ff)
226
229
227 def register(self, empty, ref, furl_file):
230 def register(self, empty, ref, furl_file):
228 """Register the reference with the FURL file.
231 """Register the reference with the FURL file.
229
232
230 The FURL file is created and then moved to make sure that when the
233 The FURL file is created and then moved to make sure that when the
231 file appears, the buffer has been flushed and the file closed.
234 file appears, the buffer has been flushed and the file closed.
232 """
235 """
233 temp_furl_file = get_temp_furlfile(furl_file)
236 temp_furl_file = get_temp_furlfile(furl_file)
234 self.tub.registerReference(ref, furlFile=temp_furl_file)
237 self.tub.registerReference(ref, furlFile=temp_furl_file)
235 os.rename(temp_furl_file, furl_file)
238 os.rename(temp_furl_file, furl_file)
236
239
@@ -1,268 +1,274 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython controller application.
4 The IPython controller application.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import copy
18 import copy
19 import os
19 import os
20 import sys
20 import sys
21
21
22 from twisted.application import service
22 from twisted.application import service
23 from twisted.internet import reactor
23 from twisted.internet import reactor
24 from twisted.python import log
24 from twisted.python import log
25
25
26 from IPython.config.loader import Config, NoConfigDefault
26 from IPython.config.loader import Config, NoConfigDefault
27
27
28 from IPython.kernel.clusterdir import (
28 from IPython.kernel.clusterdir import (
29 ApplicationWithClusterDir,
29 ApplicationWithClusterDir,
30 AppWithClusterDirArgParseConfigLoader
30 AppWithClusterDirArgParseConfigLoader
31 )
31 )
32
32
33 from IPython.core import release
33 from IPython.core import release
34
34
35 from IPython.utils.traitlets import Str, Instance
35 from IPython.utils.traitlets import Str, Instance
36
36
37 from IPython.kernel import controllerservice
37 from IPython.kernel import controllerservice
38
38
39 from IPython.kernel.fcutil import FCServiceFactory
39 from IPython.kernel.fcutil import FCServiceFactory
40
40
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42 # Default interfaces
42 # Default interfaces
43 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
44
44
45
45
46 # The default client interfaces for FCClientServiceFactory.interfaces
46 # The default client interfaces for FCClientServiceFactory.interfaces
47 default_client_interfaces = Config()
47 default_client_interfaces = Config()
48 default_client_interfaces.Task.interface_chain = [
48 default_client_interfaces.Task.interface_chain = [
49 'IPython.kernel.task.ITaskController',
49 'IPython.kernel.task.ITaskController',
50 'IPython.kernel.taskfc.IFCTaskController'
50 'IPython.kernel.taskfc.IFCTaskController'
51 ]
51 ]
52
52
53 default_client_interfaces.Task.furl_file = 'ipcontroller-tc.furl'
53 default_client_interfaces.Task.furl_file = 'ipcontroller-tc.furl'
54
54
55 default_client_interfaces.MultiEngine.interface_chain = [
55 default_client_interfaces.MultiEngine.interface_chain = [
56 'IPython.kernel.multiengine.IMultiEngine',
56 'IPython.kernel.multiengine.IMultiEngine',
57 'IPython.kernel.multienginefc.IFCSynchronousMultiEngine'
57 'IPython.kernel.multienginefc.IFCSynchronousMultiEngine'
58 ]
58 ]
59
59
60 default_client_interfaces.MultiEngine.furl_file = 'ipcontroller-mec.furl'
60 default_client_interfaces.MultiEngine.furl_file = 'ipcontroller-mec.furl'
61
61
62 # Make this a dict we can pass to Config.__init__ for the default
62 # Make this a dict we can pass to Config.__init__ for the default
63 default_client_interfaces = dict(copy.deepcopy(default_client_interfaces.items()))
63 default_client_interfaces = dict(copy.deepcopy(default_client_interfaces.items()))
64
64
65
65
66
66
67 # The default engine interfaces for FCEngineServiceFactory.interfaces
67 # The default engine interfaces for FCEngineServiceFactory.interfaces
68 default_engine_interfaces = Config()
68 default_engine_interfaces = Config()
69 default_engine_interfaces.Default.interface_chain = [
69 default_engine_interfaces.Default.interface_chain = [
70 'IPython.kernel.enginefc.IFCControllerBase'
70 'IPython.kernel.enginefc.IFCControllerBase'
71 ]
71 ]
72
72
73 default_engine_interfaces.Default.furl_file = 'ipcontroller-engine.furl'
73 default_engine_interfaces.Default.furl_file = 'ipcontroller-engine.furl'
74
74
75 # Make this a dict we can pass to Config.__init__ for the default
75 # Make this a dict we can pass to Config.__init__ for the default
76 default_engine_interfaces = dict(copy.deepcopy(default_engine_interfaces.items()))
76 default_engine_interfaces = dict(copy.deepcopy(default_engine_interfaces.items()))
77
77
78
78
79 #-----------------------------------------------------------------------------
79 #-----------------------------------------------------------------------------
80 # Service factories
80 # Service factories
81 #-----------------------------------------------------------------------------
81 #-----------------------------------------------------------------------------
82
82
83
83
84 class FCClientServiceFactory(FCServiceFactory):
84 class FCClientServiceFactory(FCServiceFactory):
85 """A Foolscap implementation of the client services."""
85 """A Foolscap implementation of the client services."""
86
86
87 cert_file = Str('ipcontroller-client.pem', config=True)
87 cert_file = Str('ipcontroller-client.pem', config=True)
88 interfaces = Instance(klass=Config, kw=default_client_interfaces,
88 interfaces = Instance(klass=Config, kw=default_client_interfaces,
89 allow_none=False, config=True)
89 allow_none=False, config=True)
90
90
91
91
92 class FCEngineServiceFactory(FCServiceFactory):
92 class FCEngineServiceFactory(FCServiceFactory):
93 """A Foolscap implementation of the engine services."""
93 """A Foolscap implementation of the engine services."""
94
94
95 cert_file = Str('ipcontroller-engine.pem', config=True)
95 cert_file = Str('ipcontroller-engine.pem', config=True)
96 interfaces = Instance(klass=dict, kw=default_engine_interfaces,
96 interfaces = Instance(klass=dict, kw=default_engine_interfaces,
97 allow_none=False, config=True)
97 allow_none=False, config=True)
98
98
99
99
100 #-----------------------------------------------------------------------------
100 #-----------------------------------------------------------------------------
101 # The main application
101 # The main application
102 #-----------------------------------------------------------------------------
102 #-----------------------------------------------------------------------------
103
103
104
104
105 cl_args = (
105 cl_args = (
106 # Client config
106 # Client config
107 (('--client-ip',), dict(
107 (('--client-ip',), dict(
108 type=str, dest='FCClientServiceFactory.ip', default=NoConfigDefault,
108 type=str, dest='FCClientServiceFactory.ip', default=NoConfigDefault,
109 help='The IP address or hostname the controller will listen on for '
109 help='The IP address or hostname the controller will listen on for '
110 'client connections.',
110 'client connections.',
111 metavar='FCClientServiceFactory.ip')
111 metavar='FCClientServiceFactory.ip')
112 ),
112 ),
113 (('--client-port',), dict(
113 (('--client-port',), dict(
114 type=int, dest='FCClientServiceFactory.port', default=NoConfigDefault,
114 type=int, dest='FCClientServiceFactory.port', default=NoConfigDefault,
115 help='The port the controller will listen on for client connections. '
115 help='The port the controller will listen on for client connections. '
116 'The default is to use 0, which will autoselect an open port.',
116 'The default is to use 0, which will autoselect an open port.',
117 metavar='FCClientServiceFactory.port')
117 metavar='FCClientServiceFactory.port')
118 ),
118 ),
119 (('--client-location',), dict(
119 (('--client-location',), dict(
120 type=str, dest='FCClientServiceFactory.location', default=NoConfigDefault,
120 type=str, dest='FCClientServiceFactory.location', default=NoConfigDefault,
121 help='The hostname or IP that clients should connect to. This does '
121 help='The hostname or IP that clients should connect to. This does '
122 'not control which interface the controller listens on. Instead, this '
122 'not control which interface the controller listens on. Instead, this '
123 'determines the hostname/IP that is listed in the FURL, which is how '
123 'determines the hostname/IP that is listed in the FURL, which is how '
124 'clients know where to connect. Useful if the controller is listening '
124 'clients know where to connect. Useful if the controller is listening '
125 'on multiple interfaces.',
125 'on multiple interfaces.',
126 metavar='FCClientServiceFactory.location')
126 metavar='FCClientServiceFactory.location')
127 ),
127 ),
128 # Engine config
128 # Engine config
129 (('--engine-ip',), dict(
129 (('--engine-ip',), dict(
130 type=str, dest='FCEngineServiceFactory.ip', default=NoConfigDefault,
130 type=str, dest='FCEngineServiceFactory.ip', default=NoConfigDefault,
131 help='The IP address or hostname the controller will listen on for '
131 help='The IP address or hostname the controller will listen on for '
132 'engine connections.',
132 'engine connections.',
133 metavar='FCEngineServiceFactory.ip')
133 metavar='FCEngineServiceFactory.ip')
134 ),
134 ),
135 (('--engine-port',), dict(
135 (('--engine-port',), dict(
136 type=int, dest='FCEngineServiceFactory.port', default=NoConfigDefault,
136 type=int, dest='FCEngineServiceFactory.port', default=NoConfigDefault,
137 help='The port the controller will listen on for engine connections. '
137 help='The port the controller will listen on for engine connections. '
138 'The default is to use 0, which will autoselect an open port.',
138 'The default is to use 0, which will autoselect an open port.',
139 metavar='FCEngineServiceFactory.port')
139 metavar='FCEngineServiceFactory.port')
140 ),
140 ),
141 (('--engine-location',), dict(
141 (('--engine-location',), dict(
142 type=str, dest='FCEngineServiceFactory.location', default=NoConfigDefault,
142 type=str, dest='FCEngineServiceFactory.location', default=NoConfigDefault,
143 help='The hostname or IP that engines should connect to. This does '
143 help='The hostname or IP that engines should connect to. This does '
144 'not control which interface the controller listens on. Instead, this '
144 'not control which interface the controller listens on. Instead, this '
145 'determines the hostname/IP that is listed in the FURL, which is how '
145 'determines the hostname/IP that is listed in the FURL, which is how '
146 'engines know where to connect. Useful if the controller is listening '
146 'engines know where to connect. Useful if the controller is listening '
147 'on multiple interfaces.',
147 'on multiple interfaces.',
148 metavar='FCEngineServiceFactory.location')
148 metavar='FCEngineServiceFactory.location')
149 ),
149 ),
150 # Global config
150 # Global config
151 (('--log-to-file',), dict(
151 (('--log-to-file',), dict(
152 action='store_true', dest='Global.log_to_file', default=NoConfigDefault,
152 action='store_true', dest='Global.log_to_file', default=NoConfigDefault,
153 help='Log to a file in the log directory (default is stdout)')
153 help='Log to a file in the log directory (default is stdout)')
154 ),
154 ),
155 (('-r','--reuse-furls'), dict(
155 (('-r','--reuse-furls'), dict(
156 action='store_true', dest='Global.reuse_furls', default=NoConfigDefault,
156 action='store_true', dest='Global.reuse_furls', default=NoConfigDefault,
157 help='Try to reuse all FURL files. If this is not set all FURL files '
157 help='Try to reuse all FURL files. If this is not set all FURL files '
158 'are deleted before the controller starts. This must be set if '
158 'are deleted before the controller starts. This must be set if '
159 'specific ports are specified by --engine-port or --client-port.')
159 'specific ports are specified by --engine-port or --client-port.')
160 ),
160 ),
161 (('-ns','--no-security'), dict(
161 (('-ns','--no-security'), dict(
162 action='store_false', dest='Global.secure', default=NoConfigDefault,
162 action='store_false', dest='Global.secure', default=NoConfigDefault,
163 help='Turn off SSL encryption for all connections.')
163 help='Turn off SSL encryption for all connections.')
164 )
164 )
165 )
165 )
166
166
167
167
168 class IPControllerAppCLConfigLoader(AppWithClusterDirArgParseConfigLoader):
168 class IPControllerAppCLConfigLoader(AppWithClusterDirArgParseConfigLoader):
169
169
170 arguments = cl_args
170 arguments = cl_args
171
171
172
172
173 default_config_file_name = 'ipcontroller_config.py'
173 default_config_file_name = 'ipcontroller_config.py'
174
174
175
175
176 class IPControllerApp(ApplicationWithClusterDir):
176 class IPControllerApp(ApplicationWithClusterDir):
177
177
178 name = 'ipcontroller'
178 name = 'ipcontroller'
179 description = 'Start the IPython controller for parallel computing.'
179 description = 'Start the IPython controller for parallel computing.'
180 config_file_name = default_config_file_name
180 config_file_name = default_config_file_name
181
181
182 def create_default_config(self):
182 def create_default_config(self):
183 super(IPControllerApp, self).create_default_config()
183 super(IPControllerApp, self).create_default_config()
184 self.default_config.Global.reuse_furls = False
184 self.default_config.Global.reuse_furls = False
185 self.default_config.Global.secure = True
185 self.default_config.Global.secure = True
186 self.default_config.Global.import_statements = []
186 self.default_config.Global.import_statements = []
187 self.default_config.Global.log_to_file = False
187 self.default_config.Global.log_to_file = False
188
188
189 def create_command_line_config(self):
189 def create_command_line_config(self):
190 """Create and return a command line config loader."""
190 """Create and return a command line config loader."""
191 return IPControllerAppCLConfigLoader(
191 return IPControllerAppCLConfigLoader(
192 description=self.description,
192 description=self.description,
193 version=release.version
193 version=release.version
194 )
194 )
195
195
196 def post_load_command_line_config(self):
196 def post_load_command_line_config(self):
197 # Now setup reuse_furls
197 # Now setup reuse_furls
198 c = self.command_line_config
198 c = self.command_line_config
199 if hasattr(c.Global, 'reuse_furls'):
199 if hasattr(c.Global, 'reuse_furls'):
200 c.FCClientServiceFactory.reuse_furls = c.Global.reuse_furls
200 c.FCClientServiceFactory.reuse_furls = c.Global.reuse_furls
201 c.FCEngineServiceFactory.reuse_furls = c.Global.reuse_furls
201 c.FCEngineServiceFactory.reuse_furls = c.Global.reuse_furls
202 del c.Global.reuse_furls
202 del c.Global.reuse_furls
203 if hasattr(c.Global, 'secure'):
203 if hasattr(c.Global, 'secure'):
204 c.FCClientServiceFactory.secure = c.Global.secure
204 c.FCClientServiceFactory.secure = c.Global.secure
205 c.FCEngineServiceFactory.secure = c.Global.secure
205 c.FCEngineServiceFactory.secure = c.Global.secure
206 del c.Global.secure
206 del c.Global.secure
207
207
208 def pre_construct(self):
208 def pre_construct(self):
209 # The log and security dirs were set earlier, but here we put them
209 # The log and security dirs were set earlier, but here we put them
210 # into the config and log them.
210 # into the config and log them.
211 config = self.master_config
211 config = self.master_config
212 sdir = self.cluster_dir_obj.security_dir
212 sdir = self.cluster_dir_obj.security_dir
213 self.security_dir = config.Global.security_dir = sdir
213 self.security_dir = config.Global.security_dir = sdir
214 ldir = self.cluster_dir_obj.log_dir
214 ldir = self.cluster_dir_obj.log_dir
215 self.log_dir = config.Global.log_dir = ldir
215 self.log_dir = config.Global.log_dir = ldir
216 self.log.info("Cluster directory set to: %s" % self.cluster_dir)
216 self.log.info("Log directory set to: %s" % self.log_dir)
217 self.log.info("Log directory set to: %s" % self.log_dir)
217 self.log.info("Security directory set to: %s" % self.security_dir)
218 self.log.info("Security directory set to: %s" % self.security_dir)
218
219
219 def construct(self):
220 def construct(self):
220 # I am a little hesitant to put these into InteractiveShell itself.
221 # I am a little hesitant to put these into InteractiveShell itself.
221 # But that might be the place for them
222 # But that might be the place for them
222 sys.path.insert(0, '')
223 sys.path.insert(0, '')
223
224
224 self.start_logging()
225 self.start_logging()
225 self.import_statements()
226 self.import_statements()
226
227
227 # Create the service hierarchy
228 # Create the service hierarchy
228 self.main_service = service.MultiService()
229 self.main_service = service.MultiService()
229 # The controller service
230 # The controller service
230 controller_service = controllerservice.ControllerService()
231 controller_service = controllerservice.ControllerService()
231 controller_service.setServiceParent(self.main_service)
232 controller_service.setServiceParent(self.main_service)
232 # The client tub and all its refereceables
233 # The client tub and all its refereceables
233 csfactory = FCClientServiceFactory(self.master_config, controller_service)
234 csfactory = FCClientServiceFactory(self.master_config, controller_service)
234 client_service = csfactory.create()
235 client_service = csfactory.create()
235 client_service.setServiceParent(self.main_service)
236 client_service.setServiceParent(self.main_service)
236 # The engine tub
237 # The engine tub
237 esfactory = FCEngineServiceFactory(self.master_config, controller_service)
238 esfactory = FCEngineServiceFactory(self.master_config, controller_service)
238 engine_service = esfactory.create()
239 engine_service = esfactory.create()
239 engine_service.setServiceParent(self.main_service)
240 engine_service.setServiceParent(self.main_service)
240
241
241 def start_logging(self):
242 def start_logging(self):
242 if self.master_config.Global.log_to_file:
243 if self.master_config.Global.log_to_file:
243 log_filename = self.name + '-' + str(os.getpid()) + '.log'
244 log_filename = self.name + '-' + str(os.getpid()) + '.log'
244 logfile = os.path.join(self.log_dir, log_filename)
245 logfile = os.path.join(self.log_dir, log_filename)
245 open_log_file = open(logfile, 'w')
246 open_log_file = open(logfile, 'w')
246 else:
247 else:
247 open_log_file = sys.stdout
248 open_log_file = sys.stdout
248 log.startLogging(open_log_file)
249 log.startLogging(open_log_file)
249
250
250 def import_statements(self):
251 def import_statements(self):
251 statements = self.master_config.Global.import_statements
252 statements = self.master_config.Global.import_statements
252 for s in statements:
253 for s in statements:
253 try:
254 try:
254 log.msg("Executing statement: '%s'" % s)
255 log.msg("Executing statement: '%s'" % s)
255 exec s in globals(), locals()
256 exec s in globals(), locals()
256 except:
257 except:
257 log.msg("Error running statement: %s" % s)
258 log.msg("Error running statement: %s" % s)
258
259
259 def start_app(self):
260 def start_app(self):
260 # Start the controller service and set things running
261 # Start the controller service and set things running
261 self.main_service.startService()
262 self.main_service.startService()
262 reactor.run()
263 reactor.run()
263
264
264
265
265 def launch_new_instance():
266 def launch_new_instance():
266 """Create and run the IPython controller"""
267 """Create and run the IPython controller"""
267 app = IPControllerApp()
268 app = IPControllerApp()
268 app.start()
269 app.start()
270
271
272 if __name__ == '__main__':
273 launch_new_instance()
274
@@ -1,245 +1,248 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython controller application
4 The IPython controller application
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import sys
19 import sys
20
20
21 from twisted.application import service
21 from twisted.application import service
22 from twisted.internet import reactor
22 from twisted.internet import reactor
23 from twisted.python import log
23 from twisted.python import log
24
24
25 from IPython.config.loader import NoConfigDefault
25 from IPython.config.loader import NoConfigDefault
26
26
27 from IPython.kernel.clusterdir import (
27 from IPython.kernel.clusterdir import (
28 ApplicationWithClusterDir,
28 ApplicationWithClusterDir,
29 AppWithClusterDirArgParseConfigLoader
29 AppWithClusterDirArgParseConfigLoader
30 )
30 )
31 from IPython.core import release
31 from IPython.core import release
32
32
33 from IPython.utils.importstring import import_item
33 from IPython.utils.importstring import import_item
34
34
35 from IPython.kernel.engineservice import EngineService
35 from IPython.kernel.engineservice import EngineService
36 from IPython.kernel.fcutil import Tub
36 from IPython.kernel.fcutil import Tub
37 from IPython.kernel.engineconnector import EngineConnector
37 from IPython.kernel.engineconnector import EngineConnector
38
38
39 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
40 # The main application
40 # The main application
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42
42
43
43
44 cl_args = (
44 cl_args = (
45 # Controller config
45 # Controller config
46 (('--furl-file',), dict(
46 (('--furl-file',), dict(
47 type=str, dest='Global.furl_file', default=NoConfigDefault,
47 type=str, dest='Global.furl_file', default=NoConfigDefault,
48 help='The full location of the file containing the FURL of the '
48 help='The full location of the file containing the FURL of the '
49 'controller. If this is not given, the FURL file must be in the '
49 'controller. If this is not given, the FURL file must be in the '
50 'security directory of the cluster directory. This location is '
50 'security directory of the cluster directory. This location is '
51 'resolved using the --profile and --app-dir options.',
51 'resolved using the --profile and --app-dir options.',
52 metavar='Global.furl_file')
52 metavar='Global.furl_file')
53 ),
53 ),
54 # MPI
54 # MPI
55 (('--mpi',), dict(
55 (('--mpi',), dict(
56 type=str, dest='MPI.use', default=NoConfigDefault,
56 type=str, dest='MPI.use', default=NoConfigDefault,
57 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).',
57 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).',
58 metavar='MPI.use')
58 metavar='MPI.use')
59 ),
59 ),
60 # Global config
60 # Global config
61 (('--log-to-file',), dict(
61 (('--log-to-file',), dict(
62 action='store_true', dest='Global.log_to_file', default=NoConfigDefault,
62 action='store_true', dest='Global.log_to_file', default=NoConfigDefault,
63 help='Log to a file in the log directory (default is stdout)')
63 help='Log to a file in the log directory (default is stdout)')
64 )
64 )
65 )
65 )
66
66
67
67
68 class IPEngineAppCLConfigLoader(AppWithClusterDirArgParseConfigLoader):
68 class IPEngineAppCLConfigLoader(AppWithClusterDirArgParseConfigLoader):
69
69
70 arguments = cl_args
70 arguments = cl_args
71
71
72
72
73 mpi4py_init = """from mpi4py import MPI as mpi
73 mpi4py_init = """from mpi4py import MPI as mpi
74 mpi.size = mpi.COMM_WORLD.Get_size()
74 mpi.size = mpi.COMM_WORLD.Get_size()
75 mpi.rank = mpi.COMM_WORLD.Get_rank()
75 mpi.rank = mpi.COMM_WORLD.Get_rank()
76 """
76 """
77
77
78 pytrilinos_init = """from PyTrilinos import Epetra
78 pytrilinos_init = """from PyTrilinos import Epetra
79 class SimpleStruct:
79 class SimpleStruct:
80 pass
80 pass
81 mpi = SimpleStruct()
81 mpi = SimpleStruct()
82 mpi.rank = 0
82 mpi.rank = 0
83 mpi.size = 0
83 mpi.size = 0
84 """
84 """
85
85
86
86
87 default_config_file_name = 'ipengine_config.py'
87 default_config_file_name = 'ipengine_config.py'
88
88
89
89
90 class IPEngineApp(ApplicationWithClusterDir):
90 class IPEngineApp(ApplicationWithClusterDir):
91
91
92 name = 'ipengine'
92 name = 'ipengine'
93 description = 'Start the IPython engine for parallel computing.'
93 description = 'Start the IPython engine for parallel computing.'
94 config_file_name = default_config_file_name
94 config_file_name = default_config_file_name
95
95
96 def create_default_config(self):
96 def create_default_config(self):
97 super(IPEngineApp, self).create_default_config()
97 super(IPEngineApp, self).create_default_config()
98
98
99 # Global config attributes
99 # Global config attributes
100 self.default_config.Global.log_to_file = False
100 self.default_config.Global.log_to_file = False
101 self.default_config.Global.exec_lines = []
101 self.default_config.Global.exec_lines = []
102 # The log and security dir names must match that of the controller
102 # The log and security dir names must match that of the controller
103 self.default_config.Global.log_dir_name = 'log'
103 self.default_config.Global.log_dir_name = 'log'
104 self.default_config.Global.security_dir_name = 'security'
104 self.default_config.Global.security_dir_name = 'security'
105 self.default_config.Global.shell_class = 'IPython.kernel.core.interpreter.Interpreter'
105 self.default_config.Global.shell_class = 'IPython.kernel.core.interpreter.Interpreter'
106
106
107 # Configuration related to the controller
107 # Configuration related to the controller
108 # This must match the filename (path not included) that the controller
108 # This must match the filename (path not included) that the controller
109 # used for the FURL file.
109 # used for the FURL file.
110 self.default_config.Global.furl_file_name = 'ipcontroller-engine.furl'
110 self.default_config.Global.furl_file_name = 'ipcontroller-engine.furl'
111 # If given, this is the actual location of the controller's FURL file.
111 # If given, this is the actual location of the controller's FURL file.
112 # If not, this is computed using the profile, app_dir and furl_file_name
112 # If not, this is computed using the profile, app_dir and furl_file_name
113 self.default_config.Global.furl_file = ''
113 self.default_config.Global.furl_file = ''
114
114
115 # MPI related config attributes
115 # MPI related config attributes
116 self.default_config.MPI.use = ''
116 self.default_config.MPI.use = ''
117 self.default_config.MPI.mpi4py = mpi4py_init
117 self.default_config.MPI.mpi4py = mpi4py_init
118 self.default_config.MPI.pytrilinos = pytrilinos_init
118 self.default_config.MPI.pytrilinos = pytrilinos_init
119
119
120 def create_command_line_config(self):
120 def create_command_line_config(self):
121 """Create and return a command line config loader."""
121 """Create and return a command line config loader."""
122 return IPEngineAppCLConfigLoader(
122 return IPEngineAppCLConfigLoader(
123 description=self.description,
123 description=self.description,
124 version=release.version
124 version=release.version
125 )
125 )
126
126
127 def post_load_command_line_config(self):
127 def post_load_command_line_config(self):
128 pass
128 pass
129
129
130 def pre_construct(self):
130 def pre_construct(self):
131 config = self.master_config
131 config = self.master_config
132 sdir = self.cluster_dir_obj.security_dir
132 sdir = self.cluster_dir_obj.security_dir
133 self.security_dir = config.Global.security_dir = sdir
133 self.security_dir = config.Global.security_dir = sdir
134 ldir = self.cluster_dir_obj.log_dir
134 ldir = self.cluster_dir_obj.log_dir
135 self.log_dir = config.Global.log_dir = ldir
135 self.log_dir = config.Global.log_dir = ldir
136 self.log.info("Cluster directory set to: %s" % self.cluster_dir)
136 self.log.info("Log directory set to: %s" % self.log_dir)
137 self.log.info("Log directory set to: %s" % self.log_dir)
137 self.log.info("Security directory set to: %s" % self.security_dir)
138 self.log.info("Security directory set to: %s" % self.security_dir)
138
139
139 self.find_cont_furl_file()
140 self.find_cont_furl_file()
140
141
141 def find_cont_furl_file(self):
142 def find_cont_furl_file(self):
143 """Set the furl file.
144
145 Here we don't try to actually see if it exists for is valid as that
146 is hadled by the connection logic.
147 """
142 config = self.master_config
148 config = self.master_config
143 # Find the actual controller FURL file
149 # Find the actual controller FURL file
144 if os.path.isfile(config.Global.furl_file):
150 if not config.Global.furl_file:
145 return
146 else:
147 # We should know what the app dir is
148 try_this = os.path.join(
151 try_this = os.path.join(
149 config.Global.cluster_dir,
152 config.Global.cluster_dir,
150 config.Global.security_dir,
153 config.Global.security_dir,
151 config.Global.furl_file_name
154 config.Global.furl_file_name
152 )
155 )
153 if os.path.isfile(try_this):
154 config.Global.furl_file = try_this
156 config.Global.furl_file = try_this
155 return
156 else:
157 self.log.critical("Could not find a valid controller FURL file.")
158 self.abort()
159
157
160 def construct(self):
158 def construct(self):
161 # I am a little hesitant to put these into InteractiveShell itself.
159 # I am a little hesitant to put these into InteractiveShell itself.
162 # But that might be the place for them
160 # But that might be the place for them
163 sys.path.insert(0, '')
161 sys.path.insert(0, '')
164
162
165 self.start_mpi()
163 self.start_mpi()
166 self.start_logging()
164 self.start_logging()
167
165
168 # Create the underlying shell class and EngineService
166 # Create the underlying shell class and EngineService
169 shell_class = import_item(self.master_config.Global.shell_class)
167 shell_class = import_item(self.master_config.Global.shell_class)
170 self.engine_service = EngineService(shell_class, mpi=mpi)
168 self.engine_service = EngineService(shell_class, mpi=mpi)
171
169
172 self.exec_lines()
170 self.exec_lines()
173
171
174 # Create the service hierarchy
172 # Create the service hierarchy
175 self.main_service = service.MultiService()
173 self.main_service = service.MultiService()
176 self.engine_service.setServiceParent(self.main_service)
174 self.engine_service.setServiceParent(self.main_service)
177 self.tub_service = Tub()
175 self.tub_service = Tub()
178 self.tub_service.setServiceParent(self.main_service)
176 self.tub_service.setServiceParent(self.main_service)
179 # This needs to be called before the connection is initiated
177 # This needs to be called before the connection is initiated
180 self.main_service.startService()
178 self.main_service.startService()
181
179
182 # This initiates the connection to the controller and calls
180 # This initiates the connection to the controller and calls
183 # register_engine to tell the controller we are ready to do work
181 # register_engine to tell the controller we are ready to do work
184 self.engine_connector = EngineConnector(self.tub_service)
182 self.engine_connector = EngineConnector(self.tub_service)
185
183
186 log.msg("Using furl file: %s" % self.master_config.Global.furl_file)
184 log.msg("Using furl file: %s" % self.master_config.Global.furl_file)
187
185
188 reactor.callWhenRunning(self.call_connect)
186 reactor.callWhenRunning(self.call_connect)
189
187
190 def call_connect(self):
188 def call_connect(self):
191 d = self.engine_connector.connect_to_controller(
189 d = self.engine_connector.connect_to_controller(
192 self.engine_service,
190 self.engine_service,
193 self.master_config.Global.furl_file
191 self.master_config.Global.furl_file
194 )
192 )
195
193
196 def handle_error(f):
194 def handle_error(f):
197 # If this print statement is replaced by a log.err(f) I get
195 log.msg('Error connecting to controller. This usually means that '
198 # an unhandled error, which makes no sense. I shouldn't have
196 'i) the controller was not started, ii) a firewall was blocking '
199 # to use a print statement here. My only thought is that
197 'the engine from connecting to the controller or iii) the engine '
200 # at the beginning of the process the logging is still starting up
198 ' was not pointed at the right FURL file:')
201 print "Error connecting to controller:", f.getErrorMessage()
199 log.msg(f.getErrorMessage())
202 reactor.callLater(0.1, reactor.stop)
200 reactor.callLater(0.1, reactor.stop)
203
201
204 d.addErrback(handle_error)
202 d.addErrback(handle_error)
205
203
206 def start_mpi(self):
204 def start_mpi(self):
207 global mpi
205 global mpi
208 mpikey = self.master_config.MPI.use
206 mpikey = self.master_config.MPI.use
209 mpi_import_statement = self.master_config.MPI.get(mpikey, None)
207 mpi_import_statement = self.master_config.MPI.get(mpikey, None)
210 if mpi_import_statement is not None:
208 if mpi_import_statement is not None:
211 try:
209 try:
212 self.log.info("Initializing MPI:")
210 self.log.info("Initializing MPI:")
213 self.log.info(mpi_import_statement)
211 self.log.info(mpi_import_statement)
214 exec mpi_import_statement in globals()
212 exec mpi_import_statement in globals()
215 except:
213 except:
216 mpi = None
214 mpi = None
217 else:
215 else:
218 mpi = None
216 mpi = None
219
217
220 def start_logging(self):
218 def start_logging(self):
221 if self.master_config.Global.log_to_file:
219 if self.master_config.Global.log_to_file:
222 log_filename = self.name + '-' + str(os.getpid()) + '.log'
220 log_filename = self.name + '-' + str(os.getpid()) + '.log'
223 logfile = os.path.join(self.log_dir, log_filename)
221 logfile = os.path.join(self.log_dir, log_filename)
224 open_log_file = open(logfile, 'w')
222 open_log_file = open(logfile, 'w')
225 else:
223 else:
226 open_log_file = sys.stdout
224 open_log_file = sys.stdout
227 log.startLogging(open_log_file)
225 log.startLogging(open_log_file)
228
226
229 def exec_lines(self):
227 def exec_lines(self):
230 for line in self.master_config.Global.exec_lines:
228 for line in self.master_config.Global.exec_lines:
231 try:
229 try:
232 log.msg("Executing statement: '%s'" % line)
230 log.msg("Executing statement: '%s'" % line)
233 self.engine_service.execute(line)
231 self.engine_service.execute(line)
234 except:
232 except:
235 log.msg("Error executing statement: %s" % line)
233 log.msg("Error executing statement: %s" % line)
236
234
237 def start_app(self):
235 def start_app(self):
238 # Start the controller service and set things running
236 # Start the controller service and set things running
239 reactor.run()
237 reactor.run()
240
238
241
239
242 def launch_new_instance():
240 def launch_new_instance():
243 """Create and run the IPython controller"""
241 """Create and run the IPython controller"""
244 app = IPEngineApp()
242 app = IPEngineApp()
245 app.start()
243 app.start()
244
245
246 if __name__ == '__main__':
247 launch_new_instance()
248
@@ -1,22 +1,18 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 """ipcluster script"""
4 #-----------------------------------------------------------------------------
5
5 # Copyright (C) 2008-2009 The IPython Development Team
6 __docformat__ = "restructuredtext en"
7
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2008 The IPython Development Team
10 #
6 #
11 # Distributed under the terms of the BSD License. The full license is in
7 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
8 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
14
10
15 #-------------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
16 # Imports
12 # Imports
17 #-------------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14
18
15
19 if __name__ == '__main__':
16 from IPython.kernel.ipclusterapp import launch_new_instance
20 from IPython.kernel.scripts import ipcluster
21 ipcluster.main()
22
17
18 launch_new_instance()
@@ -1,813 +1,811 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 """Start an IPython cluster = (controller + engines)."""
4 """Start an IPython cluster = (controller + engines)."""
5
5
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2008 The IPython Development Team
7 # Copyright (C) 2008-2009 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 import os
17 import os
18 import re
18 import re
19 import sys
19 import sys
20 import signal
20 import signal
21 import tempfile
21 import tempfile
22 pjoin = os.path.join
22 pjoin = os.path.join
23
23
24 from twisted.internet import reactor, defer
24 from twisted.internet import reactor, defer
25 from twisted.internet.protocol import ProcessProtocol
25 from twisted.internet.protocol import ProcessProtocol
26 from twisted.internet.error import ProcessDone, ProcessTerminated
26 from twisted.internet.error import ProcessDone, ProcessTerminated
27 from twisted.internet.utils import getProcessOutput
27 from twisted.internet.utils import getProcessOutput
28 from twisted.python import failure, log
28 from twisted.python import log
29
29
30 from IPython.external import argparse
30 from IPython.external import argparse
31 from IPython.external import Itpl
31 from IPython.external import Itpl
32 from IPython.utils.genutils import (
32 from IPython.utils.genutils import (
33 get_ipython_dir,
33 get_ipython_dir,
34 get_log_dir,
34 get_log_dir,
35 get_security_dir,
35 get_security_dir,
36 num_cpus
36 num_cpus
37 )
37 )
38 from IPython.kernel.fcutil import have_crypto
38 from IPython.kernel.fcutil import have_crypto
39
39
40 # Create various ipython directories if they don't exist.
40 # Create various ipython directories if they don't exist.
41 # This must be done before IPython.kernel.config is imported.
41 # This must be done before IPython.kernel.config is imported.
42 from IPython.core.oldusersetup import user_setup
42 from IPython.core.oldusersetup import user_setup
43 if os.name == 'posix':
43 if os.name == 'posix':
44 rc_suffix = ''
44 rc_suffix = ''
45 else:
45 else:
46 rc_suffix = '.ini'
46 rc_suffix = '.ini'
47 user_setup(get_ipython_dir(), rc_suffix, mode='install', interactive=False)
47 user_setup(get_ipython_dir(), rc_suffix, mode='install', interactive=False)
48 get_log_dir()
48 get_log_dir()
49 get_security_dir()
49 get_security_dir()
50
50
51 from IPython.kernel.config import config_manager as kernel_config_manager
51 from IPython.kernel.config import config_manager as kernel_config_manager
52 from IPython.kernel.error import SecurityError, FileTimeoutError
53 from IPython.kernel.fcutil import have_crypto
54 from IPython.kernel.twistedutil import gatherBoth, wait_for_file
52 from IPython.kernel.twistedutil import gatherBoth, wait_for_file
55 from IPython.kernel.util import printer
53
56
54
57 #-----------------------------------------------------------------------------
55 #-----------------------------------------------------------------------------
58 # General process handling code
56 # General process handling code
59 #-----------------------------------------------------------------------------
57 #-----------------------------------------------------------------------------
60
58
61
59
62 class ProcessStateError(Exception):
60 class ProcessStateError(Exception):
63 pass
61 pass
64
62
65 class UnknownStatus(Exception):
63 class UnknownStatus(Exception):
66 pass
64 pass
67
65
68 class LauncherProcessProtocol(ProcessProtocol):
66 class LauncherProcessProtocol(ProcessProtocol):
69 """
67 """
70 A ProcessProtocol to go with the ProcessLauncher.
68 A ProcessProtocol to go with the ProcessLauncher.
71 """
69 """
72 def __init__(self, process_launcher):
70 def __init__(self, process_launcher):
73 self.process_launcher = process_launcher
71 self.process_launcher = process_launcher
74
72
75 def connectionMade(self):
73 def connectionMade(self):
76 self.process_launcher.fire_start_deferred(self.transport.pid)
74 self.process_launcher.fire_start_deferred(self.transport.pid)
77
75
78 def processEnded(self, status):
76 def processEnded(self, status):
79 value = status.value
77 value = status.value
80 if isinstance(value, ProcessDone):
78 if isinstance(value, ProcessDone):
81 self.process_launcher.fire_stop_deferred(0)
79 self.process_launcher.fire_stop_deferred(0)
82 elif isinstance(value, ProcessTerminated):
80 elif isinstance(value, ProcessTerminated):
83 self.process_launcher.fire_stop_deferred(
81 self.process_launcher.fire_stop_deferred(
84 {'exit_code':value.exitCode,
82 {'exit_code':value.exitCode,
85 'signal':value.signal,
83 'signal':value.signal,
86 'status':value.status
84 'status':value.status
87 }
85 }
88 )
86 )
89 else:
87 else:
90 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
88 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
91
89
92 def outReceived(self, data):
90 def outReceived(self, data):
93 log.msg(data)
91 log.msg(data)
94
92
95 def errReceived(self, data):
93 def errReceived(self, data):
96 log.err(data)
94 log.err(data)
97
95
98 class ProcessLauncher(object):
96 class ProcessLauncher(object):
99 """
97 """
100 Start and stop an external process in an asynchronous manner.
98 Start and stop an external process in an asynchronous manner.
101
99
102 Currently this uses deferreds to notify other parties of process state
100 Currently this uses deferreds to notify other parties of process state
103 changes. This is an awkward design and should be moved to using
101 changes. This is an awkward design and should be moved to using
104 a formal NotificationCenter.
102 a formal NotificationCenter.
105 """
103 """
106 def __init__(self, cmd_and_args):
104 def __init__(self, cmd_and_args):
107 self.cmd = cmd_and_args[0]
105 self.cmd = cmd_and_args[0]
108 self.args = cmd_and_args
106 self.args = cmd_and_args
109 self._reset()
107 self._reset()
110
108
111 def _reset(self):
109 def _reset(self):
112 self.process_protocol = None
110 self.process_protocol = None
113 self.pid = None
111 self.pid = None
114 self.start_deferred = None
112 self.start_deferred = None
115 self.stop_deferreds = []
113 self.stop_deferreds = []
116 self.state = 'before' # before, running, or after
114 self.state = 'before' # before, running, or after
117
115
118 @property
116 @property
119 def running(self):
117 def running(self):
120 if self.state == 'running':
118 if self.state == 'running':
121 return True
119 return True
122 else:
120 else:
123 return False
121 return False
124
122
125 def fire_start_deferred(self, pid):
123 def fire_start_deferred(self, pid):
126 self.pid = pid
124 self.pid = pid
127 self.state = 'running'
125 self.state = 'running'
128 log.msg('Process %r has started with pid=%i' % (self.args, pid))
126 log.msg('Process %r has started with pid=%i' % (self.args, pid))
129 self.start_deferred.callback(pid)
127 self.start_deferred.callback(pid)
130
128
131 def start(self):
129 def start(self):
132 if self.state == 'before':
130 if self.state == 'before':
133 self.process_protocol = LauncherProcessProtocol(self)
131 self.process_protocol = LauncherProcessProtocol(self)
134 self.start_deferred = defer.Deferred()
132 self.start_deferred = defer.Deferred()
135 self.process_transport = reactor.spawnProcess(
133 self.process_transport = reactor.spawnProcess(
136 self.process_protocol,
134 self.process_protocol,
137 self.cmd,
135 self.cmd,
138 self.args,
136 self.args,
139 env=os.environ
137 env=os.environ
140 )
138 )
141 return self.start_deferred
139 return self.start_deferred
142 else:
140 else:
143 s = 'the process has already been started and has state: %r' % \
141 s = 'The process has already been started and has state: %r' % \
144 self.state
142 self.state
145 return defer.fail(ProcessStateError(s))
143 return defer.fail(ProcessStateError(s))
146
144
147 def get_stop_deferred(self):
145 def get_stop_deferred(self):
148 if self.state == 'running' or self.state == 'before':
146 if self.state == 'running' or self.state == 'before':
149 d = defer.Deferred()
147 d = defer.Deferred()
150 self.stop_deferreds.append(d)
148 self.stop_deferreds.append(d)
151 return d
149 return d
152 else:
150 else:
153 s = 'this process is already complete'
151 s = 'this process is already complete'
154 return defer.fail(ProcessStateError(s))
152 return defer.fail(ProcessStateError(s))
155
153
156 def fire_stop_deferred(self, exit_code):
154 def fire_stop_deferred(self, exit_code):
157 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
155 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
158 self.state = 'after'
156 self.state = 'after'
159 for d in self.stop_deferreds:
157 for d in self.stop_deferreds:
160 d.callback(exit_code)
158 d.callback(exit_code)
161
159
162 def signal(self, sig):
160 def signal(self, sig):
163 """
161 """
164 Send a signal to the process.
162 Send a signal to the process.
165
163
166 The argument sig can be ('KILL','INT', etc.) or any signal number.
164 The argument sig can be ('KILL','INT', etc.) or any signal number.
167 """
165 """
168 if self.state == 'running':
166 if self.state == 'running':
169 self.process_transport.signalProcess(sig)
167 self.process_transport.signalProcess(sig)
170
168
171 # def __del__(self):
169 # def __del__(self):
172 # self.signal('KILL')
170 # self.signal('KILL')
173
171
174 def interrupt_then_kill(self, delay=1.0):
172 def interrupt_then_kill(self, delay=1.0):
175 self.signal('INT')
173 self.signal('INT')
176 reactor.callLater(delay, self.signal, 'KILL')
174 reactor.callLater(delay, self.signal, 'KILL')
177
175
178
176
179 #-----------------------------------------------------------------------------
177 #-----------------------------------------------------------------------------
180 # Code for launching controller and engines
178 # Code for launching controller and engines
181 #-----------------------------------------------------------------------------
179 #-----------------------------------------------------------------------------
182
180
183
181
184 class ControllerLauncher(ProcessLauncher):
182 class ControllerLauncher(ProcessLauncher):
185
183
186 def __init__(self, extra_args=None):
184 def __init__(self, extra_args=None):
187 if sys.platform == 'win32':
185 if sys.platform == 'win32':
188 # This logic is needed because the ipcontroller script doesn't
186 # This logic is needed because the ipcontroller script doesn't
189 # always get installed in the same way or in the same location.
187 # always get installed in the same way or in the same location.
190 from IPython.kernel.scripts import ipcontroller
188 from IPython.kernel.scripts import ipcontroller
191 script_location = ipcontroller.__file__.replace('.pyc', '.py')
189 script_location = ipcontroller.__file__.replace('.pyc', '.py')
192 # The -u option here turns on unbuffered output, which is required
190 # The -u option here turns on unbuffered output, which is required
193 # on Win32 to prevent wierd conflict and problems with Twisted.
191 # on Win32 to prevent wierd conflict and problems with Twisted.
194 # Also, use sys.executable to make sure we are picking up the
192 # Also, use sys.executable to make sure we are picking up the
195 # right python exe.
193 # right python exe.
196 args = [sys.executable, '-u', script_location]
194 args = [sys.executable, '-u', script_location]
197 else:
195 else:
198 args = ['ipcontroller']
196 args = ['ipcontroller']
199 self.extra_args = extra_args
197 self.extra_args = extra_args
200 if extra_args is not None:
198 if extra_args is not None:
201 args.extend(extra_args)
199 args.extend(extra_args)
202
200
203 ProcessLauncher.__init__(self, args)
201 ProcessLauncher.__init__(self, args)
204
202
205
203
206 class EngineLauncher(ProcessLauncher):
204 class EngineLauncher(ProcessLauncher):
207
205
208 def __init__(self, extra_args=None):
206 def __init__(self, extra_args=None):
209 if sys.platform == 'win32':
207 if sys.platform == 'win32':
210 # This logic is needed because the ipcontroller script doesn't
208 # This logic is needed because the ipcontroller script doesn't
211 # always get installed in the same way or in the same location.
209 # always get installed in the same way or in the same location.
212 from IPython.kernel.scripts import ipengine
210 from IPython.kernel.scripts import ipengine
213 script_location = ipengine.__file__.replace('.pyc', '.py')
211 script_location = ipengine.__file__.replace('.pyc', '.py')
214 # The -u option here turns on unbuffered output, which is required
212 # The -u option here turns on unbuffered output, which is required
215 # on Win32 to prevent wierd conflict and problems with Twisted.
213 # on Win32 to prevent wierd conflict and problems with Twisted.
216 # Also, use sys.executable to make sure we are picking up the
214 # Also, use sys.executable to make sure we are picking up the
217 # right python exe.
215 # right python exe.
218 args = [sys.executable, '-u', script_location]
216 args = [sys.executable, '-u', script_location]
219 else:
217 else:
220 args = ['ipengine']
218 args = ['ipengine']
221 self.extra_args = extra_args
219 self.extra_args = extra_args
222 if extra_args is not None:
220 if extra_args is not None:
223 args.extend(extra_args)
221 args.extend(extra_args)
224
222
225 ProcessLauncher.__init__(self, args)
223 ProcessLauncher.__init__(self, args)
226
224
227
225
228 class LocalEngineSet(object):
226 class LocalEngineSet(object):
229
227
230 def __init__(self, extra_args=None):
228 def __init__(self, extra_args=None):
231 self.extra_args = extra_args
229 self.extra_args = extra_args
232 self.launchers = []
230 self.launchers = []
233
231
234 def start(self, n):
232 def start(self, n):
235 dlist = []
233 dlist = []
236 for i in range(n):
234 for i in range(n):
237 el = EngineLauncher(extra_args=self.extra_args)
235 el = EngineLauncher(extra_args=self.extra_args)
238 d = el.start()
236 d = el.start()
239 self.launchers.append(el)
237 self.launchers.append(el)
240 dlist.append(d)
238 dlist.append(d)
241 dfinal = gatherBoth(dlist, consumeErrors=True)
239 dfinal = gatherBoth(dlist, consumeErrors=True)
242 dfinal.addCallback(self._handle_start)
240 dfinal.addCallback(self._handle_start)
243 return dfinal
241 return dfinal
244
242
245 def _handle_start(self, r):
243 def _handle_start(self, r):
246 log.msg('Engines started with pids: %r' % r)
244 log.msg('Engines started with pids: %r' % r)
247 return r
245 return r
248
246
249 def _handle_stop(self, r):
247 def _handle_stop(self, r):
250 log.msg('Engines received signal: %r' % r)
248 log.msg('Engines received signal: %r' % r)
251 return r
249 return r
252
250
253 def signal(self, sig):
251 def signal(self, sig):
254 dlist = []
252 dlist = []
255 for el in self.launchers:
253 for el in self.launchers:
256 d = el.get_stop_deferred()
254 d = el.get_stop_deferred()
257 dlist.append(d)
255 dlist.append(d)
258 el.signal(sig)
256 el.signal(sig)
259 dfinal = gatherBoth(dlist, consumeErrors=True)
257 dfinal = gatherBoth(dlist, consumeErrors=True)
260 dfinal.addCallback(self._handle_stop)
258 dfinal.addCallback(self._handle_stop)
261 return dfinal
259 return dfinal
262
260
263 def interrupt_then_kill(self, delay=1.0):
261 def interrupt_then_kill(self, delay=1.0):
264 dlist = []
262 dlist = []
265 for el in self.launchers:
263 for el in self.launchers:
266 d = el.get_stop_deferred()
264 d = el.get_stop_deferred()
267 dlist.append(d)
265 dlist.append(d)
268 el.interrupt_then_kill(delay)
266 el.interrupt_then_kill(delay)
269 dfinal = gatherBoth(dlist, consumeErrors=True)
267 dfinal = gatherBoth(dlist, consumeErrors=True)
270 dfinal.addCallback(self._handle_stop)
268 dfinal.addCallback(self._handle_stop)
271 return dfinal
269 return dfinal
272
270
273
271
274 class BatchEngineSet(object):
272 class BatchEngineSet(object):
275
273
276 # Subclasses must fill these in. See PBSEngineSet
274 # Subclasses must fill these in. See PBSEngineSet
277 submit_command = ''
275 submit_command = ''
278 delete_command = ''
276 delete_command = ''
279 job_id_regexp = ''
277 job_id_regexp = ''
280
278
281 def __init__(self, template_file, **kwargs):
279 def __init__(self, template_file, **kwargs):
282 self.template_file = template_file
280 self.template_file = template_file
283 self.context = {}
281 self.context = {}
284 self.context.update(kwargs)
282 self.context.update(kwargs)
285 self.batch_file = self.template_file+'-run'
283 self.batch_file = self.template_file+'-run'
286
284
287 def parse_job_id(self, output):
285 def parse_job_id(self, output):
288 m = re.match(self.job_id_regexp, output)
286 m = re.match(self.job_id_regexp, output)
289 if m is not None:
287 if m is not None:
290 job_id = m.group()
288 job_id = m.group()
291 else:
289 else:
292 raise Exception("job id couldn't be determined: %s" % output)
290 raise Exception("job id couldn't be determined: %s" % output)
293 self.job_id = job_id
291 self.job_id = job_id
294 log.msg('Job started with job id: %r' % job_id)
292 log.msg('Job started with job id: %r' % job_id)
295 return job_id
293 return job_id
296
294
297 def write_batch_script(self, n):
295 def write_batch_script(self, n):
298 self.context['n'] = n
296 self.context['n'] = n
299 template = open(self.template_file, 'r').read()
297 template = open(self.template_file, 'r').read()
300 log.msg('Using template for batch script: %s' % self.template_file)
298 log.msg('Using template for batch script: %s' % self.template_file)
301 script_as_string = Itpl.itplns(template, self.context)
299 script_as_string = Itpl.itplns(template, self.context)
302 log.msg('Writing instantiated batch script: %s' % self.batch_file)
300 log.msg('Writing instantiated batch script: %s' % self.batch_file)
303 f = open(self.batch_file,'w')
301 f = open(self.batch_file,'w')
304 f.write(script_as_string)
302 f.write(script_as_string)
305 f.close()
303 f.close()
306
304
307 def handle_error(self, f):
305 def handle_error(self, f):
308 f.printTraceback()
306 f.printTraceback()
309 f.raiseException()
307 f.raiseException()
310
308
311 def start(self, n):
309 def start(self, n):
312 self.write_batch_script(n)
310 self.write_batch_script(n)
313 d = getProcessOutput(self.submit_command,
311 d = getProcessOutput(self.submit_command,
314 [self.batch_file],env=os.environ)
312 [self.batch_file],env=os.environ)
315 d.addCallback(self.parse_job_id)
313 d.addCallback(self.parse_job_id)
316 d.addErrback(self.handle_error)
314 d.addErrback(self.handle_error)
317 return d
315 return d
318
316
319 def kill(self):
317 def kill(self):
320 d = getProcessOutput(self.delete_command,
318 d = getProcessOutput(self.delete_command,
321 [self.job_id],env=os.environ)
319 [self.job_id],env=os.environ)
322 return d
320 return d
323
321
324 class PBSEngineSet(BatchEngineSet):
322 class PBSEngineSet(BatchEngineSet):
325
323
326 submit_command = 'qsub'
324 submit_command = 'qsub'
327 delete_command = 'qdel'
325 delete_command = 'qdel'
328 job_id_regexp = '\d+'
326 job_id_regexp = '\d+'
329
327
330 def __init__(self, template_file, **kwargs):
328 def __init__(self, template_file, **kwargs):
331 BatchEngineSet.__init__(self, template_file, **kwargs)
329 BatchEngineSet.__init__(self, template_file, **kwargs)
332
330
333
331
334 sshx_template="""#!/bin/sh
332 sshx_template="""#!/bin/sh
335 "$@" &> /dev/null &
333 "$@" &> /dev/null &
336 echo $!
334 echo $!
337 """
335 """
338
336
339 engine_killer_template="""#!/bin/sh
337 engine_killer_template="""#!/bin/sh
340 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
338 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
341 """
339 """
342
340
343 class SSHEngineSet(object):
341 class SSHEngineSet(object):
344 sshx_template=sshx_template
342 sshx_template=sshx_template
345 engine_killer_template=engine_killer_template
343 engine_killer_template=engine_killer_template
346
344
347 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
345 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
348 """Start a controller on localhost and engines using ssh.
346 """Start a controller on localhost and engines using ssh.
349
347
350 The engine_hosts argument is a dict with hostnames as keys and
348 The engine_hosts argument is a dict with hostnames as keys and
351 the number of engine (int) as values. sshx is the name of a local
349 the number of engine (int) as values. sshx is the name of a local
352 file that will be used to run remote commands. This file is used
350 file that will be used to run remote commands. This file is used
353 to setup the environment properly.
351 to setup the environment properly.
354 """
352 """
355
353
356 self.temp_dir = tempfile.gettempdir()
354 self.temp_dir = tempfile.gettempdir()
357 if sshx is not None:
355 if sshx is not None:
358 self.sshx = sshx
356 self.sshx = sshx
359 else:
357 else:
360 # Write the sshx.sh file locally from our template.
358 # Write the sshx.sh file locally from our template.
361 self.sshx = os.path.join(
359 self.sshx = os.path.join(
362 self.temp_dir,
360 self.temp_dir,
363 '%s-main-sshx.sh' % os.environ['USER']
361 '%s-main-sshx.sh' % os.environ['USER']
364 )
362 )
365 f = open(self.sshx, 'w')
363 f = open(self.sshx, 'w')
366 f.writelines(self.sshx_template)
364 f.writelines(self.sshx_template)
367 f.close()
365 f.close()
368 self.engine_command = ipengine
366 self.engine_command = ipengine
369 self.engine_hosts = engine_hosts
367 self.engine_hosts = engine_hosts
370 # Write the engine killer script file locally from our template.
368 # Write the engine killer script file locally from our template.
371 self.engine_killer = os.path.join(
369 self.engine_killer = os.path.join(
372 self.temp_dir,
370 self.temp_dir,
373 '%s-local-engine_killer.sh' % os.environ['USER']
371 '%s-local-engine_killer.sh' % os.environ['USER']
374 )
372 )
375 f = open(self.engine_killer, 'w')
373 f = open(self.engine_killer, 'w')
376 f.writelines(self.engine_killer_template)
374 f.writelines(self.engine_killer_template)
377 f.close()
375 f.close()
378
376
379 def start(self, send_furl=False):
377 def start(self, send_furl=False):
380 dlist = []
378 dlist = []
381 for host in self.engine_hosts.keys():
379 for host in self.engine_hosts.keys():
382 count = self.engine_hosts[host]
380 count = self.engine_hosts[host]
383 d = self._start(host, count, send_furl)
381 d = self._start(host, count, send_furl)
384 dlist.append(d)
382 dlist.append(d)
385 return gatherBoth(dlist, consumeErrors=True)
383 return gatherBoth(dlist, consumeErrors=True)
386
384
387 def _start(self, hostname, count=1, send_furl=False):
385 def _start(self, hostname, count=1, send_furl=False):
388 if send_furl:
386 if send_furl:
389 d = self._scp_furl(hostname)
387 d = self._scp_furl(hostname)
390 else:
388 else:
391 d = defer.succeed(None)
389 d = defer.succeed(None)
392 d.addCallback(lambda r: self._scp_sshx(hostname))
390 d.addCallback(lambda r: self._scp_sshx(hostname))
393 d.addCallback(lambda r: self._ssh_engine(hostname, count))
391 d.addCallback(lambda r: self._ssh_engine(hostname, count))
394 return d
392 return d
395
393
396 def _scp_furl(self, hostname):
394 def _scp_furl(self, hostname):
397 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
395 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
398 cmd_list = scp_cmd.split()
396 cmd_list = scp_cmd.split()
399 cmd_list[1] = os.path.expanduser(cmd_list[1])
397 cmd_list[1] = os.path.expanduser(cmd_list[1])
400 log.msg('Copying furl file: %s' % scp_cmd)
398 log.msg('Copying furl file: %s' % scp_cmd)
401 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
399 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
402 return d
400 return d
403
401
404 def _scp_sshx(self, hostname):
402 def _scp_sshx(self, hostname):
405 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
403 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
406 self.sshx, hostname,
404 self.sshx, hostname,
407 self.temp_dir, os.environ['USER']
405 self.temp_dir, os.environ['USER']
408 )
406 )
409 print
407 print
410 log.msg("Copying sshx: %s" % scp_cmd)
408 log.msg("Copying sshx: %s" % scp_cmd)
411 sshx_scp = scp_cmd.split()
409 sshx_scp = scp_cmd.split()
412 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
410 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
413 return d
411 return d
414
412
415 def _ssh_engine(self, hostname, count):
413 def _ssh_engine(self, hostname, count):
416 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
414 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
417 hostname, self.temp_dir,
415 hostname, self.temp_dir,
418 os.environ['USER'], self.engine_command
416 os.environ['USER'], self.engine_command
419 )
417 )
420 cmds = exec_engine.split()
418 cmds = exec_engine.split()
421 dlist = []
419 dlist = []
422 log.msg("about to start engines...")
420 log.msg("about to start engines...")
423 for i in range(count):
421 for i in range(count):
424 log.msg('Starting engines: %s' % exec_engine)
422 log.msg('Starting engines: %s' % exec_engine)
425 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
423 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
426 dlist.append(d)
424 dlist.append(d)
427 return gatherBoth(dlist, consumeErrors=True)
425 return gatherBoth(dlist, consumeErrors=True)
428
426
429 def kill(self):
427 def kill(self):
430 dlist = []
428 dlist = []
431 for host in self.engine_hosts.keys():
429 for host in self.engine_hosts.keys():
432 d = self._killall(host)
430 d = self._killall(host)
433 dlist.append(d)
431 dlist.append(d)
434 return gatherBoth(dlist, consumeErrors=True)
432 return gatherBoth(dlist, consumeErrors=True)
435
433
436 def _killall(self, hostname):
434 def _killall(self, hostname):
437 d = self._scp_engine_killer(hostname)
435 d = self._scp_engine_killer(hostname)
438 d.addCallback(lambda r: self._ssh_kill(hostname))
436 d.addCallback(lambda r: self._ssh_kill(hostname))
439 # d.addErrback(self._exec_err)
437 # d.addErrback(self._exec_err)
440 return d
438 return d
441
439
442 def _scp_engine_killer(self, hostname):
440 def _scp_engine_killer(self, hostname):
443 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
441 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
444 self.engine_killer,
442 self.engine_killer,
445 hostname,
443 hostname,
446 self.temp_dir,
444 self.temp_dir,
447 os.environ['USER']
445 os.environ['USER']
448 )
446 )
449 cmds = scp_cmd.split()
447 cmds = scp_cmd.split()
450 log.msg('Copying engine_killer: %s' % scp_cmd)
448 log.msg('Copying engine_killer: %s' % scp_cmd)
451 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
449 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
452 return d
450 return d
453
451
454 def _ssh_kill(self, hostname):
452 def _ssh_kill(self, hostname):
455 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
453 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
456 hostname,
454 hostname,
457 self.temp_dir,
455 self.temp_dir,
458 os.environ['USER']
456 os.environ['USER']
459 )
457 )
460 log.msg('Killing engine: %s' % kill_cmd)
458 log.msg('Killing engine: %s' % kill_cmd)
461 kill_cmd = kill_cmd.split()
459 kill_cmd = kill_cmd.split()
462 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
460 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
463 return d
461 return d
464
462
465 def _exec_err(self, r):
463 def _exec_err(self, r):
466 log.msg(r)
464 log.msg(r)
467
465
468 #-----------------------------------------------------------------------------
466 #-----------------------------------------------------------------------------
469 # Main functions for the different types of clusters
467 # Main functions for the different types of clusters
470 #-----------------------------------------------------------------------------
468 #-----------------------------------------------------------------------------
471
469
472 # TODO:
470 # TODO:
473 # The logic in these codes should be moved into classes like LocalCluster
471 # The logic in these codes should be moved into classes like LocalCluster
474 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
472 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
475 # The main functions should then just parse the command line arguments, create
473 # The main functions should then just parse the command line arguments, create
476 # the appropriate class and call a 'start' method.
474 # the appropriate class and call a 'start' method.
477
475
478
476
479 def check_security(args, cont_args):
477 def check_security(args, cont_args):
480 """Check to see if we should run with SSL support."""
478 """Check to see if we should run with SSL support."""
481 if (not args.x or not args.y) and not have_crypto:
479 if (not args.x or not args.y) and not have_crypto:
482 log.err("""
480 log.err("""
483 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
481 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
484 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
482 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
485 reactor.stop()
483 reactor.stop()
486 return False
484 return False
487 if args.x:
485 if args.x:
488 cont_args.append('-x')
486 cont_args.append('-x')
489 if args.y:
487 if args.y:
490 cont_args.append('-y')
488 cont_args.append('-y')
491 return True
489 return True
492
490
493
491
494 def check_reuse(args, cont_args):
492 def check_reuse(args, cont_args):
495 """Check to see if we should try to resuse FURL files."""
493 """Check to see if we should try to resuse FURL files."""
496 if args.r:
494 if args.r:
497 cont_args.append('-r')
495 cont_args.append('-r')
498 if args.client_port == 0 or args.engine_port == 0:
496 if args.client_port == 0 or args.engine_port == 0:
499 log.err("""
497 log.err("""
500 To reuse FURL files, you must also set the client and engine ports using
498 To reuse FURL files, you must also set the client and engine ports using
501 the --client-port and --engine-port options.""")
499 the --client-port and --engine-port options.""")
502 reactor.stop()
500 reactor.stop()
503 return False
501 return False
504 cont_args.append('--client-port=%i' % args.client_port)
502 cont_args.append('--client-port=%i' % args.client_port)
505 cont_args.append('--engine-port=%i' % args.engine_port)
503 cont_args.append('--engine-port=%i' % args.engine_port)
506 return True
504 return True
507
505
508
506
509 def _err_and_stop(f):
507 def _err_and_stop(f):
510 """Errback to log a failure and halt the reactor on a fatal error."""
508 """Errback to log a failure and halt the reactor on a fatal error."""
511 log.err(f)
509 log.err(f)
512 reactor.stop()
510 reactor.stop()
513
511
514
512
515 def _delay_start(cont_pid, start_engines, furl_file, reuse):
513 def _delay_start(cont_pid, start_engines, furl_file, reuse):
516 """Wait for controller to create FURL files and the start the engines."""
514 """Wait for controller to create FURL files and the start the engines."""
517 if not reuse:
515 if not reuse:
518 if os.path.isfile(furl_file):
516 if os.path.isfile(furl_file):
519 os.unlink(furl_file)
517 os.unlink(furl_file)
520 log.msg('Waiting for controller to finish starting...')
518 log.msg('Waiting for controller to finish starting...')
521 d = wait_for_file(furl_file, delay=0.2, max_tries=50)
519 d = wait_for_file(furl_file, delay=0.2, max_tries=50)
522 d.addCallback(lambda _: log.msg('Controller started'))
520 d.addCallback(lambda _: log.msg('Controller started'))
523 d.addCallback(lambda _: start_engines(cont_pid))
521 d.addCallback(lambda _: start_engines(cont_pid))
524 return d
522 return d
525
523
526
524
527 def main_local(args):
525 def main_local(args):
528 cont_args = []
526 cont_args = []
529 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
527 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
530
528
531 # Check security settings before proceeding
529 # Check security settings before proceeding
532 if not check_security(args, cont_args):
530 if not check_security(args, cont_args):
533 return
531 return
534
532
535 # See if we are reusing FURL files
533 # See if we are reusing FURL files
536 if not check_reuse(args, cont_args):
534 if not check_reuse(args, cont_args):
537 return
535 return
538
536
539 cl = ControllerLauncher(extra_args=cont_args)
537 cl = ControllerLauncher(extra_args=cont_args)
540 dstart = cl.start()
538 dstart = cl.start()
541 def start_engines(cont_pid):
539 def start_engines(cont_pid):
542 engine_args = []
540 engine_args = []
543 engine_args.append('--logfile=%s' % \
541 engine_args.append('--logfile=%s' % \
544 pjoin(args.logdir,'ipengine%s-' % cont_pid))
542 pjoin(args.logdir,'ipengine%s-' % cont_pid))
545 eset = LocalEngineSet(extra_args=engine_args)
543 eset = LocalEngineSet(extra_args=engine_args)
546 def shutdown(signum, frame):
544 def shutdown(signum, frame):
547 log.msg('Stopping local cluster')
545 log.msg('Stopping local cluster')
548 # We are still playing with the times here, but these seem
546 # We are still playing with the times here, but these seem
549 # to be reliable in allowing everything to exit cleanly.
547 # to be reliable in allowing everything to exit cleanly.
550 eset.interrupt_then_kill(0.5)
548 eset.interrupt_then_kill(0.5)
551 cl.interrupt_then_kill(0.5)
549 cl.interrupt_then_kill(0.5)
552 reactor.callLater(1.0, reactor.stop)
550 reactor.callLater(1.0, reactor.stop)
553 signal.signal(signal.SIGINT,shutdown)
551 signal.signal(signal.SIGINT,shutdown)
554 d = eset.start(args.n)
552 d = eset.start(args.n)
555 return d
553 return d
556 config = kernel_config_manager.get_config_obj()
554 config = kernel_config_manager.get_config_obj()
557 furl_file = config['controller']['engine_furl_file']
555 furl_file = config['controller']['engine_furl_file']
558 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
556 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
559 dstart.addErrback(_err_and_stop)
557 dstart.addErrback(_err_and_stop)
560
558
561
559
562 def main_mpi(args):
560 def main_mpi(args):
563 cont_args = []
561 cont_args = []
564 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
562 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
565
563
566 # Check security settings before proceeding
564 # Check security settings before proceeding
567 if not check_security(args, cont_args):
565 if not check_security(args, cont_args):
568 return
566 return
569
567
570 # See if we are reusing FURL files
568 # See if we are reusing FURL files
571 if not check_reuse(args, cont_args):
569 if not check_reuse(args, cont_args):
572 return
570 return
573
571
574 cl = ControllerLauncher(extra_args=cont_args)
572 cl = ControllerLauncher(extra_args=cont_args)
575 dstart = cl.start()
573 dstart = cl.start()
576 def start_engines(cont_pid):
574 def start_engines(cont_pid):
577 raw_args = [args.cmd]
575 raw_args = [args.cmd]
578 raw_args.extend(['-n',str(args.n)])
576 raw_args.extend(['-n',str(args.n)])
579 raw_args.append('ipengine')
577 raw_args.append('ipengine')
580 raw_args.append('-l')
578 raw_args.append('-l')
581 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
579 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
582 if args.mpi:
580 if args.mpi:
583 raw_args.append('--mpi=%s' % args.mpi)
581 raw_args.append('--mpi=%s' % args.mpi)
584 eset = ProcessLauncher(raw_args)
582 eset = ProcessLauncher(raw_args)
585 def shutdown(signum, frame):
583 def shutdown(signum, frame):
586 log.msg('Stopping local cluster')
584 log.msg('Stopping local cluster')
587 # We are still playing with the times here, but these seem
585 # We are still playing with the times here, but these seem
588 # to be reliable in allowing everything to exit cleanly.
586 # to be reliable in allowing everything to exit cleanly.
589 eset.interrupt_then_kill(1.0)
587 eset.interrupt_then_kill(1.0)
590 cl.interrupt_then_kill(1.0)
588 cl.interrupt_then_kill(1.0)
591 reactor.callLater(2.0, reactor.stop)
589 reactor.callLater(2.0, reactor.stop)
592 signal.signal(signal.SIGINT,shutdown)
590 signal.signal(signal.SIGINT,shutdown)
593 d = eset.start()
591 d = eset.start()
594 return d
592 return d
595 config = kernel_config_manager.get_config_obj()
593 config = kernel_config_manager.get_config_obj()
596 furl_file = config['controller']['engine_furl_file']
594 furl_file = config['controller']['engine_furl_file']
597 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
595 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
598 dstart.addErrback(_err_and_stop)
596 dstart.addErrback(_err_and_stop)
599
597
600
598
601 def main_pbs(args):
599 def main_pbs(args):
602 cont_args = []
600 cont_args = []
603 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
601 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
604
602
605 # Check security settings before proceeding
603 # Check security settings before proceeding
606 if not check_security(args, cont_args):
604 if not check_security(args, cont_args):
607 return
605 return
608
606
609 # See if we are reusing FURL files
607 # See if we are reusing FURL files
610 if not check_reuse(args, cont_args):
608 if not check_reuse(args, cont_args):
611 return
609 return
612
610
613 cl = ControllerLauncher(extra_args=cont_args)
611 cl = ControllerLauncher(extra_args=cont_args)
614 dstart = cl.start()
612 dstart = cl.start()
615 def start_engines(r):
613 def start_engines(r):
616 pbs_set = PBSEngineSet(args.pbsscript)
614 pbs_set = PBSEngineSet(args.pbsscript)
617 def shutdown(signum, frame):
615 def shutdown(signum, frame):
618 log.msg('Stopping pbs cluster')
616 log.msg('Stopping pbs cluster')
619 d = pbs_set.kill()
617 d = pbs_set.kill()
620 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
618 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
621 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
619 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
622 signal.signal(signal.SIGINT,shutdown)
620 signal.signal(signal.SIGINT,shutdown)
623 d = pbs_set.start(args.n)
621 d = pbs_set.start(args.n)
624 return d
622 return d
625 config = kernel_config_manager.get_config_obj()
623 config = kernel_config_manager.get_config_obj()
626 furl_file = config['controller']['engine_furl_file']
624 furl_file = config['controller']['engine_furl_file']
627 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
625 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
628 dstart.addErrback(_err_and_stop)
626 dstart.addErrback(_err_and_stop)
629
627
630
628
631 def main_ssh(args):
629 def main_ssh(args):
632 """Start a controller on localhost and engines using ssh.
630 """Start a controller on localhost and engines using ssh.
633
631
634 Your clusterfile should look like::
632 Your clusterfile should look like::
635
633
636 send_furl = False # True, if you want
634 send_furl = False # True, if you want
637 engines = {
635 engines = {
638 'engine_host1' : engine_count,
636 'engine_host1' : engine_count,
639 'engine_host2' : engine_count2
637 'engine_host2' : engine_count2
640 }
638 }
641 """
639 """
642 clusterfile = {}
640 clusterfile = {}
643 execfile(args.clusterfile, clusterfile)
641 execfile(args.clusterfile, clusterfile)
644 if not clusterfile.has_key('send_furl'):
642 if not clusterfile.has_key('send_furl'):
645 clusterfile['send_furl'] = False
643 clusterfile['send_furl'] = False
646
644
647 cont_args = []
645 cont_args = []
648 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
646 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
649
647
650 # Check security settings before proceeding
648 # Check security settings before proceeding
651 if not check_security(args, cont_args):
649 if not check_security(args, cont_args):
652 return
650 return
653
651
654 # See if we are reusing FURL files
652 # See if we are reusing FURL files
655 if not check_reuse(args, cont_args):
653 if not check_reuse(args, cont_args):
656 return
654 return
657
655
658 cl = ControllerLauncher(extra_args=cont_args)
656 cl = ControllerLauncher(extra_args=cont_args)
659 dstart = cl.start()
657 dstart = cl.start()
660 def start_engines(cont_pid):
658 def start_engines(cont_pid):
661 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
659 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
662 def shutdown(signum, frame):
660 def shutdown(signum, frame):
663 d = ssh_set.kill()
661 d = ssh_set.kill()
664 cl.interrupt_then_kill(1.0)
662 cl.interrupt_then_kill(1.0)
665 reactor.callLater(2.0, reactor.stop)
663 reactor.callLater(2.0, reactor.stop)
666 signal.signal(signal.SIGINT,shutdown)
664 signal.signal(signal.SIGINT,shutdown)
667 d = ssh_set.start(clusterfile['send_furl'])
665 d = ssh_set.start(clusterfile['send_furl'])
668 return d
666 return d
669 config = kernel_config_manager.get_config_obj()
667 config = kernel_config_manager.get_config_obj()
670 furl_file = config['controller']['engine_furl_file']
668 furl_file = config['controller']['engine_furl_file']
671 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
669 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
672 dstart.addErrback(_err_and_stop)
670 dstart.addErrback(_err_and_stop)
673
671
674
672
675 def get_args():
673 def get_args():
676 base_parser = argparse.ArgumentParser(add_help=False)
674 base_parser = argparse.ArgumentParser(add_help=False)
677 base_parser.add_argument(
675 base_parser.add_argument(
678 '-r',
676 '-r',
679 action='store_true',
677 action='store_true',
680 dest='r',
678 dest='r',
681 help='try to reuse FURL files. Use with --client-port and --engine-port'
679 help='try to reuse FURL files. Use with --client-port and --engine-port'
682 )
680 )
683 base_parser.add_argument(
681 base_parser.add_argument(
684 '--client-port',
682 '--client-port',
685 type=int,
683 type=int,
686 dest='client_port',
684 dest='client_port',
687 help='the port the controller will listen on for client connections',
685 help='the port the controller will listen on for client connections',
688 default=0
686 default=0
689 )
687 )
690 base_parser.add_argument(
688 base_parser.add_argument(
691 '--engine-port',
689 '--engine-port',
692 type=int,
690 type=int,
693 dest='engine_port',
691 dest='engine_port',
694 help='the port the controller will listen on for engine connections',
692 help='the port the controller will listen on for engine connections',
695 default=0
693 default=0
696 )
694 )
697 base_parser.add_argument(
695 base_parser.add_argument(
698 '-x',
696 '-x',
699 action='store_true',
697 action='store_true',
700 dest='x',
698 dest='x',
701 help='turn off client security'
699 help='turn off client security'
702 )
700 )
703 base_parser.add_argument(
701 base_parser.add_argument(
704 '-y',
702 '-y',
705 action='store_true',
703 action='store_true',
706 dest='y',
704 dest='y',
707 help='turn off engine security'
705 help='turn off engine security'
708 )
706 )
709 base_parser.add_argument(
707 base_parser.add_argument(
710 "--logdir",
708 "--logdir",
711 type=str,
709 type=str,
712 dest="logdir",
710 dest="logdir",
713 help="directory to put log files (default=$IPYTHONDIR/log)",
711 help="directory to put log files (default=$IPYTHONDIR/log)",
714 default=pjoin(get_ipython_dir(),'log')
712 default=pjoin(get_ipython_dir(),'log')
715 )
713 )
716 base_parser.add_argument(
714 base_parser.add_argument(
717 "-n",
715 "-n",
718 "--num",
716 "--num",
719 type=int,
717 type=int,
720 dest="n",
718 dest="n",
721 default=2,
719 default=2,
722 help="the number of engines to start"
720 help="the number of engines to start"
723 )
721 )
724
722
725 parser = argparse.ArgumentParser(
723 parser = argparse.ArgumentParser(
726 description='IPython cluster startup. This starts a controller and\
724 description='IPython cluster startup. This starts a controller and\
727 engines using various approaches. Use the IPYTHONDIR environment\
725 engines using various approaches. Use the IPYTHONDIR environment\
728 variable to change your IPython directory from the default of\
726 variable to change your IPython directory from the default of\
729 .ipython or _ipython. The log and security subdirectories of your\
727 .ipython or _ipython. The log and security subdirectories of your\
730 IPython directory will be used by this script for log files and\
728 IPython directory will be used by this script for log files and\
731 security files.'
729 security files.'
732 )
730 )
733 subparsers = parser.add_subparsers(
731 subparsers = parser.add_subparsers(
734 help='available cluster types. For help, do "ipcluster TYPE --help"')
732 help='available cluster types. For help, do "ipcluster TYPE --help"')
735
733
736 parser_local = subparsers.add_parser(
734 parser_local = subparsers.add_parser(
737 'local',
735 'local',
738 help='run a local cluster',
736 help='run a local cluster',
739 parents=[base_parser]
737 parents=[base_parser]
740 )
738 )
741 parser_local.set_defaults(func=main_local)
739 parser_local.set_defaults(func=main_local)
742
740
743 parser_mpirun = subparsers.add_parser(
741 parser_mpirun = subparsers.add_parser(
744 'mpirun',
742 'mpirun',
745 help='run a cluster using mpirun (mpiexec also works)',
743 help='run a cluster using mpirun (mpiexec also works)',
746 parents=[base_parser]
744 parents=[base_parser]
747 )
745 )
748 parser_mpirun.add_argument(
746 parser_mpirun.add_argument(
749 "--mpi",
747 "--mpi",
750 type=str,
748 type=str,
751 dest="mpi", # Don't put a default here to allow no MPI support
749 dest="mpi", # Don't put a default here to allow no MPI support
752 help="how to call MPI_Init (default=mpi4py)"
750 help="how to call MPI_Init (default=mpi4py)"
753 )
751 )
754 parser_mpirun.set_defaults(func=main_mpi, cmd='mpirun')
752 parser_mpirun.set_defaults(func=main_mpi, cmd='mpirun')
755
753
756 parser_mpiexec = subparsers.add_parser(
754 parser_mpiexec = subparsers.add_parser(
757 'mpiexec',
755 'mpiexec',
758 help='run a cluster using mpiexec (mpirun also works)',
756 help='run a cluster using mpiexec (mpirun also works)',
759 parents=[base_parser]
757 parents=[base_parser]
760 )
758 )
761 parser_mpiexec.add_argument(
759 parser_mpiexec.add_argument(
762 "--mpi",
760 "--mpi",
763 type=str,
761 type=str,
764 dest="mpi", # Don't put a default here to allow no MPI support
762 dest="mpi", # Don't put a default here to allow no MPI support
765 help="how to call MPI_Init (default=mpi4py)"
763 help="how to call MPI_Init (default=mpi4py)"
766 )
764 )
767 parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
765 parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
768
766
769 parser_pbs = subparsers.add_parser(
767 parser_pbs = subparsers.add_parser(
770 'pbs',
768 'pbs',
771 help='run a pbs cluster',
769 help='run a pbs cluster',
772 parents=[base_parser]
770 parents=[base_parser]
773 )
771 )
774 parser_pbs.add_argument(
772 parser_pbs.add_argument(
775 '--pbs-script',
773 '--pbs-script',
776 type=str,
774 type=str,
777 dest='pbsscript',
775 dest='pbsscript',
778 help='PBS script template',
776 help='PBS script template',
779 default='pbs.template'
777 default='pbs.template'
780 )
778 )
781 parser_pbs.set_defaults(func=main_pbs)
779 parser_pbs.set_defaults(func=main_pbs)
782
780
783 parser_ssh = subparsers.add_parser(
781 parser_ssh = subparsers.add_parser(
784 'ssh',
782 'ssh',
785 help='run a cluster using ssh, should have ssh-keys setup',
783 help='run a cluster using ssh, should have ssh-keys setup',
786 parents=[base_parser]
784 parents=[base_parser]
787 )
785 )
788 parser_ssh.add_argument(
786 parser_ssh.add_argument(
789 '--clusterfile',
787 '--clusterfile',
790 type=str,
788 type=str,
791 dest='clusterfile',
789 dest='clusterfile',
792 help='python file describing the cluster',
790 help='python file describing the cluster',
793 default='clusterfile.py',
791 default='clusterfile.py',
794 )
792 )
795 parser_ssh.add_argument(
793 parser_ssh.add_argument(
796 '--sshx',
794 '--sshx',
797 type=str,
795 type=str,
798 dest='sshx',
796 dest='sshx',
799 help='sshx launcher helper'
797 help='sshx launcher helper'
800 )
798 )
801 parser_ssh.set_defaults(func=main_ssh)
799 parser_ssh.set_defaults(func=main_ssh)
802
800
803 args = parser.parse_args()
801 args = parser.parse_args()
804 return args
802 return args
805
803
806 def main():
804 def main():
807 args = get_args()
805 args = get_args()
808 reactor.callWhenRunning(args.func, args)
806 reactor.callWhenRunning(args.func, args)
809 log.startLogging(sys.stdout)
807 log.startLogging(sys.stdout)
810 reactor.run()
808 reactor.run()
811
809
812 if __name__ == '__main__':
810 if __name__ == '__main__':
813 main()
811 main()
@@ -1,249 +1,256 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 """Things directly related to all of twisted."""
4 """Things directly related to all of twisted."""
5
5
6 __docformat__ = "restructuredtext en"
6 __docformat__ = "restructuredtext en"
7
7
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2008 The IPython Development Team
9 # Copyright (C) 2008 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 import os, sys
19 import os, sys
20 import threading, Queue, atexit
20 import threading, Queue, atexit
21
21
22 import twisted
22 import twisted
23 from twisted.internet import defer, reactor
23 from twisted.internet import defer, reactor
24 from twisted.python import log, failure
24 from twisted.python import log, failure
25
25
26 from IPython.kernel.error import FileTimeoutError
26 from IPython.kernel.error import FileTimeoutError
27
27
28 #-------------------------------------------------------------------------------
28 #-------------------------------------------------------------------------------
29 # Classes related to twisted and threads
29 # Classes related to twisted and threads
30 #-------------------------------------------------------------------------------
30 #-------------------------------------------------------------------------------
31
31
32
32
33 class ReactorInThread(threading.Thread):
33 class ReactorInThread(threading.Thread):
34 """Run the twisted reactor in a different thread.
34 """Run the twisted reactor in a different thread.
35
35
36 For the process to be able to exit cleanly, do the following:
36 For the process to be able to exit cleanly, do the following:
37
37
38 rit = ReactorInThread()
38 rit = ReactorInThread()
39 rit.setDaemon(True)
39 rit.setDaemon(True)
40 rit.start()
40 rit.start()
41
41
42 """
42 """
43
43
44 def run(self):
44 def run(self):
45 reactor.run(installSignalHandlers=0)
45 reactor.run(installSignalHandlers=0)
46 # self.join()
46 # self.join()
47
47
48 def stop(self):
48 def stop(self):
49 # I don't think this does anything useful.
49 # I don't think this does anything useful.
50 blockingCallFromThread(reactor.stop)
50 blockingCallFromThread(reactor.stop)
51 self.join()
51 self.join()
52
52
53 if(twisted.version.major >= 8):
53 if(twisted.version.major >= 8):
54 import twisted.internet.threads
54 import twisted.internet.threads
55 def blockingCallFromThread(f, *a, **kw):
55 def blockingCallFromThread(f, *a, **kw):
56 """
56 """
57 Run a function in the reactor from a thread, and wait for the result
57 Run a function in the reactor from a thread, and wait for the result
58 synchronously, i.e. until the callback chain returned by the function get a
58 synchronously, i.e. until the callback chain returned by the function get a
59 result.
59 result.
60
60
61 Delegates to twisted.internet.threads.blockingCallFromThread(reactor, f, *a, **kw),
61 Delegates to twisted.internet.threads.blockingCallFromThread(reactor, f, *a, **kw),
62 passing twisted.internet.reactor for the first argument.
62 passing twisted.internet.reactor for the first argument.
63
63
64 @param f: the callable to run in the reactor thread
64 @param f: the callable to run in the reactor thread
65 @type f: any callable.
65 @type f: any callable.
66 @param a: the arguments to pass to C{f}.
66 @param a: the arguments to pass to C{f}.
67 @param kw: the keyword arguments to pass to C{f}.
67 @param kw: the keyword arguments to pass to C{f}.
68
68
69 @return: the result of the callback chain.
69 @return: the result of the callback chain.
70 @raise: any error raised during the callback chain.
70 @raise: any error raised during the callback chain.
71 """
71 """
72 return twisted.internet.threads.blockingCallFromThread(reactor, f, *a, **kw)
72 return twisted.internet.threads.blockingCallFromThread(reactor, f, *a, **kw)
73
73
74 else:
74 else:
75 def blockingCallFromThread(f, *a, **kw):
75 def blockingCallFromThread(f, *a, **kw):
76 """
76 """
77 Run a function in the reactor from a thread, and wait for the result
77 Run a function in the reactor from a thread, and wait for the result
78 synchronously, i.e. until the callback chain returned by the function get a
78 synchronously, i.e. until the callback chain returned by the function get a
79 result.
79 result.
80
80
81 @param f: the callable to run in the reactor thread
81 @param f: the callable to run in the reactor thread
82 @type f: any callable.
82 @type f: any callable.
83 @param a: the arguments to pass to C{f}.
83 @param a: the arguments to pass to C{f}.
84 @param kw: the keyword arguments to pass to C{f}.
84 @param kw: the keyword arguments to pass to C{f}.
85
85
86 @return: the result of the callback chain.
86 @return: the result of the callback chain.
87 @raise: any error raised during the callback chain.
87 @raise: any error raised during the callback chain.
88 """
88 """
89 from twisted.internet import reactor
89 from twisted.internet import reactor
90 queue = Queue.Queue()
90 queue = Queue.Queue()
91 def _callFromThread():
91 def _callFromThread():
92 result = defer.maybeDeferred(f, *a, **kw)
92 result = defer.maybeDeferred(f, *a, **kw)
93 result.addBoth(queue.put)
93 result.addBoth(queue.put)
94
94
95 reactor.callFromThread(_callFromThread)
95 reactor.callFromThread(_callFromThread)
96 result = queue.get()
96 result = queue.get()
97 if isinstance(result, failure.Failure):
97 if isinstance(result, failure.Failure):
98 # This makes it easier for the debugger to get access to the instance
98 # This makes it easier for the debugger to get access to the instance
99 try:
99 try:
100 result.raiseException()
100 result.raiseException()
101 except Exception, e:
101 except Exception, e:
102 raise e
102 raise e
103 return result
103 return result
104
104
105
105
106
106
107 #-------------------------------------------------------------------------------
107 #-------------------------------------------------------------------------------
108 # Things for managing deferreds
108 # Things for managing deferreds
109 #-------------------------------------------------------------------------------
109 #-------------------------------------------------------------------------------
110
110
111
111
112 def parseResults(results):
112 def parseResults(results):
113 """Pull out results/Failures from a DeferredList."""
113 """Pull out results/Failures from a DeferredList."""
114 return [x[1] for x in results]
114 return [x[1] for x in results]
115
115
116 def gatherBoth(dlist, fireOnOneCallback=0,
116 def gatherBoth(dlist, fireOnOneCallback=0,
117 fireOnOneErrback=0,
117 fireOnOneErrback=0,
118 consumeErrors=0,
118 consumeErrors=0,
119 logErrors=0):
119 logErrors=0):
120 """This is like gatherBoth, but sets consumeErrors=1."""
120 """This is like gatherBoth, but sets consumeErrors=1."""
121 d = DeferredList(dlist, fireOnOneCallback, fireOnOneErrback,
121 d = DeferredList(dlist, fireOnOneCallback, fireOnOneErrback,
122 consumeErrors, logErrors)
122 consumeErrors, logErrors)
123 if not fireOnOneCallback:
123 if not fireOnOneCallback:
124 d.addCallback(parseResults)
124 d.addCallback(parseResults)
125 return d
125 return d
126
126
127 SUCCESS = True
127 SUCCESS = True
128 FAILURE = False
128 FAILURE = False
129
129
130 class DeferredList(defer.Deferred):
130 class DeferredList(defer.Deferred):
131 """I combine a group of deferreds into one callback.
131 """I combine a group of deferreds into one callback.
132
132
133 I track a list of L{Deferred}s for their callbacks, and make a single
133 I track a list of L{Deferred}s for their callbacks, and make a single
134 callback when they have all completed, a list of (success, result)
134 callback when they have all completed, a list of (success, result)
135 tuples, 'success' being a boolean.
135 tuples, 'success' being a boolean.
136
136
137 Note that you can still use a L{Deferred} after putting it in a
137 Note that you can still use a L{Deferred} after putting it in a
138 DeferredList. For example, you can suppress 'Unhandled error in Deferred'
138 DeferredList. For example, you can suppress 'Unhandled error in Deferred'
139 messages by adding errbacks to the Deferreds *after* putting them in the
139 messages by adding errbacks to the Deferreds *after* putting them in the
140 DeferredList, as a DeferredList won't swallow the errors. (Although a more
140 DeferredList, as a DeferredList won't swallow the errors. (Although a more
141 convenient way to do this is simply to set the consumeErrors flag)
141 convenient way to do this is simply to set the consumeErrors flag)
142
142
143 Note: This is a modified version of the twisted.internet.defer.DeferredList
143 Note: This is a modified version of the twisted.internet.defer.DeferredList
144 """
144 """
145
145
146 fireOnOneCallback = 0
146 fireOnOneCallback = 0
147 fireOnOneErrback = 0
147 fireOnOneErrback = 0
148
148
149 def __init__(self, deferredList, fireOnOneCallback=0, fireOnOneErrback=0,
149 def __init__(self, deferredList, fireOnOneCallback=0, fireOnOneErrback=0,
150 consumeErrors=0, logErrors=0):
150 consumeErrors=0, logErrors=0):
151 """Initialize a DeferredList.
151 """Initialize a DeferredList.
152
152
153 @type deferredList: C{list} of L{Deferred}s
153 @type deferredList: C{list} of L{Deferred}s
154 @param deferredList: The list of deferreds to track.
154 @param deferredList: The list of deferreds to track.
155 @param fireOnOneCallback: (keyword param) a flag indicating that
155 @param fireOnOneCallback: (keyword param) a flag indicating that
156 only one callback needs to be fired for me to call
156 only one callback needs to be fired for me to call
157 my callback
157 my callback
158 @param fireOnOneErrback: (keyword param) a flag indicating that
158 @param fireOnOneErrback: (keyword param) a flag indicating that
159 only one errback needs to be fired for me to call
159 only one errback needs to be fired for me to call
160 my errback
160 my errback
161 @param consumeErrors: (keyword param) a flag indicating that any errors
161 @param consumeErrors: (keyword param) a flag indicating that any errors
162 raised in the original deferreds should be
162 raised in the original deferreds should be
163 consumed by this DeferredList. This is useful to
163 consumed by this DeferredList. This is useful to
164 prevent spurious warnings being logged.
164 prevent spurious warnings being logged.
165 """
165 """
166 self.resultList = [None] * len(deferredList)
166 self.resultList = [None] * len(deferredList)
167 defer.Deferred.__init__(self)
167 defer.Deferred.__init__(self)
168 if len(deferredList) == 0 and not fireOnOneCallback:
168 if len(deferredList) == 0 and not fireOnOneCallback:
169 self.callback(self.resultList)
169 self.callback(self.resultList)
170
170
171 # These flags need to be set *before* attaching callbacks to the
171 # These flags need to be set *before* attaching callbacks to the
172 # deferreds, because the callbacks use these flags, and will run
172 # deferreds, because the callbacks use these flags, and will run
173 # synchronously if any of the deferreds are already fired.
173 # synchronously if any of the deferreds are already fired.
174 self.fireOnOneCallback = fireOnOneCallback
174 self.fireOnOneCallback = fireOnOneCallback
175 self.fireOnOneErrback = fireOnOneErrback
175 self.fireOnOneErrback = fireOnOneErrback
176 self.consumeErrors = consumeErrors
176 self.consumeErrors = consumeErrors
177 self.logErrors = logErrors
177 self.logErrors = logErrors
178 self.finishedCount = 0
178 self.finishedCount = 0
179
179
180 index = 0
180 index = 0
181 for deferred in deferredList:
181 for deferred in deferredList:
182 deferred.addCallbacks(self._cbDeferred, self._cbDeferred,
182 deferred.addCallbacks(self._cbDeferred, self._cbDeferred,
183 callbackArgs=(index,SUCCESS),
183 callbackArgs=(index,SUCCESS),
184 errbackArgs=(index,FAILURE))
184 errbackArgs=(index,FAILURE))
185 index = index + 1
185 index = index + 1
186
186
187 def _cbDeferred(self, result, index, succeeded):
187 def _cbDeferred(self, result, index, succeeded):
188 """(internal) Callback for when one of my deferreds fires.
188 """(internal) Callback for when one of my deferreds fires.
189 """
189 """
190 self.resultList[index] = (succeeded, result)
190 self.resultList[index] = (succeeded, result)
191
191
192 self.finishedCount += 1
192 self.finishedCount += 1
193 if not self.called:
193 if not self.called:
194 if succeeded == SUCCESS and self.fireOnOneCallback:
194 if succeeded == SUCCESS and self.fireOnOneCallback:
195 self.callback((result, index))
195 self.callback((result, index))
196 elif succeeded == FAILURE and self.fireOnOneErrback:
196 elif succeeded == FAILURE and self.fireOnOneErrback:
197 # We have modified this to fire the errback chain with the actual
197 # We have modified this to fire the errback chain with the actual
198 # Failure instance the originally occured rather than twisted's
198 # Failure instance the originally occured rather than twisted's
199 # FirstError which wraps the failure
199 # FirstError which wraps the failure
200 self.errback(result)
200 self.errback(result)
201 elif self.finishedCount == len(self.resultList):
201 elif self.finishedCount == len(self.resultList):
202 self.callback(self.resultList)
202 self.callback(self.resultList)
203
203
204 if succeeded == FAILURE and self.logErrors:
204 if succeeded == FAILURE and self.logErrors:
205 log.err(result)
205 log.err(result)
206 if succeeded == FAILURE and self.consumeErrors:
206 if succeeded == FAILURE and self.consumeErrors:
207 result = None
207 result = None
208
208
209 return result
209 return result
210
210
211
211
212 def wait_for_file(filename, delay=0.1, max_tries=10):
212 def wait_for_file(filename, delay=0.1, max_tries=10):
213 """Wait (poll) for a file to be created.
213 """Wait (poll) for a file to be created.
214
214
215 This method returns a Deferred that will fire when a file exists. It
215 This method returns a Deferred that will fire when a file exists. It
216 works by polling os.path.isfile in time intervals specified by the
216 works by polling os.path.isfile in time intervals specified by the
217 delay argument. If `max_tries` is reached, it will errback with a
217 delay argument. If `max_tries` is reached, it will errback with a
218 `FileTimeoutError`.
218 `FileTimeoutError`.
219
219
220 Parameters
220 Parameters
221 ----------
221 ----------
222 filename : str
222 filename : str
223 The name of the file to wait for.
223 The name of the file to wait for.
224 delay : float
224 delay : float
225 The time to wait between polls.
225 The time to wait between polls.
226 max_tries : int
226 max_tries : int
227 The max number of attempts before raising `FileTimeoutError`
227 The max number of attempts before raising `FileTimeoutError`
228
228
229 Returns
229 Returns
230 -------
230 -------
231 d : Deferred
231 d : Deferred
232 A Deferred instance that will fire when the file exists.
232 A Deferred instance that will fire when the file exists.
233 """
233 """
234
234
235 d = defer.Deferred()
235 d = defer.Deferred()
236
236
237 def _test_for_file(filename, attempt=0):
237 def _test_for_file(filename, attempt=0):
238 if attempt >= max_tries:
238 if attempt >= max_tries:
239 d.errback(FileTimeoutError(
239 d.errback(FileTimeoutError(
240 'timeout waiting for file to be created: %s' % filename
240 'timeout waiting for file to be created: %s' % filename
241 ))
241 ))
242 else:
242 else:
243 if os.path.isfile(filename):
243 if os.path.isfile(filename):
244 d.callback(True)
244 d.callback(True)
245 else:
245 else:
246 reactor.callLater(delay, _test_for_file, filename, attempt+1)
246 reactor.callLater(delay, _test_for_file, filename, attempt+1)
247
247
248 _test_for_file(filename)
248 _test_for_file(filename)
249 return d
249 return d
250
251
252 def sleep_deferred(seconds):
253 """Sleep without blocking the event loop."""
254 d = defer.Deferred()
255 reactor.callLater(seconds, d.callback, seconds)
256 return d
@@ -1,125 +1,143 b''
1 #!/usr/bin/env python
1 # encoding: utf-8
2 # encoding: utf-8
2
3 """
3 """The IPython Core Notification Center.
4 The IPython Core Notification Center.
4
5
5 See docs/source/development/notification_blueprint.txt for an overview of the
6 See docs/source/development/notification_blueprint.txt for an overview of the
6 notification module.
7 notification module.
7 """
8
8
9 __docformat__ = "restructuredtext en"
9 Authors:
10
11 * Barry Wark
12 * Brian Granger
13 """
10
14
11 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
12 # Copyright (C) 2008 The IPython Development Team
16 # Copyright (C) 2008-2009 The IPython Development Team
13 #
17 #
14 # Distributed under the terms of the BSD License. The full license is in
18 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
19 # the file COPYING, distributed as part of this software.
16 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
17
21
18 # Tell nose to skip the testing of this module
22 #-----------------------------------------------------------------------------
19 __test__ = {}
23 # Code
24 #-----------------------------------------------------------------------------
25
26
27 class NotificationError(Exception):
28 pass
29
20
30
21 class NotificationCenter(object):
31 class NotificationCenter(object):
22 """Synchronous notification center
32 """Synchronous notification center.
23
33
24 Examples
34 Examples
25 --------
35 --------
26 >>> import IPython.kernel.core.notification as notification
36 Here is a simple example of how to use this::
27 >>> def callback(theType, theSender, args={}):
28 ... print theType,theSender,args
29 ...
30 >>> notification.sharedCenter.add_observer(callback, 'NOTIFICATION_TYPE', None)
31 >>> notification.sharedCenter.post_notification('NOTIFICATION_TYPE', object()) # doctest:+ELLIPSIS
32 NOTIFICATION_TYPE ...
33
37
38 import IPython.kernel.core.notification as notification
39 def callback(ntype, theSender, args={}):
40 print ntype,theSender,args
41
42 notification.sharedCenter.add_observer(callback, 'NOTIFICATION_TYPE', None)
43 notification.sharedCenter.post_notification('NOTIFICATION_TYPE', object()) # doctest:+ELLIPSIS
44 NOTIFICATION_TYPE ...
34 """
45 """
35 def __init__(self):
46 def __init__(self):
36 super(NotificationCenter, self).__init__()
47 super(NotificationCenter, self).__init__()
37 self._init_observers()
48 self._init_observers()
38
49
39
40 def _init_observers(self):
50 def _init_observers(self):
41 """Initialize observer storage"""
51 """Initialize observer storage"""
42
52
43 self.registered_types = set() #set of types that are observed
53 self.registered_types = set() #set of types that are observed
44 self.registered_senders = set() #set of senders that are observed
54 self.registered_senders = set() #set of senders that are observed
45 self.observers = {} #map (type,sender) => callback (callable)
55 self.observers = {} #map (type,sender) => callback (callable)
46
56
57 def post_notification(self, ntype, sender, *args, **kwargs):
58 """Post notification to all registered observers.
47
59
48 def post_notification(self, theType, sender, **kwargs):
60 The registered callback will be called as::
49 """Post notification (type,sender,**kwargs) to all registered
50 observers.
51
61
52 Implementation notes:
62 callback(ntype, sender, *args, **kwargs)
53
63
64 Parameters
65 ----------
66 ntype : hashable
67 The notification type.
68 sender : hashable
69 The object sending the notification.
70 *args : tuple
71 The positional arguments to be passed to the callback.
72 **kwargs : dict
73 The keyword argument to be passed to the callback.
74
75 Notes
76 -----
54 * If no registered observers, performance is O(1).
77 * If no registered observers, performance is O(1).
55 * Notificaiton order is undefined.
78 * Notificaiton order is undefined.
56 * Notifications are posted synchronously.
79 * Notifications are posted synchronously.
57 """
80 """
58
81
59 if(theType==None or sender==None):
82 if(ntype==None or sender==None):
60 raise Exception("NotificationCenter.post_notification requires \
83 raise NotificationError(
61 type and sender.")
84 "Notification type and sender are required.")
62
85
63 # If there are no registered observers for the type/sender pair
86 # If there are no registered observers for the type/sender pair
64 if((theType not in self.registered_types and
87 if((ntype not in self.registered_types and
65 None not in self.registered_types) or
88 None not in self.registered_types) or
66 (sender not in self.registered_senders and
89 (sender not in self.registered_senders and
67 None not in self.registered_senders)):
90 None not in self.registered_senders)):
68 return
91 return
69
92
70 for o in self._observers_for_notification(theType, sender):
93 for o in self._observers_for_notification(ntype, sender):
71 o(theType, sender, args=kwargs)
94 o(ntype, sender, *args, **kwargs)
72
95
73
96 def _observers_for_notification(self, ntype, sender):
74 def _observers_for_notification(self, theType, sender):
75 """Find all registered observers that should recieve notification"""
97 """Find all registered observers that should recieve notification"""
76
98
77 keys = (
99 keys = (
78 (theType,sender),
100 (ntype,sender),
79 (theType, None),
101 (ntype, None),
80 (None, sender),
102 (None, sender),
81 (None,None)
103 (None,None)
82 )
104 )
83
105
84
85 obs = set()
106 obs = set()
86 for k in keys:
107 for k in keys:
87 obs.update(self.observers.get(k, set()))
108 obs.update(self.observers.get(k, set()))
88
109
89 return obs
110 return obs
90
111
91
112 def add_observer(self, callback, ntype, sender):
92 def add_observer(self, callback, theType, sender):
93 """Add an observer callback to this notification center.
113 """Add an observer callback to this notification center.
94
114
95 The given callback will be called upon posting of notifications of
115 The given callback will be called upon posting of notifications of
96 the given type/sender and will receive any additional kwargs passed
116 the given type/sender and will receive any additional arguments passed
97 to post_notification.
117 to post_notification.
98
118
99 Parameters
119 Parameters
100 ----------
120 ----------
101 observerCallback : callable
121 callback : callable
102 Callable. Must take at least two arguments::
122 The callable that will be called by :meth:`post_notification`
103 observerCallback(type, sender, args={})
123 as ``callback(ntype, sender, *args, **kwargs)
104
124 ntype : hashable
105 theType : hashable
106 The notification type. If None, all notifications from sender
125 The notification type. If None, all notifications from sender
107 will be posted.
126 will be posted.
108
109 sender : hashable
127 sender : hashable
110 The notification sender. If None, all notifications of theType
128 The notification sender. If None, all notifications of ntype
111 will be posted.
129 will be posted.
112 """
130 """
113 assert(callback != None)
131 assert(callback != None)
114 self.registered_types.add(theType)
132 self.registered_types.add(ntype)
115 self.registered_senders.add(sender)
133 self.registered_senders.add(sender)
116 self.observers.setdefault((theType,sender), set()).add(callback)
134 self.observers.setdefault((ntype,sender), set()).add(callback)
117
135
118 def remove_all_observers(self):
136 def remove_all_observers(self):
119 """Removes all observers from this notification center"""
137 """Removes all observers from this notification center"""
120
138
121 self._init_observers()
139 self._init_observers()
122
140
123
141
124
142
125 sharedCenter = NotificationCenter()
143 shared_center = NotificationCenter()
@@ -1,161 +1,154 b''
1 # encoding: utf-8
1 # encoding: utf-8
2
2
3 """This file contains unittests for the notification.py module."""
3 """This file contains unittests for the notification.py module."""
4
4
5 #-----------------------------------------------------------------------------
5 #-----------------------------------------------------------------------------
6 # Copyright (C) 2008-2009 The IPython Development Team
6 # Copyright (C) 2008-2009 The IPython Development Team
7 #
7 #
8 # Distributed under the terms of the BSD License. The full license is
8 # Distributed under the terms of the BSD License. The full license is
9 # in the file COPYING, distributed as part of this software.
9 # in the file COPYING, distributed as part of this software.
10 #-----------------------------------------------------------------------------
10 #-----------------------------------------------------------------------------
11
11
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13 # Imports
13 # Imports
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15
15
16 # Tell nose to skip this module
16 import unittest
17 __test__ = {}
18
17
19 from twisted.trial import unittest
18 from IPython.utils.notification import (
20 import IPython.kernel.core.notification as notification
19 NotificationCenter,
20 NotificationError,
21 shared_center
22 )
21
23
22 #-----------------------------------------------------------------------------
24 #-----------------------------------------------------------------------------
23 # Support Classes
25 # Support Classes
24 #-----------------------------------------------------------------------------
26 #-----------------------------------------------------------------------------
25
27
28
26 class Observer(object):
29 class Observer(object):
27 """docstring for Observer"""
30
28 def __init__(self, expectedType, expectedSender,
31 def __init__(self, expected_ntype, expected_sender,
29 center=notification.sharedCenter, **kwargs):
32 center=shared_center, *args, **kwargs):
30 super(Observer, self).__init__()
33 super(Observer, self).__init__()
31 self.expectedType = expectedType
34 self.expected_ntype = expected_ntype
32 self.expectedSender = expectedSender
35 self.expected_sender = expected_sender
33 self.expectedKwArgs = kwargs
36 self.expected_args = args
37 self.expected_kwargs = kwargs
34 self.recieved = False
38 self.recieved = False
35 center.add_observer(self.callback,
39 center.add_observer(self.callback,
36 self.expectedType,
40 self.expected_ntype,
37 self.expectedSender)
41 self.expected_sender)
38
42
39 def callback(self, theType, sender, args={}):
43 def callback(self, ntype, sender, *args, **kwargs):
40 """callback"""
44 assert(ntype == self.expected_ntype or
41
45 self.expected_ntype == None)
42 assert(theType == self.expectedType or
46 assert(sender == self.expected_sender or
43 self.expectedType == None)
47 self.expected_sender == None)
44 assert(sender == self.expectedSender or
48 assert(args == self.expected_args)
45 self.expectedSender == None)
49 assert(kwargs == self.expected_kwargs)
46 assert(args == self.expectedKwArgs)
47 self.recieved = True
50 self.recieved = True
48
51
49 def verify(self):
52 def verify(self):
50 """verify"""
51
52 assert(self.recieved)
53 assert(self.recieved)
53
54
54 def reset(self):
55 def reset(self):
55 """reset"""
56
57 self.recieved = False
56 self.recieved = False
58
57
59
58
60 class Notifier(object):
59 class Notifier(object):
61 """docstring for Notifier"""
60
62 def __init__(self, theType, **kwargs):
61 def __init__(self, ntype, **kwargs):
63 super(Notifier, self).__init__()
62 super(Notifier, self).__init__()
64 self.theType = theType
63 self.ntype = ntype
65 self.kwargs = kwargs
64 self.kwargs = kwargs
66
65
67 def post(self, center=notification.sharedCenter):
66 def post(self, center=shared_center):
68 """fire"""
69
67
70 center.post_notification(self.theType, self,
68 center.post_notification(self.ntype, self,
71 **self.kwargs)
69 **self.kwargs)
72
70
71
73 #-----------------------------------------------------------------------------
72 #-----------------------------------------------------------------------------
74 # Tests
73 # Tests
75 #-----------------------------------------------------------------------------
74 #-----------------------------------------------------------------------------
76
75
76
77 class NotificationTests(unittest.TestCase):
77 class NotificationTests(unittest.TestCase):
78 """docstring for NotificationTests"""
79
78
80 def tearDown(self):
79 def tearDown(self):
81 notification.sharedCenter.remove_all_observers()
80 shared_center.remove_all_observers()
82
81
83 def test_notification_delivered(self):
82 def test_notification_delivered(self):
84 """Test that notifications are delivered"""
83 """Test that notifications are delivered"""
85 expectedType = 'EXPECTED_TYPE'
86 sender = Notifier(expectedType)
87 observer = Observer(expectedType, sender)
88
84
89 sender.post()
85 expected_ntype = 'EXPECTED_TYPE'
86 sender = Notifier(expected_ntype)
87 observer = Observer(expected_ntype, sender)
90
88
89 sender.post()
91 observer.verify()
90 observer.verify()
92
91
93 def test_type_specificity(self):
92 def test_type_specificity(self):
94 """Test that observers are registered by type"""
93 """Test that observers are registered by type"""
95
94
96 expectedType = 1
95 expected_ntype = 1
97 unexpectedType = "UNEXPECTED_TYPE"
96 unexpected_ntype = "UNEXPECTED_TYPE"
98 sender = Notifier(expectedType)
97 sender = Notifier(expected_ntype)
99 unexpectedSender = Notifier(unexpectedType)
98 unexpected_sender = Notifier(unexpected_ntype)
100 observer = Observer(expectedType, sender)
99 observer = Observer(expected_ntype, sender)
101
100
102 sender.post()
101 sender.post()
103 unexpectedSender.post()
102 unexpected_sender.post()
104
105 observer.verify()
103 observer.verify()
106
104
107 def test_sender_specificity(self):
105 def test_sender_specificity(self):
108 """Test that observers are registered by sender"""
106 """Test that observers are registered by sender"""
109
107
110 expectedType = "EXPECTED_TYPE"
108 expected_ntype = "EXPECTED_TYPE"
111 sender1 = Notifier(expectedType)
109 sender1 = Notifier(expected_ntype)
112 sender2 = Notifier(expectedType)
110 sender2 = Notifier(expected_ntype)
113 observer = Observer(expectedType, sender1)
111 observer = Observer(expected_ntype, sender1)
114
112
115 sender1.post()
113 sender1.post()
116 sender2.post()
114 sender2.post()
117
115
118 observer.verify()
116 observer.verify()
119
117
120 def test_remove_all_observers(self):
118 def test_remove_all_observers(self):
121 """White-box test for remove_all_observers"""
119 """White-box test for remove_all_observers"""
122
120
123 for i in xrange(10):
121 for i in xrange(10):
124 Observer('TYPE', None, center=notification.sharedCenter)
122 Observer('TYPE', None, center=shared_center)
125
123
126 self.assert_(len(notification.sharedCenter.observers[('TYPE',None)]) >= 10,
124 self.assert_(len(shared_center.observers[('TYPE',None)]) >= 10,
127 "observers registered")
125 "observers registered")
128
126
129 notification.sharedCenter.remove_all_observers()
127 shared_center.remove_all_observers()
130
128 self.assert_(len(shared_center.observers) == 0, "observers removed")
131 self.assert_(len(notification.sharedCenter.observers) == 0, "observers removed")
132
129
133 def test_any_sender(self):
130 def test_any_sender(self):
134 """test_any_sender"""
131 expected_ntype = "EXPECTED_TYPE"
135
132 sender1 = Notifier(expected_ntype)
136 expectedType = "EXPECTED_TYPE"
133 sender2 = Notifier(expected_ntype)
137 sender1 = Notifier(expectedType)
134 observer = Observer(expected_ntype, None)
138 sender2 = Notifier(expectedType)
139 observer = Observer(expectedType, None)
140
141
135
142 sender1.post()
136 sender1.post()
143 observer.verify()
137 observer.verify()
144
138
145 observer.reset()
139 observer.reset()
146 sender2.post()
140 sender2.post()
147 observer.verify()
141 observer.verify()
148
142
149 def test_post_performance(self):
143 def test_post_performance(self):
150 """Test that post_notification, even with many registered irrelevant
144 """Test that post_notification, even with many registered irrelevant
151 observers is fast"""
145 observers is fast"""
152
146
153 for i in xrange(10):
147 for i in xrange(10):
154 Observer("UNRELATED_TYPE", None)
148 Observer("UNRELATED_TYPE", None)
155
149
156 o = Observer('EXPECTED_TYPE', None)
150 o = Observer('EXPECTED_TYPE', None)
157
151 shared_center.post_notification('EXPECTED_TYPE', self)
158 notification.sharedCenter.post_notification('EXPECTED_TYPE', self)
159
160 o.verify()
152 o.verify()
161
153
154
@@ -1,210 +1,210 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
2 # -*- coding: utf-8 -*-
3 """Setup script for IPython.
3 """Setup script for IPython.
4
4
5 Under Posix environments it works like a typical setup.py script.
5 Under Posix environments it works like a typical setup.py script.
6 Under Windows, the command sdist is not supported, since IPython
6 Under Windows, the command sdist is not supported, since IPython
7 requires utilities which are not available under Windows."""
7 requires utilities which are not available under Windows."""
8
8
9 #-------------------------------------------------------------------------------
9 #-------------------------------------------------------------------------------
10 # Copyright (C) 2008 The IPython Development Team
10 # Copyright (C) 2008 The IPython Development Team
11 #
11 #
12 # Distributed under the terms of the BSD License. The full license is in
12 # Distributed under the terms of the BSD License. The full license is in
13 # the file COPYING, distributed as part of this software.
13 # the file COPYING, distributed as part of this software.
14 #-------------------------------------------------------------------------------
14 #-------------------------------------------------------------------------------
15
15
16 #-------------------------------------------------------------------------------
16 #-------------------------------------------------------------------------------
17 # Imports
17 # Imports
18 #-------------------------------------------------------------------------------
18 #-------------------------------------------------------------------------------
19
19
20 # Stdlib imports
20 # Stdlib imports
21 import os
21 import os
22 import sys
22 import sys
23
23
24 from glob import glob
24 from glob import glob
25
25
26 # BEFORE importing distutils, remove MANIFEST. distutils doesn't properly
26 # BEFORE importing distutils, remove MANIFEST. distutils doesn't properly
27 # update it when the contents of directories change.
27 # update it when the contents of directories change.
28 if os.path.exists('MANIFEST'): os.remove('MANIFEST')
28 if os.path.exists('MANIFEST'): os.remove('MANIFEST')
29
29
30 from distutils.core import setup
30 from distutils.core import setup
31
31
32 from IPython.utils.genutils import target_update
32 from IPython.utils.genutils import target_update
33
33
34 from setupbase import (
34 from setupbase import (
35 setup_args,
35 setup_args,
36 find_packages,
36 find_packages,
37 find_package_data,
37 find_package_data,
38 find_scripts,
38 find_scripts,
39 find_data_files,
39 find_data_files,
40 check_for_dependencies
40 check_for_dependencies
41 )
41 )
42
42
43 isfile = os.path.isfile
43 isfile = os.path.isfile
44 pjoin = os.path.join
44 pjoin = os.path.join
45
45
46 #-------------------------------------------------------------------------------
46 #-------------------------------------------------------------------------------
47 # Handle OS specific things
47 # Handle OS specific things
48 #-------------------------------------------------------------------------------
48 #-------------------------------------------------------------------------------
49
49
50 if os.name == 'posix':
50 if os.name == 'posix':
51 os_name = 'posix'
51 os_name = 'posix'
52 elif os.name in ['nt','dos']:
52 elif os.name in ['nt','dos']:
53 os_name = 'windows'
53 os_name = 'windows'
54 else:
54 else:
55 print 'Unsupported operating system:',os.name
55 print 'Unsupported operating system:',os.name
56 sys.exit(1)
56 sys.exit(1)
57
57
58 # Under Windows, 'sdist' has not been supported. Now that the docs build with
58 # Under Windows, 'sdist' has not been supported. Now that the docs build with
59 # Sphinx it might work, but let's not turn it on until someone confirms that it
59 # Sphinx it might work, but let's not turn it on until someone confirms that it
60 # actually works.
60 # actually works.
61 if os_name == 'windows' and 'sdist' in sys.argv:
61 if os_name == 'windows' and 'sdist' in sys.argv:
62 print 'The sdist command is not available under Windows. Exiting.'
62 print 'The sdist command is not available under Windows. Exiting.'
63 sys.exit(1)
63 sys.exit(1)
64
64
65 #-------------------------------------------------------------------------------
65 #-------------------------------------------------------------------------------
66 # Things related to the IPython documentation
66 # Things related to the IPython documentation
67 #-------------------------------------------------------------------------------
67 #-------------------------------------------------------------------------------
68
68
69 # update the manuals when building a source dist
69 # update the manuals when building a source dist
70 if len(sys.argv) >= 2 and sys.argv[1] in ('sdist','bdist_rpm'):
70 if len(sys.argv) >= 2 and sys.argv[1] in ('sdist','bdist_rpm'):
71 import textwrap
71 import textwrap
72
72
73 # List of things to be updated. Each entry is a triplet of args for
73 # List of things to be updated. Each entry is a triplet of args for
74 # target_update()
74 # target_update()
75 to_update = [
75 to_update = [
76 # FIXME - Disabled for now: we need to redo an automatic way
76 # FIXME - Disabled for now: we need to redo an automatic way
77 # of generating the magic info inside the rst.
77 # of generating the magic info inside the rst.
78 #('docs/magic.tex',
78 #('docs/magic.tex',
79 #['IPython/Magic.py'],
79 #['IPython/Magic.py'],
80 #"cd doc && ./update_magic.sh" ),
80 #"cd doc && ./update_magic.sh" ),
81
81
82 ('docs/man/ipcluster.1.gz',
82 ('docs/man/ipcluster.1.gz',
83 ['docs/man/ipcluster.1'],
83 ['docs/man/ipcluster.1'],
84 'cd docs/man && gzip -9c ipcluster.1 > ipcluster.1.gz'),
84 'cd docs/man && gzip -9c ipcluster.1 > ipcluster.1.gz'),
85
85
86 ('docs/man/ipcontroller.1.gz',
86 ('docs/man/ipcontroller.1.gz',
87 ['docs/man/ipcontroller.1'],
87 ['docs/man/ipcontroller.1'],
88 'cd docs/man && gzip -9c ipcontroller.1 > ipcontroller.1.gz'),
88 'cd docs/man && gzip -9c ipcontroller.1 > ipcontroller.1.gz'),
89
89
90 ('docs/man/ipengine.1.gz',
90 ('docs/man/ipengine.1.gz',
91 ['docs/man/ipengine.1'],
91 ['docs/man/ipengine.1'],
92 'cd docs/man && gzip -9c ipengine.1 > ipengine.1.gz'),
92 'cd docs/man && gzip -9c ipengine.1 > ipengine.1.gz'),
93
93
94 ('docs/man/ipython.1.gz',
94 ('docs/man/ipython.1.gz',
95 ['docs/man/ipython.1'],
95 ['docs/man/ipython.1'],
96 'cd docs/man && gzip -9c ipython.1 > ipython.1.gz'),
96 'cd docs/man && gzip -9c ipython.1 > ipython.1.gz'),
97
97
98 ('docs/man/ipython-wx.1.gz',
98 ('docs/man/ipython-wx.1.gz',
99 ['docs/man/ipython-wx.1'],
99 ['docs/man/ipython-wx.1'],
100 'cd docs/man && gzip -9c ipython-wx.1 > ipython-wx.1.gz'),
100 'cd docs/man && gzip -9c ipython-wx.1 > ipython-wx.1.gz'),
101
101
102 ('docs/man/ipythonx.1.gz',
102 ('docs/man/ipythonx.1.gz',
103 ['docs/man/ipythonx.1'],
103 ['docs/man/ipythonx.1'],
104 'cd docs/man && gzip -9c ipythonx.1 > ipythonx.1.gz'),
104 'cd docs/man && gzip -9c ipythonx.1 > ipythonx.1.gz'),
105
105
106 ('docs/man/irunner.1.gz',
106 ('docs/man/irunner.1.gz',
107 ['docs/man/irunner.1'],
107 ['docs/man/irunner.1'],
108 'cd docs/man && gzip -9c irunner.1 > irunner.1.gz'),
108 'cd docs/man && gzip -9c irunner.1 > irunner.1.gz'),
109
109
110 ('docs/man/pycolor.1.gz',
110 ('docs/man/pycolor.1.gz',
111 ['docs/man/pycolor.1'],
111 ['docs/man/pycolor.1'],
112 'cd docs/man && gzip -9c pycolor.1 > pycolor.1.gz'),
112 'cd docs/man && gzip -9c pycolor.1 > pycolor.1.gz'),
113 ]
113 ]
114
114
115 # Only build the docs if sphinx is present
115 # Only build the docs if sphinx is present
116 try:
116 try:
117 import sphinx
117 import sphinx
118 except ImportError:
118 except ImportError:
119 pass
119 pass
120 else:
120 else:
121 # The Makefile calls the do_sphinx scripts to build html and pdf, so
121 # The Makefile calls the do_sphinx scripts to build html and pdf, so
122 # just one target is enough to cover all manual generation
122 # just one target is enough to cover all manual generation
123
123
124 # First, compute all the dependencies that can force us to rebuild the
124 # First, compute all the dependencies that can force us to rebuild the
125 # docs. Start with the main release file that contains metadata
125 # docs. Start with the main release file that contains metadata
126 docdeps = ['IPython/core/release.py']
126 docdeps = ['IPython/core/release.py']
127 # Inculde all the reST sources
127 # Inculde all the reST sources
128 pjoin = os.path.join
128 pjoin = os.path.join
129 for dirpath,dirnames,filenames in os.walk('docs/source'):
129 for dirpath,dirnames,filenames in os.walk('docs/source'):
130 if dirpath in ['_static','_templates']:
130 if dirpath in ['_static','_templates']:
131 continue
131 continue
132 docdeps += [ pjoin(dirpath,f) for f in filenames
132 docdeps += [ pjoin(dirpath,f) for f in filenames
133 if f.endswith('.txt') ]
133 if f.endswith('.txt') ]
134 # and the examples
134 # and the examples
135 for dirpath,dirnames,filenames in os.walk('docs/example'):
135 for dirpath,dirnames,filenames in os.walk('docs/example'):
136 docdeps += [ pjoin(dirpath,f) for f in filenames
136 docdeps += [ pjoin(dirpath,f) for f in filenames
137 if not f.endswith('~') ]
137 if not f.endswith('~') ]
138 # then, make them all dependencies for the main PDF (the html will get
138 # then, make them all dependencies for the main PDF (the html will get
139 # auto-generated as well).
139 # auto-generated as well).
140 to_update.append(
140 to_update.append(
141 ('docs/dist/ipython.pdf',
141 ('docs/dist/ipython.pdf',
142 docdeps,
142 docdeps,
143 "cd docs && make dist")
143 "cd docs && make dist")
144 )
144 )
145
145
146 [ target_update(*t) for t in to_update ]
146 [ target_update(*t) for t in to_update ]
147
147
148
148
149 #---------------------------------------------------------------------------
149 #---------------------------------------------------------------------------
150 # Find all the packages, package data, scripts and data_files
150 # Find all the packages, package data, scripts and data_files
151 #---------------------------------------------------------------------------
151 #---------------------------------------------------------------------------
152
152
153 packages = find_packages()
153 packages = find_packages()
154 package_data = find_package_data()
154 package_data = find_package_data()
155 scripts = find_scripts()
155 scripts = find_scripts()
156 data_files = find_data_files()
156 data_files = find_data_files()
157
157
158 #---------------------------------------------------------------------------
158 #---------------------------------------------------------------------------
159 # Handle dependencies and setuptools specific things
159 # Handle dependencies and setuptools specific things
160 #---------------------------------------------------------------------------
160 #---------------------------------------------------------------------------
161
161
162 # This dict is used for passing extra arguments that are setuptools
162 # This dict is used for passing extra arguments that are setuptools
163 # specific to setup
163 # specific to setup
164 setuptools_extra_args = {}
164 setuptools_extra_args = {}
165
165
166 if 'setuptools' in sys.modules:
166 if 'setuptools' in sys.modules:
167 setuptools_extra_args['zip_safe'] = False
167 setuptools_extra_args['zip_safe'] = False
168 setuptools_extra_args['entry_points'] = {
168 setuptools_extra_args['entry_points'] = {
169 'console_scripts': [
169 'console_scripts': [
170 'ipython = IPython.core.ipapp:launch_new_instance',
170 'ipython = IPython.core.ipapp:launch_new_instance',
171 'pycolor = IPython.utils.PyColorize:main',
171 'pycolor = IPython.utils.PyColorize:main',
172 'ipcontroller = IPython.kernel.ipcontrollerapp:launch_new_instance',
172 'ipcontroller = IPython.kernel.ipcontrollerapp:launch_new_instance',
173 'ipengine = IPython.kernel.ipengineapp:launch_new_instance',
173 'ipengine = IPython.kernel.ipengineapp:launch_new_instance',
174 'ipcluster = IPython.kernel.scripts.ipcluster:main',
174 'ipcluster = IPython.kernel.ipclusterapp:launch_new_instance',
175 'ipythonx = IPython.frontend.wx.ipythonx:main',
175 'ipythonx = IPython.frontend.wx.ipythonx:main',
176 'iptest = IPython.testing.iptest:main',
176 'iptest = IPython.testing.iptest:main',
177 'irunner = IPython.lib.irunner:main'
177 'irunner = IPython.lib.irunner:main'
178 ]
178 ]
179 }
179 }
180 setup_args['extras_require'] = dict(
180 setup_args['extras_require'] = dict(
181 kernel = [
181 kernel = [
182 'zope.interface>=3.4.1',
182 'zope.interface>=3.4.1',
183 'Twisted>=8.0.1',
183 'Twisted>=8.0.1',
184 'foolscap>=0.2.6'
184 'foolscap>=0.2.6'
185 ],
185 ],
186 doc='Sphinx>=0.3',
186 doc='Sphinx>=0.3',
187 test='nose>=0.10.1',
187 test='nose>=0.10.1',
188 security='pyOpenSSL>=0.6'
188 security='pyOpenSSL>=0.6'
189 )
189 )
190 # Allow setuptools to handle the scripts
190 # Allow setuptools to handle the scripts
191 scripts = []
191 scripts = []
192 else:
192 else:
193 # If we are running without setuptools, call this function which will
193 # If we are running without setuptools, call this function which will
194 # check for dependencies an inform the user what is needed. This is
194 # check for dependencies an inform the user what is needed. This is
195 # just to make life easy for users.
195 # just to make life easy for users.
196 check_for_dependencies()
196 check_for_dependencies()
197
197
198
198
199 #---------------------------------------------------------------------------
199 #---------------------------------------------------------------------------
200 # Do the actual setup now
200 # Do the actual setup now
201 #---------------------------------------------------------------------------
201 #---------------------------------------------------------------------------
202
202
203 setup_args['packages'] = packages
203 setup_args['packages'] = packages
204 setup_args['package_data'] = package_data
204 setup_args['package_data'] = package_data
205 setup_args['scripts'] = scripts
205 setup_args['scripts'] = scripts
206 setup_args['data_files'] = data_files
206 setup_args['data_files'] = data_files
207 setup_args.update(setuptools_extra_args)
207 setup_args.update(setuptools_extra_args)
208
208
209 if __name__ == '__main__':
209 if __name__ == '__main__':
210 setup(**setup_args)
210 setup(**setup_args)
General Comments 0
You need to be logged in to leave comments. Login now