##// END OF EJS Templates
Fixing minor bugs in IPython.kernel....
Brian Granger -
Show More
@@ -1,184 +1,184 b''
1 1 import os
2 2
3 3 c = get_config()
4 4
5 5 #-----------------------------------------------------------------------------
6 6 # Select which launchers to use
7 7 #-----------------------------------------------------------------------------
8 8
9 9 # This allows you to control what method is used to start the controller
10 10 # and engines. The following methods are currently supported:
11 11 # - Start as a regular process on localhost.
12 12 # - Start using mpiexec.
13 13 # - Start using the Windows HPC Server 2008 scheduler
14 14 # - Start using PBS
15 15 # - Start using SSH (currently broken)
16 16
17 17
18 18 # The selected launchers can be configured below.
19 19
20 20 # Options are:
21 21 # - LocalControllerLauncher
22 22 # - MPIExecControllerLauncher
23 23 # - PBSControllerLauncher
24 24 # - WindowsHPCControllerLauncher
25 25 # c.Global.controller_launcher = 'IPython.kernel.launcher.LocalControllerLauncher'
26 26
27 27 # Options are:
28 28 # - LocalEngineSetLauncher
29 29 # - MPIExecEngineSetLauncher
30 30 # - PBSEngineSetLauncher
31 31 # - WindowsHPCEngineSetLauncher
32 32 # c.Global.engine_launcher = 'IPython.kernel.launcher.LocalEngineSetLauncher'
33 33
34 34 #-----------------------------------------------------------------------------
35 35 # Global configuration
36 36 #-----------------------------------------------------------------------------
37 37
38 38 # The default number of engines that will be started. This is overridden by
39 39 # the -n command line option: "ipcluster start -n 4"
40 40 # c.Global.n = 2
41 41
42 42 # Log to a file in cluster_dir/log, otherwise just log to sys.stdout.
43 43 # c.Global.log_to_file = False
44 44
45 45 # Remove old logs from cluster_dir/log before starting.
46 46 # c.Global.clean_logs = True
47 47
48 48 # The working directory for the process. The application will use os.chdir
49 49 # to change to this directory before starting.
50 50 # c.Global.work_dir = os.getcwd()
51 51
52 52
53 53 #-----------------------------------------------------------------------------
54 54 # Local process launchers
55 55 #-----------------------------------------------------------------------------
56 56
57 57 # The command line arguments to call the controller with.
58 58 # c.LocalControllerLauncher.controller_args = \
59 59 # ['--log-to-file','--log-level', '40']
60 60
61 61 # The working directory for the controller
62 62 # c.LocalEngineSetLauncher.work_dir = u''
63 63
64 64 # Command line argument passed to the engines.
65 65 # c.LocalEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
66 66
67 67 #-----------------------------------------------------------------------------
68 68 # MPIExec launchers
69 69 #-----------------------------------------------------------------------------
70 70
71 71 # The mpiexec/mpirun command to use in started the controller.
72 72 # c.MPIExecControllerLauncher.mpi_cmd = ['mpiexec']
73 73
74 74 # Additional arguments to pass to the actual mpiexec command.
75 75 # c.MPIExecControllerLauncher.mpi_args = []
76 76
77 77 # The command line argument to call the controller with.
78 78 # c.MPIExecControllerLauncher.controller_args = \
79 79 # ['--log-to-file','--log-level', '40']
80 80
81 81
82 82 # The mpiexec/mpirun command to use in started the controller.
83 83 # c.MPIExecEngineSetLauncher.mpi_cmd = ['mpiexec']
84 84
85 85 # Additional arguments to pass to the actual mpiexec command.
86 86 # c.MPIExecEngineSetLauncher.mpi_args = []
87 87
88 88 # Command line argument passed to the engines.
89 89 # c.MPIExecEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
90 90
91 91 # The default number of engines to start if not given elsewhere.
92 92 # c.MPIExecEngineSetLauncher.n = 1
93 93
94 94 #-----------------------------------------------------------------------------
95 95 # SSH launchers
96 96 #-----------------------------------------------------------------------------
97 97
98 98 # Todo
99 99
100 100
101 101 #-----------------------------------------------------------------------------
102 102 # Unix batch (PBS) schedulers launchers
103 103 #-----------------------------------------------------------------------------
104 104
105 105 # The command line program to use to submit a PBS job.
106 106 # c.PBSControllerLauncher.submit_command = 'qsub'
107 107
108 108 # The command line program to use to delete a PBS job.
109 109 # c.PBSControllerLauncher.delete_command = 'qdel'
110 110
111 111 # A regular expression that takes the output of qsub and find the job id.
112 # c.PBSControllerLauncher.job_id_regexp = '\d+'
112 # c.PBSControllerLauncher.job_id_regexp = r'\d+'
113 113
114 114 # The batch submission script used to start the controller. This is where
115 115 # environment variables would be setup, etc. This string is interpolated using
116 116 # the Itpl module in IPython.external. Basically, you can use ${n} for the
117 117 # number of engine and ${cluster_dir} for the cluster_dir.
118 118 # c.PBSControllerLauncher.batch_template = """"""
119 119
120 120 # The name of the instantiated batch script that will actually be used to
121 121 # submit the job. This will be written to the cluster directory.
122 122 # c.PBSControllerLauncher.batch_file_name = u'pbs_batch_script_controller'
123 123
124 124
125 125 # The command line program to use to submit a PBS job.
126 126 # c.PBSEngineSetLauncher.submit_command = 'qsub'
127 127
128 128 # The command line program to use to delete a PBS job.
129 129 # c.PBSEngineSetLauncher.delete_command = 'qdel'
130 130
131 131 # A regular expression that takes the output of qsub and find the job id.
132 # c.PBSEngineSetLauncher.job_id_regexp = '\d+'
132 # c.PBSEngineSetLauncher.job_id_regexp = r'\d+'
133 133
134 134 # The batch submission script used to start the engines. This is where
135 135 # environment variables would be setup, etc. This string is interpolated using
136 136 # the Itpl module in IPython.external. Basically, you can use ${n} for the
137 137 # number of engine and ${cluster_dir} for the cluster_dir.
138 138 # c.PBSEngineSetLauncher.batch_template = """"""
139 139
140 140 # The name of the instantiated batch script that will actually be used to
141 141 # submit the job. This will be written to the cluster directory.
142 142 # c.PBSEngineSetLauncher.batch_file_name = u'pbs_batch_script_engines'
143 143
144 144 #-----------------------------------------------------------------------------
145 145 # Windows HPC Server 2008 launcher configuration
146 146 #-----------------------------------------------------------------------------
147 147
148 148 # c.IPControllerJob.job_name = 'IPController'
149 149 # c.IPControllerJob.is_exclusive = False
150 # c.IPControllerJob.username = 'USERDOMAIN\\USERNAME'
150 # c.IPControllerJob.username = r'USERDOMAIN\USERNAME'
151 151 # c.IPControllerJob.priority = 'Highest'
152 152 # c.IPControllerJob.requested_nodes = ''
153 153 # c.IPControllerJob.project = 'MyProject'
154 154
155 155 # c.IPControllerTask.task_name = 'IPController'
156 156 # c.IPControllerTask.controller_cmd = [u'ipcontroller.exe']
157 157 # c.IPControllerTask.controller_args = ['--log-to-file', '--log-level', '40']
158 158 # c.IPControllerTask.environment_variables = {}
159 159
160 160 # c.WindowsHPCControllerLauncher.scheduler = 'HEADNODE'
161 161 # c.WindowsHPCControllerLauncher.job_file_name = u'ipcontroller_job.xml'
162 162
163 163
164 164 # c.IPEngineSetJob.job_name = 'IPEngineSet'
165 165 # c.IPEngineSetJob.is_exclusive = False
166 # c.IPEngineSetJob.username = 'USERDOMAIN\\USERNAME'
166 # c.IPEngineSetJob.username = r'USERDOMAIN\USERNAME'
167 167 # c.IPEngineSetJob.priority = 'Highest'
168 168 # c.IPEngineSetJob.requested_nodes = ''
169 169 # c.IPEngineSetJob.project = 'MyProject'
170 170
171 171 # c.IPEngineTask.task_name = 'IPEngine'
172 172 # c.IPEngineTask.engine_cmd = [u'ipengine.exe']
173 173 # c.IPEngineTask.engine_args = ['--log-to-file', '--log-level', '40']
174 174 # c.IPEngineTask.environment_variables = {}
175 175
176 176 # c.WindowsHPCEngineSetLauncher.scheduler = 'HEADNODE'
177 177 # c.WindowsHPCEngineSetLauncher.job_file_name = u'ipengineset_job.xml'
178 178
179 179
180 180
181 181
182 182
183 183
184 184
@@ -1,481 +1,475 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython cluster directory
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 from __future__ import with_statement
19 19
20 20 import os
21 21 import shutil
22 22 import sys
23 23
24 24 from twisted.python import log
25 25
26 26 from IPython.core import release
27 27 from IPython.config.loader import PyFileConfigLoader
28 28 from IPython.core.application import Application
29 29 from IPython.core.component import Component
30 30 from IPython.config.loader import ArgParseConfigLoader, NoConfigDefault
31 31 from IPython.utils.traitlets import Unicode, Bool
32 32 from IPython.utils import genutils
33 33
34 34 #-----------------------------------------------------------------------------
35 35 # Imports
36 36 #-----------------------------------------------------------------------------
37 37
38 38
39 39 class ClusterDirError(Exception):
40 40 pass
41 41
42 42
43 43 class PIDFileError(Exception):
44 44 pass
45 45
46 46
47 47 class ClusterDir(Component):
48 48 """An object to manage the cluster directory and its resources.
49 49
50 50 The cluster directory is used by :command:`ipcontroller`,
51 51 :command:`ipcontroller` and :command:`ipcontroller` to manage the
52 52 configuration, logging and security of these applications.
53 53
54 54 This object knows how to find, create and manage these directories. This
55 55 should be used by any code that want's to handle cluster directories.
56 56 """
57 57
58 58 security_dir_name = Unicode('security')
59 59 log_dir_name = Unicode('log')
60 60 pid_dir_name = Unicode('pid')
61 61 security_dir = Unicode(u'')
62 62 log_dir = Unicode(u'')
63 63 pid_dir = Unicode(u'')
64 64 location = Unicode(u'')
65 65
66 66 def __init__(self, location):
67 67 super(ClusterDir, self).__init__(None)
68 68 self.location = location
69 69
70 70 def _location_changed(self, name, old, new):
71 71 if not os.path.isdir(new):
72 os.makedirs(new, mode=0777)
73 else:
74 os.chmod(new, 0777)
72 os.makedirs(new)
75 73 self.security_dir = os.path.join(new, self.security_dir_name)
76 74 self.log_dir = os.path.join(new, self.log_dir_name)
77 75 self.pid_dir = os.path.join(new, self.pid_dir_name)
78 76 self.check_dirs()
79 77
80 78 def _log_dir_changed(self, name, old, new):
81 79 self.check_log_dir()
82 80
83 81 def check_log_dir(self):
84 82 if not os.path.isdir(self.log_dir):
85 os.mkdir(self.log_dir, 0777)
86 else:
87 os.chmod(self.log_dir, 0777)
83 os.mkdir(self.log_dir)
88 84
89 85 def _security_dir_changed(self, name, old, new):
90 86 self.check_security_dir()
91 87
92 88 def check_security_dir(self):
93 89 if not os.path.isdir(self.security_dir):
94 90 os.mkdir(self.security_dir, 0700)
95 else:
96 os.chmod(self.security_dir, 0700)
91 os.chmod(self.security_dir, 0700)
97 92
98 93 def _pid_dir_changed(self, name, old, new):
99 94 self.check_pid_dir()
100 95
101 96 def check_pid_dir(self):
102 97 if not os.path.isdir(self.pid_dir):
103 98 os.mkdir(self.pid_dir, 0700)
104 else:
105 os.chmod(self.pid_dir, 0700)
99 os.chmod(self.pid_dir, 0700)
106 100
107 101 def check_dirs(self):
108 102 self.check_security_dir()
109 103 self.check_log_dir()
110 104 self.check_pid_dir()
111 105
112 106 def load_config_file(self, filename):
113 107 """Load a config file from the top level of the cluster dir.
114 108
115 109 Parameters
116 110 ----------
117 111 filename : unicode or str
118 112 The filename only of the config file that must be located in
119 113 the top-level of the cluster directory.
120 114 """
121 115 loader = PyFileConfigLoader(filename, self.location)
122 116 return loader.load_config()
123 117
124 118 def copy_config_file(self, config_file, path=None, overwrite=False):
125 119 """Copy a default config file into the active cluster directory.
126 120
127 121 Default configuration files are kept in :mod:`IPython.config.default`.
128 122 This function moves these from that location to the working cluster
129 123 directory.
130 124 """
131 125 if path is None:
132 126 import IPython.config.default
133 127 path = IPython.config.default.__file__.split(os.path.sep)[:-1]
134 128 path = os.path.sep.join(path)
135 129 src = os.path.join(path, config_file)
136 130 dst = os.path.join(self.location, config_file)
137 131 if not os.path.isfile(dst) or overwrite:
138 132 shutil.copy(src, dst)
139 133
140 134 def copy_all_config_files(self, path=None, overwrite=False):
141 135 """Copy all config files into the active cluster directory."""
142 136 for f in [u'ipcontroller_config.py', u'ipengine_config.py',
143 137 u'ipcluster_config.py']:
144 138 self.copy_config_file(f, path=path, overwrite=overwrite)
145 139
146 140 @classmethod
147 141 def create_cluster_dir(csl, cluster_dir):
148 142 """Create a new cluster directory given a full path.
149 143
150 144 Parameters
151 145 ----------
152 146 cluster_dir : str
153 147 The full path to the cluster directory. If it does exist, it will
154 148 be used. If not, it will be created.
155 149 """
156 150 return ClusterDir(cluster_dir)
157 151
158 152 @classmethod
159 153 def create_cluster_dir_by_profile(cls, path, profile=u'default'):
160 154 """Create a cluster dir by profile name and path.
161 155
162 156 Parameters
163 157 ----------
164 158 path : str
165 159 The path (directory) to put the cluster directory in.
166 160 profile : str
167 161 The name of the profile. The name of the cluster directory will
168 162 be "cluster_<profile>".
169 163 """
170 164 if not os.path.isdir(path):
171 165 raise ClusterDirError('Directory not found: %s' % path)
172 166 cluster_dir = os.path.join(path, u'cluster_' + profile)
173 167 return ClusterDir(cluster_dir)
174 168
175 169 @classmethod
176 170 def find_cluster_dir_by_profile(cls, ipython_dir, profile=u'default'):
177 171 """Find an existing cluster dir by profile name, return its ClusterDir.
178 172
179 173 This searches through a sequence of paths for a cluster dir. If it
180 174 is not found, a :class:`ClusterDirError` exception will be raised.
181 175
182 176 The search path algorithm is:
183 177 1. ``os.getcwd()``
184 178 2. ``ipython_dir``
185 179 3. The directories found in the ":" separated
186 180 :env:`IPCLUSTER_DIR_PATH` environment variable.
187 181
188 182 Parameters
189 183 ----------
190 184 ipython_dir : unicode or str
191 185 The IPython directory to use.
192 186 profile : unicode or str
193 187 The name of the profile. The name of the cluster directory
194 188 will be "cluster_<profile>".
195 189 """
196 190 dirname = u'cluster_' + profile
197 191 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
198 192 if cluster_dir_paths:
199 193 cluster_dir_paths = cluster_dir_paths.split(':')
200 194 else:
201 195 cluster_dir_paths = []
202 196 paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
203 197 for p in paths:
204 198 cluster_dir = os.path.join(p, dirname)
205 199 if os.path.isdir(cluster_dir):
206 200 return ClusterDir(cluster_dir)
207 201 else:
208 202 raise ClusterDirError('Cluster directory not found in paths: %s' % dirname)
209 203
210 204 @classmethod
211 205 def find_cluster_dir(cls, cluster_dir):
212 206 """Find/create a cluster dir and return its ClusterDir.
213 207
214 208 This will create the cluster directory if it doesn't exist.
215 209
216 210 Parameters
217 211 ----------
218 212 cluster_dir : unicode or str
219 213 The path of the cluster directory. This is expanded using
220 214 :func:`IPython.utils.genutils.expand_path`.
221 215 """
222 216 cluster_dir = genutils.expand_path(cluster_dir)
223 217 if not os.path.isdir(cluster_dir):
224 218 raise ClusterDirError('Cluster directory not found: %s' % cluster_dir)
225 219 return ClusterDir(cluster_dir)
226 220
227 221
228 222 class AppWithClusterDirArgParseConfigLoader(ArgParseConfigLoader):
229 223 """Default command line options for IPython cluster applications."""
230 224
231 225 def _add_other_arguments(self):
232 226 self.parser.add_argument('--ipython-dir',
233 227 dest='Global.ipython_dir',type=unicode,
234 228 help='Set to override default location of Global.ipython_dir.',
235 229 default=NoConfigDefault,
236 230 metavar='Global.ipython_dir'
237 231 )
238 232 self.parser.add_argument('-p', '--profile',
239 233 dest='Global.profile',type=unicode,
240 234 help='The string name of the profile to be used. This determines '
241 235 'the name of the cluster dir as: cluster_<profile>. The default profile '
242 236 'is named "default". The cluster directory is resolve this way '
243 237 'if the --cluster-dir option is not used.',
244 238 default=NoConfigDefault,
245 239 metavar='Global.profile'
246 240 )
247 241 self.parser.add_argument('--log-level',
248 242 dest="Global.log_level",type=int,
249 243 help='Set the log level (0,10,20,30,40,50). Default is 30.',
250 244 default=NoConfigDefault,
251 245 metavar="Global.log_level"
252 246 )
253 247 self.parser.add_argument('--cluster-dir',
254 248 dest='Global.cluster_dir',type=unicode,
255 249 help='Set the cluster dir. This overrides the logic used by the '
256 250 '--profile option.',
257 251 default=NoConfigDefault,
258 252 metavar='Global.cluster_dir'
259 253 ),
260 254 self.parser.add_argument('--work-dir',
261 255 dest='Global.work_dir',type=unicode,
262 256 help='Set the working dir for the process.',
263 257 default=NoConfigDefault,
264 258 metavar='Global.work_dir'
265 259 )
266 260 self.parser.add_argument('--clean-logs',
267 261 dest='Global.clean_logs', action='store_true',
268 262 help='Delete old log flies before starting.',
269 263 default=NoConfigDefault
270 264 )
271 265 self.parser.add_argument('--no-clean-logs',
272 266 dest='Global.clean_logs', action='store_false',
273 267 help="Don't Delete old log flies before starting.",
274 268 default=NoConfigDefault
275 269 )
276 270
277 271 class ApplicationWithClusterDir(Application):
278 272 """An application that puts everything into a cluster directory.
279 273
280 274 Instead of looking for things in the ipython_dir, this type of application
281 275 will use its own private directory called the "cluster directory"
282 276 for things like config files, log files, etc.
283 277
284 278 The cluster directory is resolved as follows:
285 279
286 280 * If the ``--cluster-dir`` option is given, it is used.
287 281 * If ``--cluster-dir`` is not given, the application directory is
288 282 resolve using the profile name as ``cluster_<profile>``. The search
289 283 path for this directory is then i) cwd if it is found there
290 284 and ii) in ipython_dir otherwise.
291 285
292 286 The config file for the application is to be put in the cluster
293 287 dir and named the value of the ``config_file_name`` class attribute.
294 288 """
295 289
296 290 auto_create_cluster_dir = True
297 291
298 292 def create_default_config(self):
299 293 super(ApplicationWithClusterDir, self).create_default_config()
300 294 self.default_config.Global.profile = u'default'
301 295 self.default_config.Global.cluster_dir = u''
302 296 self.default_config.Global.work_dir = os.getcwd()
303 297 self.default_config.Global.log_to_file = False
304 298 self.default_config.Global.clean_logs = False
305 299
306 300 def create_command_line_config(self):
307 301 """Create and return a command line config loader."""
308 302 return AppWithClusterDirArgParseConfigLoader(
309 303 description=self.description,
310 304 version=release.version
311 305 )
312 306
313 307 def find_resources(self):
314 308 """This resolves the cluster directory.
315 309
316 310 This tries to find the cluster directory and if successful, it will
317 311 have done:
318 312 * Sets ``self.cluster_dir_obj`` to the :class:`ClusterDir` object for
319 313 the application.
320 314 * Sets ``self.cluster_dir`` attribute of the application and config
321 315 objects.
322 316
323 317 The algorithm used for this is as follows:
324 318 1. Try ``Global.cluster_dir``.
325 319 2. Try using ``Global.profile``.
326 320 3. If both of these fail and ``self.auto_create_cluster_dir`` is
327 321 ``True``, then create the new cluster dir in the IPython directory.
328 322 4. If all fails, then raise :class:`ClusterDirError`.
329 323 """
330 324
331 325 try:
332 326 cluster_dir = self.command_line_config.Global.cluster_dir
333 327 except AttributeError:
334 328 cluster_dir = self.default_config.Global.cluster_dir
335 329 cluster_dir = genutils.expand_path(cluster_dir)
336 330 try:
337 331 self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
338 332 except ClusterDirError:
339 333 pass
340 334 else:
341 335 self.log.info('Using existing cluster dir: %s' % \
342 336 self.cluster_dir_obj.location
343 337 )
344 338 self.finish_cluster_dir()
345 339 return
346 340
347 341 try:
348 342 self.profile = self.command_line_config.Global.profile
349 343 except AttributeError:
350 344 self.profile = self.default_config.Global.profile
351 345 try:
352 346 self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
353 347 self.ipython_dir, self.profile)
354 348 except ClusterDirError:
355 349 pass
356 350 else:
357 351 self.log.info('Using existing cluster dir: %s' % \
358 352 self.cluster_dir_obj.location
359 353 )
360 354 self.finish_cluster_dir()
361 355 return
362 356
363 357 if self.auto_create_cluster_dir:
364 358 self.cluster_dir_obj = ClusterDir.create_cluster_dir_by_profile(
365 359 self.ipython_dir, self.profile
366 360 )
367 361 self.log.info('Creating new cluster dir: %s' % \
368 362 self.cluster_dir_obj.location
369 363 )
370 364 self.finish_cluster_dir()
371 365 else:
372 366 raise ClusterDirError('Could not find a valid cluster directory.')
373 367
374 368 def finish_cluster_dir(self):
375 369 # Set the cluster directory
376 370 self.cluster_dir = self.cluster_dir_obj.location
377 371
378 372 # These have to be set because they could be different from the one
379 373 # that we just computed. Because command line has the highest
380 374 # priority, this will always end up in the master_config.
381 375 self.default_config.Global.cluster_dir = self.cluster_dir
382 376 self.command_line_config.Global.cluster_dir = self.cluster_dir
383 377
384 378 # Set the search path to the cluster directory
385 379 self.config_file_paths = (self.cluster_dir,)
386 380
387 381 def find_config_file_name(self):
388 382 """Find the config file name for this application."""
389 383 # For this type of Application it should be set as a class attribute.
390 384 if not hasattr(self, 'config_file_name'):
391 385 self.log.critical("No config filename found")
392 386
393 387 def find_config_file_paths(self):
394 388 # Set the search path to the cluster directory
395 389 self.config_file_paths = (self.cluster_dir,)
396 390
397 391 def pre_construct(self):
398 392 # The log and security dirs were set earlier, but here we put them
399 393 # into the config and log them.
400 394 config = self.master_config
401 395 sdir = self.cluster_dir_obj.security_dir
402 396 self.security_dir = config.Global.security_dir = sdir
403 397 ldir = self.cluster_dir_obj.log_dir
404 398 self.log_dir = config.Global.log_dir = ldir
405 399 pdir = self.cluster_dir_obj.pid_dir
406 400 self.pid_dir = config.Global.pid_dir = pdir
407 401 self.log.info("Cluster directory set to: %s" % self.cluster_dir)
408 402 config.Global.work_dir = unicode(genutils.expand_path(config.Global.work_dir))
409 403 # Change to the working directory. We do this just before construct
410 404 # is called so all the components there have the right working dir.
411 405 self.to_work_dir()
412 406
413 407 def to_work_dir(self):
414 408 wd = self.master_config.Global.work_dir
415 409 if unicode(wd) != unicode(os.getcwd()):
416 410 os.chdir(wd)
417 411 self.log.info("Changing to working dir: %s" % wd)
418 412
419 413 def start_logging(self):
420 414 # Remove old log files
421 415 if self.master_config.Global.clean_logs:
422 416 log_dir = self.master_config.Global.log_dir
423 417 for f in os.listdir(log_dir):
424 418 if f.startswith(self.name + u'-') and f.endswith('.log'):
425 419 os.remove(os.path.join(log_dir, f))
426 420 # Start logging to the new log file
427 421 if self.master_config.Global.log_to_file:
428 422 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
429 423 logfile = os.path.join(self.log_dir, log_filename)
430 424 open_log_file = open(logfile, 'w')
431 425 else:
432 426 open_log_file = sys.stdout
433 427 log.startLogging(open_log_file)
434 428
435 429 def write_pid_file(self, overwrite=False):
436 430 """Create a .pid file in the pid_dir with my pid.
437 431
438 432 This must be called after pre_construct, which sets `self.pid_dir`.
439 433 This raises :exc:`PIDFileError` if the pid file exists already.
440 434 """
441 435 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
442 436 if os.path.isfile(pid_file):
443 437 pid = self.get_pid_from_file()
444 438 if not overwrite:
445 439 raise PIDFileError(
446 440 'The pid file [%s] already exists. \nThis could mean that this '
447 441 'server is already running with [pid=%s].' % (pid_file, pid)
448 442 )
449 443 with open(pid_file, 'w') as f:
450 444 self.log.info("Creating pid file: %s" % pid_file)
451 445 f.write(repr(os.getpid())+'\n')
452 446
453 447 def remove_pid_file(self):
454 448 """Remove the pid file.
455 449
456 450 This should be called at shutdown by registering a callback with
457 451 :func:`reactor.addSystemEventTrigger`. This needs to return
458 452 ``None``.
459 453 """
460 454 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
461 455 if os.path.isfile(pid_file):
462 456 try:
463 457 self.log.info("Removing pid file: %s" % pid_file)
464 458 os.remove(pid_file)
465 459 except:
466 460 self.log.warn("Error removing the pid file: %s" % pid_file)
467 461
468 462 def get_pid_from_file(self):
469 463 """Get the pid from the pid file.
470 464
471 465 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
472 466 """
473 467 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
474 468 if os.path.isfile(pid_file):
475 469 with open(pid_file, 'r') as f:
476 470 pid = int(f.read().strip())
477 471 return pid
478 472 else:
479 473 raise PIDFileError('pid file not found: %s' % pid_file)
480 474
481 475
@@ -1,455 +1,459 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The ipcluster application.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import logging
19 19 import os
20 20 import signal
21 21 import sys
22 22
23 23 if os.name=='posix':
24 24 from twisted.scripts._twistd_unix import daemonize
25 25
26 26 from IPython.core import release
27 27 from IPython.external import argparse
28 28 from IPython.config.loader import ArgParseConfigLoader, NoConfigDefault
29 29 from IPython.utils.importstring import import_item
30 30
31 31 from IPython.kernel.clusterdir import (
32 32 ApplicationWithClusterDir, ClusterDirError, PIDFileError
33 33 )
34 34
35 35 from twisted.internet import reactor, defer
36 36 from twisted.python import log, failure
37 37
38 38
39 39 #-----------------------------------------------------------------------------
40 40 # The ipcluster application
41 41 #-----------------------------------------------------------------------------
42 42
43 43
44 44 # Exit codes for ipcluster
45 45
46 46 # This will be the exit code if the ipcluster appears to be running because
47 47 # a .pid file exists
48 48 ALREADY_STARTED = 10
49 49
50 50 # This will be the exit code if ipcluster stop is run, but there is not .pid
51 51 # file to be found.
52 52 ALREADY_STOPPED = 11
53 53
54 54
55 55 class IPClusterCLLoader(ArgParseConfigLoader):
56 56
57 57 def _add_arguments(self):
58 58 # This has all the common options that all subcommands use
59 59 parent_parser1 = argparse.ArgumentParser(add_help=False)
60 60 parent_parser1.add_argument('--ipython-dir',
61 61 dest='Global.ipython_dir',type=unicode,
62 62 help='Set to override default location of Global.ipython_dir.',
63 63 default=NoConfigDefault,
64 64 metavar='Global.ipython_dir')
65 65 parent_parser1.add_argument('--log-level',
66 66 dest="Global.log_level",type=int,
67 67 help='Set the log level (0,10,20,30,40,50). Default is 30.',
68 68 default=NoConfigDefault,
69 69 metavar='Global.log_level')
70 70
71 71 # This has all the common options that other subcommands use
72 72 parent_parser2 = argparse.ArgumentParser(add_help=False)
73 73 parent_parser2.add_argument('-p','--profile',
74 74 dest='Global.profile',type=unicode,
75 75 help='The string name of the profile to be used. This determines '
76 76 'the name of the cluster dir as: cluster_<profile>. The default profile '
77 77 'is named "default". The cluster directory is resolve this way '
78 78 'if the --cluster-dir option is not used.',
79 79 default=NoConfigDefault,
80 80 metavar='Global.profile')
81 81 parent_parser2.add_argument('--cluster-dir',
82 82 dest='Global.cluster_dir',type=unicode,
83 83 help='Set the cluster dir. This overrides the logic used by the '
84 84 '--profile option.',
85 85 default=NoConfigDefault,
86 86 metavar='Global.cluster_dir'),
87 87 parent_parser2.add_argument('--work-dir',
88 88 dest='Global.work_dir',type=unicode,
89 89 help='Set the working dir for the process.',
90 90 default=NoConfigDefault,
91 91 metavar='Global.work_dir')
92 92 parent_parser2.add_argument('--log-to-file',
93 93 action='store_true', dest='Global.log_to_file',
94 94 default=NoConfigDefault,
95 95 help='Log to a file in the log directory (default is stdout)'
96 96 )
97 97
98 98 subparsers = self.parser.add_subparsers(
99 99 dest='Global.subcommand',
100 100 title='ipcluster subcommands',
101 101 description='ipcluster has a variety of subcommands. '
102 102 'The general way of running ipcluster is "ipcluster <cmd> '
103 103 ' [options]""',
104 104 help='For more help, type "ipcluster <cmd> -h"')
105 105
106 106 parser_list = subparsers.add_parser(
107 107 'list',
108 108 help='List all clusters in cwd and ipython_dir.',
109 109 parents=[parent_parser1]
110 110 )
111 111
112 112 parser_create = subparsers.add_parser(
113 113 'create',
114 114 help='Create a new cluster directory.',
115 115 parents=[parent_parser1, parent_parser2]
116 116 )
117 117 parser_create.add_argument(
118 118 '--reset-config',
119 119 dest='Global.reset_config', action='store_true',
120 120 default=NoConfigDefault,
121 121 help='Recopy the default config files to the cluster directory. '
122 122 'You will loose any modifications you have made to these files.'
123 123 )
124 124
125 125 parser_start = subparsers.add_parser(
126 126 'start',
127 127 help='Start a cluster.',
128 128 parents=[parent_parser1, parent_parser2]
129 129 )
130 130 parser_start.add_argument(
131 131 '-n', '--number',
132 132 type=int, dest='Global.n',
133 133 default=NoConfigDefault,
134 134 help='The number of engines to start.',
135 135 metavar='Global.n'
136 136 )
137 137 parser_start.add_argument('--clean-logs',
138 138 dest='Global.clean_logs', action='store_true',
139 139 help='Delete old log flies before starting.',
140 140 default=NoConfigDefault
141 141 )
142 142 parser_start.add_argument('--no-clean-logs',
143 143 dest='Global.clean_logs', action='store_false',
144 144 help="Don't delete old log flies before starting.",
145 145 default=NoConfigDefault
146 146 )
147 147 parser_start.add_argument('--daemon',
148 148 dest='Global.daemonize', action='store_true',
149 149 help='Daemonize the ipcluster program. This implies --log-to-file',
150 150 default=NoConfigDefault
151 151 )
152 152 parser_start.add_argument('--no-daemon',
153 153 dest='Global.daemonize', action='store_false',
154 154 help="Dont't daemonize the ipcluster program.",
155 155 default=NoConfigDefault
156 156 )
157 157
158 158 parser_start = subparsers.add_parser(
159 159 'stop',
160 160 help='Stop a cluster.',
161 161 parents=[parent_parser1, parent_parser2]
162 162 )
163 163 parser_start.add_argument('--signal',
164 164 dest='Global.signal', type=int,
165 165 help="The signal number to use in stopping the cluster (default=2).",
166 166 metavar="Global.signal",
167 167 default=NoConfigDefault
168 168 )
169 169
170 170
171 171 default_config_file_name = u'ipcluster_config.py'
172 172
173 173
174 174 class IPClusterApp(ApplicationWithClusterDir):
175 175
176 176 name = u'ipcluster'
177 177 description = 'Start an IPython cluster (controller and engines).'
178 178 config_file_name = default_config_file_name
179 179 default_log_level = logging.INFO
180 180 auto_create_cluster_dir = False
181 181
182 182 def create_default_config(self):
183 183 super(IPClusterApp, self).create_default_config()
184 184 self.default_config.Global.controller_launcher = \
185 185 'IPython.kernel.launcher.LocalControllerLauncher'
186 186 self.default_config.Global.engine_launcher = \
187 187 'IPython.kernel.launcher.LocalEngineSetLauncher'
188 188 self.default_config.Global.n = 2
189 189 self.default_config.Global.reset_config = False
190 190 self.default_config.Global.clean_logs = True
191 191 self.default_config.Global.signal = 2
192 192 self.default_config.Global.daemonize = False
193 193
194 194 def create_command_line_config(self):
195 195 """Create and return a command line config loader."""
196 196 return IPClusterCLLoader(
197 197 description=self.description,
198 198 version=release.version
199 199 )
200 200
201 201 def find_resources(self):
202 202 subcommand = self.command_line_config.Global.subcommand
203 203 if subcommand=='list':
204 204 self.list_cluster_dirs()
205 205 # Exit immediately because there is nothing left to do.
206 206 self.exit()
207 207 elif subcommand=='create':
208 208 self.auto_create_cluster_dir = True
209 209 super(IPClusterApp, self).find_resources()
210 210 elif subcommand=='start' or subcommand=='stop':
211 self.auto_create_cluster_dir = False
211 self.auto_create_cluster_dir = True
212 212 try:
213 213 super(IPClusterApp, self).find_resources()
214 214 except ClusterDirError:
215 215 raise ClusterDirError(
216 216 "Could not find a cluster directory. A cluster dir must "
217 217 "be created before running 'ipcluster start'. Do "
218 218 "'ipcluster create -h' or 'ipcluster list -h' for more "
219 219 "information about creating and listing cluster dirs."
220 220 )
221 221
222 222 def list_cluster_dirs(self):
223 223 # Find the search paths
224 224 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
225 225 if cluster_dir_paths:
226 226 cluster_dir_paths = cluster_dir_paths.split(':')
227 227 else:
228 228 cluster_dir_paths = []
229 229 try:
230 230 ipython_dir = self.command_line_config.Global.ipython_dir
231 231 except AttributeError:
232 232 ipython_dir = self.default_config.Global.ipython_dir
233 233 paths = [os.getcwd(), ipython_dir] + \
234 234 cluster_dir_paths
235 235 paths = list(set(paths))
236 236
237 237 self.log.info('Searching for cluster dirs in paths: %r' % paths)
238 238 for path in paths:
239 239 files = os.listdir(path)
240 240 for f in files:
241 241 full_path = os.path.join(path, f)
242 242 if os.path.isdir(full_path) and f.startswith('cluster_'):
243 243 profile = full_path.split('_')[-1]
244 244 start_cmd = 'ipcluster start -p %s -n 4' % profile
245 245 print start_cmd + " ==> " + full_path
246 246
247 247 def pre_construct(self):
248 248 # IPClusterApp.pre_construct() is where we cd to the working directory.
249 249 super(IPClusterApp, self).pre_construct()
250 250 config = self.master_config
251 251 try:
252 252 daemon = config.Global.daemonize
253 253 if daemon:
254 254 config.Global.log_to_file = True
255 255 except AttributeError:
256 256 pass
257 257
258 258 def construct(self):
259 259 config = self.master_config
260 if config.Global.subcommand=='list':
261 pass
262 elif config.Global.subcommand=='create':
260 subcmd = config.Global.subcommand
261 reset = config.Global.reset_config
262 if subcmd == 'list':
263 return
264 if subcmd == 'create':
263 265 self.log.info('Copying default config files to cluster directory '
264 '[overwrite=%r]' % (config.Global.reset_config,))
265 self.cluster_dir_obj.copy_all_config_files(overwrite=config.Global.reset_config)
266 elif config.Global.subcommand=='start':
266 '[overwrite=%r]' % (reset,))
267 self.cluster_dir_obj.copy_all_config_files(overwrite=reset)
268 if subcmd =='start':
269 self.cluster_dir_obj.copy_all_config_files(overwrite=False)
267 270 self.start_logging()
268 271 reactor.callWhenRunning(self.start_launchers)
269 272
270 273 def start_launchers(self):
271 274 config = self.master_config
272 275
273 276 # Create the launchers. In both bases, we set the work_dir of
274 277 # the launcher to the cluster_dir. This is where the launcher's
275 278 # subprocesses will be launched. It is not where the controller
276 279 # and engine will be launched.
277 280 el_class = import_item(config.Global.engine_launcher)
278 281 self.engine_launcher = el_class(
279 282 work_dir=self.cluster_dir, config=config
280 283 )
281 284 cl_class = import_item(config.Global.controller_launcher)
282 285 self.controller_launcher = cl_class(
283 286 work_dir=self.cluster_dir, config=config
284 287 )
285 288
286 289 # Setup signals
287 290 signal.signal(signal.SIGINT, self.sigint_handler)
288 291
289 292 # Setup the observing of stopping. If the controller dies, shut
290 293 # everything down as that will be completely fatal for the engines.
291 294 d1 = self.controller_launcher.observe_stop()
292 295 d1.addCallback(self.stop_launchers)
293 296 # But, we don't monitor the stopping of engines. An engine dying
294 297 # is just fine and in principle a user could start a new engine.
295 298 # Also, if we did monitor engine stopping, it is difficult to
296 299 # know what to do when only some engines die. Currently, the
297 300 # observing of engine stopping is inconsistent. Some launchers
298 301 # might trigger on a single engine stopping, other wait until
299 302 # all stop. TODO: think more about how to handle this.
300 303
301 304 # Start the controller and engines
302 305 self._stopping = False # Make sure stop_launchers is not called 2x.
303 306 d = self.start_controller()
304 307 d.addCallback(self.start_engines)
305 308 d.addCallback(self.startup_message)
306 309 # If the controller or engines fail to start, stop everything
307 310 d.addErrback(self.stop_launchers)
308 311 return d
309 312
310 313 def startup_message(self, r=None):
311 314 log.msg("IPython cluster: started")
312 315 return r
313 316
314 317 def start_controller(self, r=None):
315 318 # log.msg("In start_controller")
316 319 config = self.master_config
317 320 d = self.controller_launcher.start(
318 321 cluster_dir=config.Global.cluster_dir
319 322 )
320 323 return d
321 324
322 325 def start_engines(self, r=None):
323 326 # log.msg("In start_engines")
324 327 config = self.master_config
325 328 d = self.engine_launcher.start(
326 329 config.Global.n,
327 330 cluster_dir=config.Global.cluster_dir
328 331 )
329 332 return d
330 333
331 334 def stop_controller(self, r=None):
332 335 # log.msg("In stop_controller")
333 336 if self.controller_launcher.running:
334 337 d = self.controller_launcher.stop()
335 338 d.addErrback(self.log_err)
336 339 return d
337 340 else:
338 341 return defer.succeed(None)
339 342
340 343 def stop_engines(self, r=None):
341 344 # log.msg("In stop_engines")
342 345 if self.engine_launcher.running:
343 346 d = self.engine_launcher.stop()
344 347 d.addErrback(self.log_err)
345 348 return d
346 349 else:
347 350 return defer.succeed(None)
348 351
349 352 def log_err(self, f):
350 353 log.msg(f.getTraceback())
351 354 return None
352 355
353 356 def stop_launchers(self, r=None):
354 357 if not self._stopping:
355 358 self._stopping = True
356 359 if isinstance(r, failure.Failure):
357 360 log.msg('Unexpected error in ipcluster:')
358 361 log.msg(r.getTraceback())
359 362 log.msg("IPython cluster: stopping")
360 363 d= self.stop_engines()
361 364 d2 = self.stop_controller()
362 365 # Wait a few seconds to let things shut down.
363 366 reactor.callLater(4.0, reactor.stop)
364 367
365 368 def sigint_handler(self, signum, frame):
366 369 self.stop_launchers()
367 370
368 371 def start_logging(self):
369 372 # Remove old log files of the controller and engine
370 373 if self.master_config.Global.clean_logs:
371 374 log_dir = self.master_config.Global.log_dir
372 375 for f in os.listdir(log_dir):
373 376 if f.startswith('ipengine' + '-'):
374 377 if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'):
375 378 os.remove(os.path.join(log_dir, f))
376 379 if f.startswith('ipcontroller' + '-'):
377 380 if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'):
378 381 os.remove(os.path.join(log_dir, f))
379 382 # This will remote old log files for ipcluster itself
380 383 super(IPClusterApp, self).start_logging()
381 384
382 385 def start_app(self):
383 386 """Start the application, depending on what subcommand is used."""
384 387 subcmd = self.master_config.Global.subcommand
385 388 if subcmd=='create' or subcmd=='list':
386 389 return
387 390 elif subcmd=='start':
388 391 self.start_app_start()
389 392 elif subcmd=='stop':
390 393 self.start_app_stop()
391 394
392 395 def start_app_start(self):
393 396 """Start the app for the start subcommand."""
394 397 config = self.master_config
395 398 # First see if the cluster is already running
396 399 try:
397 400 pid = self.get_pid_from_file()
398 401 except PIDFileError:
399 402 pass
400 403 else:
401 404 self.log.critical(
402 405 'Cluster is already running with [pid=%s]. '
403 406 'use "ipcluster stop" to stop the cluster.' % pid
404 407 )
405 408 # Here I exit with a unusual exit status that other processes
406 409 # can watch for to learn how I existed.
407 410 self.exit(ALREADY_STARTED)
408 411
409 412 # Now log and daemonize
410 413 self.log.info(
411 414 'Starting ipcluster with [daemon=%r]' % config.Global.daemonize
412 415 )
416 # TODO: Get daemonize working on Windows or as a Windows Server.
413 417 if config.Global.daemonize:
414 418 if os.name=='posix':
415 419 daemonize()
416 420
417 421 # Now write the new pid file AFTER our new forked pid is active.
418 422 self.write_pid_file()
419 423 reactor.addSystemEventTrigger('during','shutdown', self.remove_pid_file)
420 424 reactor.run()
421 425
422 426 def start_app_stop(self):
423 427 """Start the app for the stop subcommand."""
424 428 config = self.master_config
425 429 try:
426 430 pid = self.get_pid_from_file()
427 431 except PIDFileError:
428 432 self.log.critical(
429 433 'Problem reading pid file, cluster is probably not running.'
430 434 )
431 435 # Here I exit with a unusual exit status that other processes
432 436 # can watch for to learn how I existed.
433 437 self.exit(ALREADY_STOPPED)
434 438 else:
435 439 if os.name=='posix':
436 440 sig = config.Global.signal
437 441 self.log.info(
438 442 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
439 443 )
440 444 os.kill(pid, sig)
441 445 elif os.name=='nt':
442 446 # As of right now, we don't support daemonize on Windows, so
443 447 # stop will not do anything. Minimally, it should clean up the
444 448 # old .pid files.
445 449 self.remove_pid_file()
446 450
447 451 def launch_new_instance():
448 452 """Create and run the IPython cluster."""
449 453 app = IPClusterApp()
450 454 app.start()
451 455
452 456
453 457 if __name__ == '__main__':
454 458 launch_new_instance()
455 459
@@ -1,867 +1,869 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 Facilities for launching IPython processes asynchronously.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import os
19 19 import re
20 20 import sys
21 21
22 22 from IPython.core.component import Component
23 23 from IPython.external import Itpl
24 24 from IPython.utils.traitlets import Str, Int, List, Unicode, Enum
25 25 from IPython.utils.platutils import find_cmd
26 26 from IPython.kernel.twistedutil import gatherBoth, make_deferred, sleep_deferred
27 27 from IPython.kernel.winhpcjob import (
28 28 WinHPCJob, WinHPCTask,
29 29 IPControllerTask, IPEngineTask,
30 30 IPControllerJob, IPEngineSetJob
31 31 )
32 32
33 33 from twisted.internet import reactor, defer
34 34 from twisted.internet.defer import inlineCallbacks
35 35 from twisted.internet.protocol import ProcessProtocol
36 36 from twisted.internet.utils import getProcessOutput
37 37 from twisted.internet.error import ProcessDone, ProcessTerminated
38 38 from twisted.python import log
39 39 from twisted.python.failure import Failure
40 40
41 41 #-----------------------------------------------------------------------------
42 42 # Utilities
43 43 #-----------------------------------------------------------------------------
44 44
45 45
46 46 def find_controller_cmd():
47 47 """Find the command line ipcontroller program in a cross platform way."""
48 48 if sys.platform == 'win32':
49 49 # This logic is needed because the ipcontroller script doesn't
50 50 # always get installed in the same way or in the same location.
51 51 from IPython.kernel import ipcontrollerapp
52 52 script_location = ipcontrollerapp.__file__.replace('.pyc', '.py')
53 53 # The -u option here turns on unbuffered output, which is required
54 54 # on Win32 to prevent wierd conflict and problems with Twisted.
55 55 # Also, use sys.executable to make sure we are picking up the
56 56 # right python exe.
57 57 cmd = [sys.executable, '-u', script_location]
58 58 else:
59 59 # ipcontroller has to be on the PATH in this case.
60 60 cmd = ['ipcontroller']
61 61 return cmd
62 62
63 63
64 64 def find_engine_cmd():
65 65 """Find the command line ipengine program in a cross platform way."""
66 66 if sys.platform == 'win32':
67 67 # This logic is needed because the ipengine script doesn't
68 68 # always get installed in the same way or in the same location.
69 69 from IPython.kernel import ipengineapp
70 70 script_location = ipengineapp.__file__.replace('.pyc', '.py')
71 71 # The -u option here turns on unbuffered output, which is required
72 72 # on Win32 to prevent wierd conflict and problems with Twisted.
73 73 # Also, use sys.executable to make sure we are picking up the
74 74 # right python exe.
75 75 cmd = [sys.executable, '-u', script_location]
76 76 else:
77 77 # ipcontroller has to be on the PATH in this case.
78 78 cmd = ['ipengine']
79 79 return cmd
80 80
81 81
82 82 #-----------------------------------------------------------------------------
83 83 # Base launchers and errors
84 84 #-----------------------------------------------------------------------------
85 85
86 86
87 87 class LauncherError(Exception):
88 88 pass
89 89
90 90
91 91 class ProcessStateError(LauncherError):
92 92 pass
93 93
94 94
95 95 class UnknownStatus(LauncherError):
96 96 pass
97 97
98 98
99 99 class BaseLauncher(Component):
100 100 """An asbtraction for starting, stopping and signaling a process."""
101 101
102 102 # In all of the launchers, the work_dir is where child processes will be
103 103 # run. This will usually be the cluster_dir, but may not be. any work_dir
104 104 # passed into the __init__ method will override the config value.
105 105 # This should not be used to set the work_dir for the actual engine
106 106 # and controller. Instead, use their own config files or the
107 107 # controller_args, engine_args attributes of the launchers to add
108 108 # the --work-dir option.
109 109 work_dir = Unicode(u'')
110 110
111 111 def __init__(self, work_dir, parent=None, name=None, config=None):
112 112 super(BaseLauncher, self).__init__(parent, name, config)
113 113 self.work_dir = work_dir
114 114 self.state = 'before' # can be before, running, after
115 115 self.stop_deferreds = []
116 116 self.start_data = None
117 117 self.stop_data = None
118 118
119 119 @property
120 120 def args(self):
121 121 """A list of cmd and args that will be used to start the process.
122 122
123 123 This is what is passed to :func:`spawnProcess` and the first element
124 124 will be the process name.
125 125 """
126 126 return self.find_args()
127 127
128 128 def find_args(self):
129 129 """The ``.args`` property calls this to find the args list.
130 130
131 131 Subcommand should implement this to construct the cmd and args.
132 132 """
133 133 raise NotImplementedError('find_args must be implemented in a subclass')
134 134
135 135 @property
136 136 def arg_str(self):
137 137 """The string form of the program arguments."""
138 138 return ' '.join(self.args)
139 139
140 140 @property
141 141 def running(self):
142 142 """Am I running."""
143 143 if self.state == 'running':
144 144 return True
145 145 else:
146 146 return False
147 147
148 148 def start(self):
149 149 """Start the process.
150 150
151 151 This must return a deferred that fires with information about the
152 152 process starting (like a pid, job id, etc.).
153 153 """
154 154 return defer.fail(
155 155 Failure(NotImplementedError(
156 156 'start must be implemented in a subclass')
157 157 )
158 158 )
159 159
160 160 def stop(self):
161 161 """Stop the process and notify observers of stopping.
162 162
163 163 This must return a deferred that fires with information about the
164 164 processing stopping, like errors that occur while the process is
165 165 attempting to be shut down. This deferred won't fire when the process
166 166 actually stops. To observe the actual process stopping, see
167 167 :func:`observe_stop`.
168 168 """
169 169 return defer.fail(
170 170 Failure(NotImplementedError(
171 171 'stop must be implemented in a subclass')
172 172 )
173 173 )
174 174
175 175 def observe_stop(self):
176 176 """Get a deferred that will fire when the process stops.
177 177
178 178 The deferred will fire with data that contains information about
179 179 the exit status of the process.
180 180 """
181 181 if self.state=='after':
182 182 return defer.succeed(self.stop_data)
183 183 else:
184 184 d = defer.Deferred()
185 185 self.stop_deferreds.append(d)
186 186 return d
187 187
188 188 def notify_start(self, data):
189 189 """Call this to trigger startup actions.
190 190
191 191 This logs the process startup and sets the state to 'running'. It is
192 192 a pass-through so it can be used as a callback.
193 193 """
194 194
195 195 log.msg('Process %r started: %r' % (self.args[0], data))
196 196 self.start_data = data
197 197 self.state = 'running'
198 198 return data
199 199
200 200 def notify_stop(self, data):
201 201 """Call this to trigger process stop actions.
202 202
203 203 This logs the process stopping and sets the state to 'after'. Call
204 204 this to trigger all the deferreds from :func:`observe_stop`."""
205 205
206 206 log.msg('Process %r stopped: %r' % (self.args[0], data))
207 207 self.stop_data = data
208 208 self.state = 'after'
209 209 for i in range(len(self.stop_deferreds)):
210 210 d = self.stop_deferreds.pop()
211 211 d.callback(data)
212 212 return data
213 213
214 214 def signal(self, sig):
215 215 """Signal the process.
216 216
217 217 Return a semi-meaningless deferred after signaling the process.
218 218
219 219 Parameters
220 220 ----------
221 221 sig : str or int
222 222 'KILL', 'INT', etc., or any signal number
223 223 """
224 224 return defer.fail(
225 225 Failure(NotImplementedError(
226 226 'signal must be implemented in a subclass')
227 227 )
228 228 )
229 229
230 230
231 231 #-----------------------------------------------------------------------------
232 232 # Local process launchers
233 233 #-----------------------------------------------------------------------------
234 234
235 235
236 236 class LocalProcessLauncherProtocol(ProcessProtocol):
237 237 """A ProcessProtocol to go with the LocalProcessLauncher."""
238 238
239 239 def __init__(self, process_launcher):
240 240 self.process_launcher = process_launcher
241 241 self.pid = None
242 242
243 243 def connectionMade(self):
244 244 self.pid = self.transport.pid
245 245 self.process_launcher.notify_start(self.transport.pid)
246 246
247 247 def processEnded(self, status):
248 248 value = status.value
249 249 if isinstance(value, ProcessDone):
250 250 self.process_launcher.notify_stop(
251 251 {'exit_code':0,
252 252 'signal':None,
253 253 'status':None,
254 254 'pid':self.pid
255 255 }
256 256 )
257 257 elif isinstance(value, ProcessTerminated):
258 258 self.process_launcher.notify_stop(
259 259 {'exit_code':value.exitCode,
260 260 'signal':value.signal,
261 261 'status':value.status,
262 262 'pid':self.pid
263 263 }
264 264 )
265 265 else:
266 266 raise UnknownStatus("Unknown exit status, this is probably a "
267 267 "bug in Twisted")
268 268
269 269 def outReceived(self, data):
270 270 log.msg(data)
271 271
272 272 def errReceived(self, data):
273 273 log.err(data)
274 274
275 275
276 276 class LocalProcessLauncher(BaseLauncher):
277 277 """Start and stop an external process in an asynchronous manner.
278 278
279 279 This will launch the external process with a working directory of
280 280 ``self.work_dir``.
281 281 """
282 282
283 283 # This is used to to construct self.args, which is passed to
284 284 # spawnProcess.
285 285 cmd_and_args = List([])
286 286
287 287 def __init__(self, work_dir, parent=None, name=None, config=None):
288 288 super(LocalProcessLauncher, self).__init__(
289 289 work_dir, parent, name, config
290 290 )
291 291 self.process_protocol = None
292 292 self.start_deferred = None
293 293
294 294 def find_args(self):
295 295 return self.cmd_and_args
296 296
297 297 def start(self):
298 298 if self.state == 'before':
299 299 self.process_protocol = LocalProcessLauncherProtocol(self)
300 300 self.start_deferred = defer.Deferred()
301 301 self.process_transport = reactor.spawnProcess(
302 302 self.process_protocol,
303 303 str(self.args[0]), # twisted expects these to be str, not unicode
304 304 [str(a) for a in self.args], # str expected, not unicode
305 305 env=os.environ,
306 306 path=self.work_dir # start in the work_dir
307 307 )
308 308 return self.start_deferred
309 309 else:
310 310 s = 'The process was already started and has state: %r' % self.state
311 311 return defer.fail(ProcessStateError(s))
312 312
313 313 def notify_start(self, data):
314 314 super(LocalProcessLauncher, self).notify_start(data)
315 315 self.start_deferred.callback(data)
316 316
317 317 def stop(self):
318 318 return self.interrupt_then_kill()
319 319
320 320 @make_deferred
321 321 def signal(self, sig):
322 322 if self.state == 'running':
323 323 self.process_transport.signalProcess(sig)
324 324
325 325 @inlineCallbacks
326 326 def interrupt_then_kill(self, delay=2.0):
327 327 """Send INT, wait a delay and then send KILL."""
328 328 yield self.signal('INT')
329 329 yield sleep_deferred(delay)
330 330 yield self.signal('KILL')
331 331
332 332
333 333 class LocalControllerLauncher(LocalProcessLauncher):
334 334 """Launch a controller as a regular external process."""
335 335
336 336 controller_cmd = List(find_controller_cmd(), config=True)
337 337 # Command line arguments to ipcontroller.
338 338 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
339 339
340 340 def find_args(self):
341 341 return self.controller_cmd + self.controller_args
342 342
343 343 def start(self, cluster_dir):
344 344 """Start the controller by cluster_dir."""
345 345 self.controller_args.extend(['--cluster-dir', cluster_dir])
346 346 self.cluster_dir = unicode(cluster_dir)
347 347 log.msg("Starting LocalControllerLauncher: %r" % self.args)
348 348 return super(LocalControllerLauncher, self).start()
349 349
350 350
351 351 class LocalEngineLauncher(LocalProcessLauncher):
352 352 """Launch a single engine as a regular externall process."""
353 353
354 354 engine_cmd = List(find_engine_cmd(), config=True)
355 355 # Command line arguments for ipengine.
356 356 engine_args = List(
357 357 ['--log-to-file','--log-level', '40'], config=True
358 358 )
359 359
360 360 def find_args(self):
361 361 return self.engine_cmd + self.engine_args
362 362
363 363 def start(self, cluster_dir):
364 364 """Start the engine by cluster_dir."""
365 365 self.engine_args.extend(['--cluster-dir', cluster_dir])
366 366 self.cluster_dir = unicode(cluster_dir)
367 367 return super(LocalEngineLauncher, self).start()
368 368
369 369
370 370 class LocalEngineSetLauncher(BaseLauncher):
371 371 """Launch a set of engines as regular external processes."""
372 372
373 373 # Command line arguments for ipengine.
374 374 engine_args = List(
375 375 ['--log-to-file','--log-level', '40'], config=True
376 376 )
377 377
378 378 def __init__(self, work_dir, parent=None, name=None, config=None):
379 379 super(LocalEngineSetLauncher, self).__init__(
380 380 work_dir, parent, name, config
381 381 )
382 382 self.launchers = []
383 383
384 384 def start(self, n, cluster_dir):
385 385 """Start n engines by profile or cluster_dir."""
386 386 self.cluster_dir = unicode(cluster_dir)
387 387 dlist = []
388 388 for i in range(n):
389 389 el = LocalEngineLauncher(self.work_dir, self)
390 390 # Copy the engine args over to each engine launcher.
391 391 import copy
392 392 el.engine_args = copy.deepcopy(self.engine_args)
393 393 d = el.start(cluster_dir)
394 394 if i==0:
395 395 log.msg("Starting LocalEngineSetLauncher: %r" % el.args)
396 396 self.launchers.append(el)
397 397 dlist.append(d)
398 398 # The consumeErrors here could be dangerous
399 399 dfinal = gatherBoth(dlist, consumeErrors=True)
400 400 dfinal.addCallback(self.notify_start)
401 401 return dfinal
402 402
403 403 def find_args(self):
404 404 return ['engine set']
405 405
406 406 def signal(self, sig):
407 407 dlist = []
408 408 for el in self.launchers:
409 409 d = el.signal(sig)
410 410 dlist.append(d)
411 411 dfinal = gatherBoth(dlist, consumeErrors=True)
412 412 return dfinal
413 413
414 414 def interrupt_then_kill(self, delay=1.0):
415 415 dlist = []
416 416 for el in self.launchers:
417 417 d = el.interrupt_then_kill(delay)
418 418 dlist.append(d)
419 419 dfinal = gatherBoth(dlist, consumeErrors=True)
420 420 return dfinal
421 421
422 422 def stop(self):
423 423 return self.interrupt_then_kill()
424 424
425 425 def observe_stop(self):
426 426 dlist = [el.observe_stop() for el in self.launchers]
427 427 dfinal = gatherBoth(dlist, consumeErrors=False)
428 428 dfinal.addCallback(self.notify_stop)
429 429 return dfinal
430 430
431 431
432 432 #-----------------------------------------------------------------------------
433 433 # MPIExec launchers
434 434 #-----------------------------------------------------------------------------
435 435
436 436
437 437 class MPIExecLauncher(LocalProcessLauncher):
438 438 """Launch an external process using mpiexec."""
439 439
440 440 # The mpiexec command to use in starting the process.
441 441 mpi_cmd = List(['mpiexec'], config=True)
442 442 # The command line arguments to pass to mpiexec.
443 443 mpi_args = List([], config=True)
444 444 # The program to start using mpiexec.
445 445 program = List(['date'], config=True)
446 446 # The command line argument to the program.
447 447 program_args = List([], config=True)
448 448 # The number of instances of the program to start.
449 449 n = Int(1, config=True)
450 450
451 451 def find_args(self):
452 452 """Build self.args using all the fields."""
453 453 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
454 454 self.program + self.program_args
455 455
456 456 def start(self, n):
457 457 """Start n instances of the program using mpiexec."""
458 458 self.n = n
459 459 return super(MPIExecLauncher, self).start()
460 460
461 461
462 462 class MPIExecControllerLauncher(MPIExecLauncher):
463 463 """Launch a controller using mpiexec."""
464 464
465 465 controller_cmd = List(find_controller_cmd(), config=True)
466 466 # Command line arguments to ipcontroller.
467 467 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
468 468 n = Int(1, config=False)
469 469
470 470 def start(self, cluster_dir):
471 471 """Start the controller by cluster_dir."""
472 472 self.controller_args.extend(['--cluster-dir', cluster_dir])
473 473 self.cluster_dir = unicode(cluster_dir)
474 474 log.msg("Starting MPIExecControllerLauncher: %r" % self.args)
475 475 return super(MPIExecControllerLauncher, self).start(1)
476 476
477 477 def find_args(self):
478 478 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
479 479 self.controller_cmd + self.controller_args
480 480
481 481
482 482 class MPIExecEngineSetLauncher(MPIExecLauncher):
483 483
484 484 engine_cmd = List(find_engine_cmd(), config=True)
485 485 # Command line arguments for ipengine.
486 486 engine_args = List(
487 487 ['--log-to-file','--log-level', '40'], config=True
488 488 )
489 489 n = Int(1, config=True)
490 490
491 491 def start(self, n, cluster_dir):
492 492 """Start n engines by profile or cluster_dir."""
493 493 self.engine_args.extend(['--cluster-dir', cluster_dir])
494 494 self.cluster_dir = unicode(cluster_dir)
495 495 self.n = n
496 496 log.msg('Starting MPIExecEngineSetLauncher: %r' % self.args)
497 497 return super(MPIExecEngineSetLauncher, self).start(n)
498 498
499 499 def find_args(self):
500 500 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
501 501 self.engine_cmd + self.engine_args
502 502
503 503
504 504 #-----------------------------------------------------------------------------
505 505 # SSH launchers
506 506 #-----------------------------------------------------------------------------
507 507
508 # TODO: Get SSH Launcher working again.
508 509
509 510 class SSHLauncher(BaseLauncher):
510 511 """A minimal launcher for ssh.
511 512
512 513 To be useful this will probably have to be extended to use the ``sshx``
513 514 idea for environment variables. There could be other things this needs
514 515 as well.
515 516 """
516 517
517 518 ssh_cmd = List(['ssh'], config=True)
518 519 ssh_args = List([], config=True)
519 520 program = List(['date'], config=True)
520 521 program_args = List([], config=True)
521 522 hostname = Str('', config=True)
522 523 user = Str('', config=True)
523 524 location = Str('')
524 525
525 526 def _hostname_changed(self, name, old, new):
526 527 self.location = '%s@%s' % (self.user, new)
527 528
528 529 def _user_changed(self, name, old, new):
529 530 self.location = '%s@%s' % (new, self.hostname)
530 531
531 532 def find_args(self):
532 533 return self.ssh_cmd + self.ssh_args + [self.location] + \
533 534 self.program + self.program_args
534 535
535 536 def start(self, n, hostname=None, user=None):
536 537 if hostname is not None:
537 538 self.hostname = hostname
538 539 if user is not None:
539 540 self.user = user
540 541 return super(SSHLauncher, self).start()
541 542
542 543
543 544 class SSHControllerLauncher(SSHLauncher):
544 545 pass
545 546
546 547
547 548 class SSHEngineSetLauncher(BaseLauncher):
548 549 pass
549 550
550 551
551 552 #-----------------------------------------------------------------------------
552 553 # Windows HPC Server 2008 scheduler launchers
553 554 #-----------------------------------------------------------------------------
554 555
555 556
556 557 # This is only used on Windows.
557 558 def find_job_cmd():
558 559 if os.name=='nt':
559 560 return find_cmd('job')
560 561 else:
561 562 return 'job'
562 563
563 564
564 565 class WindowsHPCLauncher(BaseLauncher):
565 566
566 567 # A regular expression used to get the job id from the output of the
567 568 # submit_command.
568 job_id_regexp = Str('\d+', config=True)
569 job_id_regexp = Str(r'\d+', config=True)
569 570 # The filename of the instantiated job script.
570 571 job_file_name = Unicode(u'ipython_job.xml', config=True)
571 572 # The full path to the instantiated job script. This gets made dynamically
572 573 # by combining the work_dir with the job_file_name.
573 574 job_file = Unicode(u'')
574 575 # The hostname of the scheduler to submit the job to
575 576 scheduler = Str('', config=True)
576 577 job_cmd = Str(find_job_cmd(), config=True)
577 578
578 579 def __init__(self, work_dir, parent=None, name=None, config=None):
579 580 super(WindowsHPCLauncher, self).__init__(
580 581 work_dir, parent, name, config
581 582 )
582 583
583 584 @property
584 585 def job_file(self):
585 586 return os.path.join(self.work_dir, self.job_file_name)
586 587
587 588 def write_job_file(self, n):
588 589 raise NotImplementedError("Implement write_job_file in a subclass.")
589 590
590 591 def find_args(self):
591 592 return ['job.exe']
592 593
593 594 def parse_job_id(self, output):
594 595 """Take the output of the submit command and return the job id."""
595 596 m = re.search(self.job_id_regexp, output)
596 597 if m is not None:
597 598 job_id = m.group()
598 599 else:
599 600 raise LauncherError("Job id couldn't be determined: %s" % output)
600 601 self.job_id = job_id
601 602 log.msg('Job started with job id: %r' % job_id)
602 603 return job_id
603 604
604 605 @inlineCallbacks
605 606 def start(self, n):
606 607 """Start n copies of the process using the Win HPC job scheduler."""
607 608 self.write_job_file(n)
608 609 args = [
609 610 'submit',
610 611 '/jobfile:%s' % self.job_file,
611 612 '/scheduler:%s' % self.scheduler
612 613 ]
613 614 log.msg("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
614 615 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
615 616 output = yield getProcessOutput(str(self.job_cmd),
616 617 [str(a) for a in args],
617 618 env=dict((str(k),str(v)) for k,v in os.environ.items()),
618 619 path=self.work_dir
619 620 )
620 621 job_id = self.parse_job_id(output)
621 622 self.notify_start(job_id)
622 623 defer.returnValue(job_id)
623 624
624 625 @inlineCallbacks
625 626 def stop(self):
626 627 args = [
627 628 'cancel',
628 629 self.job_id,
629 630 '/scheduler:%s' % self.scheduler
630 631 ]
631 632 log.msg("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
632 633 try:
633 634 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
634 635 output = yield getProcessOutput(str(self.job_cmd),
635 636 [str(a) for a in args],
636 637 env=dict((str(k),str(v)) for k,v in os.environ.items()),
637 638 path=self.work_dir
638 639 )
639 640 except:
640 641 output = 'The job already appears to be stoppped: %r' % self.job_id
641 642 self.notify_stop(output) # Pass the output of the kill cmd
642 643 defer.returnValue(output)
643 644
644 645
645 646 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
646 647
647 648 job_file_name = Unicode(u'ipcontroller_job.xml', config=True)
648 649 extra_args = List([], config=False)
649 650
650 651 def write_job_file(self, n):
651 652 job = IPControllerJob(self)
652 653
653 654 t = IPControllerTask(self)
654 655 # The tasks work directory is *not* the actual work directory of
655 656 # the controller. It is used as the base path for the stdout/stderr
656 657 # files that the scheduler redirects to.
657 658 t.work_directory = self.cluster_dir
658 659 # Add the --cluster-dir and from self.start().
659 660 t.controller_args.extend(self.extra_args)
660 661 job.add_task(t)
661 662
662 663 log.msg("Writing job description file: %s" % self.job_file)
663 664 job.write(self.job_file)
664 665
665 666 @property
666 667 def job_file(self):
667 668 return os.path.join(self.cluster_dir, self.job_file_name)
668 669
669 670 def start(self, cluster_dir):
670 671 """Start the controller by cluster_dir."""
671 672 self.extra_args = ['--cluster-dir', cluster_dir]
672 673 self.cluster_dir = unicode(cluster_dir)
673 674 return super(WindowsHPCControllerLauncher, self).start(1)
674 675
675 676
676 677 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
677 678
678 679 job_file_name = Unicode(u'ipengineset_job.xml', config=True)
679 680 extra_args = List([], config=False)
680 681
681 682 def write_job_file(self, n):
682 683 job = IPEngineSetJob(self)
683 684
684 685 for i in range(n):
685 686 t = IPEngineTask(self)
686 687 # The tasks work directory is *not* the actual work directory of
687 688 # the engine. It is used as the base path for the stdout/stderr
688 689 # files that the scheduler redirects to.
689 690 t.work_directory = self.cluster_dir
690 691 # Add the --cluster-dir and from self.start().
691 692 t.engine_args.extend(self.extra_args)
692 693 job.add_task(t)
693 694
694 695 log.msg("Writing job description file: %s" % self.job_file)
695 696 job.write(self.job_file)
696 697
697 698 @property
698 699 def job_file(self):
699 700 return os.path.join(self.cluster_dir, self.job_file_name)
700 701
701 702 def start(self, n, cluster_dir):
702 703 """Start the controller by cluster_dir."""
703 704 self.extra_args = ['--cluster-dir', cluster_dir]
704 705 self.cluster_dir = unicode(cluster_dir)
705 706 return super(WindowsHPCEngineSetLauncher, self).start(n)
706 707
707 708
708 709 #-----------------------------------------------------------------------------
709 710 # Batch (PBS) system launchers
710 711 #-----------------------------------------------------------------------------
711 712
713 # TODO: Get PBS launcher working again.
712 714
713 715 class BatchSystemLauncher(BaseLauncher):
714 716 """Launch an external process using a batch system.
715 717
716 718 This class is designed to work with UNIX batch systems like PBS, LSF,
717 719 GridEngine, etc. The overall model is that there are different commands
718 720 like qsub, qdel, etc. that handle the starting and stopping of the process.
719 721
720 722 This class also has the notion of a batch script. The ``batch_template``
721 723 attribute can be set to a string that is a template for the batch script.
722 724 This template is instantiated using Itpl. Thus the template can use
723 725 ${n} fot the number of instances. Subclasses can add additional variables
724 726 to the template dict.
725 727 """
726 728
727 729 # Subclasses must fill these in. See PBSEngineSet
728 730 # The name of the command line program used to submit jobs.
729 731 submit_command = Str('', config=True)
730 732 # The name of the command line program used to delete jobs.
731 733 delete_command = Str('', config=True)
732 734 # A regular expression used to get the job id from the output of the
733 735 # submit_command.
734 736 job_id_regexp = Str('', config=True)
735 737 # The string that is the batch script template itself.
736 738 batch_template = Str('', config=True)
737 739 # The filename of the instantiated batch script.
738 740 batch_file_name = Unicode(u'batch_script', config=True)
739 741 # The full path to the instantiated batch script.
740 742 batch_file = Unicode(u'')
741 743
742 744 def __init__(self, work_dir, parent=None, name=None, config=None):
743 745 super(BatchSystemLauncher, self).__init__(
744 746 work_dir, parent, name, config
745 747 )
746 748 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
747 749 self.context = {}
748 750
749 751 def parse_job_id(self, output):
750 752 """Take the output of the submit command and return the job id."""
751 753 m = re.match(self.job_id_regexp, output)
752 754 if m is not None:
753 755 job_id = m.group()
754 756 else:
755 757 raise LauncherError("Job id couldn't be determined: %s" % output)
756 758 self.job_id = job_id
757 759 log.msg('Job started with job id: %r' % job_id)
758 760 return job_id
759 761
760 762 def write_batch_script(self, n):
761 763 """Instantiate and write the batch script to the work_dir."""
762 764 self.context['n'] = n
763 765 script_as_string = Itpl.itplns(self.batch_template, self.context)
764 766 log.msg('Writing instantiated batch script: %s' % self.batch_file)
765 767 f = open(self.batch_file, 'w')
766 768 f.write(script_as_string)
767 769 f.close()
768 770
769 771 @inlineCallbacks
770 772 def start(self, n):
771 773 """Start n copies of the process using a batch system."""
772 774 self.write_batch_script(n)
773 775 output = yield getProcessOutput(self.submit_command,
774 776 [self.batch_file], env=os.environ)
775 777 job_id = self.parse_job_id(output)
776 778 self.notify_start(job_id)
777 779 defer.returnValue(job_id)
778 780
779 781 @inlineCallbacks
780 782 def stop(self):
781 783 output = yield getProcessOutput(self.delete_command,
782 784 [self.job_id], env=os.environ
783 785 )
784 786 self.notify_stop(output) # Pass the output of the kill cmd
785 787 defer.returnValue(output)
786 788
787 789
788 790 class PBSLauncher(BatchSystemLauncher):
789 791 """A BatchSystemLauncher subclass for PBS."""
790 792
791 793 submit_command = Str('qsub', config=True)
792 794 delete_command = Str('qdel', config=True)
793 job_id_regexp = Str('\d+', config=True)
795 job_id_regexp = Str(r'\d+', config=True)
794 796 batch_template = Str('', config=True)
795 797 batch_file_name = Unicode(u'pbs_batch_script', config=True)
796 798 batch_file = Unicode(u'')
797 799
798 800
799 801 class PBSControllerLauncher(PBSLauncher):
800 802 """Launch a controller using PBS."""
801 803
802 804 batch_file_name = Unicode(u'pbs_batch_script_controller', config=True)
803 805
804 806 def start(self, cluster_dir):
805 807 """Start the controller by profile or cluster_dir."""
806 808 # Here we save profile and cluster_dir in the context so they
807 809 # can be used in the batch script template as ${profile} and
808 810 # ${cluster_dir}
809 811 self.context['cluster_dir'] = cluster_dir
810 812 self.cluster_dir = unicode(cluster_dir)
811 813 log.msg("Starting PBSControllerLauncher: %r" % self.args)
812 814 return super(PBSControllerLauncher, self).start(1)
813 815
814 816
815 817 class PBSEngineSetLauncher(PBSLauncher):
816 818
817 819 batch_file_name = Unicode(u'pbs_batch_script_engines', config=True)
818 820
819 821 def start(self, n, cluster_dir):
820 822 """Start n engines by profile or cluster_dir."""
821 823 self.program_args.extend(['--cluster-dir', cluster_dir])
822 824 self.cluster_dir = unicode(cluster_dir)
823 825 log.msg('Starting PBSEngineSetLauncher: %r' % self.args)
824 826 return super(PBSEngineSetLauncher, self).start(n)
825 827
826 828
827 829 #-----------------------------------------------------------------------------
828 830 # A launcher for ipcluster itself!
829 831 #-----------------------------------------------------------------------------
830 832
831 833
832 834 def find_ipcluster_cmd():
833 835 """Find the command line ipcluster program in a cross platform way."""
834 836 if sys.platform == 'win32':
835 837 # This logic is needed because the ipcluster script doesn't
836 838 # always get installed in the same way or in the same location.
837 839 from IPython.kernel import ipclusterapp
838 840 script_location = ipclusterapp.__file__.replace('.pyc', '.py')
839 841 # The -u option here turns on unbuffered output, which is required
840 842 # on Win32 to prevent wierd conflict and problems with Twisted.
841 843 # Also, use sys.executable to make sure we are picking up the
842 844 # right python exe.
843 845 cmd = [sys.executable, '-u', script_location]
844 846 else:
845 847 # ipcontroller has to be on the PATH in this case.
846 848 cmd = ['ipcluster']
847 849 return cmd
848 850
849 851
850 852 class IPClusterLauncher(LocalProcessLauncher):
851 853 """Launch the ipcluster program in an external process."""
852 854
853 855 ipcluster_cmd = List(find_ipcluster_cmd(), config=True)
854 856 # Command line arguments to pass to ipcluster.
855 857 ipcluster_args = List(
856 858 ['--clean-logs', '--log-to-file', '--log-level', '40'], config=True)
857 859 ipcluster_subcommand = Str('start')
858 860 ipcluster_n = Int(2)
859 861
860 862 def find_args(self):
861 863 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
862 864 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
863 865
864 866 def start(self):
865 867 log.msg("Starting ipcluster: %r" % self.args)
866 868 return super(IPClusterLauncher, self).start()
867 869
General Comments 0
You need to be logged in to leave comments. Login now