##// END OF EJS Templates
untwist PBS, WinHPC Launchers in newparallel
MinRK -
Show More
@@ -1,536 +1,538 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython cluster directory
4 The IPython cluster directory
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 from __future__ import with_statement
18 from __future__ import with_statement
19
19
20 import re
20 import os
21 import os
21 import shutil
22 import shutil
22 import sys
23 import sys
23 import logging
24 import logging
24 import warnings
25 import warnings
25
26
26 from IPython.config.loader import PyFileConfigLoader
27 from IPython.config.loader import PyFileConfigLoader
27 from IPython.core.application import Application, BaseAppConfigLoader
28 from IPython.core.application import Application, BaseAppConfigLoader
28 from IPython.config.configurable import Configurable
29 from IPython.config.configurable import Configurable
29 from IPython.core.crashhandler import CrashHandler
30 from IPython.core.crashhandler import CrashHandler
30 from IPython.core import release
31 from IPython.core import release
31 from IPython.utils.path import (
32 from IPython.utils.path import (
32 get_ipython_package_dir,
33 get_ipython_package_dir,
33 expand_path
34 expand_path
34 )
35 )
35 from IPython.utils.traitlets import Unicode
36 from IPython.utils.traitlets import Unicode
36
37
37 #-----------------------------------------------------------------------------
38 #-----------------------------------------------------------------------------
38 # Module errors
39 # Module errors
39 #-----------------------------------------------------------------------------
40 #-----------------------------------------------------------------------------
40
41
41 class ClusterDirError(Exception):
42 class ClusterDirError(Exception):
42 pass
43 pass
43
44
44
45
45 class PIDFileError(Exception):
46 class PIDFileError(Exception):
46 pass
47 pass
47
48
48
49
49 #-----------------------------------------------------------------------------
50 #-----------------------------------------------------------------------------
50 # Class for managing cluster directories
51 # Class for managing cluster directories
51 #-----------------------------------------------------------------------------
52 #-----------------------------------------------------------------------------
52
53
53 class ClusterDir(Configurable):
54 class ClusterDir(Configurable):
54 """An object to manage the cluster directory and its resources.
55 """An object to manage the cluster directory and its resources.
55
56
56 The cluster directory is used by :command:`ipengine`,
57 The cluster directory is used by :command:`ipengine`,
57 :command:`ipcontroller` and :command:`ipclsuter` to manage the
58 :command:`ipcontroller` and :command:`ipclsuter` to manage the
58 configuration, logging and security of these applications.
59 configuration, logging and security of these applications.
59
60
60 This object knows how to find, create and manage these directories. This
61 This object knows how to find, create and manage these directories. This
61 should be used by any code that want's to handle cluster directories.
62 should be used by any code that want's to handle cluster directories.
62 """
63 """
63
64
64 security_dir_name = Unicode('security')
65 security_dir_name = Unicode('security')
65 log_dir_name = Unicode('log')
66 log_dir_name = Unicode('log')
66 pid_dir_name = Unicode('pid')
67 pid_dir_name = Unicode('pid')
67 security_dir = Unicode(u'')
68 security_dir = Unicode(u'')
68 log_dir = Unicode(u'')
69 log_dir = Unicode(u'')
69 pid_dir = Unicode(u'')
70 pid_dir = Unicode(u'')
70 location = Unicode(u'')
71 location = Unicode(u'')
71
72
72 def __init__(self, location=u''):
73 def __init__(self, location=u''):
73 super(ClusterDir, self).__init__(location=location)
74 super(ClusterDir, self).__init__(location=location)
74
75
75 def _location_changed(self, name, old, new):
76 def _location_changed(self, name, old, new):
76 if not os.path.isdir(new):
77 if not os.path.isdir(new):
77 os.makedirs(new)
78 os.makedirs(new)
78 self.security_dir = os.path.join(new, self.security_dir_name)
79 self.security_dir = os.path.join(new, self.security_dir_name)
79 self.log_dir = os.path.join(new, self.log_dir_name)
80 self.log_dir = os.path.join(new, self.log_dir_name)
80 self.pid_dir = os.path.join(new, self.pid_dir_name)
81 self.pid_dir = os.path.join(new, self.pid_dir_name)
81 self.check_dirs()
82 self.check_dirs()
82
83
83 def _log_dir_changed(self, name, old, new):
84 def _log_dir_changed(self, name, old, new):
84 self.check_log_dir()
85 self.check_log_dir()
85
86
86 def check_log_dir(self):
87 def check_log_dir(self):
87 if not os.path.isdir(self.log_dir):
88 if not os.path.isdir(self.log_dir):
88 os.mkdir(self.log_dir)
89 os.mkdir(self.log_dir)
89
90
90 def _security_dir_changed(self, name, old, new):
91 def _security_dir_changed(self, name, old, new):
91 self.check_security_dir()
92 self.check_security_dir()
92
93
93 def check_security_dir(self):
94 def check_security_dir(self):
94 if not os.path.isdir(self.security_dir):
95 if not os.path.isdir(self.security_dir):
95 os.mkdir(self.security_dir, 0700)
96 os.mkdir(self.security_dir, 0700)
96 os.chmod(self.security_dir, 0700)
97 os.chmod(self.security_dir, 0700)
97
98
98 def _pid_dir_changed(self, name, old, new):
99 def _pid_dir_changed(self, name, old, new):
99 self.check_pid_dir()
100 self.check_pid_dir()
100
101
101 def check_pid_dir(self):
102 def check_pid_dir(self):
102 if not os.path.isdir(self.pid_dir):
103 if not os.path.isdir(self.pid_dir):
103 os.mkdir(self.pid_dir, 0700)
104 os.mkdir(self.pid_dir, 0700)
104 os.chmod(self.pid_dir, 0700)
105 os.chmod(self.pid_dir, 0700)
105
106
106 def check_dirs(self):
107 def check_dirs(self):
107 self.check_security_dir()
108 self.check_security_dir()
108 self.check_log_dir()
109 self.check_log_dir()
109 self.check_pid_dir()
110 self.check_pid_dir()
110
111
111 def load_config_file(self, filename):
112 def load_config_file(self, filename):
112 """Load a config file from the top level of the cluster dir.
113 """Load a config file from the top level of the cluster dir.
113
114
114 Parameters
115 Parameters
115 ----------
116 ----------
116 filename : unicode or str
117 filename : unicode or str
117 The filename only of the config file that must be located in
118 The filename only of the config file that must be located in
118 the top-level of the cluster directory.
119 the top-level of the cluster directory.
119 """
120 """
120 loader = PyFileConfigLoader(filename, self.location)
121 loader = PyFileConfigLoader(filename, self.location)
121 return loader.load_config()
122 return loader.load_config()
122
123
123 def copy_config_file(self, config_file, path=None, overwrite=False):
124 def copy_config_file(self, config_file, path=None, overwrite=False):
124 """Copy a default config file into the active cluster directory.
125 """Copy a default config file into the active cluster directory.
125
126
126 Default configuration files are kept in :mod:`IPython.config.default`.
127 Default configuration files are kept in :mod:`IPython.config.default`.
127 This function moves these from that location to the working cluster
128 This function moves these from that location to the working cluster
128 directory.
129 directory.
129 """
130 """
130 if path is None:
131 if path is None:
131 import IPython.config.default
132 import IPython.config.default
132 path = IPython.config.default.__file__.split(os.path.sep)[:-1]
133 path = IPython.config.default.__file__.split(os.path.sep)[:-1]
133 path = os.path.sep.join(path)
134 path = os.path.sep.join(path)
134 src = os.path.join(path, config_file)
135 src = os.path.join(path, config_file)
135 dst = os.path.join(self.location, config_file)
136 dst = os.path.join(self.location, config_file)
136 if not os.path.isfile(dst) or overwrite:
137 if not os.path.isfile(dst) or overwrite:
137 shutil.copy(src, dst)
138 shutil.copy(src, dst)
138
139
139 def copy_all_config_files(self, path=None, overwrite=False):
140 def copy_all_config_files(self, path=None, overwrite=False):
140 """Copy all config files into the active cluster directory."""
141 """Copy all config files into the active cluster directory."""
141 for f in [u'ipcontroller_config.py', u'ipengine_config.py',
142 for f in [u'ipcontroller_config.py', u'ipengine_config.py',
142 u'ipcluster_config.py']:
143 u'ipcluster_config.py']:
143 self.copy_config_file(f, path=path, overwrite=overwrite)
144 self.copy_config_file(f, path=path, overwrite=overwrite)
144
145
145 @classmethod
146 @classmethod
146 def create_cluster_dir(csl, cluster_dir):
147 def create_cluster_dir(csl, cluster_dir):
147 """Create a new cluster directory given a full path.
148 """Create a new cluster directory given a full path.
148
149
149 Parameters
150 Parameters
150 ----------
151 ----------
151 cluster_dir : str
152 cluster_dir : str
152 The full path to the cluster directory. If it does exist, it will
153 The full path to the cluster directory. If it does exist, it will
153 be used. If not, it will be created.
154 be used. If not, it will be created.
154 """
155 """
155 return ClusterDir(location=cluster_dir)
156 return ClusterDir(location=cluster_dir)
156
157
157 @classmethod
158 @classmethod
158 def create_cluster_dir_by_profile(cls, path, profile=u'default'):
159 def create_cluster_dir_by_profile(cls, path, profile=u'default'):
159 """Create a cluster dir by profile name and path.
160 """Create a cluster dir by profile name and path.
160
161
161 Parameters
162 Parameters
162 ----------
163 ----------
163 path : str
164 path : str
164 The path (directory) to put the cluster directory in.
165 The path (directory) to put the cluster directory in.
165 profile : str
166 profile : str
166 The name of the profile. The name of the cluster directory will
167 The name of the profile. The name of the cluster directory will
167 be "clusterz_<profile>".
168 be "clusterz_<profile>".
168 """
169 """
169 if not os.path.isdir(path):
170 if not os.path.isdir(path):
170 raise ClusterDirError('Directory not found: %s' % path)
171 raise ClusterDirError('Directory not found: %s' % path)
171 cluster_dir = os.path.join(path, u'clusterz_' + profile)
172 cluster_dir = os.path.join(path, u'clusterz_' + profile)
172 return ClusterDir(location=cluster_dir)
173 return ClusterDir(location=cluster_dir)
173
174
174 @classmethod
175 @classmethod
175 def find_cluster_dir_by_profile(cls, ipython_dir, profile=u'default'):
176 def find_cluster_dir_by_profile(cls, ipython_dir, profile=u'default'):
176 """Find an existing cluster dir by profile name, return its ClusterDir.
177 """Find an existing cluster dir by profile name, return its ClusterDir.
177
178
178 This searches through a sequence of paths for a cluster dir. If it
179 This searches through a sequence of paths for a cluster dir. If it
179 is not found, a :class:`ClusterDirError` exception will be raised.
180 is not found, a :class:`ClusterDirError` exception will be raised.
180
181
181 The search path algorithm is:
182 The search path algorithm is:
182 1. ``os.getcwd()``
183 1. ``os.getcwd()``
183 2. ``ipython_dir``
184 2. ``ipython_dir``
184 3. The directories found in the ":" separated
185 3. The directories found in the ":" separated
185 :env:`IPCLUSTER_DIR_PATH` environment variable.
186 :env:`IPCLUSTER_DIR_PATH` environment variable.
186
187
187 Parameters
188 Parameters
188 ----------
189 ----------
189 ipython_dir : unicode or str
190 ipython_dir : unicode or str
190 The IPython directory to use.
191 The IPython directory to use.
191 profile : unicode or str
192 profile : unicode or str
192 The name of the profile. The name of the cluster directory
193 The name of the profile. The name of the cluster directory
193 will be "clusterz_<profile>".
194 will be "clusterz_<profile>".
194 """
195 """
195 dirname = u'clusterz_' + profile
196 dirname = u'clusterz_' + profile
196 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
197 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
197 if cluster_dir_paths:
198 if cluster_dir_paths:
198 cluster_dir_paths = cluster_dir_paths.split(':')
199 cluster_dir_paths = cluster_dir_paths.split(':')
199 else:
200 else:
200 cluster_dir_paths = []
201 cluster_dir_paths = []
201 paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
202 paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
202 for p in paths:
203 for p in paths:
203 cluster_dir = os.path.join(p, dirname)
204 cluster_dir = os.path.join(p, dirname)
204 if os.path.isdir(cluster_dir):
205 if os.path.isdir(cluster_dir):
205 return ClusterDir(location=cluster_dir)
206 return ClusterDir(location=cluster_dir)
206 else:
207 else:
207 raise ClusterDirError('Cluster directory not found in paths: %s' % dirname)
208 raise ClusterDirError('Cluster directory not found in paths: %s' % dirname)
208
209
209 @classmethod
210 @classmethod
210 def find_cluster_dir(cls, cluster_dir):
211 def find_cluster_dir(cls, cluster_dir):
211 """Find/create a cluster dir and return its ClusterDir.
212 """Find/create a cluster dir and return its ClusterDir.
212
213
213 This will create the cluster directory if it doesn't exist.
214 This will create the cluster directory if it doesn't exist.
214
215
215 Parameters
216 Parameters
216 ----------
217 ----------
217 cluster_dir : unicode or str
218 cluster_dir : unicode or str
218 The path of the cluster directory. This is expanded using
219 The path of the cluster directory. This is expanded using
219 :func:`IPython.utils.genutils.expand_path`.
220 :func:`IPython.utils.genutils.expand_path`.
220 """
221 """
221 cluster_dir = expand_path(cluster_dir)
222 cluster_dir = expand_path(cluster_dir)
222 if not os.path.isdir(cluster_dir):
223 if not os.path.isdir(cluster_dir):
223 raise ClusterDirError('Cluster directory not found: %s' % cluster_dir)
224 raise ClusterDirError('Cluster directory not found: %s' % cluster_dir)
224 return ClusterDir(location=cluster_dir)
225 return ClusterDir(location=cluster_dir)
225
226
226
227
227 #-----------------------------------------------------------------------------
228 #-----------------------------------------------------------------------------
228 # Command line options
229 # Command line options
229 #-----------------------------------------------------------------------------
230 #-----------------------------------------------------------------------------
230
231
231 class ClusterDirConfigLoader(BaseAppConfigLoader):
232 class ClusterDirConfigLoader(BaseAppConfigLoader):
232
233
233 def _add_cluster_profile(self, parser):
234 def _add_cluster_profile(self, parser):
234 paa = parser.add_argument
235 paa = parser.add_argument
235 paa('-p', '--profile',
236 paa('-p', '--profile',
236 dest='Global.profile',type=unicode,
237 dest='Global.profile',type=unicode,
237 help=
238 help=
238 """The string name of the profile to be used. This determines the name
239 """The string name of the profile to be used. This determines the name
239 of the cluster dir as: cluster_<profile>. The default profile is named
240 of the cluster dir as: cluster_<profile>. The default profile is named
240 'default'. The cluster directory is resolve this way if the
241 'default'. The cluster directory is resolve this way if the
241 --cluster-dir option is not used.""",
242 --cluster-dir option is not used.""",
242 metavar='Global.profile')
243 metavar='Global.profile')
243
244
244 def _add_cluster_dir(self, parser):
245 def _add_cluster_dir(self, parser):
245 paa = parser.add_argument
246 paa = parser.add_argument
246 paa('--cluster-dir',
247 paa('--cluster-dir',
247 dest='Global.cluster_dir',type=unicode,
248 dest='Global.cluster_dir',type=unicode,
248 help="""Set the cluster dir. This overrides the logic used by the
249 help="""Set the cluster dir. This overrides the logic used by the
249 --profile option.""",
250 --profile option.""",
250 metavar='Global.cluster_dir')
251 metavar='Global.cluster_dir')
251
252
252 def _add_work_dir(self, parser):
253 def _add_work_dir(self, parser):
253 paa = parser.add_argument
254 paa = parser.add_argument
254 paa('--work-dir',
255 paa('--work-dir',
255 dest='Global.work_dir',type=unicode,
256 dest='Global.work_dir',type=unicode,
256 help='Set the working dir for the process.',
257 help='Set the working dir for the process.',
257 metavar='Global.work_dir')
258 metavar='Global.work_dir')
258
259
259 def _add_clean_logs(self, parser):
260 def _add_clean_logs(self, parser):
260 paa = parser.add_argument
261 paa = parser.add_argument
261 paa('--clean-logs',
262 paa('--clean-logs',
262 dest='Global.clean_logs', action='store_true',
263 dest='Global.clean_logs', action='store_true',
263 help='Delete old log flies before starting.')
264 help='Delete old log flies before starting.')
264
265
265 def _add_no_clean_logs(self, parser):
266 def _add_no_clean_logs(self, parser):
266 paa = parser.add_argument
267 paa = parser.add_argument
267 paa('--no-clean-logs',
268 paa('--no-clean-logs',
268 dest='Global.clean_logs', action='store_false',
269 dest='Global.clean_logs', action='store_false',
269 help="Don't Delete old log flies before starting.")
270 help="Don't Delete old log flies before starting.")
270
271
271 def _add_arguments(self):
272 def _add_arguments(self):
272 super(ClusterDirConfigLoader, self)._add_arguments()
273 super(ClusterDirConfigLoader, self)._add_arguments()
273 self._add_cluster_profile(self.parser)
274 self._add_cluster_profile(self.parser)
274 self._add_cluster_dir(self.parser)
275 self._add_cluster_dir(self.parser)
275 self._add_work_dir(self.parser)
276 self._add_work_dir(self.parser)
276 self._add_clean_logs(self.parser)
277 self._add_clean_logs(self.parser)
277 self._add_no_clean_logs(self.parser)
278 self._add_no_clean_logs(self.parser)
278
279
279
280
280 #-----------------------------------------------------------------------------
281 #-----------------------------------------------------------------------------
281 # Crash handler for this application
282 # Crash handler for this application
282 #-----------------------------------------------------------------------------
283 #-----------------------------------------------------------------------------
283
284
284
285
285 _message_template = """\
286 _message_template = """\
286 Oops, $self.app_name crashed. We do our best to make it stable, but...
287 Oops, $self.app_name crashed. We do our best to make it stable, but...
287
288
288 A crash report was automatically generated with the following information:
289 A crash report was automatically generated with the following information:
289 - A verbatim copy of the crash traceback.
290 - A verbatim copy of the crash traceback.
290 - Data on your current $self.app_name configuration.
291 - Data on your current $self.app_name configuration.
291
292
292 It was left in the file named:
293 It was left in the file named:
293 \t'$self.crash_report_fname'
294 \t'$self.crash_report_fname'
294 If you can email this file to the developers, the information in it will help
295 If you can email this file to the developers, the information in it will help
295 them in understanding and correcting the problem.
296 them in understanding and correcting the problem.
296
297
297 You can mail it to: $self.contact_name at $self.contact_email
298 You can mail it to: $self.contact_name at $self.contact_email
298 with the subject '$self.app_name Crash Report'.
299 with the subject '$self.app_name Crash Report'.
299
300
300 If you want to do it now, the following command will work (under Unix):
301 If you want to do it now, the following command will work (under Unix):
301 mail -s '$self.app_name Crash Report' $self.contact_email < $self.crash_report_fname
302 mail -s '$self.app_name Crash Report' $self.contact_email < $self.crash_report_fname
302
303
303 To ensure accurate tracking of this issue, please file a report about it at:
304 To ensure accurate tracking of this issue, please file a report about it at:
304 $self.bug_tracker
305 $self.bug_tracker
305 """
306 """
306
307
307 class ClusterDirCrashHandler(CrashHandler):
308 class ClusterDirCrashHandler(CrashHandler):
308 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
309 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
309
310
310 message_template = _message_template
311 message_template = _message_template
311
312
312 def __init__(self, app):
313 def __init__(self, app):
313 contact_name = release.authors['Brian'][0]
314 contact_name = release.authors['Brian'][0]
314 contact_email = release.authors['Brian'][1]
315 contact_email = release.authors['Brian'][1]
315 bug_tracker = 'http://github.com/ipython/ipython/issues'
316 bug_tracker = 'http://github.com/ipython/ipython/issues'
316 super(ClusterDirCrashHandler,self).__init__(
317 super(ClusterDirCrashHandler,self).__init__(
317 app, contact_name, contact_email, bug_tracker
318 app, contact_name, contact_email, bug_tracker
318 )
319 )
319
320
320
321
321 #-----------------------------------------------------------------------------
322 #-----------------------------------------------------------------------------
322 # Main application
323 # Main application
323 #-----------------------------------------------------------------------------
324 #-----------------------------------------------------------------------------
324
325
325 class ApplicationWithClusterDir(Application):
326 class ApplicationWithClusterDir(Application):
326 """An application that puts everything into a cluster directory.
327 """An application that puts everything into a cluster directory.
327
328
328 Instead of looking for things in the ipython_dir, this type of application
329 Instead of looking for things in the ipython_dir, this type of application
329 will use its own private directory called the "cluster directory"
330 will use its own private directory called the "cluster directory"
330 for things like config files, log files, etc.
331 for things like config files, log files, etc.
331
332
332 The cluster directory is resolved as follows:
333 The cluster directory is resolved as follows:
333
334
334 * If the ``--cluster-dir`` option is given, it is used.
335 * If the ``--cluster-dir`` option is given, it is used.
335 * If ``--cluster-dir`` is not given, the application directory is
336 * If ``--cluster-dir`` is not given, the application directory is
336 resolve using the profile name as ``cluster_<profile>``. The search
337 resolve using the profile name as ``cluster_<profile>``. The search
337 path for this directory is then i) cwd if it is found there
338 path for this directory is then i) cwd if it is found there
338 and ii) in ipython_dir otherwise.
339 and ii) in ipython_dir otherwise.
339
340
340 The config file for the application is to be put in the cluster
341 The config file for the application is to be put in the cluster
341 dir and named the value of the ``config_file_name`` class attribute.
342 dir and named the value of the ``config_file_name`` class attribute.
342 """
343 """
343
344
344 command_line_loader = ClusterDirConfigLoader
345 command_line_loader = ClusterDirConfigLoader
345 crash_handler_class = ClusterDirCrashHandler
346 crash_handler_class = ClusterDirCrashHandler
346 auto_create_cluster_dir = True
347 auto_create_cluster_dir = True
347 # temporarily override default_log_level to DEBUG
348 # temporarily override default_log_level to DEBUG
348 default_log_level = logging.DEBUG
349 default_log_level = logging.DEBUG
349
350
350 def create_default_config(self):
351 def create_default_config(self):
351 super(ApplicationWithClusterDir, self).create_default_config()
352 super(ApplicationWithClusterDir, self).create_default_config()
352 self.default_config.Global.profile = u'default'
353 self.default_config.Global.profile = u'default'
353 self.default_config.Global.cluster_dir = u''
354 self.default_config.Global.cluster_dir = u''
354 self.default_config.Global.work_dir = os.getcwd()
355 self.default_config.Global.work_dir = os.getcwd()
355 self.default_config.Global.log_to_file = False
356 self.default_config.Global.log_to_file = False
356 self.default_config.Global.log_url = None
357 self.default_config.Global.log_url = None
357 self.default_config.Global.clean_logs = False
358 self.default_config.Global.clean_logs = False
358
359
359 def find_resources(self):
360 def find_resources(self):
360 """This resolves the cluster directory.
361 """This resolves the cluster directory.
361
362
362 This tries to find the cluster directory and if successful, it will
363 This tries to find the cluster directory and if successful, it will
363 have done:
364 have done:
364 * Sets ``self.cluster_dir_obj`` to the :class:`ClusterDir` object for
365 * Sets ``self.cluster_dir_obj`` to the :class:`ClusterDir` object for
365 the application.
366 the application.
366 * Sets ``self.cluster_dir`` attribute of the application and config
367 * Sets ``self.cluster_dir`` attribute of the application and config
367 objects.
368 objects.
368
369
369 The algorithm used for this is as follows:
370 The algorithm used for this is as follows:
370 1. Try ``Global.cluster_dir``.
371 1. Try ``Global.cluster_dir``.
371 2. Try using ``Global.profile``.
372 2. Try using ``Global.profile``.
372 3. If both of these fail and ``self.auto_create_cluster_dir`` is
373 3. If both of these fail and ``self.auto_create_cluster_dir`` is
373 ``True``, then create the new cluster dir in the IPython directory.
374 ``True``, then create the new cluster dir in the IPython directory.
374 4. If all fails, then raise :class:`ClusterDirError`.
375 4. If all fails, then raise :class:`ClusterDirError`.
375 """
376 """
376
377
377 try:
378 try:
378 cluster_dir = self.command_line_config.Global.cluster_dir
379 cluster_dir = self.command_line_config.Global.cluster_dir
379 except AttributeError:
380 except AttributeError:
380 cluster_dir = self.default_config.Global.cluster_dir
381 cluster_dir = self.default_config.Global.cluster_dir
381 cluster_dir = expand_path(cluster_dir)
382 cluster_dir = expand_path(cluster_dir)
382 try:
383 try:
383 self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
384 self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
384 except ClusterDirError:
385 except ClusterDirError:
385 pass
386 pass
386 else:
387 else:
387 self.log.info('Using existing cluster dir: %s' % \
388 self.log.info('Using existing cluster dir: %s' % \
388 self.cluster_dir_obj.location
389 self.cluster_dir_obj.location
389 )
390 )
390 self.finish_cluster_dir()
391 self.finish_cluster_dir()
391 return
392 return
392
393
393 try:
394 try:
394 self.profile = self.command_line_config.Global.profile
395 self.profile = self.command_line_config.Global.profile
395 except AttributeError:
396 except AttributeError:
396 self.profile = self.default_config.Global.profile
397 self.profile = self.default_config.Global.profile
397 try:
398 try:
398 self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
399 self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
399 self.ipython_dir, self.profile)
400 self.ipython_dir, self.profile)
400 except ClusterDirError:
401 except ClusterDirError:
401 pass
402 pass
402 else:
403 else:
403 self.log.info('Using existing cluster dir: %s' % \
404 self.log.info('Using existing cluster dir: %s' % \
404 self.cluster_dir_obj.location
405 self.cluster_dir_obj.location
405 )
406 )
406 self.finish_cluster_dir()
407 self.finish_cluster_dir()
407 return
408 return
408
409
409 if self.auto_create_cluster_dir:
410 if self.auto_create_cluster_dir:
410 self.cluster_dir_obj = ClusterDir.create_cluster_dir_by_profile(
411 self.cluster_dir_obj = ClusterDir.create_cluster_dir_by_profile(
411 self.ipython_dir, self.profile
412 self.ipython_dir, self.profile
412 )
413 )
413 self.log.info('Creating new cluster dir: %s' % \
414 self.log.info('Creating new cluster dir: %s' % \
414 self.cluster_dir_obj.location
415 self.cluster_dir_obj.location
415 )
416 )
416 self.finish_cluster_dir()
417 self.finish_cluster_dir()
417 else:
418 else:
418 raise ClusterDirError('Could not find a valid cluster directory.')
419 raise ClusterDirError('Could not find a valid cluster directory.')
419
420
420 def finish_cluster_dir(self):
421 def finish_cluster_dir(self):
421 # Set the cluster directory
422 # Set the cluster directory
422 self.cluster_dir = self.cluster_dir_obj.location
423 self.cluster_dir = self.cluster_dir_obj.location
423
424
424 # These have to be set because they could be different from the one
425 # These have to be set because they could be different from the one
425 # that we just computed. Because command line has the highest
426 # that we just computed. Because command line has the highest
426 # priority, this will always end up in the master_config.
427 # priority, this will always end up in the master_config.
427 self.default_config.Global.cluster_dir = self.cluster_dir
428 self.default_config.Global.cluster_dir = self.cluster_dir
428 self.command_line_config.Global.cluster_dir = self.cluster_dir
429 self.command_line_config.Global.cluster_dir = self.cluster_dir
429
430
430 def find_config_file_name(self):
431 def find_config_file_name(self):
431 """Find the config file name for this application."""
432 """Find the config file name for this application."""
432 # For this type of Application it should be set as a class attribute.
433 # For this type of Application it should be set as a class attribute.
433 if not hasattr(self, 'default_config_file_name'):
434 if not hasattr(self, 'default_config_file_name'):
434 self.log.critical("No config filename found")
435 self.log.critical("No config filename found")
435 else:
436 else:
436 self.config_file_name = self.default_config_file_name
437 self.config_file_name = self.default_config_file_name
437
438
438 def find_config_file_paths(self):
439 def find_config_file_paths(self):
439 # Set the search path to to the cluster directory. We should NOT
440 # Set the search path to to the cluster directory. We should NOT
440 # include IPython.config.default here as the default config files
441 # include IPython.config.default here as the default config files
441 # are ALWAYS automatically moved to the cluster directory.
442 # are ALWAYS automatically moved to the cluster directory.
442 conf_dir = os.path.join(get_ipython_package_dir(), 'config', 'default')
443 conf_dir = os.path.join(get_ipython_package_dir(), 'config', 'default')
443 self.config_file_paths = (self.cluster_dir,)
444 self.config_file_paths = (self.cluster_dir,)
444
445
445 def pre_construct(self):
446 def pre_construct(self):
446 # The log and security dirs were set earlier, but here we put them
447 # The log and security dirs were set earlier, but here we put them
447 # into the config and log them.
448 # into the config and log them.
448 config = self.master_config
449 config = self.master_config
449 sdir = self.cluster_dir_obj.security_dir
450 sdir = self.cluster_dir_obj.security_dir
450 self.security_dir = config.Global.security_dir = sdir
451 self.security_dir = config.Global.security_dir = sdir
451 ldir = self.cluster_dir_obj.log_dir
452 ldir = self.cluster_dir_obj.log_dir
452 self.log_dir = config.Global.log_dir = ldir
453 self.log_dir = config.Global.log_dir = ldir
453 pdir = self.cluster_dir_obj.pid_dir
454 pdir = self.cluster_dir_obj.pid_dir
454 self.pid_dir = config.Global.pid_dir = pdir
455 self.pid_dir = config.Global.pid_dir = pdir
455 self.log.info("Cluster directory set to: %s" % self.cluster_dir)
456 self.log.info("Cluster directory set to: %s" % self.cluster_dir)
456 config.Global.work_dir = unicode(expand_path(config.Global.work_dir))
457 config.Global.work_dir = unicode(expand_path(config.Global.work_dir))
457 # Change to the working directory. We do this just before construct
458 # Change to the working directory. We do this just before construct
458 # is called so all the components there have the right working dir.
459 # is called so all the components there have the right working dir.
459 self.to_work_dir()
460 self.to_work_dir()
460
461
461 def to_work_dir(self):
462 def to_work_dir(self):
462 wd = self.master_config.Global.work_dir
463 wd = self.master_config.Global.work_dir
463 if unicode(wd) != unicode(os.getcwd()):
464 if unicode(wd) != unicode(os.getcwd()):
464 os.chdir(wd)
465 os.chdir(wd)
465 self.log.info("Changing to working dir: %s" % wd)
466 self.log.info("Changing to working dir: %s" % wd)
466
467
467 def start_logging(self):
468 def start_logging(self):
468 # Remove old log files
469 # Remove old log files
469 if self.master_config.Global.clean_logs:
470 if self.master_config.Global.clean_logs:
470 log_dir = self.master_config.Global.log_dir
471 log_dir = self.master_config.Global.log_dir
471 for f in os.listdir(log_dir):
472 for f in os.listdir(log_dir):
472 if f.startswith(self.name + u'-') and f.endswith('.log'):
473 if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
474 # if f.startswith(self.name + u'-') and f.endswith('.log'):
473 os.remove(os.path.join(log_dir, f))
475 os.remove(os.path.join(log_dir, f))
474 # Start logging to the new log file
476 # Start logging to the new log file
475 if self.master_config.Global.log_to_file:
477 if self.master_config.Global.log_to_file:
476 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
478 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
477 logfile = os.path.join(self.log_dir, log_filename)
479 logfile = os.path.join(self.log_dir, log_filename)
478 open_log_file = open(logfile, 'w')
480 open_log_file = open(logfile, 'w')
479 elif self.master_config.Global.log_url:
481 elif self.master_config.Global.log_url:
480 open_log_file = None
482 open_log_file = None
481 else:
483 else:
482 open_log_file = sys.stdout
484 open_log_file = sys.stdout
483 if open_log_file is not None:
485 if open_log_file is not None:
484 self.log.removeHandler(self._log_handler)
486 self.log.removeHandler(self._log_handler)
485 self._log_handler = logging.StreamHandler(open_log_file)
487 self._log_handler = logging.StreamHandler(open_log_file)
486 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
488 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
487 self._log_handler.setFormatter(self._log_formatter)
489 self._log_handler.setFormatter(self._log_formatter)
488 self.log.addHandler(self._log_handler)
490 self.log.addHandler(self._log_handler)
489 # log.startLogging(open_log_file)
491 # log.startLogging(open_log_file)
490
492
491 def write_pid_file(self, overwrite=False):
493 def write_pid_file(self, overwrite=False):
492 """Create a .pid file in the pid_dir with my pid.
494 """Create a .pid file in the pid_dir with my pid.
493
495
494 This must be called after pre_construct, which sets `self.pid_dir`.
496 This must be called after pre_construct, which sets `self.pid_dir`.
495 This raises :exc:`PIDFileError` if the pid file exists already.
497 This raises :exc:`PIDFileError` if the pid file exists already.
496 """
498 """
497 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
499 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
498 if os.path.isfile(pid_file):
500 if os.path.isfile(pid_file):
499 pid = self.get_pid_from_file()
501 pid = self.get_pid_from_file()
500 if not overwrite:
502 if not overwrite:
501 raise PIDFileError(
503 raise PIDFileError(
502 'The pid file [%s] already exists. \nThis could mean that this '
504 'The pid file [%s] already exists. \nThis could mean that this '
503 'server is already running with [pid=%s].' % (pid_file, pid)
505 'server is already running with [pid=%s].' % (pid_file, pid)
504 )
506 )
505 with open(pid_file, 'w') as f:
507 with open(pid_file, 'w') as f:
506 self.log.info("Creating pid file: %s" % pid_file)
508 self.log.info("Creating pid file: %s" % pid_file)
507 f.write(repr(os.getpid())+'\n')
509 f.write(repr(os.getpid())+'\n')
508
510
509 def remove_pid_file(self):
511 def remove_pid_file(self):
510 """Remove the pid file.
512 """Remove the pid file.
511
513
512 This should be called at shutdown by registering a callback with
514 This should be called at shutdown by registering a callback with
513 :func:`reactor.addSystemEventTrigger`. This needs to return
515 :func:`reactor.addSystemEventTrigger`. This needs to return
514 ``None``.
516 ``None``.
515 """
517 """
516 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
518 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
517 if os.path.isfile(pid_file):
519 if os.path.isfile(pid_file):
518 try:
520 try:
519 self.log.info("Removing pid file: %s" % pid_file)
521 self.log.info("Removing pid file: %s" % pid_file)
520 os.remove(pid_file)
522 os.remove(pid_file)
521 except:
523 except:
522 self.log.warn("Error removing the pid file: %s" % pid_file)
524 self.log.warn("Error removing the pid file: %s" % pid_file)
523
525
524 def get_pid_from_file(self):
526 def get_pid_from_file(self):
525 """Get the pid from the pid file.
527 """Get the pid from the pid file.
526
528
527 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
529 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
528 """
530 """
529 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
531 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
530 if os.path.isfile(pid_file):
532 if os.path.isfile(pid_file):
531 with open(pid_file, 'r') as f:
533 with open(pid_file, 'r') as f:
532 pid = int(f.read().strip())
534 pid = int(f.read().strip())
533 return pid
535 return pid
534 else:
536 else:
535 raise PIDFileError('pid file not found: %s' % pid_file)
537 raise PIDFileError('pid file not found: %s' % pid_file)
536
538
@@ -1,503 +1,500 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The ipcluster application.
4 The ipcluster application.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import re
18 import logging
19 import logging
19 import os
20 import os
20 import signal
21 import signal
21 import logging
22 import logging
22
23
23 from zmq.eventloop import ioloop
24 from zmq.eventloop import ioloop
24
25
25 from IPython.external.argparse import ArgumentParser, SUPPRESS
26 from IPython.external.argparse import ArgumentParser, SUPPRESS
26 from IPython.utils.importstring import import_item
27 from IPython.utils.importstring import import_item
27 from IPython.zmq.parallel.clusterdir import (
28 from IPython.zmq.parallel.clusterdir import (
28 ApplicationWithClusterDir, ClusterDirConfigLoader,
29 ApplicationWithClusterDir, ClusterDirConfigLoader,
29 ClusterDirError, PIDFileError
30 ClusterDirError, PIDFileError
30 )
31 )
31
32
32
33
33 #-----------------------------------------------------------------------------
34 #-----------------------------------------------------------------------------
34 # Module level variables
35 # Module level variables
35 #-----------------------------------------------------------------------------
36 #-----------------------------------------------------------------------------
36
37
37
38
38 default_config_file_name = u'ipcluster_config.py'
39 default_config_file_name = u'ipcluster_config.py'
39
40
40
41
41 _description = """\
42 _description = """\
42 Start an IPython cluster for parallel computing.\n\n
43 Start an IPython cluster for parallel computing.\n\n
43
44
44 An IPython cluster consists of 1 controller and 1 or more engines.
45 An IPython cluster consists of 1 controller and 1 or more engines.
45 This command automates the startup of these processes using a wide
46 This command automates the startup of these processes using a wide
46 range of startup methods (SSH, local processes, PBS, mpiexec,
47 range of startup methods (SSH, local processes, PBS, mpiexec,
47 Windows HPC Server 2008). To start a cluster with 4 engines on your
48 Windows HPC Server 2008). To start a cluster with 4 engines on your
48 local host simply do 'ipcluster start -n 4'. For more complex usage
49 local host simply do 'ipcluster start -n 4'. For more complex usage
49 you will typically do 'ipcluster create -p mycluster', then edit
50 you will typically do 'ipcluster create -p mycluster', then edit
50 configuration files, followed by 'ipcluster start -p mycluster -n 4'.
51 configuration files, followed by 'ipcluster start -p mycluster -n 4'.
51 """
52 """
52
53
53
54
54 # Exit codes for ipcluster
55 # Exit codes for ipcluster
55
56
56 # This will be the exit code if the ipcluster appears to be running because
57 # This will be the exit code if the ipcluster appears to be running because
57 # a .pid file exists
58 # a .pid file exists
58 ALREADY_STARTED = 10
59 ALREADY_STARTED = 10
59
60
60
61
61 # This will be the exit code if ipcluster stop is run, but there is not .pid
62 # This will be the exit code if ipcluster stop is run, but there is not .pid
62 # file to be found.
63 # file to be found.
63 ALREADY_STOPPED = 11
64 ALREADY_STOPPED = 11
64
65
65
66
66 #-----------------------------------------------------------------------------
67 #-----------------------------------------------------------------------------
67 # Command line options
68 # Command line options
68 #-----------------------------------------------------------------------------
69 #-----------------------------------------------------------------------------
69
70
70
71
71 class IPClusterAppConfigLoader(ClusterDirConfigLoader):
72 class IPClusterAppConfigLoader(ClusterDirConfigLoader):
72
73
73 def _add_arguments(self):
74 def _add_arguments(self):
74 # Don't call ClusterDirConfigLoader._add_arguments as we don't want
75 # Don't call ClusterDirConfigLoader._add_arguments as we don't want
75 # its defaults on self.parser. Instead, we will put those on
76 # its defaults on self.parser. Instead, we will put those on
76 # default options on our subparsers.
77 # default options on our subparsers.
77
78
78 # This has all the common options that all subcommands use
79 # This has all the common options that all subcommands use
79 parent_parser1 = ArgumentParser(
80 parent_parser1 = ArgumentParser(
80 add_help=False,
81 add_help=False,
81 argument_default=SUPPRESS
82 argument_default=SUPPRESS
82 )
83 )
83 self._add_ipython_dir(parent_parser1)
84 self._add_ipython_dir(parent_parser1)
84 self._add_log_level(parent_parser1)
85 self._add_log_level(parent_parser1)
85
86
86 # This has all the common options that other subcommands use
87 # This has all the common options that other subcommands use
87 parent_parser2 = ArgumentParser(
88 parent_parser2 = ArgumentParser(
88 add_help=False,
89 add_help=False,
89 argument_default=SUPPRESS
90 argument_default=SUPPRESS
90 )
91 )
91 self._add_cluster_profile(parent_parser2)
92 self._add_cluster_profile(parent_parser2)
92 self._add_cluster_dir(parent_parser2)
93 self._add_cluster_dir(parent_parser2)
93 self._add_work_dir(parent_parser2)
94 self._add_work_dir(parent_parser2)
94 paa = parent_parser2.add_argument
95 paa = parent_parser2.add_argument
95 paa('--log-to-file',
96 paa('--log-to-file',
96 action='store_true', dest='Global.log_to_file',
97 action='store_true', dest='Global.log_to_file',
97 help='Log to a file in the log directory (default is stdout)')
98 help='Log to a file in the log directory (default is stdout)')
98
99
99 # Create the object used to create the subparsers.
100 # Create the object used to create the subparsers.
100 subparsers = self.parser.add_subparsers(
101 subparsers = self.parser.add_subparsers(
101 dest='Global.subcommand',
102 dest='Global.subcommand',
102 title='ipcluster subcommands',
103 title='ipcluster subcommands',
103 description=
104 description=
104 """ipcluster has a variety of subcommands. The general way of
105 """ipcluster has a variety of subcommands. The general way of
105 running ipcluster is 'ipcluster <cmd> [options]'. To get help
106 running ipcluster is 'ipcluster <cmd> [options]'. To get help
106 on a particular subcommand do 'ipcluster <cmd> -h'."""
107 on a particular subcommand do 'ipcluster <cmd> -h'."""
107 # help="For more help, type 'ipcluster <cmd> -h'",
108 # help="For more help, type 'ipcluster <cmd> -h'",
108 )
109 )
109
110
110 # The "list" subcommand parser
111 # The "list" subcommand parser
111 parser_list = subparsers.add_parser(
112 parser_list = subparsers.add_parser(
112 'list',
113 'list',
113 parents=[parent_parser1],
114 parents=[parent_parser1],
114 argument_default=SUPPRESS,
115 argument_default=SUPPRESS,
115 help="List all clusters in cwd and ipython_dir.",
116 help="List all clusters in cwd and ipython_dir.",
116 description=
117 description=
117 """List all available clusters, by cluster directory, that can
118 """List all available clusters, by cluster directory, that can
118 be found in the current working directly or in the ipython
119 be found in the current working directly or in the ipython
119 directory. Cluster directories are named using the convention
120 directory. Cluster directories are named using the convention
120 'cluster_<profile>'."""
121 'cluster_<profile>'."""
121 )
122 )
122
123
123 # The "create" subcommand parser
124 # The "create" subcommand parser
124 parser_create = subparsers.add_parser(
125 parser_create = subparsers.add_parser(
125 'create',
126 'create',
126 parents=[parent_parser1, parent_parser2],
127 parents=[parent_parser1, parent_parser2],
127 argument_default=SUPPRESS,
128 argument_default=SUPPRESS,
128 help="Create a new cluster directory.",
129 help="Create a new cluster directory.",
129 description=
130 description=
130 """Create an ipython cluster directory by its profile name or
131 """Create an ipython cluster directory by its profile name or
131 cluster directory path. Cluster directories contain
132 cluster directory path. Cluster directories contain
132 configuration, log and security related files and are named
133 configuration, log and security related files and are named
133 using the convention 'cluster_<profile>'. By default they are
134 using the convention 'cluster_<profile>'. By default they are
134 located in your ipython directory. Once created, you will
135 located in your ipython directory. Once created, you will
135 probably need to edit the configuration files in the cluster
136 probably need to edit the configuration files in the cluster
136 directory to configure your cluster. Most users will create a
137 directory to configure your cluster. Most users will create a
137 cluster directory by profile name,
138 cluster directory by profile name,
138 'ipcluster create -p mycluster', which will put the directory
139 'ipcluster create -p mycluster', which will put the directory
139 in '<ipython_dir>/cluster_mycluster'.
140 in '<ipython_dir>/cluster_mycluster'.
140 """
141 """
141 )
142 )
142 paa = parser_create.add_argument
143 paa = parser_create.add_argument
143 paa('--reset-config',
144 paa('--reset-config',
144 dest='Global.reset_config', action='store_true',
145 dest='Global.reset_config', action='store_true',
145 help=
146 help=
146 """Recopy the default config files to the cluster directory.
147 """Recopy the default config files to the cluster directory.
147 You will loose any modifications you have made to these files.""")
148 You will loose any modifications you have made to these files.""")
148
149
149 # The "start" subcommand parser
150 # The "start" subcommand parser
150 parser_start = subparsers.add_parser(
151 parser_start = subparsers.add_parser(
151 'start',
152 'start',
152 parents=[parent_parser1, parent_parser2],
153 parents=[parent_parser1, parent_parser2],
153 argument_default=SUPPRESS,
154 argument_default=SUPPRESS,
154 help="Start a cluster.",
155 help="Start a cluster.",
155 description=
156 description=
156 """Start an ipython cluster by its profile name or cluster
157 """Start an ipython cluster by its profile name or cluster
157 directory. Cluster directories contain configuration, log and
158 directory. Cluster directories contain configuration, log and
158 security related files and are named using the convention
159 security related files and are named using the convention
159 'cluster_<profile>' and should be creating using the 'start'
160 'cluster_<profile>' and should be creating using the 'start'
160 subcommand of 'ipcluster'. If your cluster directory is in
161 subcommand of 'ipcluster'. If your cluster directory is in
161 the cwd or the ipython directory, you can simply refer to it
162 the cwd or the ipython directory, you can simply refer to it
162 using its profile name, 'ipcluster start -n 4 -p <profile>`,
163 using its profile name, 'ipcluster start -n 4 -p <profile>`,
163 otherwise use the '--cluster-dir' option.
164 otherwise use the '--cluster-dir' option.
164 """
165 """
165 )
166 )
166 paa = parser_start.add_argument
167 paa = parser_start.add_argument
167 paa('-n', '--number',
168 paa('-n', '--number',
168 type=int, dest='Global.n',
169 type=int, dest='Global.n',
169 help='The number of engines to start.',
170 help='The number of engines to start.',
170 metavar='Global.n')
171 metavar='Global.n')
171 paa('--clean-logs',
172 paa('--clean-logs',
172 dest='Global.clean_logs', action='store_true',
173 dest='Global.clean_logs', action='store_true',
173 help='Delete old log flies before starting.')
174 help='Delete old log flies before starting.')
174 paa('--no-clean-logs',
175 paa('--no-clean-logs',
175 dest='Global.clean_logs', action='store_false',
176 dest='Global.clean_logs', action='store_false',
176 help="Don't delete old log flies before starting.")
177 help="Don't delete old log flies before starting.")
177 paa('--daemon',
178 paa('--daemon',
178 dest='Global.daemonize', action='store_true',
179 dest='Global.daemonize', action='store_true',
179 help='Daemonize the ipcluster program. This implies --log-to-file')
180 help='Daemonize the ipcluster program. This implies --log-to-file')
180 paa('--no-daemon',
181 paa('--no-daemon',
181 dest='Global.daemonize', action='store_false',
182 dest='Global.daemonize', action='store_false',
182 help="Dont't daemonize the ipcluster program.")
183 help="Dont't daemonize the ipcluster program.")
183
184
184 # The "stop" subcommand parser
185 # The "stop" subcommand parser
185 parser_stop = subparsers.add_parser(
186 parser_stop = subparsers.add_parser(
186 'stop',
187 'stop',
187 parents=[parent_parser1, parent_parser2],
188 parents=[parent_parser1, parent_parser2],
188 argument_default=SUPPRESS,
189 argument_default=SUPPRESS,
189 help="Stop a running cluster.",
190 help="Stop a running cluster.",
190 description=
191 description=
191 """Stop a running ipython cluster by its profile name or cluster
192 """Stop a running ipython cluster by its profile name or cluster
192 directory. Cluster directories are named using the convention
193 directory. Cluster directories are named using the convention
193 'cluster_<profile>'. If your cluster directory is in
194 'cluster_<profile>'. If your cluster directory is in
194 the cwd or the ipython directory, you can simply refer to it
195 the cwd or the ipython directory, you can simply refer to it
195 using its profile name, 'ipcluster stop -p <profile>`, otherwise
196 using its profile name, 'ipcluster stop -p <profile>`, otherwise
196 use the '--cluster-dir' option.
197 use the '--cluster-dir' option.
197 """
198 """
198 )
199 )
199 paa = parser_stop.add_argument
200 paa = parser_stop.add_argument
200 paa('--signal',
201 paa('--signal',
201 dest='Global.signal', type=int,
202 dest='Global.signal', type=int,
202 help="The signal number to use in stopping the cluster (default=2).",
203 help="The signal number to use in stopping the cluster (default=2).",
203 metavar="Global.signal")
204 metavar="Global.signal")
204
205
205
206
206 #-----------------------------------------------------------------------------
207 #-----------------------------------------------------------------------------
207 # Main application
208 # Main application
208 #-----------------------------------------------------------------------------
209 #-----------------------------------------------------------------------------
209
210
210
211
211 class IPClusterApp(ApplicationWithClusterDir):
212 class IPClusterApp(ApplicationWithClusterDir):
212
213
213 name = u'ipclusterz'
214 name = u'ipclusterz'
214 description = _description
215 description = _description
215 usage = None
216 usage = None
216 command_line_loader = IPClusterAppConfigLoader
217 command_line_loader = IPClusterAppConfigLoader
217 default_config_file_name = default_config_file_name
218 default_config_file_name = default_config_file_name
218 default_log_level = logging.INFO
219 default_log_level = logging.INFO
219 auto_create_cluster_dir = False
220 auto_create_cluster_dir = False
220
221
221 def create_default_config(self):
222 def create_default_config(self):
222 super(IPClusterApp, self).create_default_config()
223 super(IPClusterApp, self).create_default_config()
223 self.default_config.Global.controller_launcher = \
224 self.default_config.Global.controller_launcher = \
224 'IPython.zmq.parallel.launcher.LocalControllerLauncher'
225 'IPython.zmq.parallel.launcher.LocalControllerLauncher'
225 self.default_config.Global.engine_launcher = \
226 self.default_config.Global.engine_launcher = \
226 'IPython.zmq.parallel.launcher.LocalEngineSetLauncher'
227 'IPython.zmq.parallel.launcher.LocalEngineSetLauncher'
227 self.default_config.Global.n = 2
228 self.default_config.Global.n = 2
228 self.default_config.Global.reset_config = False
229 self.default_config.Global.reset_config = False
229 self.default_config.Global.clean_logs = True
230 self.default_config.Global.clean_logs = True
230 self.default_config.Global.signal = 2
231 self.default_config.Global.signal = 2
231 self.default_config.Global.daemonize = False
232 self.default_config.Global.daemonize = False
232
233
233 def find_resources(self):
234 def find_resources(self):
234 subcommand = self.command_line_config.Global.subcommand
235 subcommand = self.command_line_config.Global.subcommand
235 if subcommand=='list':
236 if subcommand=='list':
236 self.list_cluster_dirs()
237 self.list_cluster_dirs()
237 # Exit immediately because there is nothing left to do.
238 # Exit immediately because there is nothing left to do.
238 self.exit()
239 self.exit()
239 elif subcommand=='create':
240 elif subcommand=='create':
240 self.auto_create_cluster_dir = True
241 self.auto_create_cluster_dir = True
241 super(IPClusterApp, self).find_resources()
242 super(IPClusterApp, self).find_resources()
242 elif subcommand=='start' or subcommand=='stop':
243 elif subcommand=='start' or subcommand=='stop':
243 self.auto_create_cluster_dir = True
244 self.auto_create_cluster_dir = True
244 try:
245 try:
245 super(IPClusterApp, self).find_resources()
246 super(IPClusterApp, self).find_resources()
246 except ClusterDirError:
247 except ClusterDirError:
247 raise ClusterDirError(
248 raise ClusterDirError(
248 "Could not find a cluster directory. A cluster dir must "
249 "Could not find a cluster directory. A cluster dir must "
249 "be created before running 'ipcluster start'. Do "
250 "be created before running 'ipcluster start'. Do "
250 "'ipcluster create -h' or 'ipcluster list -h' for more "
251 "'ipcluster create -h' or 'ipcluster list -h' for more "
251 "information about creating and listing cluster dirs."
252 "information about creating and listing cluster dirs."
252 )
253 )
253
254
254 def list_cluster_dirs(self):
255 def list_cluster_dirs(self):
255 # Find the search paths
256 # Find the search paths
256 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
257 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
257 if cluster_dir_paths:
258 if cluster_dir_paths:
258 cluster_dir_paths = cluster_dir_paths.split(':')
259 cluster_dir_paths = cluster_dir_paths.split(':')
259 else:
260 else:
260 cluster_dir_paths = []
261 cluster_dir_paths = []
261 try:
262 try:
262 ipython_dir = self.command_line_config.Global.ipython_dir
263 ipython_dir = self.command_line_config.Global.ipython_dir
263 except AttributeError:
264 except AttributeError:
264 ipython_dir = self.default_config.Global.ipython_dir
265 ipython_dir = self.default_config.Global.ipython_dir
265 paths = [os.getcwd(), ipython_dir] + \
266 paths = [os.getcwd(), ipython_dir] + \
266 cluster_dir_paths
267 cluster_dir_paths
267 paths = list(set(paths))
268 paths = list(set(paths))
268
269
269 self.log.info('Searching for cluster dirs in paths: %r' % paths)
270 self.log.info('Searching for cluster dirs in paths: %r' % paths)
270 for path in paths:
271 for path in paths:
271 files = os.listdir(path)
272 files = os.listdir(path)
272 for f in files:
273 for f in files:
273 full_path = os.path.join(path, f)
274 full_path = os.path.join(path, f)
274 if os.path.isdir(full_path) and f.startswith('cluster_'):
275 if os.path.isdir(full_path) and f.startswith('cluster_'):
275 profile = full_path.split('_')[-1]
276 profile = full_path.split('_')[-1]
276 start_cmd = 'ipcluster start -p %s -n 4' % profile
277 start_cmd = 'ipcluster start -p %s -n 4' % profile
277 print start_cmd + " ==> " + full_path
278 print start_cmd + " ==> " + full_path
278
279
279 def pre_construct(self):
280 def pre_construct(self):
280 # IPClusterApp.pre_construct() is where we cd to the working directory.
281 # IPClusterApp.pre_construct() is where we cd to the working directory.
281 super(IPClusterApp, self).pre_construct()
282 super(IPClusterApp, self).pre_construct()
282 config = self.master_config
283 config = self.master_config
283 try:
284 try:
284 daemon = config.Global.daemonize
285 daemon = config.Global.daemonize
285 if daemon:
286 if daemon:
286 config.Global.log_to_file = True
287 config.Global.log_to_file = True
287 except AttributeError:
288 except AttributeError:
288 pass
289 pass
289
290
290 def construct(self):
291 def construct(self):
291 config = self.master_config
292 config = self.master_config
292 subcmd = config.Global.subcommand
293 subcmd = config.Global.subcommand
293 reset = config.Global.reset_config
294 reset = config.Global.reset_config
294 if subcmd == 'list':
295 if subcmd == 'list':
295 return
296 return
296 if subcmd == 'create':
297 if subcmd == 'create':
297 self.log.info('Copying default config files to cluster directory '
298 self.log.info('Copying default config files to cluster directory '
298 '[overwrite=%r]' % (reset,))
299 '[overwrite=%r]' % (reset,))
299 self.cluster_dir_obj.copy_all_config_files(overwrite=reset)
300 self.cluster_dir_obj.copy_all_config_files(overwrite=reset)
300 if subcmd =='start':
301 if subcmd =='start':
301 self.cluster_dir_obj.copy_all_config_files(overwrite=False)
302 self.cluster_dir_obj.copy_all_config_files(overwrite=False)
302 self.start_logging()
303 self.start_logging()
303 self.loop = ioloop.IOLoop.instance()
304 self.loop = ioloop.IOLoop.instance()
304 # reactor.callWhenRunning(self.start_launchers)
305 # reactor.callWhenRunning(self.start_launchers)
305 dc = ioloop.DelayedCallback(self.start_launchers, 0, self.loop)
306 dc = ioloop.DelayedCallback(self.start_launchers, 0, self.loop)
306 dc.start()
307 dc.start()
307
308
308 def start_launchers(self):
309 def start_launchers(self):
309 config = self.master_config
310 config = self.master_config
310
311
311 # Create the launchers. In both bases, we set the work_dir of
312 # Create the launchers. In both bases, we set the work_dir of
312 # the launcher to the cluster_dir. This is where the launcher's
313 # the launcher to the cluster_dir. This is where the launcher's
313 # subprocesses will be launched. It is not where the controller
314 # subprocesses will be launched. It is not where the controller
314 # and engine will be launched.
315 # and engine will be launched.
315 el_class = import_item(config.Global.engine_launcher)
316 el_class = import_item(config.Global.engine_launcher)
316 self.engine_launcher = el_class(
317 self.engine_launcher = el_class(
317 work_dir=self.cluster_dir, config=config, logname=self.log.name
318 work_dir=self.cluster_dir, config=config, logname=self.log.name
318 )
319 )
319 cl_class = import_item(config.Global.controller_launcher)
320 cl_class = import_item(config.Global.controller_launcher)
320 self.controller_launcher = cl_class(
321 self.controller_launcher = cl_class(
321 work_dir=self.cluster_dir, config=config,
322 work_dir=self.cluster_dir, config=config,
322 logname=self.log.name
323 logname=self.log.name
323 )
324 )
324
325
325 # Setup signals
326 # Setup signals
326 signal.signal(signal.SIGINT, self.sigint_handler)
327 signal.signal(signal.SIGINT, self.sigint_handler)
327
328
328 # Setup the observing of stopping. If the controller dies, shut
329 # Setup the observing of stopping. If the controller dies, shut
329 # everything down as that will be completely fatal for the engines.
330 # everything down as that will be completely fatal for the engines.
330 self.controller_launcher.on_stop(self.stop_launchers)
331 self.controller_launcher.on_stop(self.stop_launchers)
331 # d1.addCallback(self.stop_launchers)
332 # d1.addCallback(self.stop_launchers)
332 # But, we don't monitor the stopping of engines. An engine dying
333 # But, we don't monitor the stopping of engines. An engine dying
333 # is just fine and in principle a user could start a new engine.
334 # is just fine and in principle a user could start a new engine.
334 # Also, if we did monitor engine stopping, it is difficult to
335 # Also, if we did monitor engine stopping, it is difficult to
335 # know what to do when only some engines die. Currently, the
336 # know what to do when only some engines die. Currently, the
336 # observing of engine stopping is inconsistent. Some launchers
337 # observing of engine stopping is inconsistent. Some launchers
337 # might trigger on a single engine stopping, other wait until
338 # might trigger on a single engine stopping, other wait until
338 # all stop. TODO: think more about how to handle this.
339 # all stop. TODO: think more about how to handle this.
339
340
340 # Start the controller and engines
341 # Start the controller and engines
341 self._stopping = False # Make sure stop_launchers is not called 2x.
342 self._stopping = False # Make sure stop_launchers is not called 2x.
342 d = self.start_controller()
343 d = self.start_controller()
343 self.start_engines()
344 self.start_engines()
344 self.startup_message()
345 self.startup_message()
345 # d.addCallback(self.start_engines)
346 # d.addCallback(self.start_engines)
346 # d.addCallback(self.startup_message)
347 # d.addCallback(self.startup_message)
347 # If the controller or engines fail to start, stop everything
348 # If the controller or engines fail to start, stop everything
348 # d.addErrback(self.stop_launchers)
349 # d.addErrback(self.stop_launchers)
349 return d
350 return d
350
351
351 def startup_message(self, r=None):
352 def startup_message(self, r=None):
352 self.log.info("IPython cluster: started")
353 self.log.info("IPython cluster: started")
353 return r
354 return r
354
355
355 def start_controller(self, r=None):
356 def start_controller(self, r=None):
356 # self.log.info("In start_controller")
357 # self.log.info("In start_controller")
357 config = self.master_config
358 config = self.master_config
358 d = self.controller_launcher.start(
359 d = self.controller_launcher.start(
359 cluster_dir=config.Global.cluster_dir
360 cluster_dir=config.Global.cluster_dir
360 )
361 )
361 return d
362 return d
362
363
363 def start_engines(self, r=None):
364 def start_engines(self, r=None):
364 # self.log.info("In start_engines")
365 # self.log.info("In start_engines")
365 config = self.master_config
366 config = self.master_config
366 d = self.engine_launcher.start(
367 d = self.engine_launcher.start(
367 config.Global.n,
368 config.Global.n,
368 cluster_dir=config.Global.cluster_dir
369 cluster_dir=config.Global.cluster_dir
369 )
370 )
370 return d
371 return d
371
372
372 def stop_controller(self, r=None):
373 def stop_controller(self, r=None):
373 # self.log.info("In stop_controller")
374 # self.log.info("In stop_controller")
374 if self.controller_launcher.running:
375 if self.controller_launcher.running:
375 return self.controller_launcher.stop()
376 return self.controller_launcher.stop()
376
377
377 def stop_engines(self, r=None):
378 def stop_engines(self, r=None):
378 # self.log.info("In stop_engines")
379 # self.log.info("In stop_engines")
379 if self.engine_launcher.running:
380 if self.engine_launcher.running:
380 d = self.engine_launcher.stop()
381 d = self.engine_launcher.stop()
381 # d.addErrback(self.log_err)
382 # d.addErrback(self.log_err)
382 return d
383 return d
383 else:
384 else:
384 return None
385 return None
385
386
386 def log_err(self, f):
387 def log_err(self, f):
387 self.log.error(f.getTraceback())
388 self.log.error(f.getTraceback())
388 return None
389 return None
389
390
390 def stop_launchers(self, r=None):
391 def stop_launchers(self, r=None):
391 if not self._stopping:
392 if not self._stopping:
392 self._stopping = True
393 self._stopping = True
393 # if isinstance(r, failure.Failure):
394 # if isinstance(r, failure.Failure):
394 # self.log.error('Unexpected error in ipcluster:')
395 # self.log.error('Unexpected error in ipcluster:')
395 # self.log.info(r.getTraceback())
396 # self.log.info(r.getTraceback())
396 self.log.error("IPython cluster: stopping")
397 self.log.error("IPython cluster: stopping")
397 # These return deferreds. We are not doing anything with them
398 # These return deferreds. We are not doing anything with them
398 # but we are holding refs to them as a reminder that they
399 # but we are holding refs to them as a reminder that they
399 # do return deferreds.
400 # do return deferreds.
400 d1 = self.stop_engines()
401 d1 = self.stop_engines()
401 d2 = self.stop_controller()
402 d2 = self.stop_controller()
402 # Wait a few seconds to let things shut down.
403 # Wait a few seconds to let things shut down.
403 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
404 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
404 dc.start()
405 dc.start()
405 # reactor.callLater(4.0, reactor.stop)
406 # reactor.callLater(4.0, reactor.stop)
406
407
407 def sigint_handler(self, signum, frame):
408 def sigint_handler(self, signum, frame):
408 self.stop_launchers()
409 self.stop_launchers()
409
410
410 def start_logging(self):
411 def start_logging(self):
411 # Remove old log files of the controller and engine
412 # Remove old log files of the controller and engine
412 if self.master_config.Global.clean_logs:
413 if self.master_config.Global.clean_logs:
413 log_dir = self.master_config.Global.log_dir
414 log_dir = self.master_config.Global.log_dir
414 for f in os.listdir(log_dir):
415 for f in os.listdir(log_dir):
415 if f.startswith('ipengine' + '-'):
416 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
416 if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'):
417 os.remove(os.path.join(log_dir, f))
417 os.remove(os.path.join(log_dir, f))
418 # This will remove old log files for ipcluster itself
418 if f.startswith('ipcontroller' + '-'):
419 if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'):
420 os.remove(os.path.join(log_dir, f))
421 # This will remote old log files for ipcluster itself
422 super(IPClusterApp, self).start_logging()
419 super(IPClusterApp, self).start_logging()
423
420
424 def start_app(self):
421 def start_app(self):
425 """Start the application, depending on what subcommand is used."""
422 """Start the application, depending on what subcommand is used."""
426 subcmd = self.master_config.Global.subcommand
423 subcmd = self.master_config.Global.subcommand
427 if subcmd=='create' or subcmd=='list':
424 if subcmd=='create' or subcmd=='list':
428 return
425 return
429 elif subcmd=='start':
426 elif subcmd=='start':
430 self.start_app_start()
427 self.start_app_start()
431 elif subcmd=='stop':
428 elif subcmd=='stop':
432 self.start_app_stop()
429 self.start_app_stop()
433
430
434 def start_app_start(self):
431 def start_app_start(self):
435 """Start the app for the start subcommand."""
432 """Start the app for the start subcommand."""
436 config = self.master_config
433 config = self.master_config
437 # First see if the cluster is already running
434 # First see if the cluster is already running
438 try:
435 try:
439 pid = self.get_pid_from_file()
436 pid = self.get_pid_from_file()
440 except PIDFileError:
437 except PIDFileError:
441 pass
438 pass
442 else:
439 else:
443 self.log.critical(
440 self.log.critical(
444 'Cluster is already running with [pid=%s]. '
441 'Cluster is already running with [pid=%s]. '
445 'use "ipcluster stop" to stop the cluster.' % pid
442 'use "ipcluster stop" to stop the cluster.' % pid
446 )
443 )
447 # Here I exit with a unusual exit status that other processes
444 # Here I exit with a unusual exit status that other processes
448 # can watch for to learn how I existed.
445 # can watch for to learn how I existed.
449 self.exit(ALREADY_STARTED)
446 self.exit(ALREADY_STARTED)
450
447
451 # Now log and daemonize
448 # Now log and daemonize
452 self.log.info(
449 self.log.info(
453 'Starting ipcluster with [daemon=%r]' % config.Global.daemonize
450 'Starting ipcluster with [daemon=%r]' % config.Global.daemonize
454 )
451 )
455 # TODO: Get daemonize working on Windows or as a Windows Server.
452 # TODO: Get daemonize working on Windows or as a Windows Server.
456 if config.Global.daemonize:
453 if config.Global.daemonize:
457 if os.name=='posix':
454 if os.name=='posix':
458 from twisted.scripts._twistd_unix import daemonize
455 from twisted.scripts._twistd_unix import daemonize
459 daemonize()
456 daemonize()
460
457
461 # Now write the new pid file AFTER our new forked pid is active.
458 # Now write the new pid file AFTER our new forked pid is active.
462 self.write_pid_file()
459 self.write_pid_file()
463 try:
460 try:
464 self.loop.start()
461 self.loop.start()
465 except:
462 except:
466 self.log.info("stopping...")
463 self.log.info("stopping...")
467 self.remove_pid_file()
464 self.remove_pid_file()
468
465
469 def start_app_stop(self):
466 def start_app_stop(self):
470 """Start the app for the stop subcommand."""
467 """Start the app for the stop subcommand."""
471 config = self.master_config
468 config = self.master_config
472 try:
469 try:
473 pid = self.get_pid_from_file()
470 pid = self.get_pid_from_file()
474 except PIDFileError:
471 except PIDFileError:
475 self.log.critical(
472 self.log.critical(
476 'Problem reading pid file, cluster is probably not running.'
473 'Problem reading pid file, cluster is probably not running.'
477 )
474 )
478 # Here I exit with a unusual exit status that other processes
475 # Here I exit with a unusual exit status that other processes
479 # can watch for to learn how I existed.
476 # can watch for to learn how I existed.
480 self.exit(ALREADY_STOPPED)
477 self.exit(ALREADY_STOPPED)
481 else:
478 else:
482 if os.name=='posix':
479 if os.name=='posix':
483 sig = config.Global.signal
480 sig = config.Global.signal
484 self.log.info(
481 self.log.info(
485 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
482 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
486 )
483 )
487 os.kill(pid, sig)
484 os.kill(pid, sig)
488 elif os.name=='nt':
485 elif os.name=='nt':
489 # As of right now, we don't support daemonize on Windows, so
486 # As of right now, we don't support daemonize on Windows, so
490 # stop will not do anything. Minimally, it should clean up the
487 # stop will not do anything. Minimally, it should clean up the
491 # old .pid files.
488 # old .pid files.
492 self.remove_pid_file()
489 self.remove_pid_file()
493
490
494
491
495 def launch_new_instance():
492 def launch_new_instance():
496 """Create and run the IPython cluster."""
493 """Create and run the IPython cluster."""
497 app = IPClusterApp()
494 app = IPClusterApp()
498 app.start()
495 app.start()
499
496
500
497
501 if __name__ == '__main__':
498 if __name__ == '__main__':
502 launch_new_instance()
499 launch_new_instance()
503
500
This diff has been collapsed as it changes many lines, (584 lines changed) Show them Hide them
@@ -1,825 +1,835 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 Facilities for launching IPython processes asynchronously.
4 Facilities for launching IPython processes asynchronously.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import re
19 import re
20 import sys
20 import sys
21 import logging
21 import logging
22
22
23 from signal import SIGINT
23 from signal import SIGINT, SIGTERM
24 try:
24 try:
25 from signal import SIGKILL
25 from signal import SIGKILL
26 except ImportError:
26 except ImportError:
27 SIGKILL=SIGTERM
27 SIGKILL=SIGTERM
28
28
29 from subprocess import Popen, PIPE
29 from subprocess import Popen, PIPE, STDOUT
30 try:
31 from subprocess import check_open
32 except ImportError:
33 # pre-2.7:
34 from StringIO import StringIO
35
36 def check_open(*args, **kwargs):
37 sio = StringIO()
38 kwargs.update(dict(stdout=PIPE, stderr=STDOUT))
39 p = Popen(*args, **kwargs)
40 out,err = p.communicate()
41 return out
30
42
31 from zmq.eventloop import ioloop
43 from zmq.eventloop import ioloop
32
44
45 from IPython.external import Itpl
33 # from IPython.config.configurable import Configurable
46 # from IPython.config.configurable import Configurable
34 from IPython.utils.traitlets import Str, Int, List, Unicode, Instance
47 from IPython.utils.traitlets import Str, Int, List, Unicode, Instance
35 from IPython.utils.path import get_ipython_module_path
48 from IPython.utils.path import get_ipython_module_path
36 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
49 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
37
50
38 from factory import LoggingFactory
51 from factory import LoggingFactory
39 # from IPython.kernel.winhpcjob import (
52
40 # IPControllerTask, IPEngineTask,
53 # load winhpcjob from IPython.kernel
41 # IPControllerJob, IPEngineSetJob
54 try:
42 # )
55 from IPython.kernel.winhpcjob import (
56 IPControllerTask, IPEngineTask,
57 IPControllerJob, IPEngineSetJob
58 )
59 except ImportError:
60 pass
43
61
44
62
45 #-----------------------------------------------------------------------------
63 #-----------------------------------------------------------------------------
46 # Paths to the kernel apps
64 # Paths to the kernel apps
47 #-----------------------------------------------------------------------------
65 #-----------------------------------------------------------------------------
48
66
49
67
50 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
68 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
51 'IPython.zmq.parallel.ipclusterapp'
69 'IPython.zmq.parallel.ipclusterapp'
52 ))
70 ))
53
71
54 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
72 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
55 'IPython.zmq.parallel.ipengineapp'
73 'IPython.zmq.parallel.ipengineapp'
56 ))
74 ))
57
75
58 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
76 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
59 'IPython.zmq.parallel.ipcontrollerapp'
77 'IPython.zmq.parallel.ipcontrollerapp'
60 ))
78 ))
61
79
62 #-----------------------------------------------------------------------------
80 #-----------------------------------------------------------------------------
63 # Base launchers and errors
81 # Base launchers and errors
64 #-----------------------------------------------------------------------------
82 #-----------------------------------------------------------------------------
65
83
66
84
67 class LauncherError(Exception):
85 class LauncherError(Exception):
68 pass
86 pass
69
87
70
88
71 class ProcessStateError(LauncherError):
89 class ProcessStateError(LauncherError):
72 pass
90 pass
73
91
74
92
75 class UnknownStatus(LauncherError):
93 class UnknownStatus(LauncherError):
76 pass
94 pass
77
95
78
96
79 class BaseLauncher(LoggingFactory):
97 class BaseLauncher(LoggingFactory):
80 """An asbtraction for starting, stopping and signaling a process."""
98 """An asbtraction for starting, stopping and signaling a process."""
81
99
82 # In all of the launchers, the work_dir is where child processes will be
100 # In all of the launchers, the work_dir is where child processes will be
83 # run. This will usually be the cluster_dir, but may not be. any work_dir
101 # run. This will usually be the cluster_dir, but may not be. any work_dir
84 # passed into the __init__ method will override the config value.
102 # passed into the __init__ method will override the config value.
85 # This should not be used to set the work_dir for the actual engine
103 # This should not be used to set the work_dir for the actual engine
86 # and controller. Instead, use their own config files or the
104 # and controller. Instead, use their own config files or the
87 # controller_args, engine_args attributes of the launchers to add
105 # controller_args, engine_args attributes of the launchers to add
88 # the --work-dir option.
106 # the --work-dir option.
89 work_dir = Unicode(u'.')
107 work_dir = Unicode(u'.')
90 loop = Instance('zmq.eventloop.ioloop.IOLoop')
108 loop = Instance('zmq.eventloop.ioloop.IOLoop')
91 def _loop_default(self):
109 def _loop_default(self):
92 return ioloop.IOLoop.instance()
110 return ioloop.IOLoop.instance()
93
111
94 def __init__(self, work_dir=u'.', config=None, **kwargs):
112 def __init__(self, work_dir=u'.', config=None, **kwargs):
95 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
113 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
96 self.state = 'before' # can be before, running, after
114 self.state = 'before' # can be before, running, after
97 self.stop_callbacks = []
115 self.stop_callbacks = []
98 self.start_data = None
116 self.start_data = None
99 self.stop_data = None
117 self.stop_data = None
100
118
101 @property
119 @property
102 def args(self):
120 def args(self):
103 """A list of cmd and args that will be used to start the process.
121 """A list of cmd and args that will be used to start the process.
104
122
105 This is what is passed to :func:`spawnProcess` and the first element
123 This is what is passed to :func:`spawnProcess` and the first element
106 will be the process name.
124 will be the process name.
107 """
125 """
108 return self.find_args()
126 return self.find_args()
109
127
110 def find_args(self):
128 def find_args(self):
111 """The ``.args`` property calls this to find the args list.
129 """The ``.args`` property calls this to find the args list.
112
130
113 Subcommand should implement this to construct the cmd and args.
131 Subcommand should implement this to construct the cmd and args.
114 """
132 """
115 raise NotImplementedError('find_args must be implemented in a subclass')
133 raise NotImplementedError('find_args must be implemented in a subclass')
116
134
117 @property
135 @property
118 def arg_str(self):
136 def arg_str(self):
119 """The string form of the program arguments."""
137 """The string form of the program arguments."""
120 return ' '.join(self.args)
138 return ' '.join(self.args)
121
139
122 @property
140 @property
123 def running(self):
141 def running(self):
124 """Am I running."""
142 """Am I running."""
125 if self.state == 'running':
143 if self.state == 'running':
126 return True
144 return True
127 else:
145 else:
128 return False
146 return False
129
147
130 def start(self):
148 def start(self):
131 """Start the process.
149 """Start the process.
132
150
133 This must return a deferred that fires with information about the
151 This must return a deferred that fires with information about the
134 process starting (like a pid, job id, etc.).
152 process starting (like a pid, job id, etc.).
135 """
153 """
136 raise NotImplementedError('start must be implemented in a subclass')
154 raise NotImplementedError('start must be implemented in a subclass')
137
155
138 def stop(self):
156 def stop(self):
139 """Stop the process and notify observers of stopping.
157 """Stop the process and notify observers of stopping.
140
158
141 This must return a deferred that fires with information about the
159 This must return a deferred that fires with information about the
142 processing stopping, like errors that occur while the process is
160 processing stopping, like errors that occur while the process is
143 attempting to be shut down. This deferred won't fire when the process
161 attempting to be shut down. This deferred won't fire when the process
144 actually stops. To observe the actual process stopping, see
162 actually stops. To observe the actual process stopping, see
145 :func:`observe_stop`.
163 :func:`observe_stop`.
146 """
164 """
147 raise NotImplementedError('stop must be implemented in a subclass')
165 raise NotImplementedError('stop must be implemented in a subclass')
148
166
149 def on_stop(self, f):
167 def on_stop(self, f):
150 """Get a deferred that will fire when the process stops.
168 """Get a deferred that will fire when the process stops.
151
169
152 The deferred will fire with data that contains information about
170 The deferred will fire with data that contains information about
153 the exit status of the process.
171 the exit status of the process.
154 """
172 """
155 if self.state=='after':
173 if self.state=='after':
156 return f(self.stop_data)
174 return f(self.stop_data)
157 else:
175 else:
158 self.stop_callbacks.append(f)
176 self.stop_callbacks.append(f)
159
177
160 def notify_start(self, data):
178 def notify_start(self, data):
161 """Call this to trigger startup actions.
179 """Call this to trigger startup actions.
162
180
163 This logs the process startup and sets the state to 'running'. It is
181 This logs the process startup and sets the state to 'running'. It is
164 a pass-through so it can be used as a callback.
182 a pass-through so it can be used as a callback.
165 """
183 """
166
184
167 self.log.info('Process %r started: %r' % (self.args[0], data))
185 self.log.info('Process %r started: %r' % (self.args[0], data))
168 self.start_data = data
186 self.start_data = data
169 self.state = 'running'
187 self.state = 'running'
170 return data
188 return data
171
189
172 def notify_stop(self, data):
190 def notify_stop(self, data):
173 """Call this to trigger process stop actions.
191 """Call this to trigger process stop actions.
174
192
175 This logs the process stopping and sets the state to 'after'. Call
193 This logs the process stopping and sets the state to 'after'. Call
176 this to trigger all the deferreds from :func:`observe_stop`."""
194 this to trigger all the deferreds from :func:`observe_stop`."""
177
195
178 self.log.info('Process %r stopped: %r' % (self.args[0], data))
196 self.log.info('Process %r stopped: %r' % (self.args[0], data))
179 self.stop_data = data
197 self.stop_data = data
180 self.state = 'after'
198 self.state = 'after'
181 for i in range(len(self.stop_callbacks)):
199 for i in range(len(self.stop_callbacks)):
182 d = self.stop_callbacks.pop()
200 d = self.stop_callbacks.pop()
183 d(data)
201 d(data)
184 return data
202 return data
185
203
186 def signal(self, sig):
204 def signal(self, sig):
187 """Signal the process.
205 """Signal the process.
188
206
189 Return a semi-meaningless deferred after signaling the process.
207 Return a semi-meaningless deferred after signaling the process.
190
208
191 Parameters
209 Parameters
192 ----------
210 ----------
193 sig : str or int
211 sig : str or int
194 'KILL', 'INT', etc., or any signal number
212 'KILL', 'INT', etc., or any signal number
195 """
213 """
196 raise NotImplementedError('signal must be implemented in a subclass')
214 raise NotImplementedError('signal must be implemented in a subclass')
197
215
198
216
199 #-----------------------------------------------------------------------------
217 #-----------------------------------------------------------------------------
200 # Local process launchers
218 # Local process launchers
201 #-----------------------------------------------------------------------------
219 #-----------------------------------------------------------------------------
202
220
203
221
204 class LocalProcessLauncher(BaseLauncher):
222 class LocalProcessLauncher(BaseLauncher):
205 """Start and stop an external process in an asynchronous manner.
223 """Start and stop an external process in an asynchronous manner.
206
224
207 This will launch the external process with a working directory of
225 This will launch the external process with a working directory of
208 ``self.work_dir``.
226 ``self.work_dir``.
209 """
227 """
210
228
211 # This is used to to construct self.args, which is passed to
229 # This is used to to construct self.args, which is passed to
212 # spawnProcess.
230 # spawnProcess.
213 cmd_and_args = List([])
231 cmd_and_args = List([])
214 poll_frequency = Int(100) # in ms
232 poll_frequency = Int(100) # in ms
215
233
216 def __init__(self, work_dir=u'.', config=None, **kwargs):
234 def __init__(self, work_dir=u'.', config=None, **kwargs):
217 super(LocalProcessLauncher, self).__init__(
235 super(LocalProcessLauncher, self).__init__(
218 work_dir=work_dir, config=config, **kwargs
236 work_dir=work_dir, config=config, **kwargs
219 )
237 )
220 self.process = None
238 self.process = None
221 self.start_deferred = None
239 self.start_deferred = None
222 self.poller = None
240 self.poller = None
223
241
224 def find_args(self):
242 def find_args(self):
225 return self.cmd_and_args
243 return self.cmd_and_args
226
244
227 def start(self):
245 def start(self):
228 if self.state == 'before':
246 if self.state == 'before':
229 self.process = Popen(self.args,
247 self.process = Popen(self.args,
230 stdout=PIPE,stderr=PIPE,stdin=PIPE,
248 stdout=PIPE,stderr=PIPE,stdin=PIPE,
231 env=os.environ,
249 env=os.environ,
232 cwd=self.work_dir
250 cwd=self.work_dir
233 )
251 )
234
252
235 self.loop.add_handler(self.process.stdout.fileno(), self.handle_stdout, self.loop.READ)
253 self.loop.add_handler(self.process.stdout.fileno(), self.handle_stdout, self.loop.READ)
236 self.loop.add_handler(self.process.stderr.fileno(), self.handle_stderr, self.loop.READ)
254 self.loop.add_handler(self.process.stderr.fileno(), self.handle_stderr, self.loop.READ)
237 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
255 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
238 self.poller.start()
256 self.poller.start()
239 self.notify_start(self.process.pid)
257 self.notify_start(self.process.pid)
240 else:
258 else:
241 s = 'The process was already started and has state: %r' % self.state
259 s = 'The process was already started and has state: %r' % self.state
242 raise ProcessStateError(s)
260 raise ProcessStateError(s)
243
261
244 def stop(self):
262 def stop(self):
245 return self.interrupt_then_kill()
263 return self.interrupt_then_kill()
246
264
247 def signal(self, sig):
265 def signal(self, sig):
248 if self.state == 'running':
266 if self.state == 'running':
249 self.process.send_signal(sig)
267 self.process.send_signal(sig)
250
268
251 def interrupt_then_kill(self, delay=2.0):
269 def interrupt_then_kill(self, delay=2.0):
252 """Send INT, wait a delay and then send KILL."""
270 """Send INT, wait a delay and then send KILL."""
253 self.signal(SIGINT)
271 self.signal(SIGINT)
254 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
272 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
255 self.killer.start()
273 self.killer.start()
256
274
257 # callbacks, etc:
275 # callbacks, etc:
258
276
259 def handle_stdout(self, fd, events):
277 def handle_stdout(self, fd, events):
260 line = self.process.stdout.readline()
278 line = self.process.stdout.readline()
261 # a stopped process will be readable but return empty strings
279 # a stopped process will be readable but return empty strings
262 if line:
280 if line:
263 self.log.info(line[:-1])
281 self.log.info(line[:-1])
264 else:
282 else:
265 self.poll()
283 self.poll()
266
284
267 def handle_stderr(self, fd, events):
285 def handle_stderr(self, fd, events):
268 line = self.process.stderr.readline()
286 line = self.process.stderr.readline()
269 # a stopped process will be readable but return empty strings
287 # a stopped process will be readable but return empty strings
270 if line:
288 if line:
271 self.log.error(line[:-1])
289 self.log.error(line[:-1])
272 else:
290 else:
273 self.poll()
291 self.poll()
274
292
275 def poll(self):
293 def poll(self):
276 status = self.process.poll()
294 status = self.process.poll()
277 if status is not None:
295 if status is not None:
278 self.poller.stop()
296 self.poller.stop()
279 self.loop.remove_handler(self.process.stdout.fileno())
297 self.loop.remove_handler(self.process.stdout.fileno())
280 self.loop.remove_handler(self.process.stderr.fileno())
298 self.loop.remove_handler(self.process.stderr.fileno())
281 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
299 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
282 return status
300 return status
283
301
284 class LocalControllerLauncher(LocalProcessLauncher):
302 class LocalControllerLauncher(LocalProcessLauncher):
285 """Launch a controller as a regular external process."""
303 """Launch a controller as a regular external process."""
286
304
287 controller_cmd = List(ipcontroller_cmd_argv, config=True)
305 controller_cmd = List(ipcontroller_cmd_argv, config=True)
288 # Command line arguments to ipcontroller.
306 # Command line arguments to ipcontroller.
289 controller_args = List(['--log-to-file','--log-level', str(logging.ERROR)], config=True)
307 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
290
308
291 def find_args(self):
309 def find_args(self):
292 return self.controller_cmd + self.controller_args
310 return self.controller_cmd + self.controller_args
293
311
294 def start(self, cluster_dir):
312 def start(self, cluster_dir):
295 """Start the controller by cluster_dir."""
313 """Start the controller by cluster_dir."""
296 self.controller_args.extend(['--cluster-dir', cluster_dir])
314 self.controller_args.extend(['--cluster-dir', cluster_dir])
297 self.cluster_dir = unicode(cluster_dir)
315 self.cluster_dir = unicode(cluster_dir)
298 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
316 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
299 return super(LocalControllerLauncher, self).start()
317 return super(LocalControllerLauncher, self).start()
300
318
301
319
302 class LocalEngineLauncher(LocalProcessLauncher):
320 class LocalEngineLauncher(LocalProcessLauncher):
303 """Launch a single engine as a regular externall process."""
321 """Launch a single engine as a regular externall process."""
304
322
305 engine_cmd = List(ipengine_cmd_argv, config=True)
323 engine_cmd = List(ipengine_cmd_argv, config=True)
306 # Command line arguments for ipengine.
324 # Command line arguments for ipengine.
307 engine_args = List(
325 engine_args = List(
308 ['--log-to-file','--log-level', str(logging.ERROR)], config=True
326 ['--log-to-file','--log-level', str(logging.INFO)], config=True
309 )
327 )
310
328
311 def find_args(self):
329 def find_args(self):
312 return self.engine_cmd + self.engine_args
330 return self.engine_cmd + self.engine_args
313
331
314 def start(self, cluster_dir):
332 def start(self, cluster_dir):
315 """Start the engine by cluster_dir."""
333 """Start the engine by cluster_dir."""
316 self.engine_args.extend(['--cluster-dir', cluster_dir])
334 self.engine_args.extend(['--cluster-dir', cluster_dir])
317 self.cluster_dir = unicode(cluster_dir)
335 self.cluster_dir = unicode(cluster_dir)
318 return super(LocalEngineLauncher, self).start()
336 return super(LocalEngineLauncher, self).start()
319
337
320
338
321 class LocalEngineSetLauncher(BaseLauncher):
339 class LocalEngineSetLauncher(BaseLauncher):
322 """Launch a set of engines as regular external processes."""
340 """Launch a set of engines as regular external processes."""
323
341
324 # Command line arguments for ipengine.
342 # Command line arguments for ipengine.
325 engine_args = List(
343 engine_args = List(
326 ['--log-to-file','--log-level', str(logging.ERROR)], config=True
344 ['--log-to-file','--log-level', str(logging.INFO)], config=True
327 )
345 )
328 # launcher class
346 # launcher class
329 launcher_class = LocalEngineLauncher
347 launcher_class = LocalEngineLauncher
330
348
331 def __init__(self, work_dir=u'.', config=None, **kwargs):
349 def __init__(self, work_dir=u'.', config=None, **kwargs):
332 super(LocalEngineSetLauncher, self).__init__(
350 super(LocalEngineSetLauncher, self).__init__(
333 work_dir=work_dir, config=config, **kwargs
351 work_dir=work_dir, config=config, **kwargs
334 )
352 )
335 self.launchers = {}
353 self.launchers = {}
336 self.stop_data = {}
354 self.stop_data = {}
337
355
338 def start(self, n, cluster_dir):
356 def start(self, n, cluster_dir):
339 """Start n engines by profile or cluster_dir."""
357 """Start n engines by profile or cluster_dir."""
340 self.cluster_dir = unicode(cluster_dir)
358 self.cluster_dir = unicode(cluster_dir)
341 dlist = []
359 dlist = []
342 for i in range(n):
360 for i in range(n):
343 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
361 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
344 # Copy the engine args over to each engine launcher.
362 # Copy the engine args over to each engine launcher.
345 import copy
363 import copy
346 el.engine_args = copy.deepcopy(self.engine_args)
364 el.engine_args = copy.deepcopy(self.engine_args)
347 el.on_stop(self._notice_engine_stopped)
365 el.on_stop(self._notice_engine_stopped)
348 d = el.start(cluster_dir)
366 d = el.start(cluster_dir)
349 if i==0:
367 if i==0:
350 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
368 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
351 self.launchers[i] = el
369 self.launchers[i] = el
352 dlist.append(d)
370 dlist.append(d)
353 self.notify_start(dlist)
371 self.notify_start(dlist)
354 # The consumeErrors here could be dangerous
372 # The consumeErrors here could be dangerous
355 # dfinal = gatherBoth(dlist, consumeErrors=True)
373 # dfinal = gatherBoth(dlist, consumeErrors=True)
356 # dfinal.addCallback(self.notify_start)
374 # dfinal.addCallback(self.notify_start)
357 return dlist
375 return dlist
358
376
359 def find_args(self):
377 def find_args(self):
360 return ['engine set']
378 return ['engine set']
361
379
362 def signal(self, sig):
380 def signal(self, sig):
363 dlist = []
381 dlist = []
364 for el in self.launchers.itervalues():
382 for el in self.launchers.itervalues():
365 d = el.signal(sig)
383 d = el.signal(sig)
366 dlist.append(d)
384 dlist.append(d)
367 # dfinal = gatherBoth(dlist, consumeErrors=True)
385 # dfinal = gatherBoth(dlist, consumeErrors=True)
368 return dlist
386 return dlist
369
387
370 def interrupt_then_kill(self, delay=1.0):
388 def interrupt_then_kill(self, delay=1.0):
371 dlist = []
389 dlist = []
372 for el in self.launchers.itervalues():
390 for el in self.launchers.itervalues():
373 d = el.interrupt_then_kill(delay)
391 d = el.interrupt_then_kill(delay)
374 dlist.append(d)
392 dlist.append(d)
375 # dfinal = gatherBoth(dlist, consumeErrors=True)
393 # dfinal = gatherBoth(dlist, consumeErrors=True)
376 return dlist
394 return dlist
377
395
378 def stop(self):
396 def stop(self):
379 return self.interrupt_then_kill()
397 return self.interrupt_then_kill()
380
398
381 def _notice_engine_stopped(self, data):
399 def _notice_engine_stopped(self, data):
382 print "notice", data
400 print "notice", data
383 pid = data['pid']
401 pid = data['pid']
384 for idx,el in self.launchers.iteritems():
402 for idx,el in self.launchers.iteritems():
385 if el.process.pid == pid:
403 if el.process.pid == pid:
386 break
404 break
387 self.launchers.pop(idx)
405 self.launchers.pop(idx)
388 self.stop_data[idx] = data
406 self.stop_data[idx] = data
389 if not self.launchers:
407 if not self.launchers:
390 self.notify_stop(self.stop_data)
408 self.notify_stop(self.stop_data)
391
409
392
410
393 #-----------------------------------------------------------------------------
411 #-----------------------------------------------------------------------------
394 # MPIExec launchers
412 # MPIExec launchers
395 #-----------------------------------------------------------------------------
413 #-----------------------------------------------------------------------------
396
414
397
415
398 class MPIExecLauncher(LocalProcessLauncher):
416 class MPIExecLauncher(LocalProcessLauncher):
399 """Launch an external process using mpiexec."""
417 """Launch an external process using mpiexec."""
400
418
401 # The mpiexec command to use in starting the process.
419 # The mpiexec command to use in starting the process.
402 mpi_cmd = List(['mpiexec'], config=True)
420 mpi_cmd = List(['mpiexec'], config=True)
403 # The command line arguments to pass to mpiexec.
421 # The command line arguments to pass to mpiexec.
404 mpi_args = List([], config=True)
422 mpi_args = List([], config=True)
405 # The program to start using mpiexec.
423 # The program to start using mpiexec.
406 program = List(['date'], config=True)
424 program = List(['date'], config=True)
407 # The command line argument to the program.
425 # The command line argument to the program.
408 program_args = List([], config=True)
426 program_args = List([], config=True)
409 # The number of instances of the program to start.
427 # The number of instances of the program to start.
410 n = Int(1, config=True)
428 n = Int(1, config=True)
411
429
412 def find_args(self):
430 def find_args(self):
413 """Build self.args using all the fields."""
431 """Build self.args using all the fields."""
414 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
432 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
415 self.program + self.program_args
433 self.program + self.program_args
416
434
417 def start(self, n):
435 def start(self, n):
418 """Start n instances of the program using mpiexec."""
436 """Start n instances of the program using mpiexec."""
419 self.n = n
437 self.n = n
420 return super(MPIExecLauncher, self).start()
438 return super(MPIExecLauncher, self).start()
421
439
422
440
423 class MPIExecControllerLauncher(MPIExecLauncher):
441 class MPIExecControllerLauncher(MPIExecLauncher):
424 """Launch a controller using mpiexec."""
442 """Launch a controller using mpiexec."""
425
443
426 controller_cmd = List(ipcontroller_cmd_argv, config=True)
444 controller_cmd = List(ipcontroller_cmd_argv, config=True)
427 # Command line arguments to ipcontroller.
445 # Command line arguments to ipcontroller.
428 controller_args = List(['--log-to-file','--log-level', str(logging.ERROR)], config=True)
446 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
429 n = Int(1, config=False)
447 n = Int(1, config=False)
430
448
431 def start(self, cluster_dir):
449 def start(self, cluster_dir):
432 """Start the controller by cluster_dir."""
450 """Start the controller by cluster_dir."""
433 self.controller_args.extend(['--cluster-dir', cluster_dir])
451 self.controller_args.extend(['--cluster-dir', cluster_dir])
434 self.cluster_dir = unicode(cluster_dir)
452 self.cluster_dir = unicode(cluster_dir)
435 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
453 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
436 return super(MPIExecControllerLauncher, self).start(1)
454 return super(MPIExecControllerLauncher, self).start(1)
437
455
438 def find_args(self):
456 def find_args(self):
439 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
457 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
440 self.controller_cmd + self.controller_args
458 self.controller_cmd + self.controller_args
441
459
442
460
443 class MPIExecEngineSetLauncher(MPIExecLauncher):
461 class MPIExecEngineSetLauncher(MPIExecLauncher):
444
462
445 engine_cmd = List(ipengine_cmd_argv, config=True)
463 engine_cmd = List(ipengine_cmd_argv, config=True)
446 # Command line arguments for ipengine.
464 # Command line arguments for ipengine.
447 engine_args = List(
465 engine_args = List(
448 ['--log-to-file','--log-level', str(logging.ERROR)], config=True
466 ['--log-to-file','--log-level', str(logging.INFO)], config=True
449 )
467 )
450 n = Int(1, config=True)
468 n = Int(1, config=True)
451
469
452 def start(self, n, cluster_dir):
470 def start(self, n, cluster_dir):
453 """Start n engines by profile or cluster_dir."""
471 """Start n engines by profile or cluster_dir."""
454 self.engine_args.extend(['--cluster-dir', cluster_dir])
472 self.engine_args.extend(['--cluster-dir', cluster_dir])
455 self.cluster_dir = unicode(cluster_dir)
473 self.cluster_dir = unicode(cluster_dir)
456 self.n = n
474 self.n = n
457 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
475 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
458 return super(MPIExecEngineSetLauncher, self).start(n)
476 return super(MPIExecEngineSetLauncher, self).start(n)
459
477
460 def find_args(self):
478 def find_args(self):
461 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
479 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
462 self.engine_cmd + self.engine_args
480 self.engine_cmd + self.engine_args
463
481
464
482
465 #-----------------------------------------------------------------------------
483 #-----------------------------------------------------------------------------
466 # SSH launchers
484 # SSH launchers
467 #-----------------------------------------------------------------------------
485 #-----------------------------------------------------------------------------
468
486
469 # TODO: Get SSH Launcher working again.
487 # TODO: Get SSH Launcher working again.
470
488
471 class SSHLauncher(LocalProcessLauncher):
489 class SSHLauncher(LocalProcessLauncher):
472 """A minimal launcher for ssh.
490 """A minimal launcher for ssh.
473
491
474 To be useful this will probably have to be extended to use the ``sshx``
492 To be useful this will probably have to be extended to use the ``sshx``
475 idea for environment variables. There could be other things this needs
493 idea for environment variables. There could be other things this needs
476 as well.
494 as well.
477 """
495 """
478
496
479 ssh_cmd = List(['ssh'], config=True)
497 ssh_cmd = List(['ssh'], config=True)
480 ssh_args = List([], config=True)
498 ssh_args = List([], config=True)
481 program = List(['date'], config=True)
499 program = List(['date'], config=True)
482 program_args = List([], config=True)
500 program_args = List([], config=True)
483 hostname = Str('', config=True)
501 hostname = Str('', config=True)
484 user = Str(os.environ.get('USER','username'), config=True)
502 user = Str(os.environ.get('USER','username'), config=True)
485 location = Str('')
503 location = Str('')
486
504
487 def _hostname_changed(self, name, old, new):
505 def _hostname_changed(self, name, old, new):
488 self.location = '%s@%s' % (self.user, new)
506 self.location = '%s@%s' % (self.user, new)
489
507
490 def _user_changed(self, name, old, new):
508 def _user_changed(self, name, old, new):
491 self.location = '%s@%s' % (new, self.hostname)
509 self.location = '%s@%s' % (new, self.hostname)
492
510
493 def find_args(self):
511 def find_args(self):
494 return self.ssh_cmd + self.ssh_args + [self.location] + \
512 return self.ssh_cmd + self.ssh_args + [self.location] + \
495 self.program + self.program_args
513 self.program + self.program_args
496
514
497 def start(self, cluster_dir, hostname=None, user=None):
515 def start(self, cluster_dir, hostname=None, user=None):
498 if hostname is not None:
516 if hostname is not None:
499 self.hostname = hostname
517 self.hostname = hostname
500 if user is not None:
518 if user is not None:
501 self.user = user
519 self.user = user
502 return super(SSHLauncher, self).start()
520 return super(SSHLauncher, self).start()
503
521
504
522
505 class SSHControllerLauncher(SSHLauncher):
523 class SSHControllerLauncher(SSHLauncher):
506
524
507 program = List(ipcontroller_cmd_argv, config=True)
525 program = List(ipcontroller_cmd_argv, config=True)
508 # Command line arguments to ipcontroller.
526 # Command line arguments to ipcontroller.
509 program_args = List(['--log-to-file','--log-level', str(logging.ERROR)], config=True)
527 program_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
510
528
511
529
512 class SSHEngineLauncher(SSHLauncher):
530 class SSHEngineLauncher(SSHLauncher):
513 program = List(ipengine_cmd_argv, config=True)
531 program = List(ipengine_cmd_argv, config=True)
514 # Command line arguments for ipengine.
532 # Command line arguments for ipengine.
515 program_args = List(
533 program_args = List(
516 ['--log-to-file','--log-level', str(logging.ERROR)], config=True
534 ['--log-to-file','--log-level', str(logging.INFO)], config=True
517 )
535 )
518
536
519 class SSHEngineSetLauncher(LocalEngineSetLauncher):
537 class SSHEngineSetLauncher(LocalEngineSetLauncher):
520 launcher_class = SSHEngineLauncher
538 launcher_class = SSHEngineLauncher
521
539
522
540
523 #-----------------------------------------------------------------------------
541 #-----------------------------------------------------------------------------
524 # Windows HPC Server 2008 scheduler launchers
542 # Windows HPC Server 2008 scheduler launchers
525 #-----------------------------------------------------------------------------
543 #-----------------------------------------------------------------------------
526
544
527
545
528 # # This is only used on Windows.
546 # This is only used on Windows.
529 # def find_job_cmd():
547 def find_job_cmd():
530 # if os.name=='nt':
548 if os.name=='nt':
531 # try:
549 try:
532 # return find_cmd('job')
550 return find_cmd('job')
533 # except FindCmdError:
551 except FindCmdError:
534 # return 'job'
552 return 'job'
535 # else:
553 else:
536 # return 'job'
554 return 'job'
537 #
555
538 #
556
539 # class WindowsHPCLauncher(BaseLauncher):
557 class WindowsHPCLauncher(BaseLauncher):
540 #
558
541 # # A regular expression used to get the job id from the output of the
559 # A regular expression used to get the job id from the output of the
542 # # submit_command.
560 # submit_command.
543 # job_id_regexp = Str(r'\d+', config=True)
561 job_id_regexp = Str(r'\d+', config=True)
544 # # The filename of the instantiated job script.
562 # The filename of the instantiated job script.
545 # job_file_name = Unicode(u'ipython_job.xml', config=True)
563 job_file_name = Unicode(u'ipython_job.xml', config=True)
546 # # The full path to the instantiated job script. This gets made dynamically
564 # The full path to the instantiated job script. This gets made dynamically
547 # # by combining the work_dir with the job_file_name.
565 # by combining the work_dir with the job_file_name.
548 # job_file = Unicode(u'')
566 job_file = Unicode(u'')
549 # # The hostname of the scheduler to submit the job to
567 # The hostname of the scheduler to submit the job to
550 # scheduler = Str('', config=True)
568 scheduler = Str('', config=True)
551 # job_cmd = Str(find_job_cmd(), config=True)
569 job_cmd = Str(find_job_cmd(), config=True)
552 #
570
553 # def __init__(self, work_dir=u'.', config=None):
571 def __init__(self, work_dir=u'.', config=None):
554 # super(WindowsHPCLauncher, self).__init__(
572 super(WindowsHPCLauncher, self).__init__(
555 # work_dir=work_dir, config=config
573 work_dir=work_dir, config=config
556 # )
574 )
557 #
575
558 # @property
576 @property
559 # def job_file(self):
577 def job_file(self):
560 # return os.path.join(self.work_dir, self.job_file_name)
578 return os.path.join(self.work_dir, self.job_file_name)
561 #
579
562 # def write_job_file(self, n):
580 def write_job_file(self, n):
563 # raise NotImplementedError("Implement write_job_file in a subclass.")
581 raise NotImplementedError("Implement write_job_file in a subclass.")
564 #
582
565 # def find_args(self):
583 def find_args(self):
566 # return ['job.exe']
584 return ['job.exe']
567 #
585
568 # def parse_job_id(self, output):
586 def parse_job_id(self, output):
569 # """Take the output of the submit command and return the job id."""
587 """Take the output of the submit command and return the job id."""
570 # m = re.search(self.job_id_regexp, output)
588 m = re.search(self.job_id_regexp, output)
571 # if m is not None:
589 if m is not None:
572 # job_id = m.group()
590 job_id = m.group()
573 # else:
591 else:
574 # raise LauncherError("Job id couldn't be determined: %s" % output)
592 raise LauncherError("Job id couldn't be determined: %s" % output)
575 # self.job_id = job_id
593 self.job_id = job_id
576 # self.log.info('Job started with job id: %r' % job_id)
594 self.log.info('Job started with job id: %r' % job_id)
577 # return job_id
595 return job_id
578 #
596
579 # @inlineCallbacks
597 def start(self, n):
580 # def start(self, n):
598 """Start n copies of the process using the Win HPC job scheduler."""
581 # """Start n copies of the process using the Win HPC job scheduler."""
599 self.write_job_file(n)
582 # self.write_job_file(n)
600 args = [
583 # args = [
601 'submit',
584 # 'submit',
602 '/jobfile:%s' % self.job_file,
585 # '/jobfile:%s' % self.job_file,
603 '/scheduler:%s' % self.scheduler
586 # '/scheduler:%s' % self.scheduler
604 ]
587 # ]
605 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
588 # self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
606 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
589 # # Twisted will raise DeprecationWarnings if we try to pass unicode to this
607 output = check_output([self.job_cmd]+args,
590 # output = yield getProcessOutput(str(self.job_cmd),
608 env=os.environ,
591 # [str(a) for a in args],
609 cwd=self.work_dir,
592 # env=dict((str(k),str(v)) for k,v in os.environ.items()),
610 stderr=STDOUT
593 # path=self.work_dir
611 )
594 # )
612 job_id = self.parse_job_id(output)
595 # job_id = self.parse_job_id(output)
613 # self.notify_start(job_id)
596 # self.notify_start(job_id)
614 return job_id
597 # defer.returnValue(job_id)
615
598 #
616 def stop(self):
599 # @inlineCallbacks
617 args = [
600 # def stop(self):
618 'cancel',
601 # args = [
619 self.job_id,
602 # 'cancel',
620 '/scheduler:%s' % self.scheduler
603 # self.job_id,
621 ]
604 # '/scheduler:%s' % self.scheduler
622 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
605 # ]
623 try:
606 # self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
624 output = check_output([self.job_cmd]+args,
607 # try:
625 env=os.environ,
608 # # Twisted will raise DeprecationWarnings if we try to pass unicode to this
626 cwd=self.work_dir,
609 # output = yield getProcessOutput(str(self.job_cmd),
627 stderr=STDOUT
610 # [str(a) for a in args],
628 )
611 # env=dict((str(k),str(v)) for k,v in os.environ.iteritems()),
629 except:
612 # path=self.work_dir
630 output = 'The job already appears to be stoppped: %r' % self.job_id
613 # )
631 self.notify_stop(output) # Pass the output of the kill cmd
614 # except:
632 return output
615 # output = 'The job already appears to be stoppped: %r' % self.job_id
633
616 # self.notify_stop(output) # Pass the output of the kill cmd
634
617 # defer.returnValue(output)
635 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
618 #
636
619 #
637 job_file_name = Unicode(u'ipcontroller_job.xml', config=True)
620 # class WindowsHPCControllerLauncher(WindowsHPCLauncher):
638 extra_args = List([], config=False)
621 #
639
622 # job_file_name = Unicode(u'ipcontroller_job.xml', config=True)
640 def write_job_file(self, n):
623 # extra_args = List([], config=False)
641 job = IPControllerJob(config=self.config)
624 #
642
625 # def write_job_file(self, n):
643 t = IPControllerTask(config=self.config)
626 # job = IPControllerJob(config=self.config)
644 # The tasks work directory is *not* the actual work directory of
627 #
645 # the controller. It is used as the base path for the stdout/stderr
628 # t = IPControllerTask(config=self.config)
646 # files that the scheduler redirects to.
629 # # The tasks work directory is *not* the actual work directory of
647 t.work_directory = self.cluster_dir
630 # # the controller. It is used as the base path for the stdout/stderr
648 # Add the --cluster-dir and from self.start().
631 # # files that the scheduler redirects to.
649 t.controller_args.extend(self.extra_args)
632 # t.work_directory = self.cluster_dir
650 job.add_task(t)
633 # # Add the --cluster-dir and from self.start().
651
634 # t.controller_args.extend(self.extra_args)
652 self.log.info("Writing job description file: %s" % self.job_file)
635 # job.add_task(t)
653 job.write(self.job_file)
636 #
654
637 # self.log.info("Writing job description file: %s" % self.job_file)
655 @property
638 # job.write(self.job_file)
656 def job_file(self):
639 #
657 return os.path.join(self.cluster_dir, self.job_file_name)
640 # @property
658
641 # def job_file(self):
659 def start(self, cluster_dir):
642 # return os.path.join(self.cluster_dir, self.job_file_name)
660 """Start the controller by cluster_dir."""
643 #
661 self.extra_args = ['--cluster-dir', cluster_dir]
644 # def start(self, cluster_dir):
662 self.cluster_dir = unicode(cluster_dir)
645 # """Start the controller by cluster_dir."""
663 return super(WindowsHPCControllerLauncher, self).start(1)
646 # self.extra_args = ['--cluster-dir', cluster_dir]
664
647 # self.cluster_dir = unicode(cluster_dir)
665
648 # return super(WindowsHPCControllerLauncher, self).start(1)
666 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
649 #
667
650 #
668 job_file_name = Unicode(u'ipengineset_job.xml', config=True)
651 # class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
669 extra_args = List([], config=False)
652 #
670
653 # job_file_name = Unicode(u'ipengineset_job.xml', config=True)
671 def write_job_file(self, n):
654 # extra_args = List([], config=False)
672 job = IPEngineSetJob(config=self.config)
655 #
673
656 # def write_job_file(self, n):
674 for i in range(n):
657 # job = IPEngineSetJob(config=self.config)
675 t = IPEngineTask(config=self.config)
658 #
676 # The tasks work directory is *not* the actual work directory of
659 # for i in range(n):
677 # the engine. It is used as the base path for the stdout/stderr
660 # t = IPEngineTask(config=self.config)
678 # files that the scheduler redirects to.
661 # # The tasks work directory is *not* the actual work directory of
679 t.work_directory = self.cluster_dir
662 # # the engine. It is used as the base path for the stdout/stderr
680 # Add the --cluster-dir and from self.start().
663 # # files that the scheduler redirects to.
681 t.engine_args.extend(self.extra_args)
664 # t.work_directory = self.cluster_dir
682 job.add_task(t)
665 # # Add the --cluster-dir and from self.start().
683
666 # t.engine_args.extend(self.extra_args)
684 self.log.info("Writing job description file: %s" % self.job_file)
667 # job.add_task(t)
685 job.write(self.job_file)
668 #
686
669 # self.log.info("Writing job description file: %s" % self.job_file)
687 @property
670 # job.write(self.job_file)
688 def job_file(self):
671 #
689 return os.path.join(self.cluster_dir, self.job_file_name)
672 # @property
690
673 # def job_file(self):
691 def start(self, n, cluster_dir):
674 # return os.path.join(self.cluster_dir, self.job_file_name)
692 """Start the controller by cluster_dir."""
675 #
693 self.extra_args = ['--cluster-dir', cluster_dir]
676 # def start(self, n, cluster_dir):
694 self.cluster_dir = unicode(cluster_dir)
677 # """Start the controller by cluster_dir."""
695 return super(WindowsHPCEngineSetLauncher, self).start(n)
678 # self.extra_args = ['--cluster-dir', cluster_dir]
696
679 # self.cluster_dir = unicode(cluster_dir)
697
680 # return super(WindowsHPCEngineSetLauncher, self).start(n)
698 #-----------------------------------------------------------------------------
681 #
699 # Batch (PBS) system launchers
682 #
700 #-----------------------------------------------------------------------------
683 # #-----------------------------------------------------------------------------
701
684 # # Batch (PBS) system launchers
702 # TODO: Get PBS launcher working again.
685 # #-----------------------------------------------------------------------------
703
686 #
704 class BatchSystemLauncher(BaseLauncher):
687 # # TODO: Get PBS launcher working again.
705 """Launch an external process using a batch system.
688 #
706
689 # class BatchSystemLauncher(BaseLauncher):
707 This class is designed to work with UNIX batch systems like PBS, LSF,
690 # """Launch an external process using a batch system.
708 GridEngine, etc. The overall model is that there are different commands
691 #
709 like qsub, qdel, etc. that handle the starting and stopping of the process.
692 # This class is designed to work with UNIX batch systems like PBS, LSF,
710
693 # GridEngine, etc. The overall model is that there are different commands
711 This class also has the notion of a batch script. The ``batch_template``
694 # like qsub, qdel, etc. that handle the starting and stopping of the process.
712 attribute can be set to a string that is a template for the batch script.
695 #
713 This template is instantiated using Itpl. Thus the template can use
696 # This class also has the notion of a batch script. The ``batch_template``
714 ${n} fot the number of instances. Subclasses can add additional variables
697 # attribute can be set to a string that is a template for the batch script.
715 to the template dict.
698 # This template is instantiated using Itpl. Thus the template can use
716 """
699 # ${n} fot the number of instances. Subclasses can add additional variables
717
700 # to the template dict.
718 # Subclasses must fill these in. See PBSEngineSet
701 # """
719 # The name of the command line program used to submit jobs.
702 #
720 submit_command = Str('', config=True)
703 # # Subclasses must fill these in. See PBSEngineSet
721 # The name of the command line program used to delete jobs.
704 # # The name of the command line program used to submit jobs.
722 delete_command = Str('', config=True)
705 # submit_command = Str('', config=True)
723 # A regular expression used to get the job id from the output of the
706 # # The name of the command line program used to delete jobs.
724 # submit_command.
707 # delete_command = Str('', config=True)
725 job_id_regexp = Str('', config=True)
708 # # A regular expression used to get the job id from the output of the
726 # The string that is the batch script template itself.
709 # # submit_command.
727 batch_template = Str('', config=True)
710 # job_id_regexp = Str('', config=True)
728 # The filename of the instantiated batch script.
711 # # The string that is the batch script template itself.
729 batch_file_name = Unicode(u'batch_script', config=True)
712 # batch_template = Str('', config=True)
730 # The full path to the instantiated batch script.
713 # # The filename of the instantiated batch script.
731 batch_file = Unicode(u'')
714 # batch_file_name = Unicode(u'batch_script', config=True)
732
715 # # The full path to the instantiated batch script.
733 def __init__(self, work_dir=u'.', config=None):
716 # batch_file = Unicode(u'')
734 super(BatchSystemLauncher, self).__init__(
717 #
735 work_dir=work_dir, config=config
718 # def __init__(self, work_dir=u'.', config=None):
736 )
719 # super(BatchSystemLauncher, self).__init__(
737 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
720 # work_dir=work_dir, config=config
738 self.context = {}
721 # )
739
722 # self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
740 def parse_job_id(self, output):
723 # self.context = {}
741 """Take the output of the submit command and return the job id."""
724 #
742 m = re.match(self.job_id_regexp, output)
725 # def parse_job_id(self, output):
743 if m is not None:
726 # """Take the output of the submit command and return the job id."""
744 job_id = m.group()
727 # m = re.match(self.job_id_regexp, output)
745 else:
728 # if m is not None:
746 raise LauncherError("Job id couldn't be determined: %s" % output)
729 # job_id = m.group()
747 self.job_id = job_id
730 # else:
748 self.log.info('Job started with job id: %r' % job_id)
731 # raise LauncherError("Job id couldn't be determined: %s" % output)
749 return job_id
732 # self.job_id = job_id
750
733 # self.log.info('Job started with job id: %r' % job_id)
751 def write_batch_script(self, n):
734 # return job_id
752 """Instantiate and write the batch script to the work_dir."""
735 #
753 self.context['n'] = n
736 # def write_batch_script(self, n):
754 script_as_string = Itpl.itplns(self.batch_template, self.context)
737 # """Instantiate and write the batch script to the work_dir."""
755 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
738 # self.context['n'] = n
756 f = open(self.batch_file, 'w')
739 # script_as_string = Itpl.itplns(self.batch_template, self.context)
757 f.write(script_as_string)
740 # self.log.info('Writing instantiated batch script: %s' % self.batch_file)
758 f.close()
741 # f = open(self.batch_file, 'w')
759
742 # f.write(script_as_string)
760 def start(self, n):
743 # f.close()
761 """Start n copies of the process using a batch system."""
744 #
762 self.write_batch_script(n)
745 # @inlineCallbacks
763 output = check_output([self.submit_command, self.batch_file], env=os.environ, stdout=STDOUT)
746 # def start(self, n):
764 job_id = self.parse_job_id(output)
747 # """Start n copies of the process using a batch system."""
765 # self.notify_start(job_id)
748 # self.write_batch_script(n)
766 return job_id
749 # output = yield getProcessOutput(self.submit_command,
767
750 # [self.batch_file], env=os.environ)
768 def stop(self):
751 # job_id = self.parse_job_id(output)
769 output = Popen([self.delete_command, self.job_id], env=os.environ, stderr=STDOUT)
752 # self.notify_start(job_id)
770 self.notify_stop(output) # Pass the output of the kill cmd
753 # defer.returnValue(job_id)
771 return output
754 #
772
755 # @inlineCallbacks
773
756 # def stop(self):
774 class PBSLauncher(BatchSystemLauncher):
757 # output = yield getProcessOutput(self.delete_command,
775 """A BatchSystemLauncher subclass for PBS."""
758 # [self.job_id], env=os.environ
776
759 # )
777 submit_command = Str('qsub', config=True)
760 # self.notify_stop(output) # Pass the output of the kill cmd
778 delete_command = Str('qdel', config=True)
761 # defer.returnValue(output)
779 job_id_regexp = Str(r'\d+', config=True)
762 #
780 batch_template = Str('', config=True)
763 #
781 batch_file_name = Unicode(u'pbs_batch_script', config=True)
764 # class PBSLauncher(BatchSystemLauncher):
782 batch_file = Unicode(u'')
765 # """A BatchSystemLauncher subclass for PBS."""
783
766 #
784
767 # submit_command = Str('qsub', config=True)
785 class PBSControllerLauncher(PBSLauncher):
768 # delete_command = Str('qdel', config=True)
786 """Launch a controller using PBS."""
769 # job_id_regexp = Str(r'\d+', config=True)
787
770 # batch_template = Str('', config=True)
788 batch_file_name = Unicode(u'pbs_batch_script_controller', config=True)
771 # batch_file_name = Unicode(u'pbs_batch_script', config=True)
789
772 # batch_file = Unicode(u'')
790 def start(self, cluster_dir):
773 #
791 """Start the controller by profile or cluster_dir."""
774 #
792 # Here we save profile and cluster_dir in the context so they
775 # class PBSControllerLauncher(PBSLauncher):
793 # can be used in the batch script template as ${profile} and
776 # """Launch a controller using PBS."""
794 # ${cluster_dir}
777 #
795 self.context['cluster_dir'] = cluster_dir
778 # batch_file_name = Unicode(u'pbs_batch_script_controller', config=True)
796 self.cluster_dir = unicode(cluster_dir)
779 #
797 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
780 # def start(self, cluster_dir):
798 return super(PBSControllerLauncher, self).start(1)
781 # """Start the controller by profile or cluster_dir."""
799
782 # # Here we save profile and cluster_dir in the context so they
800
783 # # can be used in the batch script template as ${profile} and
801 class PBSEngineSetLauncher(PBSLauncher):
784 # # ${cluster_dir}
802
785 # self.context['cluster_dir'] = cluster_dir
803 batch_file_name = Unicode(u'pbs_batch_script_engines', config=True)
786 # self.cluster_dir = unicode(cluster_dir)
804
787 # self.log.info("Starting PBSControllerLauncher: %r" % self.args)
805 def start(self, n, cluster_dir):
788 # return super(PBSControllerLauncher, self).start(1)
806 """Start n engines by profile or cluster_dir."""
789 #
807 self.program_args.extend(['--cluster-dir', cluster_dir])
790 #
808 self.cluster_dir = unicode(cluster_dir)
791 # class PBSEngineSetLauncher(PBSLauncher):
809 self.log.info('Starting PBSEngineSetLauncher: %r' % self.args)
792 #
810 return super(PBSEngineSetLauncher, self).start(n)
793 # batch_file_name = Unicode(u'pbs_batch_script_engines', config=True)
794 #
795 # def start(self, n, cluster_dir):
796 # """Start n engines by profile or cluster_dir."""
797 # self.program_args.extend(['--cluster-dir', cluster_dir])
798 # self.cluster_dir = unicode(cluster_dir)
799 # self.log.info('Starting PBSEngineSetLauncher: %r' % self.args)
800 # return super(PBSEngineSetLauncher, self).start(n)
801
811
802
812
803 #-----------------------------------------------------------------------------
813 #-----------------------------------------------------------------------------
804 # A launcher for ipcluster itself!
814 # A launcher for ipcluster itself!
805 #-----------------------------------------------------------------------------
815 #-----------------------------------------------------------------------------
806
816
807
817
808 class IPClusterLauncher(LocalProcessLauncher):
818 class IPClusterLauncher(LocalProcessLauncher):
809 """Launch the ipcluster program in an external process."""
819 """Launch the ipcluster program in an external process."""
810
820
811 ipcluster_cmd = List(ipcluster_cmd_argv, config=True)
821 ipcluster_cmd = List(ipcluster_cmd_argv, config=True)
812 # Command line arguments to pass to ipcluster.
822 # Command line arguments to pass to ipcluster.
813 ipcluster_args = List(
823 ipcluster_args = List(
814 ['--clean-logs', '--log-to-file', '--log-level', str(logging.ERROR)], config=True)
824 ['--clean-logs', '--log-to-file', '--log-level', str(logging.INFO)], config=True)
815 ipcluster_subcommand = Str('start')
825 ipcluster_subcommand = Str('start')
816 ipcluster_n = Int(2)
826 ipcluster_n = Int(2)
817
827
818 def find_args(self):
828 def find_args(self):
819 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
829 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
820 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
830 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
821
831
822 def start(self):
832 def start(self):
823 self.log.info("Starting ipcluster: %r" % self.args)
833 self.log.info("Starting ipcluster: %r" % self.args)
824 return super(IPClusterLauncher, self).start()
834 return super(IPClusterLauncher, self).start()
825
835
General Comments 0
You need to be logged in to leave comments. Login now