##// END OF EJS Templates
restore auto_create behavior
MinRK -
Show More
@@ -1,525 +1,539 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython cluster directory
4 The IPython cluster directory
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 from __future__ import with_statement
18 from __future__ import with_statement
19
19
20 import os
20 import os
21 import logging
21 import logging
22 import re
22 import re
23 import shutil
23 import shutil
24 import sys
24 import sys
25
25
26 from subprocess import Popen, PIPE
26 from subprocess import Popen, PIPE
27
27
28 from IPython.config.loader import PyFileConfigLoader, Config
28 from IPython.config.loader import PyFileConfigLoader, Config
29 from IPython.config.configurable import Configurable
29 from IPython.config.configurable import Configurable
30 from IPython.config.application import Application
30 from IPython.config.application import Application
31 from IPython.core.crashhandler import CrashHandler
31 from IPython.core.crashhandler import CrashHandler
32 from IPython.core.newapplication import BaseIPythonApplication
32 from IPython.core.newapplication import BaseIPythonApplication
33 from IPython.core import release
33 from IPython.core import release
34 from IPython.utils.path import (
34 from IPython.utils.path import (
35 get_ipython_package_dir,
35 get_ipython_package_dir,
36 get_ipython_dir,
36 get_ipython_dir,
37 expand_path
37 expand_path
38 )
38 )
39 from IPython.utils.traitlets import Unicode, Bool, CStr, Instance, Dict
39 from IPython.utils.traitlets import Unicode, Bool, CStr, Instance, Dict
40
40
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42 # Module errors
42 # Module errors
43 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
44
44
45 class ClusterDirError(Exception):
45 class ClusterDirError(Exception):
46 pass
46 pass
47
47
48
48
49 class PIDFileError(Exception):
49 class PIDFileError(Exception):
50 pass
50 pass
51
51
52
52
53 #-----------------------------------------------------------------------------
53 #-----------------------------------------------------------------------------
54 # Class for managing cluster directories
54 # Class for managing cluster directories
55 #-----------------------------------------------------------------------------
55 #-----------------------------------------------------------------------------
56
56
57 class ClusterDir(Configurable):
57 class ClusterDir(Configurable):
58 """An object to manage the cluster directory and its resources.
58 """An object to manage the cluster directory and its resources.
59
59
60 The cluster directory is used by :command:`ipengine`,
60 The cluster directory is used by :command:`ipengine`,
61 :command:`ipcontroller` and :command:`ipclsuter` to manage the
61 :command:`ipcontroller` and :command:`ipclsuter` to manage the
62 configuration, logging and security of these applications.
62 configuration, logging and security of these applications.
63
63
64 This object knows how to find, create and manage these directories. This
64 This object knows how to find, create and manage these directories. This
65 should be used by any code that want's to handle cluster directories.
65 should be used by any code that want's to handle cluster directories.
66 """
66 """
67
67
68 security_dir_name = Unicode('security')
68 security_dir_name = Unicode('security')
69 log_dir_name = Unicode('log')
69 log_dir_name = Unicode('log')
70 pid_dir_name = Unicode('pid')
70 pid_dir_name = Unicode('pid')
71 security_dir = Unicode(u'')
71 security_dir = Unicode(u'')
72 log_dir = Unicode(u'')
72 log_dir = Unicode(u'')
73 pid_dir = Unicode(u'')
73 pid_dir = Unicode(u'')
74
74
75 auto_create = Bool(False,
76 help="""Whether to automatically create the ClusterDirectory if it does
77 not exist""")
78 overwrite = Bool(False,
79 help="""Whether to overwrite existing config files""")
75 location = Unicode(u'', config=True,
80 location = Unicode(u'', config=True,
76 help="""Set the cluster dir. This overrides the logic used by the
81 help="""Set the cluster dir. This overrides the logic used by the
77 `profile` option.""",
82 `profile` option.""",
78 )
83 )
79 profile = Unicode(u'default',
84 profile = Unicode(u'default', config=True,
80 help="""The string name of the profile to be used. This determines the name
85 help="""The string name of the profile to be used. This determines the name
81 of the cluster dir as: cluster_<profile>. The default profile is named
86 of the cluster dir as: cluster_<profile>. The default profile is named
82 'default'. The cluster directory is resolve this way if the
87 'default'. The cluster directory is resolve this way if the
83 `cluster_dir` option is not used.""", config=True
88 `cluster_dir` option is not used."""
84 )
89 )
85 auto_create = Bool(False,
86 help="""Whether to automatically create the ClusterDirectory if it does
87 not exist""")
88 overwrite = Bool(False,
89 help="""Whether to overwrite existing config files""")
90
90
91 _location_isset = Bool(False) # flag for detecting multiply set location
91 _location_isset = Bool(False) # flag for detecting multiply set location
92 _new_dir = Bool(False) # flag for whether a new dir was created
92 _new_dir = Bool(False) # flag for whether a new dir was created
93
93
94 def __init__(self, **kwargs):
94 def __init__(self, **kwargs):
95 # make sure auto_create,overwrite are set *before* location
96 for name in ('auto_create', 'overwrite'):
97 v = kwargs.pop(name, None)
98 if v is not None:
99 setattr(self, name, v)
95 super(ClusterDir, self).__init__(**kwargs)
100 super(ClusterDir, self).__init__(**kwargs)
96 if not self.location:
101 if not self.location:
97 self._profile_changed('profile', 'default', self.profile)
102 self._profile_changed('profile', 'default', self.profile)
98
103
99 def _location_changed(self, name, old, new):
104 def _location_changed(self, name, old, new):
100 if self._location_isset:
105 if self._location_isset:
101 raise RuntimeError("Cannot set ClusterDir more than once.")
106 raise RuntimeError("Cannot set ClusterDir more than once.")
102 self._location_isset = True
107 self._location_isset = True
103 if not os.path.isdir(new):
108 if not os.path.isdir(new):
104 if self.auto_create:
109 if self.auto_create:# or self.config.ClusterDir.auto_create:
105 os.makedirs(new)
110 os.makedirs(new)
106 self._new_dir = True
111 self._new_dir = True
107 else:
112 else:
108 raise ClusterDirError('Directory not found: %s' % new)
113 raise ClusterDirError('Directory not found: %s' % new)
109
114
110 # ensure config files exist:
115 # ensure config files exist:
111 self.copy_all_config_files(overwrite=self.overwrite)
116 self.copy_all_config_files(overwrite=self.overwrite)
112 self.security_dir = os.path.join(new, self.security_dir_name)
117 self.security_dir = os.path.join(new, self.security_dir_name)
113 self.log_dir = os.path.join(new, self.log_dir_name)
118 self.log_dir = os.path.join(new, self.log_dir_name)
114 self.pid_dir = os.path.join(new, self.pid_dir_name)
119 self.pid_dir = os.path.join(new, self.pid_dir_name)
115 self.check_dirs()
120 self.check_dirs()
116
121
117 def _profile_changed(self, name, old, new):
122 def _profile_changed(self, name, old, new):
118 if self._location_isset:
123 if self._location_isset:
119 raise RuntimeError("ClusterDir already set. Cannot set by profile.")
124 raise RuntimeError("ClusterDir already set. Cannot set by profile.")
120 self.location = os.path.join(get_ipython_dir(), 'cluster_'+new)
125 self.location = os.path.join(get_ipython_dir(), 'cluster_'+new)
121
126
122 def _log_dir_changed(self, name, old, new):
127 def _log_dir_changed(self, name, old, new):
123 self.check_log_dir()
128 self.check_log_dir()
124
129
125 def check_log_dir(self):
130 def check_log_dir(self):
126 if not os.path.isdir(self.log_dir):
131 if not os.path.isdir(self.log_dir):
127 os.mkdir(self.log_dir)
132 os.mkdir(self.log_dir)
128
133
129 def _security_dir_changed(self, name, old, new):
134 def _security_dir_changed(self, name, old, new):
130 self.check_security_dir()
135 self.check_security_dir()
131
136
132 def check_security_dir(self):
137 def check_security_dir(self):
133 if not os.path.isdir(self.security_dir):
138 if not os.path.isdir(self.security_dir):
134 os.mkdir(self.security_dir, 0700)
139 os.mkdir(self.security_dir, 0700)
135 os.chmod(self.security_dir, 0700)
140 os.chmod(self.security_dir, 0700)
136
141
137 def _pid_dir_changed(self, name, old, new):
142 def _pid_dir_changed(self, name, old, new):
138 self.check_pid_dir()
143 self.check_pid_dir()
139
144
140 def check_pid_dir(self):
145 def check_pid_dir(self):
141 if not os.path.isdir(self.pid_dir):
146 if not os.path.isdir(self.pid_dir):
142 os.mkdir(self.pid_dir, 0700)
147 os.mkdir(self.pid_dir, 0700)
143 os.chmod(self.pid_dir, 0700)
148 os.chmod(self.pid_dir, 0700)
144
149
145 def check_dirs(self):
150 def check_dirs(self):
146 self.check_security_dir()
151 self.check_security_dir()
147 self.check_log_dir()
152 self.check_log_dir()
148 self.check_pid_dir()
153 self.check_pid_dir()
149
154
150 def copy_config_file(self, config_file, path=None, overwrite=False):
155 def copy_config_file(self, config_file, path=None, overwrite=False):
151 """Copy a default config file into the active cluster directory.
156 """Copy a default config file into the active cluster directory.
152
157
153 Default configuration files are kept in :mod:`IPython.config.default`.
158 Default configuration files are kept in :mod:`IPython.config.default`.
154 This function moves these from that location to the working cluster
159 This function moves these from that location to the working cluster
155 directory.
160 directory.
156 """
161 """
157 if path is None:
162 if path is None:
158 import IPython.config.default
163 import IPython.config.default
159 path = IPython.config.default.__file__.split(os.path.sep)[:-1]
164 path = IPython.config.default.__file__.split(os.path.sep)[:-1]
160 path = os.path.sep.join(path)
165 path = os.path.sep.join(path)
161 src = os.path.join(path, config_file)
166 src = os.path.join(path, config_file)
162 dst = os.path.join(self.location, config_file)
167 dst = os.path.join(self.location, config_file)
163 if not os.path.isfile(dst) or overwrite:
168 if not os.path.isfile(dst) or overwrite:
164 shutil.copy(src, dst)
169 shutil.copy(src, dst)
165
170
166 def copy_all_config_files(self, path=None, overwrite=False):
171 def copy_all_config_files(self, path=None, overwrite=False):
167 """Copy all config files into the active cluster directory."""
172 """Copy all config files into the active cluster directory."""
168 for f in [u'ipcontroller_config.py', u'ipengine_config.py',
173 for f in [u'ipcontroller_config.py', u'ipengine_config.py',
169 u'ipcluster_config.py']:
174 u'ipcluster_config.py']:
170 self.copy_config_file(f, path=path, overwrite=overwrite)
175 self.copy_config_file(f, path=path, overwrite=overwrite)
171
176
172 @classmethod
177 @classmethod
173 def create_cluster_dir(csl, cluster_dir):
178 def create_cluster_dir(csl, cluster_dir):
174 """Create a new cluster directory given a full path.
179 """Create a new cluster directory given a full path.
175
180
176 Parameters
181 Parameters
177 ----------
182 ----------
178 cluster_dir : str
183 cluster_dir : str
179 The full path to the cluster directory. If it does exist, it will
184 The full path to the cluster directory. If it does exist, it will
180 be used. If not, it will be created.
185 be used. If not, it will be created.
181 """
186 """
182 return ClusterDir(location=cluster_dir)
187 return ClusterDir(location=cluster_dir)
183
188
184 @classmethod
189 @classmethod
185 def create_cluster_dir_by_profile(cls, path, profile=u'default'):
190 def create_cluster_dir_by_profile(cls, path, profile=u'default'):
186 """Create a cluster dir by profile name and path.
191 """Create a cluster dir by profile name and path.
187
192
188 Parameters
193 Parameters
189 ----------
194 ----------
190 path : str
195 path : str
191 The path (directory) to put the cluster directory in.
196 The path (directory) to put the cluster directory in.
192 profile : str
197 profile : str
193 The name of the profile. The name of the cluster directory will
198 The name of the profile. The name of the cluster directory will
194 be "cluster_<profile>".
199 be "cluster_<profile>".
195 """
200 """
196 if not os.path.isdir(path):
201 if not os.path.isdir(path):
197 raise ClusterDirError('Directory not found: %s' % path)
202 raise ClusterDirError('Directory not found: %s' % path)
198 cluster_dir = os.path.join(path, u'cluster_' + profile)
203 cluster_dir = os.path.join(path, u'cluster_' + profile)
199 return ClusterDir(location=cluster_dir)
204 return ClusterDir(location=cluster_dir)
200
205
201 @classmethod
206 @classmethod
202 def find_cluster_dir_by_profile(cls, ipython_dir, profile=u'default'):
207 def find_cluster_dir_by_profile(cls, ipython_dir, profile=u'default'):
203 """Find an existing cluster dir by profile name, return its ClusterDir.
208 """Find an existing cluster dir by profile name, return its ClusterDir.
204
209
205 This searches through a sequence of paths for a cluster dir. If it
210 This searches through a sequence of paths for a cluster dir. If it
206 is not found, a :class:`ClusterDirError` exception will be raised.
211 is not found, a :class:`ClusterDirError` exception will be raised.
207
212
208 The search path algorithm is:
213 The search path algorithm is:
209 1. ``os.getcwd()``
214 1. ``os.getcwd()``
210 2. ``ipython_dir``
215 2. ``ipython_dir``
211 3. The directories found in the ":" separated
216 3. The directories found in the ":" separated
212 :env:`IPCLUSTER_DIR_PATH` environment variable.
217 :env:`IPCLUSTER_DIR_PATH` environment variable.
213
218
214 Parameters
219 Parameters
215 ----------
220 ----------
216 ipython_dir : unicode or str
221 ipython_dir : unicode or str
217 The IPython directory to use.
222 The IPython directory to use.
218 profile : unicode or str
223 profile : unicode or str
219 The name of the profile. The name of the cluster directory
224 The name of the profile. The name of the cluster directory
220 will be "cluster_<profile>".
225 will be "cluster_<profile>".
221 """
226 """
222 dirname = u'cluster_' + profile
227 dirname = u'cluster_' + profile
223 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
228 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
224 if cluster_dir_paths:
229 if cluster_dir_paths:
225 cluster_dir_paths = cluster_dir_paths.split(':')
230 cluster_dir_paths = cluster_dir_paths.split(':')
226 else:
231 else:
227 cluster_dir_paths = []
232 cluster_dir_paths = []
228 paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
233 paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
229 for p in paths:
234 for p in paths:
230 cluster_dir = os.path.join(p, dirname)
235 cluster_dir = os.path.join(p, dirname)
231 if os.path.isdir(cluster_dir):
236 if os.path.isdir(cluster_dir):
232 return ClusterDir(location=cluster_dir)
237 return ClusterDir(location=cluster_dir)
233 else:
238 else:
234 raise ClusterDirError('Cluster directory not found in paths: %s' % dirname)
239 raise ClusterDirError('Cluster directory not found in paths: %s' % dirname)
235
240
236 @classmethod
241 @classmethod
237 def find_cluster_dir(cls, cluster_dir):
242 def find_cluster_dir(cls, cluster_dir):
238 """Find/create a cluster dir and return its ClusterDir.
243 """Find/create a cluster dir and return its ClusterDir.
239
244
240 This will create the cluster directory if it doesn't exist.
245 This will create the cluster directory if it doesn't exist.
241
246
242 Parameters
247 Parameters
243 ----------
248 ----------
244 cluster_dir : unicode or str
249 cluster_dir : unicode or str
245 The path of the cluster directory. This is expanded using
250 The path of the cluster directory. This is expanded using
246 :func:`IPython.utils.genutils.expand_path`.
251 :func:`IPython.utils.genutils.expand_path`.
247 """
252 """
248 cluster_dir = expand_path(cluster_dir)
253 cluster_dir = expand_path(cluster_dir)
249 if not os.path.isdir(cluster_dir):
254 if not os.path.isdir(cluster_dir):
250 raise ClusterDirError('Cluster directory not found: %s' % cluster_dir)
255 raise ClusterDirError('Cluster directory not found: %s' % cluster_dir)
251 return ClusterDir(location=cluster_dir)
256 return ClusterDir(location=cluster_dir)
252
257
253
258
254 #-----------------------------------------------------------------------------
259 #-----------------------------------------------------------------------------
255 # Crash handler for this application
260 # Crash handler for this application
256 #-----------------------------------------------------------------------------
261 #-----------------------------------------------------------------------------
257
262
258
263
259 _message_template = """\
264 _message_template = """\
260 Oops, $self.app_name crashed. We do our best to make it stable, but...
265 Oops, $self.app_name crashed. We do our best to make it stable, but...
261
266
262 A crash report was automatically generated with the following information:
267 A crash report was automatically generated with the following information:
263 - A verbatim copy of the crash traceback.
268 - A verbatim copy of the crash traceback.
264 - Data on your current $self.app_name configuration.
269 - Data on your current $self.app_name configuration.
265
270
266 It was left in the file named:
271 It was left in the file named:
267 \t'$self.crash_report_fname'
272 \t'$self.crash_report_fname'
268 If you can email this file to the developers, the information in it will help
273 If you can email this file to the developers, the information in it will help
269 them in understanding and correcting the problem.
274 them in understanding and correcting the problem.
270
275
271 You can mail it to: $self.contact_name at $self.contact_email
276 You can mail it to: $self.contact_name at $self.contact_email
272 with the subject '$self.app_name Crash Report'.
277 with the subject '$self.app_name Crash Report'.
273
278
274 If you want to do it now, the following command will work (under Unix):
279 If you want to do it now, the following command will work (under Unix):
275 mail -s '$self.app_name Crash Report' $self.contact_email < $self.crash_report_fname
280 mail -s '$self.app_name Crash Report' $self.contact_email < $self.crash_report_fname
276
281
277 To ensure accurate tracking of this issue, please file a report about it at:
282 To ensure accurate tracking of this issue, please file a report about it at:
278 $self.bug_tracker
283 $self.bug_tracker
279 """
284 """
280
285
281 class ClusterDirCrashHandler(CrashHandler):
286 class ClusterDirCrashHandler(CrashHandler):
282 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
287 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
283
288
284 message_template = _message_template
289 message_template = _message_template
285
290
286 def __init__(self, app):
291 def __init__(self, app):
287 contact_name = release.authors['Min'][0]
292 contact_name = release.authors['Min'][0]
288 contact_email = release.authors['Min'][1]
293 contact_email = release.authors['Min'][1]
289 bug_tracker = 'http://github.com/ipython/ipython/issues'
294 bug_tracker = 'http://github.com/ipython/ipython/issues'
290 super(ClusterDirCrashHandler,self).__init__(
295 super(ClusterDirCrashHandler,self).__init__(
291 app, contact_name, contact_email, bug_tracker
296 app, contact_name, contact_email, bug_tracker
292 )
297 )
293
298
294
299
295 #-----------------------------------------------------------------------------
300 #-----------------------------------------------------------------------------
296 # Main application
301 # Main application
297 #-----------------------------------------------------------------------------
302 #-----------------------------------------------------------------------------
298 base_aliases = {
303 base_aliases = {
299 'profile' : "ClusterDir.profile",
304 'profile' : "ClusterDir.profile",
300 'cluster_dir' : 'ClusterDir.location',
305 'cluster_dir' : 'ClusterDir.location',
301 'auto_create' : 'ClusterDirApplication.auto_create',
306 'auto_create' : 'ClusterDirApplication.auto_create',
302 'log_level' : 'ClusterApplication.log_level',
307 'log_level' : 'ClusterApplication.log_level',
303 'work_dir' : 'ClusterApplication.work_dir',
308 'work_dir' : 'ClusterApplication.work_dir',
304 'log_to_file' : 'ClusterApplication.log_to_file',
309 'log_to_file' : 'ClusterApplication.log_to_file',
305 'clean_logs' : 'ClusterApplication.clean_logs',
310 'clean_logs' : 'ClusterApplication.clean_logs',
306 'log_url' : 'ClusterApplication.log_url',
311 'log_url' : 'ClusterApplication.log_url',
307 }
312 }
308
313
309 base_flags = {
314 base_flags = {
310 'debug' : ( {"ClusterApplication" : {"log_level" : logging.DEBUG}}, "set loglevel to DEBUG"),
315 'debug' : ( {"ClusterApplication" : {"log_level" : logging.DEBUG}}, "set loglevel to DEBUG"),
311 'quiet' : ( {"ClusterApplication" : {"log_level" : logging.CRITICAL}}, "set loglevel to CRITICAL (minimal output)"),
316 'quiet' : ( {"ClusterApplication" : {"log_level" : logging.CRITICAL}}, "set loglevel to CRITICAL (minimal output)"),
312 'log-to-file' : ( {"ClusterApplication" : {"log_to_file" : True}}, "redirect log output to a file"),
317 'log-to-file' : ( {"ClusterApplication" : {"log_to_file" : True}}, "redirect log output to a file"),
313 }
318 }
314 for k,v in base_flags.iteritems():
319 for k,v in base_flags.iteritems():
315 base_flags[k] = (Config(v[0]),v[1])
320 base_flags[k] = (Config(v[0]),v[1])
316
321
317 class ClusterApplication(BaseIPythonApplication):
322 class ClusterApplication(BaseIPythonApplication):
318 """An application that puts everything into a cluster directory.
323 """An application that puts everything into a cluster directory.
319
324
320 Instead of looking for things in the ipython_dir, this type of application
325 Instead of looking for things in the ipython_dir, this type of application
321 will use its own private directory called the "cluster directory"
326 will use its own private directory called the "cluster directory"
322 for things like config files, log files, etc.
327 for things like config files, log files, etc.
323
328
324 The cluster directory is resolved as follows:
329 The cluster directory is resolved as follows:
325
330
326 * If the ``--cluster-dir`` option is given, it is used.
331 * If the ``--cluster-dir`` option is given, it is used.
327 * If ``--cluster-dir`` is not given, the application directory is
332 * If ``--cluster-dir`` is not given, the application directory is
328 resolve using the profile name as ``cluster_<profile>``. The search
333 resolve using the profile name as ``cluster_<profile>``. The search
329 path for this directory is then i) cwd if it is found there
334 path for this directory is then i) cwd if it is found there
330 and ii) in ipython_dir otherwise.
335 and ii) in ipython_dir otherwise.
331
336
332 The config file for the application is to be put in the cluster
337 The config file for the application is to be put in the cluster
333 dir and named the value of the ``config_file_name`` class attribute.
338 dir and named the value of the ``config_file_name`` class attribute.
334 """
339 """
335
340
336 crash_handler_class = ClusterDirCrashHandler
341 crash_handler_class = ClusterDirCrashHandler
337 auto_create_cluster_dir = Bool(True, config=True,
342 auto_create_cluster_dir = Bool(True, config=True,
338 help="whether to create the cluster_dir if it doesn't exist")
343 help="whether to create the cluster_dir if it doesn't exist")
339 cluster_dir = Instance(ClusterDir)
344 cluster_dir = Instance(ClusterDir)
340 classes = [ClusterDir]
345 classes = [ClusterDir]
341
346
342 def _log_level_default(self):
347 def _log_level_default(self):
343 # temporarily override default_log_level to INFO
348 # temporarily override default_log_level to INFO
344 return logging.INFO
349 return logging.INFO
345
350
346 work_dir = Unicode(os.getcwdu(), config=True,
351 work_dir = Unicode(os.getcwdu(), config=True,
347 help='Set the working dir for the process.'
352 help='Set the working dir for the process.'
348 )
353 )
349 def _work_dir_changed(self, name, old, new):
354 def _work_dir_changed(self, name, old, new):
350 self.work_dir = unicode(expand_path(new))
355 self.work_dir = unicode(expand_path(new))
351
356
352 log_to_file = Bool(config=True,
357 log_to_file = Bool(config=True,
353 help="whether to log to a file")
358 help="whether to log to a file")
354
359
355 clean_logs = Bool(False, shortname='--clean-logs', config=True,
360 clean_logs = Bool(False, shortname='--clean-logs', config=True,
356 help="whether to cleanup old logfiles before starting")
361 help="whether to cleanup old logfiles before starting")
357
362
358 log_url = CStr('', shortname='--log-url', config=True,
363 log_url = CStr('', shortname='--log-url', config=True,
359 help="The ZMQ URL of the iplooger to aggregate logging.")
364 help="The ZMQ URL of the iplooger to aggregate logging.")
360
365
361 config_file = Unicode(u'', config=True,
366 config_file = Unicode(u'', config=True,
362 help="""Path to ipcontroller configuration file. The default is to use
367 help="""Path to ipcontroller configuration file. The default is to use
363 <appname>_config.py, as found by cluster-dir."""
368 <appname>_config.py, as found by cluster-dir."""
364 )
369 )
365
370
366 loop = Instance('zmq.eventloop.ioloop.IOLoop')
371 loop = Instance('zmq.eventloop.ioloop.IOLoop')
367 def _loop_default(self):
372 def _loop_default(self):
368 from zmq.eventloop.ioloop import IOLoop
373 from zmq.eventloop.ioloop import IOLoop
369 return IOLoop.instance()
374 return IOLoop.instance()
370
375
371 aliases = Dict(base_aliases)
376 aliases = Dict(base_aliases)
372 flags = Dict(base_flags)
377 flags = Dict(base_flags)
373
378
374 def init_clusterdir(self):
379 def init_clusterdir(self):
375 """This resolves the cluster directory.
380 """This resolves the cluster directory.
376
381
377 This tries to find the cluster directory and if successful, it will
382 This tries to find the cluster directory and if successful, it will
378 have done:
383 have done:
379 * Sets ``self.cluster_dir_obj`` to the :class:`ClusterDir` object for
384 * Sets ``self.cluster_dir_obj`` to the :class:`ClusterDir` object for
380 the application.
385 the application.
381 * Sets ``self.cluster_dir`` attribute of the application and config
386 * Sets ``self.cluster_dir`` attribute of the application and config
382 objects.
387 objects.
383
388
384 The algorithm used for this is as follows:
389 The algorithm used for this is as follows:
385 1. Try ``Global.cluster_dir``.
390 1. Try ``Global.cluster_dir``.
386 2. Try using ``Global.profile``.
391 2. Try using ``Global.profile``.
387 3. If both of these fail and ``self.auto_create_cluster_dir`` is
392 3. If both of these fail and ``self.auto_create_cluster_dir`` is
388 ``True``, then create the new cluster dir in the IPython directory.
393 ``True``, then create the new cluster dir in the IPython directory.
389 4. If all fails, then raise :class:`ClusterDirError`.
394 4. If all fails, then raise :class:`ClusterDirError`.
390 """
395 """
391 self.cluster_dir = ClusterDir(config=self.config, auto_create=self.auto_create_cluster_dir)
396 try:
397 self.cluster_dir = ClusterDir(auto_create=self.auto_create_cluster_dir, config=self.config)
398 except ClusterDirError as e:
399 self.log.fatal("Error initializing cluster dir: %s"%e)
400 self.log.fatal("A cluster dir must be created before running this command.")
401 self.log.fatal("Do 'ipcluster create -h' or 'ipcluster list -h' for more "
402 "information about creating and listing cluster dirs."
403 )
404 self.exit(1)
405
392 if self.cluster_dir._new_dir:
406 if self.cluster_dir._new_dir:
393 self.log.info('Creating new cluster dir: %s' % \
407 self.log.info('Creating new cluster dir: %s' % \
394 self.cluster_dir.location)
408 self.cluster_dir.location)
395 else:
409 else:
396 self.log.info('Using existing cluster dir: %s' % \
410 self.log.info('Using existing cluster dir: %s' % \
397 self.cluster_dir.location)
411 self.cluster_dir.location)
398
412
399 def initialize(self, argv=None):
413 def initialize(self, argv=None):
400 """initialize the app"""
414 """initialize the app"""
415 self.init_crash_handler()
401 self.parse_command_line(argv)
416 self.parse_command_line(argv)
402 cl_config = self.config
417 cl_config = self.config
403 self.init_clusterdir()
418 self.init_clusterdir()
404 if self.config_file:
419 if self.config_file:
405 self.load_config_file(self.config_file)
420 self.load_config_file(self.config_file)
406 else:
421 else:
407 self.load_config_file(self.default_config_file_name, path=self.cluster_dir.location)
422 self.load_config_file(self.default_config_file_name, path=self.cluster_dir.location)
408 # command-line should *override* config file, but command-line is necessary
423 # command-line should *override* config file, but command-line is necessary
409 # to determine clusterdir, etc.
424 # to determine clusterdir, etc.
410 self.update_config(cl_config)
425 self.update_config(cl_config)
411 self.reinit_logging()
426 self.reinit_logging()
412
427
413 self.to_work_dir()
428 self.to_work_dir()
414
429
415 def to_work_dir(self):
430 def to_work_dir(self):
416 wd = self.work_dir
431 wd = self.work_dir
417 if unicode(wd) != os.getcwdu():
432 if unicode(wd) != os.getcwdu():
418 os.chdir(wd)
433 os.chdir(wd)
419 self.log.info("Changing to working dir: %s" % wd)
434 self.log.info("Changing to working dir: %s" % wd)
420
435
421 def load_config_file(self, filename, path=None):
436 def load_config_file(self, filename, path=None):
422 """Load a .py based config file by filename and path."""
437 """Load a .py based config file by filename and path."""
423 # use config.application.Application.load_config
438 # use config.application.Application.load_config
424 # instead of inflexible
439 # instead of inflexible core.newapplication.BaseIPythonApplication.load_config
425 # core.newapplication.BaseIPythonApplication.load_config
426 return Application.load_config_file(self, filename, path=path)
440 return Application.load_config_file(self, filename, path=path)
427 #
441 #
428 # def load_default_config_file(self):
442 # def load_default_config_file(self):
429 # """Load a .py based config file by filename and path."""
443 # """Load a .py based config file by filename and path."""
430 # return BaseIPythonApplication.load_config_file(self)
444 # return BaseIPythonApplication.load_config_file(self)
431
445
432 # disable URL-logging
446 # disable URL-logging
433 def reinit_logging(self):
447 def reinit_logging(self):
434 # Remove old log files
448 # Remove old log files
435 log_dir = self.cluster_dir.log_dir
449 log_dir = self.cluster_dir.log_dir
436 if self.clean_logs:
450 if self.clean_logs:
437 for f in os.listdir(log_dir):
451 for f in os.listdir(log_dir):
438 if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
452 if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
439 os.remove(os.path.join(log_dir, f))
453 os.remove(os.path.join(log_dir, f))
440 if self.log_to_file:
454 if self.log_to_file:
441 # Start logging to the new log file
455 # Start logging to the new log file
442 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
456 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
443 logfile = os.path.join(log_dir, log_filename)
457 logfile = os.path.join(log_dir, log_filename)
444 open_log_file = open(logfile, 'w')
458 open_log_file = open(logfile, 'w')
445 else:
459 else:
446 open_log_file = None
460 open_log_file = None
447 if open_log_file is not None:
461 if open_log_file is not None:
448 self.log.removeHandler(self._log_handler)
462 self.log.removeHandler(self._log_handler)
449 self._log_handler = logging.StreamHandler(open_log_file)
463 self._log_handler = logging.StreamHandler(open_log_file)
450 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
464 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
451 self._log_handler.setFormatter(self._log_formatter)
465 self._log_handler.setFormatter(self._log_formatter)
452 self.log.addHandler(self._log_handler)
466 self.log.addHandler(self._log_handler)
453
467
454 def write_pid_file(self, overwrite=False):
468 def write_pid_file(self, overwrite=False):
455 """Create a .pid file in the pid_dir with my pid.
469 """Create a .pid file in the pid_dir with my pid.
456
470
457 This must be called after pre_construct, which sets `self.pid_dir`.
471 This must be called after pre_construct, which sets `self.pid_dir`.
458 This raises :exc:`PIDFileError` if the pid file exists already.
472 This raises :exc:`PIDFileError` if the pid file exists already.
459 """
473 """
460 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
474 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
461 if os.path.isfile(pid_file):
475 if os.path.isfile(pid_file):
462 pid = self.get_pid_from_file()
476 pid = self.get_pid_from_file()
463 if not overwrite:
477 if not overwrite:
464 raise PIDFileError(
478 raise PIDFileError(
465 'The pid file [%s] already exists. \nThis could mean that this '
479 'The pid file [%s] already exists. \nThis could mean that this '
466 'server is already running with [pid=%s].' % (pid_file, pid)
480 'server is already running with [pid=%s].' % (pid_file, pid)
467 )
481 )
468 with open(pid_file, 'w') as f:
482 with open(pid_file, 'w') as f:
469 self.log.info("Creating pid file: %s" % pid_file)
483 self.log.info("Creating pid file: %s" % pid_file)
470 f.write(repr(os.getpid())+'\n')
484 f.write(repr(os.getpid())+'\n')
471
485
472 def remove_pid_file(self):
486 def remove_pid_file(self):
473 """Remove the pid file.
487 """Remove the pid file.
474
488
475 This should be called at shutdown by registering a callback with
489 This should be called at shutdown by registering a callback with
476 :func:`reactor.addSystemEventTrigger`. This needs to return
490 :func:`reactor.addSystemEventTrigger`. This needs to return
477 ``None``.
491 ``None``.
478 """
492 """
479 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
493 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
480 if os.path.isfile(pid_file):
494 if os.path.isfile(pid_file):
481 try:
495 try:
482 self.log.info("Removing pid file: %s" % pid_file)
496 self.log.info("Removing pid file: %s" % pid_file)
483 os.remove(pid_file)
497 os.remove(pid_file)
484 except:
498 except:
485 self.log.warn("Error removing the pid file: %s" % pid_file)
499 self.log.warn("Error removing the pid file: %s" % pid_file)
486
500
487 def get_pid_from_file(self):
501 def get_pid_from_file(self):
488 """Get the pid from the pid file.
502 """Get the pid from the pid file.
489
503
490 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
504 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
491 """
505 """
492 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
506 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
493 if os.path.isfile(pid_file):
507 if os.path.isfile(pid_file):
494 with open(pid_file, 'r') as f:
508 with open(pid_file, 'r') as f:
495 pid = int(f.read().strip())
509 pid = int(f.read().strip())
496 return pid
510 return pid
497 else:
511 else:
498 raise PIDFileError('pid file not found: %s' % pid_file)
512 raise PIDFileError('pid file not found: %s' % pid_file)
499
513
500 def check_pid(self, pid):
514 def check_pid(self, pid):
501 if os.name == 'nt':
515 if os.name == 'nt':
502 try:
516 try:
503 import ctypes
517 import ctypes
504 # returns 0 if no such process (of ours) exists
518 # returns 0 if no such process (of ours) exists
505 # positive int otherwise
519 # positive int otherwise
506 p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
520 p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
507 except Exception:
521 except Exception:
508 self.log.warn(
522 self.log.warn(
509 "Could not determine whether pid %i is running via `OpenProcess`. "
523 "Could not determine whether pid %i is running via `OpenProcess`. "
510 " Making the likely assumption that it is."%pid
524 " Making the likely assumption that it is."%pid
511 )
525 )
512 return True
526 return True
513 return bool(p)
527 return bool(p)
514 else:
528 else:
515 try:
529 try:
516 p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
530 p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
517 output,_ = p.communicate()
531 output,_ = p.communicate()
518 except OSError:
532 except OSError:
519 self.log.warn(
533 self.log.warn(
520 "Could not determine whether pid %i is running via `ps x`. "
534 "Could not determine whether pid %i is running via `ps x`. "
521 " Making the likely assumption that it is."%pid
535 " Making the likely assumption that it is."%pid
522 )
536 )
523 return True
537 return True
524 pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
538 pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
525 return pid in pids
539 return pid in pids
@@ -1,542 +1,537 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The ipcluster application.
4 The ipcluster application.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import errno
18 import errno
19 import logging
19 import logging
20 import os
20 import os
21 import re
21 import re
22 import signal
22 import signal
23
23
24 from subprocess import check_call, CalledProcessError, PIPE
24 from subprocess import check_call, CalledProcessError, PIPE
25 import zmq
25 import zmq
26 from zmq.eventloop import ioloop
26 from zmq.eventloop import ioloop
27
27
28 from IPython.config.application import Application, boolean_flag
28 from IPython.config.application import Application, boolean_flag
29 from IPython.config.loader import Config
29 from IPython.config.loader import Config
30 from IPython.core.newapplication import BaseIPythonApplication
30 from IPython.core.newapplication import BaseIPythonApplication
31 from IPython.utils.importstring import import_item
31 from IPython.utils.importstring import import_item
32 from IPython.utils.traitlets import Int, CStr, CUnicode, Str, Bool, CFloat, Dict, List
32 from IPython.utils.traitlets import Int, CStr, CUnicode, Str, Bool, CFloat, Dict, List
33
33
34 from IPython.parallel.apps.clusterdir import (
34 from IPython.parallel.apps.clusterdir import (
35 ClusterApplication, ClusterDirError, ClusterDir,
35 ClusterApplication, ClusterDirError, ClusterDir,
36 PIDFileError,
36 PIDFileError,
37 base_flags, base_aliases
37 base_flags, base_aliases
38 )
38 )
39
39
40
40
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42 # Module level variables
42 # Module level variables
43 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
44
44
45
45
46 default_config_file_name = u'ipcluster_config.py'
46 default_config_file_name = u'ipcluster_config.py'
47
47
48
48
49 _description = """\
49 _description = """\
50 Start an IPython cluster for parallel computing.\n\n
50 Start an IPython cluster for parallel computing.\n\n
51
51
52 An IPython cluster consists of 1 controller and 1 or more engines.
52 An IPython cluster consists of 1 controller and 1 or more engines.
53 This command automates the startup of these processes using a wide
53 This command automates the startup of these processes using a wide
54 range of startup methods (SSH, local processes, PBS, mpiexec,
54 range of startup methods (SSH, local processes, PBS, mpiexec,
55 Windows HPC Server 2008). To start a cluster with 4 engines on your
55 Windows HPC Server 2008). To start a cluster with 4 engines on your
56 local host simply do 'ipcluster start n=4'. For more complex usage
56 local host simply do 'ipcluster start n=4'. For more complex usage
57 you will typically do 'ipcluster create profile=mycluster', then edit
57 you will typically do 'ipcluster create profile=mycluster', then edit
58 configuration files, followed by 'ipcluster start profile=mycluster n=4'.
58 configuration files, followed by 'ipcluster start profile=mycluster n=4'.
59 """
59 """
60
60
61
61
62 # Exit codes for ipcluster
62 # Exit codes for ipcluster
63
63
64 # This will be the exit code if the ipcluster appears to be running because
64 # This will be the exit code if the ipcluster appears to be running because
65 # a .pid file exists
65 # a .pid file exists
66 ALREADY_STARTED = 10
66 ALREADY_STARTED = 10
67
67
68
68
69 # This will be the exit code if ipcluster stop is run, but there is not .pid
69 # This will be the exit code if ipcluster stop is run, but there is not .pid
70 # file to be found.
70 # file to be found.
71 ALREADY_STOPPED = 11
71 ALREADY_STOPPED = 11
72
72
73 # This will be the exit code if ipcluster engines is run, but there is not .pid
73 # This will be the exit code if ipcluster engines is run, but there is not .pid
74 # file to be found.
74 # file to be found.
75 NO_CLUSTER = 12
75 NO_CLUSTER = 12
76
76
77
77
78 #-----------------------------------------------------------------------------
78 #-----------------------------------------------------------------------------
79 # Main application
79 # Main application
80 #-----------------------------------------------------------------------------
80 #-----------------------------------------------------------------------------
81 start_help = """
81 start_help = """
82 Start an ipython cluster by its profile name or cluster
82 Start an ipython cluster by its profile name or cluster
83 directory. Cluster directories contain configuration, log and
83 directory. Cluster directories contain configuration, log and
84 security related files and are named using the convention
84 security related files and are named using the convention
85 'cluster_<profile>' and should be creating using the 'start'
85 'cluster_<profile>' and should be creating using the 'start'
86 subcommand of 'ipcluster'. If your cluster directory is in
86 subcommand of 'ipcluster'. If your cluster directory is in
87 the cwd or the ipython directory, you can simply refer to it
87 the cwd or the ipython directory, you can simply refer to it
88 using its profile name, 'ipcluster start n=4 profile=<profile>`,
88 using its profile name, 'ipcluster start n=4 profile=<profile>`,
89 otherwise use the 'cluster_dir' option.
89 otherwise use the 'cluster_dir' option.
90 """
90 """
91 stop_help = """
91 stop_help = """
92 Stop a running ipython cluster by its profile name or cluster
92 Stop a running ipython cluster by its profile name or cluster
93 directory. Cluster directories are named using the convention
93 directory. Cluster directories are named using the convention
94 'cluster_<profile>'. If your cluster directory is in
94 'cluster_<profile>'. If your cluster directory is in
95 the cwd or the ipython directory, you can simply refer to it
95 the cwd or the ipython directory, you can simply refer to it
96 using its profile name, 'ipcluster stop profile=<profile>`, otherwise
96 using its profile name, 'ipcluster stop profile=<profile>`, otherwise
97 use the 'cluster_dir' option.
97 use the 'cluster_dir' option.
98 """
98 """
99 engines_help = """
99 engines_help = """
100 Start one or more engines to connect to an existing Cluster
100 Start one or more engines to connect to an existing Cluster
101 by profile name or cluster directory.
101 by profile name or cluster directory.
102 Cluster directories contain configuration, log and
102 Cluster directories contain configuration, log and
103 security related files and are named using the convention
103 security related files and are named using the convention
104 'cluster_<profile>' and should be creating using the 'start'
104 'cluster_<profile>' and should be creating using the 'start'
105 subcommand of 'ipcluster'. If your cluster directory is in
105 subcommand of 'ipcluster'. If your cluster directory is in
106 the cwd or the ipython directory, you can simply refer to it
106 the cwd or the ipython directory, you can simply refer to it
107 using its profile name, 'ipcluster engines n=4 profile=<profile>`,
107 using its profile name, 'ipcluster engines n=4 profile=<profile>`,
108 otherwise use the 'cluster_dir' option.
108 otherwise use the 'cluster_dir' option.
109 """
109 """
110 create_help = """
110 create_help = """
111 Create an ipython cluster directory by its profile name or
111 Create an ipython cluster directory by its profile name or
112 cluster directory path. Cluster directories contain
112 cluster directory path. Cluster directories contain
113 configuration, log and security related files and are named
113 configuration, log and security related files and are named
114 using the convention 'cluster_<profile>'. By default they are
114 using the convention 'cluster_<profile>'. By default they are
115 located in your ipython directory. Once created, you will
115 located in your ipython directory. Once created, you will
116 probably need to edit the configuration files in the cluster
116 probably need to edit the configuration files in the cluster
117 directory to configure your cluster. Most users will create a
117 directory to configure your cluster. Most users will create a
118 cluster directory by profile name,
118 cluster directory by profile name,
119 `ipcluster create profile=mycluster`, which will put the directory
119 `ipcluster create profile=mycluster`, which will put the directory
120 in `<ipython_dir>/cluster_mycluster`.
120 in `<ipython_dir>/cluster_mycluster`.
121 """
121 """
122 list_help = """List all available clusters, by cluster directory, that can
122 list_help = """List all available clusters, by cluster directory, that can
123 be found in the current working directly or in the ipython
123 be found in the current working directly or in the ipython
124 directory. Cluster directories are named using the convention
124 directory. Cluster directories are named using the convention
125 'cluster_<profile>'.
125 'cluster_<profile>'.
126 """
126 """
127
127
128
128
129 class IPClusterList(BaseIPythonApplication):
129 class IPClusterList(BaseIPythonApplication):
130 name = u'ipcluster-list'
130 name = u'ipcluster-list'
131 description = list_help
131 description = list_help
132
132
133 # empty aliases
133 # empty aliases
134 aliases=Dict()
134 aliases=Dict()
135 flags = Dict(base_flags)
135 flags = Dict(base_flags)
136
136
137 def _log_level_default(self):
137 def _log_level_default(self):
138 return 20
138 return 20
139
139
140 def list_cluster_dirs(self):
140 def list_cluster_dirs(self):
141 # Find the search paths
141 # Find the search paths
142 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
142 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
143 if cluster_dir_paths:
143 if cluster_dir_paths:
144 cluster_dir_paths = cluster_dir_paths.split(':')
144 cluster_dir_paths = cluster_dir_paths.split(':')
145 else:
145 else:
146 cluster_dir_paths = []
146 cluster_dir_paths = []
147
147
148 ipython_dir = self.ipython_dir
148 ipython_dir = self.ipython_dir
149
149
150 paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
150 paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
151 paths = list(set(paths))
151 paths = list(set(paths))
152
152
153 self.log.info('Searching for cluster dirs in paths: %r' % paths)
153 self.log.info('Searching for cluster dirs in paths: %r' % paths)
154 for path in paths:
154 for path in paths:
155 files = os.listdir(path)
155 files = os.listdir(path)
156 for f in files:
156 for f in files:
157 full_path = os.path.join(path, f)
157 full_path = os.path.join(path, f)
158 if os.path.isdir(full_path) and f.startswith('cluster_'):
158 if os.path.isdir(full_path) and f.startswith('cluster_'):
159 profile = full_path.split('_')[-1]
159 profile = full_path.split('_')[-1]
160 start_cmd = 'ipcluster start profile=%s n=4' % profile
160 start_cmd = 'ipcluster start profile=%s n=4' % profile
161 print start_cmd + " ==> " + full_path
161 print start_cmd + " ==> " + full_path
162
162
163 def start(self):
163 def start(self):
164 self.list_cluster_dirs()
164 self.list_cluster_dirs()
165
165
166 create_flags = {}
166 create_flags = {}
167 create_flags.update(base_flags)
167 create_flags.update(base_flags)
168 create_flags.update(boolean_flag('reset', 'IPClusterCreate.reset',
168 create_flags.update(boolean_flag('reset', 'IPClusterCreate.reset',
169 "reset config files to defaults", "leave existing config files"))
169 "reset config files to defaults", "leave existing config files"))
170
170
171 class IPClusterCreate(ClusterApplication):
171 class IPClusterCreate(ClusterApplication):
172 name = u'ipcluster'
172 name = u'ipcluster'
173 description = create_help
173 description = create_help
174 auto_create_cluster_dir = Bool(True,
174 auto_create_cluster_dir = Bool(True,
175 help="whether to create the cluster_dir if it doesn't exist")
175 help="whether to create the cluster_dir if it doesn't exist")
176 default_config_file_name = default_config_file_name
176 default_config_file_name = default_config_file_name
177
177
178 reset = Bool(False, config=True,
178 reset = Bool(False, config=True,
179 help="Whether to reset config files as part of 'create'."
179 help="Whether to reset config files as part of 'create'."
180 )
180 )
181
181
182 flags = Dict(create_flags)
182 flags = Dict(create_flags)
183
183
184 aliases = Dict(dict(profile='ClusterDir.profile'))
184 aliases = Dict(dict(profile='ClusterDir.profile'))
185
185
186 classes = [ClusterDir]
186 classes = [ClusterDir]
187
187
188 def init_clusterdir(self):
188 def init_clusterdir(self):
189 super(IPClusterCreate, self).init_clusterdir()
189 super(IPClusterCreate, self).init_clusterdir()
190 self.log.info('Copying default config files to cluster directory '
190 self.log.info('Copying default config files to cluster directory '
191 '[overwrite=%r]' % (self.reset,))
191 '[overwrite=%r]' % (self.reset,))
192 self.cluster_dir.copy_all_config_files(overwrite=self.reset)
192 self.cluster_dir.copy_all_config_files(overwrite=self.reset)
193
193
194 def initialize(self, argv=None):
194 def initialize(self, argv=None):
195 self.parse_command_line(argv)
195 self.parse_command_line(argv)
196 self.init_clusterdir()
196 self.init_clusterdir()
197
197
198 stop_aliases = dict(
198 stop_aliases = dict(
199 signal='IPClusterStop.signal',
199 signal='IPClusterStop.signal',
200 profile='ClusterDir.profile',
200 profile='ClusterDir.profile',
201 cluster_dir='ClusterDir.location',
201 cluster_dir='ClusterDir.location',
202 )
202 )
203
203
204 class IPClusterStop(ClusterApplication):
204 class IPClusterStop(ClusterApplication):
205 name = u'ipcluster'
205 name = u'ipcluster'
206 description = stop_help
206 description = stop_help
207 auto_create_cluster_dir = Bool(False,
207 auto_create_cluster_dir = Bool(False)
208 help="whether to create the cluster_dir if it doesn't exist")
209 default_config_file_name = default_config_file_name
208 default_config_file_name = default_config_file_name
210
209
211 signal = Int(signal.SIGINT, config=True,
210 signal = Int(signal.SIGINT, config=True,
212 help="signal to use for stopping processes.")
211 help="signal to use for stopping processes.")
213
212
214 aliases = Dict(stop_aliases)
213 aliases = Dict(stop_aliases)
215
214
215 def init_clusterdir(self):
216 try:
217 super(IPClusterStop, self).init_clusterdir()
218 except ClusterDirError as e:
219 self.log.fatal("Failed ClusterDir init: %s"%e)
220 self.exit(1)
221
216 def start(self):
222 def start(self):
217 """Start the app for the stop subcommand."""
223 """Start the app for the stop subcommand."""
218 try:
224 try:
219 pid = self.get_pid_from_file()
225 pid = self.get_pid_from_file()
220 except PIDFileError:
226 except PIDFileError:
221 self.log.critical(
227 self.log.critical(
222 'Could not read pid file, cluster is probably not running.'
228 'Could not read pid file, cluster is probably not running.'
223 )
229 )
224 # Here I exit with a unusual exit status that other processes
230 # Here I exit with a unusual exit status that other processes
225 # can watch for to learn how I existed.
231 # can watch for to learn how I existed.
226 self.remove_pid_file()
232 self.remove_pid_file()
227 self.exit(ALREADY_STOPPED)
233 self.exit(ALREADY_STOPPED)
228
234
229 if not self.check_pid(pid):
235 if not self.check_pid(pid):
230 self.log.critical(
236 self.log.critical(
231 'Cluster [pid=%r] is not running.' % pid
237 'Cluster [pid=%r] is not running.' % pid
232 )
238 )
233 self.remove_pid_file()
239 self.remove_pid_file()
234 # Here I exit with a unusual exit status that other processes
240 # Here I exit with a unusual exit status that other processes
235 # can watch for to learn how I existed.
241 # can watch for to learn how I existed.
236 self.exit(ALREADY_STOPPED)
242 self.exit(ALREADY_STOPPED)
237
243
238 elif os.name=='posix':
244 elif os.name=='posix':
239 sig = self.signal
245 sig = self.signal
240 self.log.info(
246 self.log.info(
241 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
247 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
242 )
248 )
243 try:
249 try:
244 os.kill(pid, sig)
250 os.kill(pid, sig)
245 except OSError:
251 except OSError:
246 self.log.error("Stopping cluster failed, assuming already dead.",
252 self.log.error("Stopping cluster failed, assuming already dead.",
247 exc_info=True)
253 exc_info=True)
248 self.remove_pid_file()
254 self.remove_pid_file()
249 elif os.name=='nt':
255 elif os.name=='nt':
250 try:
256 try:
251 # kill the whole tree
257 # kill the whole tree
252 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
258 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
253 except (CalledProcessError, OSError):
259 except (CalledProcessError, OSError):
254 self.log.error("Stopping cluster failed, assuming already dead.",
260 self.log.error("Stopping cluster failed, assuming already dead.",
255 exc_info=True)
261 exc_info=True)
256 self.remove_pid_file()
262 self.remove_pid_file()
257
263
258 engine_aliases = {}
264 engine_aliases = {}
259 engine_aliases.update(base_aliases)
265 engine_aliases.update(base_aliases)
260 engine_aliases.update(dict(
266 engine_aliases.update(dict(
261 n='IPClusterEngines.n',
267 n='IPClusterEngines.n',
262 elauncher = 'IPClusterEngines.engine_launcher_class',
268 elauncher = 'IPClusterEngines.engine_launcher_class',
263 ))
269 ))
264 class IPClusterEngines(ClusterApplication):
270 class IPClusterEngines(ClusterApplication):
265
271
266 name = u'ipcluster'
272 name = u'ipcluster'
267 description = engines_help
273 description = engines_help
268 usage = None
274 usage = None
269 default_config_file_name = default_config_file_name
275 default_config_file_name = default_config_file_name
270 default_log_level = logging.INFO
276 default_log_level = logging.INFO
271 auto_create_cluster_dir = Bool(False)
277 auto_create_cluster_dir = Bool(False)
272 classes = List()
278 classes = List()
273 def _classes_default(self):
279 def _classes_default(self):
274 from IPython.parallel.apps import launcher
280 from IPython.parallel.apps import launcher
275 launchers = launcher.all_launchers
281 launchers = launcher.all_launchers
276 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
282 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
277 return [ClusterDir]+eslaunchers
283 return [ClusterDir]+eslaunchers
278
284
279 n = Int(2, config=True,
285 n = Int(2, config=True,
280 help="The number of engines to start.")
286 help="The number of engines to start.")
281
287
282 engine_launcher_class = Str('LocalEngineSetLauncher',
288 engine_launcher_class = Str('LocalEngineSetLauncher',
283 config=True,
289 config=True,
284 help="The class for launching a set of Engines."
290 help="The class for launching a set of Engines."
285 )
291 )
286 daemonize = Bool(False, config=True,
292 daemonize = Bool(False, config=True,
287 help='Daemonize the ipcluster program. This implies --log-to-file')
293 help='Daemonize the ipcluster program. This implies --log-to-file')
288
294
289 def _daemonize_changed(self, name, old, new):
295 def _daemonize_changed(self, name, old, new):
290 if new:
296 if new:
291 self.log_to_file = True
297 self.log_to_file = True
292
298
293 aliases = Dict(engine_aliases)
299 aliases = Dict(engine_aliases)
294 # flags = Dict(flags)
300 # flags = Dict(flags)
295 _stopping = False
301 _stopping = False
296
302
297 def initialize(self, argv=None):
303 def initialize(self, argv=None):
298 super(IPClusterEngines, self).initialize(argv)
304 super(IPClusterEngines, self).initialize(argv)
299 self.init_signal()
305 self.init_signal()
300 self.init_launchers()
306 self.init_launchers()
301
307
302 def init_launchers(self):
308 def init_launchers(self):
303 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
309 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
304 self.engine_launcher.on_stop(lambda r: self.loop.stop())
310 self.engine_launcher.on_stop(lambda r: self.loop.stop())
305
311
306 def init_signal(self):
312 def init_signal(self):
307 # Setup signals
313 # Setup signals
308 signal.signal(signal.SIGINT, self.sigint_handler)
314 signal.signal(signal.SIGINT, self.sigint_handler)
309
315
310 def build_launcher(self, clsname):
316 def build_launcher(self, clsname):
311 """import and instantiate a Launcher based on importstring"""
317 """import and instantiate a Launcher based on importstring"""
312 if '.' not in clsname:
318 if '.' not in clsname:
313 # not a module, presume it's the raw name in apps.launcher
319 # not a module, presume it's the raw name in apps.launcher
314 clsname = 'IPython.parallel.apps.launcher.'+clsname
320 clsname = 'IPython.parallel.apps.launcher.'+clsname
315 # print repr(clsname)
321 # print repr(clsname)
316 klass = import_item(clsname)
322 klass = import_item(clsname)
317
323
318 launcher = klass(
324 launcher = klass(
319 work_dir=self.cluster_dir.location, config=self.config, logname=self.log.name
325 work_dir=self.cluster_dir.location, config=self.config, logname=self.log.name
320 )
326 )
321 return launcher
327 return launcher
322
328
323 def start_engines(self):
329 def start_engines(self):
324 self.log.info("Starting %i engines"%self.n)
330 self.log.info("Starting %i engines"%self.n)
325 self.engine_launcher.start(
331 self.engine_launcher.start(
326 self.n,
332 self.n,
327 cluster_dir=self.cluster_dir.location
333 cluster_dir=self.cluster_dir.location
328 )
334 )
329
335
330 def stop_engines(self):
336 def stop_engines(self):
331 self.log.info("Stopping Engines...")
337 self.log.info("Stopping Engines...")
332 if self.engine_launcher.running:
338 if self.engine_launcher.running:
333 d = self.engine_launcher.stop()
339 d = self.engine_launcher.stop()
334 return d
340 return d
335 else:
341 else:
336 return None
342 return None
337
343
338 def stop_launchers(self, r=None):
344 def stop_launchers(self, r=None):
339 if not self._stopping:
345 if not self._stopping:
340 self._stopping = True
346 self._stopping = True
341 self.log.error("IPython cluster: stopping")
347 self.log.error("IPython cluster: stopping")
342 self.stop_engines()
348 self.stop_engines()
343 # Wait a few seconds to let things shut down.
349 # Wait a few seconds to let things shut down.
344 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
350 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
345 dc.start()
351 dc.start()
346
352
347 def sigint_handler(self, signum, frame):
353 def sigint_handler(self, signum, frame):
348 self.log.debug("SIGINT received, stopping launchers...")
354 self.log.debug("SIGINT received, stopping launchers...")
349 self.stop_launchers()
355 self.stop_launchers()
350
356
351 def start_logging(self):
357 def start_logging(self):
352 # Remove old log files of the controller and engine
358 # Remove old log files of the controller and engine
353 if self.clean_logs:
359 if self.clean_logs:
354 log_dir = self.cluster_dir.log_dir
360 log_dir = self.cluster_dir.log_dir
355 for f in os.listdir(log_dir):
361 for f in os.listdir(log_dir):
356 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
362 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
357 os.remove(os.path.join(log_dir, f))
363 os.remove(os.path.join(log_dir, f))
358 # This will remove old log files for ipcluster itself
364 # This will remove old log files for ipcluster itself
359 # super(IPClusterApp, self).start_logging()
365 # super(IPClusterApp, self).start_logging()
360
366
361 def start(self):
367 def start(self):
362 """Start the app for the engines subcommand."""
368 """Start the app for the engines subcommand."""
363 self.log.info("IPython cluster: started")
369 self.log.info("IPython cluster: started")
364 # First see if the cluster is already running
370 # First see if the cluster is already running
365
371
366 # Now log and daemonize
372 # Now log and daemonize
367 self.log.info(
373 self.log.info(
368 'Starting engines with [daemon=%r]' % self.daemonize
374 'Starting engines with [daemon=%r]' % self.daemonize
369 )
375 )
370 # TODO: Get daemonize working on Windows or as a Windows Server.
376 # TODO: Get daemonize working on Windows or as a Windows Server.
371 if self.daemonize:
377 if self.daemonize:
372 if os.name=='posix':
378 if os.name=='posix':
373 from twisted.scripts._twistd_unix import daemonize
379 from twisted.scripts._twistd_unix import daemonize
374 daemonize()
380 daemonize()
375
381
376 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
382 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
377 dc.start()
383 dc.start()
378 # Now write the new pid file AFTER our new forked pid is active.
384 # Now write the new pid file AFTER our new forked pid is active.
379 # self.write_pid_file()
385 # self.write_pid_file()
380 try:
386 try:
381 self.loop.start()
387 self.loop.start()
382 except KeyboardInterrupt:
388 except KeyboardInterrupt:
383 pass
389 pass
384 except zmq.ZMQError as e:
390 except zmq.ZMQError as e:
385 if e.errno == errno.EINTR:
391 if e.errno == errno.EINTR:
386 pass
392 pass
387 else:
393 else:
388 raise
394 raise
389
395
390 start_aliases = {}
396 start_aliases = {}
391 start_aliases.update(engine_aliases)
397 start_aliases.update(engine_aliases)
392 start_aliases.update(dict(
398 start_aliases.update(dict(
393 delay='IPClusterStart.delay',
399 delay='IPClusterStart.delay',
394 clean_logs='IPClusterStart.clean_logs',
400 clean_logs='IPClusterStart.clean_logs',
395 ))
401 ))
396
402
397 class IPClusterStart(IPClusterEngines):
403 class IPClusterStart(IPClusterEngines):
398
404
399 name = u'ipcluster'
405 name = u'ipcluster'
400 description = start_help
406 description = start_help
401 usage = None
407 usage = None
402 default_config_file_name = default_config_file_name
408 default_config_file_name = default_config_file_name
403 default_log_level = logging.INFO
409 default_log_level = logging.INFO
404 auto_create_cluster_dir = Bool(True, config=True,
410 auto_create_cluster_dir = Bool(True, config=True,
405 help="whether to create the cluster_dir if it doesn't exist")
411 help="whether to create the cluster_dir if it doesn't exist")
406 classes = List()
412 classes = List()
407 def _classes_default(self,):
413 def _classes_default(self,):
408 from IPython.parallel.apps import launcher
414 from IPython.parallel.apps import launcher
409 return [ClusterDir]+launcher.all_launchers
415 return [ClusterDir]+launcher.all_launchers
410
416
411 clean_logs = Bool(True, config=True,
417 clean_logs = Bool(True, config=True,
412 help="whether to cleanup old logs before starting")
418 help="whether to cleanup old logs before starting")
413
419
414 delay = CFloat(1., config=True,
420 delay = CFloat(1., config=True,
415 help="delay (in s) between starting the controller and the engines")
421 help="delay (in s) between starting the controller and the engines")
416
422
417 controller_launcher_class = Str('LocalControllerLauncher',
423 controller_launcher_class = Str('LocalControllerLauncher',
418 config=True,
424 config=True,
419 help="The class for launching a Controller."
425 help="The class for launching a Controller."
420 )
426 )
421 reset = Bool(False, config=True,
427 reset = Bool(False, config=True,
422 help="Whether to reset config files as part of '--create'."
428 help="Whether to reset config files as part of '--create'."
423 )
429 )
424
430
425 # flags = Dict(flags)
431 # flags = Dict(flags)
426 aliases = Dict(start_aliases)
432 aliases = Dict(start_aliases)
427
433
428 def init_clusterdir(self):
429 try:
430 super(IPClusterStart, self).init_clusterdir()
431 except ClusterDirError:
432 raise ClusterDirError(
433 "Could not find a cluster directory. A cluster dir must "
434 "be created before running 'ipcluster start'. Do "
435 "'ipcluster create -h' or 'ipcluster list -h' for more "
436 "information about creating and listing cluster dirs."
437 )
438
439 def init_launchers(self):
434 def init_launchers(self):
440 self.controller_launcher = self.build_launcher(self.controller_launcher_class)
435 self.controller_launcher = self.build_launcher(self.controller_launcher_class)
441 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
436 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
442 self.controller_launcher.on_stop(self.stop_launchers)
437 self.controller_launcher.on_stop(self.stop_launchers)
443
438
444 def start_controller(self):
439 def start_controller(self):
445 self.controller_launcher.start(
440 self.controller_launcher.start(
446 cluster_dir=self.cluster_dir.location
441 cluster_dir=self.cluster_dir.location
447 )
442 )
448
443
449 def stop_controller(self):
444 def stop_controller(self):
450 # self.log.info("In stop_controller")
445 # self.log.info("In stop_controller")
451 if self.controller_launcher and self.controller_launcher.running:
446 if self.controller_launcher and self.controller_launcher.running:
452 return self.controller_launcher.stop()
447 return self.controller_launcher.stop()
453
448
454 def stop_launchers(self, r=None):
449 def stop_launchers(self, r=None):
455 if not self._stopping:
450 if not self._stopping:
456 self.stop_controller()
451 self.stop_controller()
457 super(IPClusterStart, self).stop_launchers()
452 super(IPClusterStart, self).stop_launchers()
458
453
459 def start(self):
454 def start(self):
460 """Start the app for the start subcommand."""
455 """Start the app for the start subcommand."""
461 # First see if the cluster is already running
456 # First see if the cluster is already running
462 try:
457 try:
463 pid = self.get_pid_from_file()
458 pid = self.get_pid_from_file()
464 except PIDFileError:
459 except PIDFileError:
465 pass
460 pass
466 else:
461 else:
467 if self.check_pid(pid):
462 if self.check_pid(pid):
468 self.log.critical(
463 self.log.critical(
469 'Cluster is already running with [pid=%s]. '
464 'Cluster is already running with [pid=%s]. '
470 'use "ipcluster stop" to stop the cluster.' % pid
465 'use "ipcluster stop" to stop the cluster.' % pid
471 )
466 )
472 # Here I exit with a unusual exit status that other processes
467 # Here I exit with a unusual exit status that other processes
473 # can watch for to learn how I existed.
468 # can watch for to learn how I existed.
474 self.exit(ALREADY_STARTED)
469 self.exit(ALREADY_STARTED)
475 else:
470 else:
476 self.remove_pid_file()
471 self.remove_pid_file()
477
472
478
473
479 # Now log and daemonize
474 # Now log and daemonize
480 self.log.info(
475 self.log.info(
481 'Starting ipcluster with [daemon=%r]' % self.daemonize
476 'Starting ipcluster with [daemon=%r]' % self.daemonize
482 )
477 )
483 # TODO: Get daemonize working on Windows or as a Windows Server.
478 # TODO: Get daemonize working on Windows or as a Windows Server.
484 if self.daemonize:
479 if self.daemonize:
485 if os.name=='posix':
480 if os.name=='posix':
486 from twisted.scripts._twistd_unix import daemonize
481 from twisted.scripts._twistd_unix import daemonize
487 daemonize()
482 daemonize()
488
483
489 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
484 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
490 dc.start()
485 dc.start()
491 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
486 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
492 dc.start()
487 dc.start()
493 # Now write the new pid file AFTER our new forked pid is active.
488 # Now write the new pid file AFTER our new forked pid is active.
494 self.write_pid_file()
489 self.write_pid_file()
495 try:
490 try:
496 self.loop.start()
491 self.loop.start()
497 except KeyboardInterrupt:
492 except KeyboardInterrupt:
498 pass
493 pass
499 except zmq.ZMQError as e:
494 except zmq.ZMQError as e:
500 if e.errno == errno.EINTR:
495 if e.errno == errno.EINTR:
501 pass
496 pass
502 else:
497 else:
503 raise
498 raise
504 finally:
499 finally:
505 self.remove_pid_file()
500 self.remove_pid_file()
506
501
507 base='IPython.parallel.apps.ipclusterapp.IPCluster'
502 base='IPython.parallel.apps.ipclusterapp.IPCluster'
508
503
509 class IPClusterApp(Application):
504 class IPClusterApp(Application):
510 name = u'ipcluster'
505 name = u'ipcluster'
511 description = _description
506 description = _description
512
507
513 subcommands = {'create' : (base+'Create', create_help),
508 subcommands = {'create' : (base+'Create', create_help),
514 'list' : (base+'List', list_help),
509 'list' : (base+'List', list_help),
515 'start' : (base+'Start', start_help),
510 'start' : (base+'Start', start_help),
516 'stop' : (base+'Stop', stop_help),
511 'stop' : (base+'Stop', stop_help),
517 'engines' : (base+'Engines', engines_help),
512 'engines' : (base+'Engines', engines_help),
518 }
513 }
519
514
520 # no aliases or flags for parent App
515 # no aliases or flags for parent App
521 aliases = Dict()
516 aliases = Dict()
522 flags = Dict()
517 flags = Dict()
523
518
524 def start(self):
519 def start(self):
525 if self.subapp is None:
520 if self.subapp is None:
526 print "No subcommand specified! Must specify one of: %s"%(self.subcommands.keys())
521 print "No subcommand specified! Must specify one of: %s"%(self.subcommands.keys())
527 print
522 print
528 self.print_subcommands()
523 self.print_subcommands()
529 self.exit(1)
524 self.exit(1)
530 else:
525 else:
531 return self.subapp.start()
526 return self.subapp.start()
532
527
533 def launch_new_instance():
528 def launch_new_instance():
534 """Create and run the IPython cluster."""
529 """Create and run the IPython cluster."""
535 app = IPClusterApp()
530 app = IPClusterApp()
536 app.initialize()
531 app.initialize()
537 app.start()
532 app.start()
538
533
539
534
540 if __name__ == '__main__':
535 if __name__ == '__main__':
541 launch_new_instance()
536 launch_new_instance()
542
537
@@ -1,400 +1,401 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython controller application.
4 The IPython controller application.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 from __future__ import with_statement
18 from __future__ import with_statement
19
19
20 import copy
20 import copy
21 import os
21 import os
22 import logging
22 import logging
23 import socket
23 import socket
24 import stat
24 import stat
25 import sys
25 import sys
26 import uuid
26 import uuid
27
27
28 from multiprocessing import Process
28 from multiprocessing import Process
29
29
30 import zmq
30 import zmq
31 from zmq.devices import ProcessMonitoredQueue
31 from zmq.devices import ProcessMonitoredQueue
32 from zmq.log.handlers import PUBHandler
32 from zmq.log.handlers import PUBHandler
33 from zmq.utils import jsonapi as json
33 from zmq.utils import jsonapi as json
34
34
35 from IPython.config.loader import Config
35 from IPython.config.loader import Config
36
36
37 from IPython.parallel import factory
37 from IPython.parallel import factory
38
38
39 from IPython.parallel.apps.clusterdir import (
39 from IPython.parallel.apps.clusterdir import (
40 ClusterDir,
40 ClusterDir,
41 ClusterApplication,
41 ClusterApplication,
42 base_flags
42 base_flags
43 # ClusterDirConfigLoader
43 # ClusterDirConfigLoader
44 )
44 )
45 from IPython.utils.importstring import import_item
45 from IPython.utils.importstring import import_item
46 from IPython.utils.traitlets import Instance, Unicode, Bool, List, CStr, Dict
46 from IPython.utils.traitlets import Instance, Unicode, Bool, List, CStr, Dict
47
47
48 # from IPython.parallel.controller.controller import ControllerFactory
48 # from IPython.parallel.controller.controller import ControllerFactory
49 from IPython.parallel.streamsession import StreamSession
49 from IPython.parallel.streamsession import StreamSession
50 from IPython.parallel.controller.heartmonitor import HeartMonitor
50 from IPython.parallel.controller.heartmonitor import HeartMonitor
51 from IPython.parallel.controller.hub import Hub, HubFactory
51 from IPython.parallel.controller.hub import Hub, HubFactory
52 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
52 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
53 from IPython.parallel.controller.sqlitedb import SQLiteDB
53 from IPython.parallel.controller.sqlitedb import SQLiteDB
54
54
55 from IPython.parallel.util import signal_children,disambiguate_ip_address, split_url
55 from IPython.parallel.util import signal_children,disambiguate_ip_address, split_url
56
56
57 # conditional import of MongoDB backend class
57 # conditional import of MongoDB backend class
58
58
59 try:
59 try:
60 from IPython.parallel.controller.mongodb import MongoDB
60 from IPython.parallel.controller.mongodb import MongoDB
61 except ImportError:
61 except ImportError:
62 maybe_mongo = []
62 maybe_mongo = []
63 else:
63 else:
64 maybe_mongo = [MongoDB]
64 maybe_mongo = [MongoDB]
65
65
66
66
67 #-----------------------------------------------------------------------------
67 #-----------------------------------------------------------------------------
68 # Module level variables
68 # Module level variables
69 #-----------------------------------------------------------------------------
69 #-----------------------------------------------------------------------------
70
70
71
71
72 #: The default config file name for this application
72 #: The default config file name for this application
73 default_config_file_name = u'ipcontroller_config.py'
73 default_config_file_name = u'ipcontroller_config.py'
74
74
75
75
76 _description = """Start the IPython controller for parallel computing.
76 _description = """Start the IPython controller for parallel computing.
77
77
78 The IPython controller provides a gateway between the IPython engines and
78 The IPython controller provides a gateway between the IPython engines and
79 clients. The controller needs to be started before the engines and can be
79 clients. The controller needs to be started before the engines and can be
80 configured using command line options or using a cluster directory. Cluster
80 configured using command line options or using a cluster directory. Cluster
81 directories contain config, log and security files and are usually located in
81 directories contain config, log and security files and are usually located in
82 your ipython directory and named as "cluster_<profile>". See the --profile
82 your ipython directory and named as "cluster_<profile>". See the --profile
83 and --cluster-dir options for details.
83 and --cluster-dir options for details.
84 """
84 """
85
85
86
86
87
87
88
88
89 #-----------------------------------------------------------------------------
89 #-----------------------------------------------------------------------------
90 # The main application
90 # The main application
91 #-----------------------------------------------------------------------------
91 #-----------------------------------------------------------------------------
92 flags = {}
92 flags = {}
93 flags.update(base_flags)
93 flags.update(base_flags)
94 flags.update({
94 flags.update({
95 'usethreads' : ( {'IPControllerApp' : {'usethreads' : True}},
95 'usethreads' : ( {'IPControllerApp' : {'usethreads' : True}},
96 'Use threads instead of processes for the schedulers'),
96 'Use threads instead of processes for the schedulers'),
97 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
97 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
98 'use the SQLiteDB backend'),
98 'use the SQLiteDB backend'),
99 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
99 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
100 'use the MongoDB backend'),
100 'use the MongoDB backend'),
101 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
101 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
102 'use the in-memory DictDB backend'),
102 'use the in-memory DictDB backend'),
103 })
103 })
104
104
105 flags.update()
105 flags.update()
106
106
107 class IPControllerApp(ClusterApplication):
107 class IPControllerApp(ClusterApplication):
108
108
109 name = u'ipcontroller'
109 name = u'ipcontroller'
110 description = _description
110 description = _description
111 # command_line_loader = IPControllerAppConfigLoader
111 # command_line_loader = IPControllerAppConfigLoader
112 default_config_file_name = default_config_file_name
112 default_config_file_name = default_config_file_name
113 auto_create_cluster_dir = True
114 classes = [ClusterDir, StreamSession, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
113 classes = [ClusterDir, StreamSession, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
115
114
115 auto_create_cluster_dir = Bool(True, config=True,
116 help="Whether to create cluster_dir if it exists.")
116 reuse_files = Bool(False, config=True,
117 reuse_files = Bool(False, config=True,
117 help='Whether to reuse existing json connection files [default: False]'
118 help='Whether to reuse existing json connection files [default: False]'
118 )
119 )
119 secure = Bool(True, config=True,
120 secure = Bool(True, config=True,
120 help='Whether to use exec_keys for extra authentication [default: True]'
121 help='Whether to use exec_keys for extra authentication [default: True]'
121 )
122 )
122 ssh_server = Unicode(u'', config=True,
123 ssh_server = Unicode(u'', config=True,
123 help="""ssh url for clients to use when connecting to the Controller
124 help="""ssh url for clients to use when connecting to the Controller
124 processes. It should be of the form: [user@]server[:port]. The
125 processes. It should be of the form: [user@]server[:port]. The
125 Controller\'s listening addresses must be accessible from the ssh server""",
126 Controller\'s listening addresses must be accessible from the ssh server""",
126 )
127 )
127 location = Unicode(u'', config=True,
128 location = Unicode(u'', config=True,
128 help="""The external IP or domain name of the Controller, used for disambiguating
129 help="""The external IP or domain name of the Controller, used for disambiguating
129 engine and client connections.""",
130 engine and client connections.""",
130 )
131 )
131 import_statements = List([], config=True,
132 import_statements = List([], config=True,
132 help="import statements to be run at startup. Necessary in some environments"
133 help="import statements to be run at startup. Necessary in some environments"
133 )
134 )
134
135
135 usethreads = Bool(False, config=True,
136 usethreads = Bool(False, config=True,
136 help='Use threads instead of processes for the schedulers',
137 help='Use threads instead of processes for the schedulers',
137 )
138 )
138
139
139 # internal
140 # internal
140 children = List()
141 children = List()
141 mq_class = CStr('zmq.devices.ProcessMonitoredQueue')
142 mq_class = CStr('zmq.devices.ProcessMonitoredQueue')
142
143
143 def _usethreads_changed(self, name, old, new):
144 def _usethreads_changed(self, name, old, new):
144 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
145 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
145
146
146 aliases = Dict(dict(
147 aliases = Dict(dict(
147 config = 'IPControllerApp.config_file',
148 config = 'IPControllerApp.config_file',
148 # file = 'IPControllerApp.url_file',
149 # file = 'IPControllerApp.url_file',
149 log_level = 'IPControllerApp.log_level',
150 log_level = 'IPControllerApp.log_level',
150 reuse_files = 'IPControllerApp.reuse_files',
151 reuse_files = 'IPControllerApp.reuse_files',
151 secure = 'IPControllerApp.secure',
152 secure = 'IPControllerApp.secure',
152 ssh = 'IPControllerApp.ssh_server',
153 ssh = 'IPControllerApp.ssh_server',
153 usethreads = 'IPControllerApp.usethreads',
154 usethreads = 'IPControllerApp.usethreads',
154 import_statements = 'IPControllerApp.import_statements',
155 import_statements = 'IPControllerApp.import_statements',
155 location = 'IPControllerApp.location',
156 location = 'IPControllerApp.location',
156
157
157 ident = 'StreamSession.session',
158 ident = 'StreamSession.session',
158 user = 'StreamSession.username',
159 user = 'StreamSession.username',
159 exec_key = 'StreamSession.keyfile',
160 exec_key = 'StreamSession.keyfile',
160
161
161 url = 'HubFactory.url',
162 url = 'HubFactory.url',
162 ip = 'HubFactory.ip',
163 ip = 'HubFactory.ip',
163 transport = 'HubFactory.transport',
164 transport = 'HubFactory.transport',
164 port = 'HubFactory.regport',
165 port = 'HubFactory.regport',
165
166
166 ping = 'HeartMonitor.period',
167 ping = 'HeartMonitor.period',
167
168
168 scheme = 'TaskScheduler.scheme_name',
169 scheme = 'TaskScheduler.scheme_name',
169 hwm = 'TaskScheduler.hwm',
170 hwm = 'TaskScheduler.hwm',
170
171
171
172
172 profile = "ClusterDir.profile",
173 profile = "ClusterDir.profile",
173 cluster_dir = 'ClusterDir.location',
174 cluster_dir = 'ClusterDir.location',
174
175
175 ))
176 ))
176 flags = Dict(flags)
177 flags = Dict(flags)
177
178
178
179
179 def save_connection_dict(self, fname, cdict):
180 def save_connection_dict(self, fname, cdict):
180 """save a connection dict to json file."""
181 """save a connection dict to json file."""
181 c = self.config
182 c = self.config
182 url = cdict['url']
183 url = cdict['url']
183 location = cdict['location']
184 location = cdict['location']
184 if not location:
185 if not location:
185 try:
186 try:
186 proto,ip,port = split_url(url)
187 proto,ip,port = split_url(url)
187 except AssertionError:
188 except AssertionError:
188 pass
189 pass
189 else:
190 else:
190 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
191 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
191 cdict['location'] = location
192 cdict['location'] = location
192 fname = os.path.join(self.cluster_dir.security_dir, fname)
193 fname = os.path.join(self.cluster_dir.security_dir, fname)
193 with open(fname, 'w') as f:
194 with open(fname, 'w') as f:
194 f.write(json.dumps(cdict, indent=2))
195 f.write(json.dumps(cdict, indent=2))
195 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
196 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
196
197
197 def load_config_from_json(self):
198 def load_config_from_json(self):
198 """load config from existing json connector files."""
199 """load config from existing json connector files."""
199 c = self.config
200 c = self.config
200 # load from engine config
201 # load from engine config
201 with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-engine.json')) as f:
202 with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-engine.json')) as f:
202 cfg = json.loads(f.read())
203 cfg = json.loads(f.read())
203 key = c.StreamSession.key = cfg['exec_key']
204 key = c.StreamSession.key = cfg['exec_key']
204 xport,addr = cfg['url'].split('://')
205 xport,addr = cfg['url'].split('://')
205 c.HubFactory.engine_transport = xport
206 c.HubFactory.engine_transport = xport
206 ip,ports = addr.split(':')
207 ip,ports = addr.split(':')
207 c.HubFactory.engine_ip = ip
208 c.HubFactory.engine_ip = ip
208 c.HubFactory.regport = int(ports)
209 c.HubFactory.regport = int(ports)
209 self.location = cfg['location']
210 self.location = cfg['location']
210
211
211 # load client config
212 # load client config
212 with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-client.json')) as f:
213 with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-client.json')) as f:
213 cfg = json.loads(f.read())
214 cfg = json.loads(f.read())
214 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
215 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
215 xport,addr = cfg['url'].split('://')
216 xport,addr = cfg['url'].split('://')
216 c.HubFactory.client_transport = xport
217 c.HubFactory.client_transport = xport
217 ip,ports = addr.split(':')
218 ip,ports = addr.split(':')
218 c.HubFactory.client_ip = ip
219 c.HubFactory.client_ip = ip
219 self.ssh_server = cfg['ssh']
220 self.ssh_server = cfg['ssh']
220 assert int(ports) == c.HubFactory.regport, "regport mismatch"
221 assert int(ports) == c.HubFactory.regport, "regport mismatch"
221
222
222 def init_hub(self):
223 def init_hub(self):
223 # This is the working dir by now.
224 # This is the working dir by now.
224 sys.path.insert(0, '')
225 sys.path.insert(0, '')
225 c = self.config
226 c = self.config
226
227
227 self.do_import_statements()
228 self.do_import_statements()
228 reusing = self.reuse_files
229 reusing = self.reuse_files
229 if reusing:
230 if reusing:
230 try:
231 try:
231 self.load_config_from_json()
232 self.load_config_from_json()
232 except (AssertionError,IOError):
233 except (AssertionError,IOError):
233 reusing=False
234 reusing=False
234 # check again, because reusing may have failed:
235 # check again, because reusing may have failed:
235 if reusing:
236 if reusing:
236 pass
237 pass
237 elif self.secure:
238 elif self.secure:
238 key = str(uuid.uuid4())
239 key = str(uuid.uuid4())
239 # keyfile = os.path.join(self.cluster_dir.security_dir, self.exec_key)
240 # keyfile = os.path.join(self.cluster_dir.security_dir, self.exec_key)
240 # with open(keyfile, 'w') as f:
241 # with open(keyfile, 'w') as f:
241 # f.write(key)
242 # f.write(key)
242 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
243 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
243 c.StreamSession.key = key
244 c.StreamSession.key = key
244 else:
245 else:
245 key = c.StreamSession.key = ''
246 key = c.StreamSession.key = ''
246
247
247 try:
248 try:
248 self.factory = HubFactory(config=c, log=self.log)
249 self.factory = HubFactory(config=c, log=self.log)
249 # self.start_logging()
250 # self.start_logging()
250 self.factory.init_hub()
251 self.factory.init_hub()
251 except:
252 except:
252 self.log.error("Couldn't construct the Controller", exc_info=True)
253 self.log.error("Couldn't construct the Controller", exc_info=True)
253 self.exit(1)
254 self.exit(1)
254
255
255 if not reusing:
256 if not reusing:
256 # save to new json config files
257 # save to new json config files
257 f = self.factory
258 f = self.factory
258 cdict = {'exec_key' : key,
259 cdict = {'exec_key' : key,
259 'ssh' : self.ssh_server,
260 'ssh' : self.ssh_server,
260 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
261 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
261 'location' : self.location
262 'location' : self.location
262 }
263 }
263 self.save_connection_dict('ipcontroller-client.json', cdict)
264 self.save_connection_dict('ipcontroller-client.json', cdict)
264 edict = cdict
265 edict = cdict
265 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
266 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
266 self.save_connection_dict('ipcontroller-engine.json', edict)
267 self.save_connection_dict('ipcontroller-engine.json', edict)
267
268
268 #
269 #
269 def init_schedulers(self):
270 def init_schedulers(self):
270 children = self.children
271 children = self.children
271 mq = import_item(self.mq_class)
272 mq = import_item(self.mq_class)
272
273
273 hub = self.factory
274 hub = self.factory
274 # maybe_inproc = 'inproc://monitor' if self.usethreads else self.monitor_url
275 # maybe_inproc = 'inproc://monitor' if self.usethreads else self.monitor_url
275 # IOPub relay (in a Process)
276 # IOPub relay (in a Process)
276 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
277 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
277 q.bind_in(hub.client_info['iopub'])
278 q.bind_in(hub.client_info['iopub'])
278 q.bind_out(hub.engine_info['iopub'])
279 q.bind_out(hub.engine_info['iopub'])
279 q.setsockopt_out(zmq.SUBSCRIBE, '')
280 q.setsockopt_out(zmq.SUBSCRIBE, '')
280 q.connect_mon(hub.monitor_url)
281 q.connect_mon(hub.monitor_url)
281 q.daemon=True
282 q.daemon=True
282 children.append(q)
283 children.append(q)
283
284
284 # Multiplexer Queue (in a Process)
285 # Multiplexer Queue (in a Process)
285 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
286 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
286 q.bind_in(hub.client_info['mux'])
287 q.bind_in(hub.client_info['mux'])
287 q.setsockopt_in(zmq.IDENTITY, 'mux')
288 q.setsockopt_in(zmq.IDENTITY, 'mux')
288 q.bind_out(hub.engine_info['mux'])
289 q.bind_out(hub.engine_info['mux'])
289 q.connect_mon(hub.monitor_url)
290 q.connect_mon(hub.monitor_url)
290 q.daemon=True
291 q.daemon=True
291 children.append(q)
292 children.append(q)
292
293
293 # Control Queue (in a Process)
294 # Control Queue (in a Process)
294 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
295 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
295 q.bind_in(hub.client_info['control'])
296 q.bind_in(hub.client_info['control'])
296 q.setsockopt_in(zmq.IDENTITY, 'control')
297 q.setsockopt_in(zmq.IDENTITY, 'control')
297 q.bind_out(hub.engine_info['control'])
298 q.bind_out(hub.engine_info['control'])
298 q.connect_mon(hub.monitor_url)
299 q.connect_mon(hub.monitor_url)
299 q.daemon=True
300 q.daemon=True
300 children.append(q)
301 children.append(q)
301 try:
302 try:
302 scheme = self.config.TaskScheduler.scheme_name
303 scheme = self.config.TaskScheduler.scheme_name
303 except AttributeError:
304 except AttributeError:
304 scheme = TaskScheduler.scheme_name.get_default_value()
305 scheme = TaskScheduler.scheme_name.get_default_value()
305 # Task Queue (in a Process)
306 # Task Queue (in a Process)
306 if scheme == 'pure':
307 if scheme == 'pure':
307 self.log.warn("task::using pure XREQ Task scheduler")
308 self.log.warn("task::using pure XREQ Task scheduler")
308 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
309 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
309 # q.setsockopt_out(zmq.HWM, hub.hwm)
310 # q.setsockopt_out(zmq.HWM, hub.hwm)
310 q.bind_in(hub.client_info['task'][1])
311 q.bind_in(hub.client_info['task'][1])
311 q.setsockopt_in(zmq.IDENTITY, 'task')
312 q.setsockopt_in(zmq.IDENTITY, 'task')
312 q.bind_out(hub.engine_info['task'])
313 q.bind_out(hub.engine_info['task'])
313 q.connect_mon(hub.monitor_url)
314 q.connect_mon(hub.monitor_url)
314 q.daemon=True
315 q.daemon=True
315 children.append(q)
316 children.append(q)
316 elif scheme == 'none':
317 elif scheme == 'none':
317 self.log.warn("task::using no Task scheduler")
318 self.log.warn("task::using no Task scheduler")
318
319
319 else:
320 else:
320 self.log.info("task::using Python %s Task scheduler"%scheme)
321 self.log.info("task::using Python %s Task scheduler"%scheme)
321 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
322 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
322 hub.monitor_url, hub.client_info['notification'])
323 hub.monitor_url, hub.client_info['notification'])
323 kwargs = dict(logname=self.log.name, loglevel=self.log_level,
324 kwargs = dict(logname=self.log.name, loglevel=self.log_level,
324 config=dict(self.config))
325 config=dict(self.config))
325 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
326 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
326 q.daemon=True
327 q.daemon=True
327 children.append(q)
328 children.append(q)
328
329
329
330
330 def save_urls(self):
331 def save_urls(self):
331 """save the registration urls to files."""
332 """save the registration urls to files."""
332 c = self.config
333 c = self.config
333
334
334 sec_dir = self.cluster_dir.security_dir
335 sec_dir = self.cluster_dir.security_dir
335 cf = self.factory
336 cf = self.factory
336
337
337 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
338 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
338 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
339 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
339
340
340 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
341 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
341 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
342 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
342
343
343
344
344 def do_import_statements(self):
345 def do_import_statements(self):
345 statements = self.import_statements
346 statements = self.import_statements
346 for s in statements:
347 for s in statements:
347 try:
348 try:
348 self.log.msg("Executing statement: '%s'" % s)
349 self.log.msg("Executing statement: '%s'" % s)
349 exec s in globals(), locals()
350 exec s in globals(), locals()
350 except:
351 except:
351 self.log.msg("Error running statement: %s" % s)
352 self.log.msg("Error running statement: %s" % s)
352
353
353 # def start_logging(self):
354 # def start_logging(self):
354 # super(IPControllerApp, self).start_logging()
355 # super(IPControllerApp, self).start_logging()
355 # if self.config.Global.log_url:
356 # if self.config.Global.log_url:
356 # context = self.factory.context
357 # context = self.factory.context
357 # lsock = context.socket(zmq.PUB)
358 # lsock = context.socket(zmq.PUB)
358 # lsock.connect(self.config.Global.log_url)
359 # lsock.connect(self.config.Global.log_url)
359 # handler = PUBHandler(lsock)
360 # handler = PUBHandler(lsock)
360 # handler.root_topic = 'controller'
361 # handler.root_topic = 'controller'
361 # handler.setLevel(self.log_level)
362 # handler.setLevel(self.log_level)
362 # self.log.addHandler(handler)
363 # self.log.addHandler(handler)
363 # #
364 # #
364
365
365 def initialize(self, argv=None):
366 def initialize(self, argv=None):
366 super(IPControllerApp, self).initialize(argv)
367 super(IPControllerApp, self).initialize(argv)
367 self.init_hub()
368 self.init_hub()
368 self.init_schedulers()
369 self.init_schedulers()
369
370
370 def start(self):
371 def start(self):
371 # Start the subprocesses:
372 # Start the subprocesses:
372 self.factory.start()
373 self.factory.start()
373 child_procs = []
374 child_procs = []
374 for child in self.children:
375 for child in self.children:
375 child.start()
376 child.start()
376 if isinstance(child, ProcessMonitoredQueue):
377 if isinstance(child, ProcessMonitoredQueue):
377 child_procs.append(child.launcher)
378 child_procs.append(child.launcher)
378 elif isinstance(child, Process):
379 elif isinstance(child, Process):
379 child_procs.append(child)
380 child_procs.append(child)
380 if child_procs:
381 if child_procs:
381 signal_children(child_procs)
382 signal_children(child_procs)
382
383
383 self.write_pid_file(overwrite=True)
384 self.write_pid_file(overwrite=True)
384
385
385 try:
386 try:
386 self.factory.loop.start()
387 self.factory.loop.start()
387 except KeyboardInterrupt:
388 except KeyboardInterrupt:
388 self.log.critical("Interrupted, Exiting...\n")
389 self.log.critical("Interrupted, Exiting...\n")
389
390
390
391
391
392
392 def launch_new_instance():
393 def launch_new_instance():
393 """Create and run the IPython controller"""
394 """Create and run the IPython controller"""
394 app = IPControllerApp()
395 app = IPControllerApp()
395 app.initialize()
396 app.initialize()
396 app.start()
397 app.start()
397
398
398
399
399 if __name__ == '__main__':
400 if __name__ == '__main__':
400 launch_new_instance()
401 launch_new_instance()
@@ -1,290 +1,289 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython engine application
4 The IPython engine application
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import json
18 import json
19 import os
19 import os
20 import sys
20 import sys
21
21
22 import zmq
22 import zmq
23 from zmq.eventloop import ioloop
23 from zmq.eventloop import ioloop
24
24
25 from IPython.parallel.apps.clusterdir import (
25 from IPython.parallel.apps.clusterdir import (
26 ClusterApplication,
26 ClusterApplication,
27 ClusterDir,
27 ClusterDir,
28 base_aliases,
28 base_aliases,
29 # ClusterDirConfigLoader
29 # ClusterDirConfigLoader
30 )
30 )
31 from IPython.zmq.log import EnginePUBHandler
31 from IPython.zmq.log import EnginePUBHandler
32
32
33 from IPython.config.configurable import Configurable
33 from IPython.config.configurable import Configurable
34 from IPython.parallel.streamsession import StreamSession
34 from IPython.parallel.streamsession import StreamSession
35 from IPython.parallel.engine.engine import EngineFactory
35 from IPython.parallel.engine.engine import EngineFactory
36 from IPython.parallel.engine.streamkernel import Kernel
36 from IPython.parallel.engine.streamkernel import Kernel
37 from IPython.parallel.util import disambiguate_url
37 from IPython.parallel.util import disambiguate_url
38
38
39 from IPython.utils.importstring import import_item
39 from IPython.utils.importstring import import_item
40 from IPython.utils.traitlets import Str, Bool, Unicode, Dict, List, CStr
40 from IPython.utils.traitlets import Str, Bool, Unicode, Dict, List, CStr
41
41
42
42
43 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
44 # Module level variables
44 # Module level variables
45 #-----------------------------------------------------------------------------
45 #-----------------------------------------------------------------------------
46
46
47 #: The default config file name for this application
47 #: The default config file name for this application
48 default_config_file_name = u'ipengine_config.py'
48 default_config_file_name = u'ipengine_config.py'
49
49
50 _description = """Start an IPython engine for parallel computing.\n\n
50 _description = """Start an IPython engine for parallel computing.\n\n
51
51
52 IPython engines run in parallel and perform computations on behalf of a client
52 IPython engines run in parallel and perform computations on behalf of a client
53 and controller. A controller needs to be started before the engines. The
53 and controller. A controller needs to be started before the engines. The
54 engine can be configured using command line options or using a cluster
54 engine can be configured using command line options or using a cluster
55 directory. Cluster directories contain config, log and security files and are
55 directory. Cluster directories contain config, log and security files and are
56 usually located in your ipython directory and named as "cluster_<profile>".
56 usually located in your ipython directory and named as "cluster_<profile>".
57 See the `profile` and `cluster_dir` options for details.
57 See the `profile` and `cluster_dir` options for details.
58 """
58 """
59
59
60
60
61 #-----------------------------------------------------------------------------
61 #-----------------------------------------------------------------------------
62 # MPI configuration
62 # MPI configuration
63 #-----------------------------------------------------------------------------
63 #-----------------------------------------------------------------------------
64
64
65 mpi4py_init = """from mpi4py import MPI as mpi
65 mpi4py_init = """from mpi4py import MPI as mpi
66 mpi.size = mpi.COMM_WORLD.Get_size()
66 mpi.size = mpi.COMM_WORLD.Get_size()
67 mpi.rank = mpi.COMM_WORLD.Get_rank()
67 mpi.rank = mpi.COMM_WORLD.Get_rank()
68 """
68 """
69
69
70
70
71 pytrilinos_init = """from PyTrilinos import Epetra
71 pytrilinos_init = """from PyTrilinos import Epetra
72 class SimpleStruct:
72 class SimpleStruct:
73 pass
73 pass
74 mpi = SimpleStruct()
74 mpi = SimpleStruct()
75 mpi.rank = 0
75 mpi.rank = 0
76 mpi.size = 0
76 mpi.size = 0
77 """
77 """
78
78
79 class MPI(Configurable):
79 class MPI(Configurable):
80 """Configurable for MPI initialization"""
80 """Configurable for MPI initialization"""
81 use = Str('', config=True,
81 use = Str('', config=True,
82 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
82 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
83 )
83 )
84
84
85 def _on_use_changed(self, old, new):
85 def _on_use_changed(self, old, new):
86 # load default init script if it's not set
86 # load default init script if it's not set
87 if not self.init_script:
87 if not self.init_script:
88 self.init_script = self.default_inits.get(new, '')
88 self.init_script = self.default_inits.get(new, '')
89
89
90 init_script = Str('', config=True,
90 init_script = Str('', config=True,
91 help="Initialization code for MPI")
91 help="Initialization code for MPI")
92
92
93 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
93 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
94 config=True)
94 config=True)
95
95
96
96
97 #-----------------------------------------------------------------------------
97 #-----------------------------------------------------------------------------
98 # Main application
98 # Main application
99 #-----------------------------------------------------------------------------
99 #-----------------------------------------------------------------------------
100
100
101
101
102 class IPEngineApp(ClusterApplication):
102 class IPEngineApp(ClusterApplication):
103
103
104 app_name = Unicode(u'ipengine')
104 app_name = Unicode(u'ipengine')
105 description = Unicode(_description)
105 description = Unicode(_description)
106 default_config_file_name = default_config_file_name
106 default_config_file_name = default_config_file_name
107 classes = List([ClusterDir, StreamSession, EngineFactory, Kernel, MPI])
107 classes = List([ClusterDir, StreamSession, EngineFactory, Kernel, MPI])
108
108
109 auto_create_cluster_dir = Bool(False, config=True,
109 auto_create_cluster_dir = Bool(False,
110 help="whether to create the cluster_dir if it doesn't exist")
110 help="whether to create the cluster_dir if it doesn't exist")
111
111
112 startup_script = Unicode(u'', config=True,
112 startup_script = Unicode(u'', config=True,
113 help='specify a script to be run at startup')
113 help='specify a script to be run at startup')
114 startup_command = Str('', config=True,
114 startup_command = Str('', config=True,
115 help='specify a command to be run at startup')
115 help='specify a command to be run at startup')
116
116
117 url_file = Unicode(u'', config=True,
117 url_file = Unicode(u'', config=True,
118 help="""The full location of the file containing the connection information for
118 help="""The full location of the file containing the connection information for
119 the controller. If this is not given, the file must be in the
119 the controller. If this is not given, the file must be in the
120 security directory of the cluster directory. This location is
120 security directory of the cluster directory. This location is
121 resolved using the `profile` or `cluster_dir` options.""",
121 resolved using the `profile` or `cluster_dir` options.""",
122 )
122 )
123
123
124 url_file_name = Unicode(u'ipcontroller-engine.json')
124 url_file_name = Unicode(u'ipcontroller-engine.json')
125
125
126 aliases = Dict(dict(
126 aliases = Dict(dict(
127 config = 'IPEngineApp.config_file',
127 config = 'IPEngineApp.config_file',
128 file = 'IPEngineApp.url_file',
128 file = 'IPEngineApp.url_file',
129 c = 'IPEngineApp.startup_command',
129 c = 'IPEngineApp.startup_command',
130 s = 'IPEngineApp.startup_script',
130 s = 'IPEngineApp.startup_script',
131
131
132 ident = 'StreamSession.session',
132 ident = 'StreamSession.session',
133 user = 'StreamSession.username',
133 user = 'StreamSession.username',
134 exec_key = 'StreamSession.keyfile',
134 exec_key = 'StreamSession.keyfile',
135
135
136 url = 'EngineFactory.url',
136 url = 'EngineFactory.url',
137 ip = 'EngineFactory.ip',
137 ip = 'EngineFactory.ip',
138 transport = 'EngineFactory.transport',
138 transport = 'EngineFactory.transport',
139 port = 'EngineFactory.regport',
139 port = 'EngineFactory.regport',
140 location = 'EngineFactory.location',
140 location = 'EngineFactory.location',
141
141
142 timeout = 'EngineFactory.timeout',
142 timeout = 'EngineFactory.timeout',
143
143
144 profile = "ClusterDir.profile",
144 profile = "ClusterDir.profile",
145 cluster_dir = 'ClusterDir.location',
145 cluster_dir = 'ClusterDir.location',
146
146
147 mpi = 'MPI.use',
147 mpi = 'MPI.use',
148
148
149 log_level = 'IPEngineApp.log_level',
149 log_level = 'IPEngineApp.log_level',
150 ))
150 ))
151
151
152 # def find_key_file(self):
152 # def find_key_file(self):
153 # """Set the key file.
153 # """Set the key file.
154 #
154 #
155 # Here we don't try to actually see if it exists for is valid as that
155 # Here we don't try to actually see if it exists for is valid as that
156 # is hadled by the connection logic.
156 # is hadled by the connection logic.
157 # """
157 # """
158 # config = self.master_config
158 # config = self.master_config
159 # # Find the actual controller key file
159 # # Find the actual controller key file
160 # if not config.Global.key_file:
160 # if not config.Global.key_file:
161 # try_this = os.path.join(
161 # try_this = os.path.join(
162 # config.Global.cluster_dir,
162 # config.Global.cluster_dir,
163 # config.Global.security_dir,
163 # config.Global.security_dir,
164 # config.Global.key_file_name
164 # config.Global.key_file_name
165 # )
165 # )
166 # config.Global.key_file = try_this
166 # config.Global.key_file = try_this
167
167
168 def find_url_file(self):
168 def find_url_file(self):
169 """Set the key file.
169 """Set the key file.
170
170
171 Here we don't try to actually see if it exists for is valid as that
171 Here we don't try to actually see if it exists for is valid as that
172 is hadled by the connection logic.
172 is hadled by the connection logic.
173 """
173 """
174 config = self.config
174 config = self.config
175 # Find the actual controller key file
175 # Find the actual controller key file
176 if not self.url_file:
176 if not self.url_file:
177 self.url_file = os.path.join(
177 self.url_file = os.path.join(
178 self.cluster_dir.security_dir,
178 self.cluster_dir.security_dir,
179 self.url_file_name
179 self.url_file_name
180 )
180 )
181
182 def init_engine(self):
181 def init_engine(self):
183 # This is the working dir by now.
182 # This is the working dir by now.
184 sys.path.insert(0, '')
183 sys.path.insert(0, '')
185 config = self.config
184 config = self.config
186 # print config
185 # print config
187 self.find_url_file()
186 self.find_url_file()
188
187
189 # if os.path.exists(config.Global.key_file) and config.Global.secure:
188 # if os.path.exists(config.Global.key_file) and config.Global.secure:
190 # config.SessionFactory.exec_key = config.Global.key_file
189 # config.SessionFactory.exec_key = config.Global.key_file
191 if os.path.exists(self.url_file):
190 if os.path.exists(self.url_file):
192 with open(self.url_file) as f:
191 with open(self.url_file) as f:
193 d = json.loads(f.read())
192 d = json.loads(f.read())
194 for k,v in d.iteritems():
193 for k,v in d.iteritems():
195 if isinstance(v, unicode):
194 if isinstance(v, unicode):
196 d[k] = v.encode()
195 d[k] = v.encode()
197 if d['exec_key']:
196 if d['exec_key']:
198 config.StreamSession.key = d['exec_key']
197 config.StreamSession.key = d['exec_key']
199 d['url'] = disambiguate_url(d['url'], d['location'])
198 d['url'] = disambiguate_url(d['url'], d['location'])
200 config.EngineFactory.url = d['url']
199 config.EngineFactory.url = d['url']
201 config.EngineFactory.location = d['location']
200 config.EngineFactory.location = d['location']
202
201
203 try:
202 try:
204 exec_lines = config.Kernel.exec_lines
203 exec_lines = config.Kernel.exec_lines
205 except AttributeError:
204 except AttributeError:
206 config.Kernel.exec_lines = []
205 config.Kernel.exec_lines = []
207 exec_lines = config.Kernel.exec_lines
206 exec_lines = config.Kernel.exec_lines
208
207
209 if self.startup_script:
208 if self.startup_script:
210 enc = sys.getfilesystemencoding() or 'utf8'
209 enc = sys.getfilesystemencoding() or 'utf8'
211 cmd="execfile(%r)"%self.startup_script.encode(enc)
210 cmd="execfile(%r)"%self.startup_script.encode(enc)
212 exec_lines.append(cmd)
211 exec_lines.append(cmd)
213 if self.startup_command:
212 if self.startup_command:
214 exec_lines.append(self.startup_command)
213 exec_lines.append(self.startup_command)
215
214
216 # Create the underlying shell class and Engine
215 # Create the underlying shell class and Engine
217 # shell_class = import_item(self.master_config.Global.shell_class)
216 # shell_class = import_item(self.master_config.Global.shell_class)
218 # print self.config
217 # print self.config
219 try:
218 try:
220 self.engine = EngineFactory(config=config, log=self.log)
219 self.engine = EngineFactory(config=config, log=self.log)
221 except:
220 except:
222 self.log.error("Couldn't start the Engine", exc_info=True)
221 self.log.error("Couldn't start the Engine", exc_info=True)
223 self.exit(1)
222 self.exit(1)
224
223
225 # self.start_logging()
224 # self.start_logging()
226
225
227 # Create the service hierarchy
226 # Create the service hierarchy
228 # self.main_service = service.MultiService()
227 # self.main_service = service.MultiService()
229 # self.engine_service.setServiceParent(self.main_service)
228 # self.engine_service.setServiceParent(self.main_service)
230 # self.tub_service = Tub()
229 # self.tub_service = Tub()
231 # self.tub_service.setServiceParent(self.main_service)
230 # self.tub_service.setServiceParent(self.main_service)
232 # # This needs to be called before the connection is initiated
231 # # This needs to be called before the connection is initiated
233 # self.main_service.startService()
232 # self.main_service.startService()
234
233
235 # This initiates the connection to the controller and calls
234 # This initiates the connection to the controller and calls
236 # register_engine to tell the controller we are ready to do work
235 # register_engine to tell the controller we are ready to do work
237 # self.engine_connector = EngineConnector(self.tub_service)
236 # self.engine_connector = EngineConnector(self.tub_service)
238
237
239 # self.log.info("Using furl file: %s" % self.master_config.Global.furl_file)
238 # self.log.info("Using furl file: %s" % self.master_config.Global.furl_file)
240
239
241 # reactor.callWhenRunning(self.call_connect)
240 # reactor.callWhenRunning(self.call_connect)
242
241
243 # def start_logging(self):
242 # def start_logging(self):
244 # super(IPEngineApp, self).start_logging()
243 # super(IPEngineApp, self).start_logging()
245 # if self.master_config.Global.log_url:
244 # if self.master_config.Global.log_url:
246 # context = self.engine.context
245 # context = self.engine.context
247 # lsock = context.socket(zmq.PUB)
246 # lsock = context.socket(zmq.PUB)
248 # lsock.connect(self.master_config.Global.log_url)
247 # lsock.connect(self.master_config.Global.log_url)
249 # handler = EnginePUBHandler(self.engine, lsock)
248 # handler = EnginePUBHandler(self.engine, lsock)
250 # handler.setLevel(self.log_level)
249 # handler.setLevel(self.log_level)
251 # self.log.addHandler(handler)
250 # self.log.addHandler(handler)
252 #
251 #
253 def init_mpi(self):
252 def init_mpi(self):
254 global mpi
253 global mpi
255 self.mpi = MPI(config=self.config)
254 self.mpi = MPI(config=self.config)
256
255
257 mpi_import_statement = self.mpi.init_script
256 mpi_import_statement = self.mpi.init_script
258 if mpi_import_statement:
257 if mpi_import_statement:
259 try:
258 try:
260 self.log.info("Initializing MPI:")
259 self.log.info("Initializing MPI:")
261 self.log.info(mpi_import_statement)
260 self.log.info(mpi_import_statement)
262 exec mpi_import_statement in globals()
261 exec mpi_import_statement in globals()
263 except:
262 except:
264 mpi = None
263 mpi = None
265 else:
264 else:
266 mpi = None
265 mpi = None
267
266
268 def initialize(self, argv=None):
267 def initialize(self, argv=None):
269 super(IPEngineApp, self).initialize(argv)
268 super(IPEngineApp, self).initialize(argv)
270 self.init_mpi()
269 self.init_mpi()
271 self.init_engine()
270 self.init_engine()
272
271
273 def start(self):
272 def start(self):
274 self.engine.start()
273 self.engine.start()
275 try:
274 try:
276 self.engine.loop.start()
275 self.engine.loop.start()
277 except KeyboardInterrupt:
276 except KeyboardInterrupt:
278 self.log.critical("Engine Interrupted, shutting down...\n")
277 self.log.critical("Engine Interrupted, shutting down...\n")
279
278
280
279
281 def launch_new_instance():
280 def launch_new_instance():
282 """Create and run the IPython engine"""
281 """Create and run the IPython engine"""
283 app = IPEngineApp()
282 app = IPEngineApp()
284 app.initialize()
283 app.initialize()
285 app.start()
284 app.start()
286
285
287
286
288 if __name__ == '__main__':
287 if __name__ == '__main__':
289 launch_new_instance()
288 launch_new_instance()
290
289
General Comments 0
You need to be logged in to leave comments. Login now