##// END OF EJS Templates
untwist PBS, WinHPC Launchers in newparallel
MinRK -
Show More
@@ -1,536 +1,538 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython cluster directory
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 from __future__ import with_statement
19 19
20 import re
20 21 import os
21 22 import shutil
22 23 import sys
23 24 import logging
24 25 import warnings
25 26
26 27 from IPython.config.loader import PyFileConfigLoader
27 28 from IPython.core.application import Application, BaseAppConfigLoader
28 29 from IPython.config.configurable import Configurable
29 30 from IPython.core.crashhandler import CrashHandler
30 31 from IPython.core import release
31 32 from IPython.utils.path import (
32 33 get_ipython_package_dir,
33 34 expand_path
34 35 )
35 36 from IPython.utils.traitlets import Unicode
36 37
37 38 #-----------------------------------------------------------------------------
38 39 # Module errors
39 40 #-----------------------------------------------------------------------------
40 41
41 42 class ClusterDirError(Exception):
42 43 pass
43 44
44 45
45 46 class PIDFileError(Exception):
46 47 pass
47 48
48 49
49 50 #-----------------------------------------------------------------------------
50 51 # Class for managing cluster directories
51 52 #-----------------------------------------------------------------------------
52 53
53 54 class ClusterDir(Configurable):
54 55 """An object to manage the cluster directory and its resources.
55 56
56 57 The cluster directory is used by :command:`ipengine`,
57 58 :command:`ipcontroller` and :command:`ipclsuter` to manage the
58 59 configuration, logging and security of these applications.
59 60
60 61 This object knows how to find, create and manage these directories. This
61 62 should be used by any code that want's to handle cluster directories.
62 63 """
63 64
64 65 security_dir_name = Unicode('security')
65 66 log_dir_name = Unicode('log')
66 67 pid_dir_name = Unicode('pid')
67 68 security_dir = Unicode(u'')
68 69 log_dir = Unicode(u'')
69 70 pid_dir = Unicode(u'')
70 71 location = Unicode(u'')
71 72
72 73 def __init__(self, location=u''):
73 74 super(ClusterDir, self).__init__(location=location)
74 75
75 76 def _location_changed(self, name, old, new):
76 77 if not os.path.isdir(new):
77 78 os.makedirs(new)
78 79 self.security_dir = os.path.join(new, self.security_dir_name)
79 80 self.log_dir = os.path.join(new, self.log_dir_name)
80 81 self.pid_dir = os.path.join(new, self.pid_dir_name)
81 82 self.check_dirs()
82 83
83 84 def _log_dir_changed(self, name, old, new):
84 85 self.check_log_dir()
85 86
86 87 def check_log_dir(self):
87 88 if not os.path.isdir(self.log_dir):
88 89 os.mkdir(self.log_dir)
89 90
90 91 def _security_dir_changed(self, name, old, new):
91 92 self.check_security_dir()
92 93
93 94 def check_security_dir(self):
94 95 if not os.path.isdir(self.security_dir):
95 96 os.mkdir(self.security_dir, 0700)
96 97 os.chmod(self.security_dir, 0700)
97 98
98 99 def _pid_dir_changed(self, name, old, new):
99 100 self.check_pid_dir()
100 101
101 102 def check_pid_dir(self):
102 103 if not os.path.isdir(self.pid_dir):
103 104 os.mkdir(self.pid_dir, 0700)
104 105 os.chmod(self.pid_dir, 0700)
105 106
106 107 def check_dirs(self):
107 108 self.check_security_dir()
108 109 self.check_log_dir()
109 110 self.check_pid_dir()
110 111
111 112 def load_config_file(self, filename):
112 113 """Load a config file from the top level of the cluster dir.
113 114
114 115 Parameters
115 116 ----------
116 117 filename : unicode or str
117 118 The filename only of the config file that must be located in
118 119 the top-level of the cluster directory.
119 120 """
120 121 loader = PyFileConfigLoader(filename, self.location)
121 122 return loader.load_config()
122 123
123 124 def copy_config_file(self, config_file, path=None, overwrite=False):
124 125 """Copy a default config file into the active cluster directory.
125 126
126 127 Default configuration files are kept in :mod:`IPython.config.default`.
127 128 This function moves these from that location to the working cluster
128 129 directory.
129 130 """
130 131 if path is None:
131 132 import IPython.config.default
132 133 path = IPython.config.default.__file__.split(os.path.sep)[:-1]
133 134 path = os.path.sep.join(path)
134 135 src = os.path.join(path, config_file)
135 136 dst = os.path.join(self.location, config_file)
136 137 if not os.path.isfile(dst) or overwrite:
137 138 shutil.copy(src, dst)
138 139
139 140 def copy_all_config_files(self, path=None, overwrite=False):
140 141 """Copy all config files into the active cluster directory."""
141 142 for f in [u'ipcontroller_config.py', u'ipengine_config.py',
142 143 u'ipcluster_config.py']:
143 144 self.copy_config_file(f, path=path, overwrite=overwrite)
144 145
145 146 @classmethod
146 147 def create_cluster_dir(csl, cluster_dir):
147 148 """Create a new cluster directory given a full path.
148 149
149 150 Parameters
150 151 ----------
151 152 cluster_dir : str
152 153 The full path to the cluster directory. If it does exist, it will
153 154 be used. If not, it will be created.
154 155 """
155 156 return ClusterDir(location=cluster_dir)
156 157
157 158 @classmethod
158 159 def create_cluster_dir_by_profile(cls, path, profile=u'default'):
159 160 """Create a cluster dir by profile name and path.
160 161
161 162 Parameters
162 163 ----------
163 164 path : str
164 165 The path (directory) to put the cluster directory in.
165 166 profile : str
166 167 The name of the profile. The name of the cluster directory will
167 168 be "clusterz_<profile>".
168 169 """
169 170 if not os.path.isdir(path):
170 171 raise ClusterDirError('Directory not found: %s' % path)
171 172 cluster_dir = os.path.join(path, u'clusterz_' + profile)
172 173 return ClusterDir(location=cluster_dir)
173 174
174 175 @classmethod
175 176 def find_cluster_dir_by_profile(cls, ipython_dir, profile=u'default'):
176 177 """Find an existing cluster dir by profile name, return its ClusterDir.
177 178
178 179 This searches through a sequence of paths for a cluster dir. If it
179 180 is not found, a :class:`ClusterDirError` exception will be raised.
180 181
181 182 The search path algorithm is:
182 183 1. ``os.getcwd()``
183 184 2. ``ipython_dir``
184 185 3. The directories found in the ":" separated
185 186 :env:`IPCLUSTER_DIR_PATH` environment variable.
186 187
187 188 Parameters
188 189 ----------
189 190 ipython_dir : unicode or str
190 191 The IPython directory to use.
191 192 profile : unicode or str
192 193 The name of the profile. The name of the cluster directory
193 194 will be "clusterz_<profile>".
194 195 """
195 196 dirname = u'clusterz_' + profile
196 197 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
197 198 if cluster_dir_paths:
198 199 cluster_dir_paths = cluster_dir_paths.split(':')
199 200 else:
200 201 cluster_dir_paths = []
201 202 paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
202 203 for p in paths:
203 204 cluster_dir = os.path.join(p, dirname)
204 205 if os.path.isdir(cluster_dir):
205 206 return ClusterDir(location=cluster_dir)
206 207 else:
207 208 raise ClusterDirError('Cluster directory not found in paths: %s' % dirname)
208 209
209 210 @classmethod
210 211 def find_cluster_dir(cls, cluster_dir):
211 212 """Find/create a cluster dir and return its ClusterDir.
212 213
213 214 This will create the cluster directory if it doesn't exist.
214 215
215 216 Parameters
216 217 ----------
217 218 cluster_dir : unicode or str
218 219 The path of the cluster directory. This is expanded using
219 220 :func:`IPython.utils.genutils.expand_path`.
220 221 """
221 222 cluster_dir = expand_path(cluster_dir)
222 223 if not os.path.isdir(cluster_dir):
223 224 raise ClusterDirError('Cluster directory not found: %s' % cluster_dir)
224 225 return ClusterDir(location=cluster_dir)
225 226
226 227
227 228 #-----------------------------------------------------------------------------
228 229 # Command line options
229 230 #-----------------------------------------------------------------------------
230 231
231 232 class ClusterDirConfigLoader(BaseAppConfigLoader):
232 233
233 234 def _add_cluster_profile(self, parser):
234 235 paa = parser.add_argument
235 236 paa('-p', '--profile',
236 237 dest='Global.profile',type=unicode,
237 238 help=
238 239 """The string name of the profile to be used. This determines the name
239 240 of the cluster dir as: cluster_<profile>. The default profile is named
240 241 'default'. The cluster directory is resolve this way if the
241 242 --cluster-dir option is not used.""",
242 243 metavar='Global.profile')
243 244
244 245 def _add_cluster_dir(self, parser):
245 246 paa = parser.add_argument
246 247 paa('--cluster-dir',
247 248 dest='Global.cluster_dir',type=unicode,
248 249 help="""Set the cluster dir. This overrides the logic used by the
249 250 --profile option.""",
250 251 metavar='Global.cluster_dir')
251 252
252 253 def _add_work_dir(self, parser):
253 254 paa = parser.add_argument
254 255 paa('--work-dir',
255 256 dest='Global.work_dir',type=unicode,
256 257 help='Set the working dir for the process.',
257 258 metavar='Global.work_dir')
258 259
259 260 def _add_clean_logs(self, parser):
260 261 paa = parser.add_argument
261 262 paa('--clean-logs',
262 263 dest='Global.clean_logs', action='store_true',
263 264 help='Delete old log flies before starting.')
264 265
265 266 def _add_no_clean_logs(self, parser):
266 267 paa = parser.add_argument
267 268 paa('--no-clean-logs',
268 269 dest='Global.clean_logs', action='store_false',
269 270 help="Don't Delete old log flies before starting.")
270 271
271 272 def _add_arguments(self):
272 273 super(ClusterDirConfigLoader, self)._add_arguments()
273 274 self._add_cluster_profile(self.parser)
274 275 self._add_cluster_dir(self.parser)
275 276 self._add_work_dir(self.parser)
276 277 self._add_clean_logs(self.parser)
277 278 self._add_no_clean_logs(self.parser)
278 279
279 280
280 281 #-----------------------------------------------------------------------------
281 282 # Crash handler for this application
282 283 #-----------------------------------------------------------------------------
283 284
284 285
285 286 _message_template = """\
286 287 Oops, $self.app_name crashed. We do our best to make it stable, but...
287 288
288 289 A crash report was automatically generated with the following information:
289 290 - A verbatim copy of the crash traceback.
290 291 - Data on your current $self.app_name configuration.
291 292
292 293 It was left in the file named:
293 294 \t'$self.crash_report_fname'
294 295 If you can email this file to the developers, the information in it will help
295 296 them in understanding and correcting the problem.
296 297
297 298 You can mail it to: $self.contact_name at $self.contact_email
298 299 with the subject '$self.app_name Crash Report'.
299 300
300 301 If you want to do it now, the following command will work (under Unix):
301 302 mail -s '$self.app_name Crash Report' $self.contact_email < $self.crash_report_fname
302 303
303 304 To ensure accurate tracking of this issue, please file a report about it at:
304 305 $self.bug_tracker
305 306 """
306 307
307 308 class ClusterDirCrashHandler(CrashHandler):
308 309 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
309 310
310 311 message_template = _message_template
311 312
312 313 def __init__(self, app):
313 314 contact_name = release.authors['Brian'][0]
314 315 contact_email = release.authors['Brian'][1]
315 316 bug_tracker = 'http://github.com/ipython/ipython/issues'
316 317 super(ClusterDirCrashHandler,self).__init__(
317 318 app, contact_name, contact_email, bug_tracker
318 319 )
319 320
320 321
321 322 #-----------------------------------------------------------------------------
322 323 # Main application
323 324 #-----------------------------------------------------------------------------
324 325
325 326 class ApplicationWithClusterDir(Application):
326 327 """An application that puts everything into a cluster directory.
327 328
328 329 Instead of looking for things in the ipython_dir, this type of application
329 330 will use its own private directory called the "cluster directory"
330 331 for things like config files, log files, etc.
331 332
332 333 The cluster directory is resolved as follows:
333 334
334 335 * If the ``--cluster-dir`` option is given, it is used.
335 336 * If ``--cluster-dir`` is not given, the application directory is
336 337 resolve using the profile name as ``cluster_<profile>``. The search
337 338 path for this directory is then i) cwd if it is found there
338 339 and ii) in ipython_dir otherwise.
339 340
340 341 The config file for the application is to be put in the cluster
341 342 dir and named the value of the ``config_file_name`` class attribute.
342 343 """
343 344
344 345 command_line_loader = ClusterDirConfigLoader
345 346 crash_handler_class = ClusterDirCrashHandler
346 347 auto_create_cluster_dir = True
347 348 # temporarily override default_log_level to DEBUG
348 349 default_log_level = logging.DEBUG
349 350
350 351 def create_default_config(self):
351 352 super(ApplicationWithClusterDir, self).create_default_config()
352 353 self.default_config.Global.profile = u'default'
353 354 self.default_config.Global.cluster_dir = u''
354 355 self.default_config.Global.work_dir = os.getcwd()
355 356 self.default_config.Global.log_to_file = False
356 357 self.default_config.Global.log_url = None
357 358 self.default_config.Global.clean_logs = False
358 359
359 360 def find_resources(self):
360 361 """This resolves the cluster directory.
361 362
362 363 This tries to find the cluster directory and if successful, it will
363 364 have done:
364 365 * Sets ``self.cluster_dir_obj`` to the :class:`ClusterDir` object for
365 366 the application.
366 367 * Sets ``self.cluster_dir`` attribute of the application and config
367 368 objects.
368 369
369 370 The algorithm used for this is as follows:
370 371 1. Try ``Global.cluster_dir``.
371 372 2. Try using ``Global.profile``.
372 373 3. If both of these fail and ``self.auto_create_cluster_dir`` is
373 374 ``True``, then create the new cluster dir in the IPython directory.
374 375 4. If all fails, then raise :class:`ClusterDirError`.
375 376 """
376 377
377 378 try:
378 379 cluster_dir = self.command_line_config.Global.cluster_dir
379 380 except AttributeError:
380 381 cluster_dir = self.default_config.Global.cluster_dir
381 382 cluster_dir = expand_path(cluster_dir)
382 383 try:
383 384 self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
384 385 except ClusterDirError:
385 386 pass
386 387 else:
387 388 self.log.info('Using existing cluster dir: %s' % \
388 389 self.cluster_dir_obj.location
389 390 )
390 391 self.finish_cluster_dir()
391 392 return
392 393
393 394 try:
394 395 self.profile = self.command_line_config.Global.profile
395 396 except AttributeError:
396 397 self.profile = self.default_config.Global.profile
397 398 try:
398 399 self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
399 400 self.ipython_dir, self.profile)
400 401 except ClusterDirError:
401 402 pass
402 403 else:
403 404 self.log.info('Using existing cluster dir: %s' % \
404 405 self.cluster_dir_obj.location
405 406 )
406 407 self.finish_cluster_dir()
407 408 return
408 409
409 410 if self.auto_create_cluster_dir:
410 411 self.cluster_dir_obj = ClusterDir.create_cluster_dir_by_profile(
411 412 self.ipython_dir, self.profile
412 413 )
413 414 self.log.info('Creating new cluster dir: %s' % \
414 415 self.cluster_dir_obj.location
415 416 )
416 417 self.finish_cluster_dir()
417 418 else:
418 419 raise ClusterDirError('Could not find a valid cluster directory.')
419 420
420 421 def finish_cluster_dir(self):
421 422 # Set the cluster directory
422 423 self.cluster_dir = self.cluster_dir_obj.location
423 424
424 425 # These have to be set because they could be different from the one
425 426 # that we just computed. Because command line has the highest
426 427 # priority, this will always end up in the master_config.
427 428 self.default_config.Global.cluster_dir = self.cluster_dir
428 429 self.command_line_config.Global.cluster_dir = self.cluster_dir
429 430
430 431 def find_config_file_name(self):
431 432 """Find the config file name for this application."""
432 433 # For this type of Application it should be set as a class attribute.
433 434 if not hasattr(self, 'default_config_file_name'):
434 435 self.log.critical("No config filename found")
435 436 else:
436 437 self.config_file_name = self.default_config_file_name
437 438
438 439 def find_config_file_paths(self):
439 440 # Set the search path to to the cluster directory. We should NOT
440 441 # include IPython.config.default here as the default config files
441 442 # are ALWAYS automatically moved to the cluster directory.
442 443 conf_dir = os.path.join(get_ipython_package_dir(), 'config', 'default')
443 444 self.config_file_paths = (self.cluster_dir,)
444 445
445 446 def pre_construct(self):
446 447 # The log and security dirs were set earlier, but here we put them
447 448 # into the config and log them.
448 449 config = self.master_config
449 450 sdir = self.cluster_dir_obj.security_dir
450 451 self.security_dir = config.Global.security_dir = sdir
451 452 ldir = self.cluster_dir_obj.log_dir
452 453 self.log_dir = config.Global.log_dir = ldir
453 454 pdir = self.cluster_dir_obj.pid_dir
454 455 self.pid_dir = config.Global.pid_dir = pdir
455 456 self.log.info("Cluster directory set to: %s" % self.cluster_dir)
456 457 config.Global.work_dir = unicode(expand_path(config.Global.work_dir))
457 458 # Change to the working directory. We do this just before construct
458 459 # is called so all the components there have the right working dir.
459 460 self.to_work_dir()
460 461
461 462 def to_work_dir(self):
462 463 wd = self.master_config.Global.work_dir
463 464 if unicode(wd) != unicode(os.getcwd()):
464 465 os.chdir(wd)
465 466 self.log.info("Changing to working dir: %s" % wd)
466 467
467 468 def start_logging(self):
468 469 # Remove old log files
469 470 if self.master_config.Global.clean_logs:
470 471 log_dir = self.master_config.Global.log_dir
471 472 for f in os.listdir(log_dir):
472 if f.startswith(self.name + u'-') and f.endswith('.log'):
473 if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
474 # if f.startswith(self.name + u'-') and f.endswith('.log'):
473 475 os.remove(os.path.join(log_dir, f))
474 476 # Start logging to the new log file
475 477 if self.master_config.Global.log_to_file:
476 478 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
477 479 logfile = os.path.join(self.log_dir, log_filename)
478 480 open_log_file = open(logfile, 'w')
479 481 elif self.master_config.Global.log_url:
480 482 open_log_file = None
481 483 else:
482 484 open_log_file = sys.stdout
483 485 if open_log_file is not None:
484 486 self.log.removeHandler(self._log_handler)
485 487 self._log_handler = logging.StreamHandler(open_log_file)
486 488 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
487 489 self._log_handler.setFormatter(self._log_formatter)
488 490 self.log.addHandler(self._log_handler)
489 491 # log.startLogging(open_log_file)
490 492
491 493 def write_pid_file(self, overwrite=False):
492 494 """Create a .pid file in the pid_dir with my pid.
493 495
494 496 This must be called after pre_construct, which sets `self.pid_dir`.
495 497 This raises :exc:`PIDFileError` if the pid file exists already.
496 498 """
497 499 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
498 500 if os.path.isfile(pid_file):
499 501 pid = self.get_pid_from_file()
500 502 if not overwrite:
501 503 raise PIDFileError(
502 504 'The pid file [%s] already exists. \nThis could mean that this '
503 505 'server is already running with [pid=%s].' % (pid_file, pid)
504 506 )
505 507 with open(pid_file, 'w') as f:
506 508 self.log.info("Creating pid file: %s" % pid_file)
507 509 f.write(repr(os.getpid())+'\n')
508 510
509 511 def remove_pid_file(self):
510 512 """Remove the pid file.
511 513
512 514 This should be called at shutdown by registering a callback with
513 515 :func:`reactor.addSystemEventTrigger`. This needs to return
514 516 ``None``.
515 517 """
516 518 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
517 519 if os.path.isfile(pid_file):
518 520 try:
519 521 self.log.info("Removing pid file: %s" % pid_file)
520 522 os.remove(pid_file)
521 523 except:
522 524 self.log.warn("Error removing the pid file: %s" % pid_file)
523 525
524 526 def get_pid_from_file(self):
525 527 """Get the pid from the pid file.
526 528
527 529 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
528 530 """
529 531 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
530 532 if os.path.isfile(pid_file):
531 533 with open(pid_file, 'r') as f:
532 534 pid = int(f.read().strip())
533 535 return pid
534 536 else:
535 537 raise PIDFileError('pid file not found: %s' % pid_file)
536 538
@@ -1,503 +1,500 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The ipcluster application.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 import re
18 19 import logging
19 20 import os
20 21 import signal
21 22 import logging
22 23
23 24 from zmq.eventloop import ioloop
24 25
25 26 from IPython.external.argparse import ArgumentParser, SUPPRESS
26 27 from IPython.utils.importstring import import_item
27 28 from IPython.zmq.parallel.clusterdir import (
28 29 ApplicationWithClusterDir, ClusterDirConfigLoader,
29 30 ClusterDirError, PIDFileError
30 31 )
31 32
32 33
33 34 #-----------------------------------------------------------------------------
34 35 # Module level variables
35 36 #-----------------------------------------------------------------------------
36 37
37 38
38 39 default_config_file_name = u'ipcluster_config.py'
39 40
40 41
41 42 _description = """\
42 43 Start an IPython cluster for parallel computing.\n\n
43 44
44 45 An IPython cluster consists of 1 controller and 1 or more engines.
45 46 This command automates the startup of these processes using a wide
46 47 range of startup methods (SSH, local processes, PBS, mpiexec,
47 48 Windows HPC Server 2008). To start a cluster with 4 engines on your
48 49 local host simply do 'ipcluster start -n 4'. For more complex usage
49 50 you will typically do 'ipcluster create -p mycluster', then edit
50 51 configuration files, followed by 'ipcluster start -p mycluster -n 4'.
51 52 """
52 53
53 54
54 55 # Exit codes for ipcluster
55 56
56 57 # This will be the exit code if the ipcluster appears to be running because
57 58 # a .pid file exists
58 59 ALREADY_STARTED = 10
59 60
60 61
61 62 # This will be the exit code if ipcluster stop is run, but there is not .pid
62 63 # file to be found.
63 64 ALREADY_STOPPED = 11
64 65
65 66
66 67 #-----------------------------------------------------------------------------
67 68 # Command line options
68 69 #-----------------------------------------------------------------------------
69 70
70 71
71 72 class IPClusterAppConfigLoader(ClusterDirConfigLoader):
72 73
73 74 def _add_arguments(self):
74 75 # Don't call ClusterDirConfigLoader._add_arguments as we don't want
75 76 # its defaults on self.parser. Instead, we will put those on
76 77 # default options on our subparsers.
77 78
78 79 # This has all the common options that all subcommands use
79 80 parent_parser1 = ArgumentParser(
80 81 add_help=False,
81 82 argument_default=SUPPRESS
82 83 )
83 84 self._add_ipython_dir(parent_parser1)
84 85 self._add_log_level(parent_parser1)
85 86
86 87 # This has all the common options that other subcommands use
87 88 parent_parser2 = ArgumentParser(
88 89 add_help=False,
89 90 argument_default=SUPPRESS
90 91 )
91 92 self._add_cluster_profile(parent_parser2)
92 93 self._add_cluster_dir(parent_parser2)
93 94 self._add_work_dir(parent_parser2)
94 95 paa = parent_parser2.add_argument
95 96 paa('--log-to-file',
96 97 action='store_true', dest='Global.log_to_file',
97 98 help='Log to a file in the log directory (default is stdout)')
98 99
99 100 # Create the object used to create the subparsers.
100 101 subparsers = self.parser.add_subparsers(
101 102 dest='Global.subcommand',
102 103 title='ipcluster subcommands',
103 104 description=
104 105 """ipcluster has a variety of subcommands. The general way of
105 106 running ipcluster is 'ipcluster <cmd> [options]'. To get help
106 107 on a particular subcommand do 'ipcluster <cmd> -h'."""
107 108 # help="For more help, type 'ipcluster <cmd> -h'",
108 109 )
109 110
110 111 # The "list" subcommand parser
111 112 parser_list = subparsers.add_parser(
112 113 'list',
113 114 parents=[parent_parser1],
114 115 argument_default=SUPPRESS,
115 116 help="List all clusters in cwd and ipython_dir.",
116 117 description=
117 118 """List all available clusters, by cluster directory, that can
118 119 be found in the current working directly or in the ipython
119 120 directory. Cluster directories are named using the convention
120 121 'cluster_<profile>'."""
121 122 )
122 123
123 124 # The "create" subcommand parser
124 125 parser_create = subparsers.add_parser(
125 126 'create',
126 127 parents=[parent_parser1, parent_parser2],
127 128 argument_default=SUPPRESS,
128 129 help="Create a new cluster directory.",
129 130 description=
130 131 """Create an ipython cluster directory by its profile name or
131 132 cluster directory path. Cluster directories contain
132 133 configuration, log and security related files and are named
133 134 using the convention 'cluster_<profile>'. By default they are
134 135 located in your ipython directory. Once created, you will
135 136 probably need to edit the configuration files in the cluster
136 137 directory to configure your cluster. Most users will create a
137 138 cluster directory by profile name,
138 139 'ipcluster create -p mycluster', which will put the directory
139 140 in '<ipython_dir>/cluster_mycluster'.
140 141 """
141 142 )
142 143 paa = parser_create.add_argument
143 144 paa('--reset-config',
144 145 dest='Global.reset_config', action='store_true',
145 146 help=
146 147 """Recopy the default config files to the cluster directory.
147 148 You will loose any modifications you have made to these files.""")
148 149
149 150 # The "start" subcommand parser
150 151 parser_start = subparsers.add_parser(
151 152 'start',
152 153 parents=[parent_parser1, parent_parser2],
153 154 argument_default=SUPPRESS,
154 155 help="Start a cluster.",
155 156 description=
156 157 """Start an ipython cluster by its profile name or cluster
157 158 directory. Cluster directories contain configuration, log and
158 159 security related files and are named using the convention
159 160 'cluster_<profile>' and should be creating using the 'start'
160 161 subcommand of 'ipcluster'. If your cluster directory is in
161 162 the cwd or the ipython directory, you can simply refer to it
162 163 using its profile name, 'ipcluster start -n 4 -p <profile>`,
163 164 otherwise use the '--cluster-dir' option.
164 165 """
165 166 )
166 167 paa = parser_start.add_argument
167 168 paa('-n', '--number',
168 169 type=int, dest='Global.n',
169 170 help='The number of engines to start.',
170 171 metavar='Global.n')
171 172 paa('--clean-logs',
172 173 dest='Global.clean_logs', action='store_true',
173 174 help='Delete old log flies before starting.')
174 175 paa('--no-clean-logs',
175 176 dest='Global.clean_logs', action='store_false',
176 177 help="Don't delete old log flies before starting.")
177 178 paa('--daemon',
178 179 dest='Global.daemonize', action='store_true',
179 180 help='Daemonize the ipcluster program. This implies --log-to-file')
180 181 paa('--no-daemon',
181 182 dest='Global.daemonize', action='store_false',
182 183 help="Dont't daemonize the ipcluster program.")
183 184
184 185 # The "stop" subcommand parser
185 186 parser_stop = subparsers.add_parser(
186 187 'stop',
187 188 parents=[parent_parser1, parent_parser2],
188 189 argument_default=SUPPRESS,
189 190 help="Stop a running cluster.",
190 191 description=
191 192 """Stop a running ipython cluster by its profile name or cluster
192 193 directory. Cluster directories are named using the convention
193 194 'cluster_<profile>'. If your cluster directory is in
194 195 the cwd or the ipython directory, you can simply refer to it
195 196 using its profile name, 'ipcluster stop -p <profile>`, otherwise
196 197 use the '--cluster-dir' option.
197 198 """
198 199 )
199 200 paa = parser_stop.add_argument
200 201 paa('--signal',
201 202 dest='Global.signal', type=int,
202 203 help="The signal number to use in stopping the cluster (default=2).",
203 204 metavar="Global.signal")
204 205
205 206
206 207 #-----------------------------------------------------------------------------
207 208 # Main application
208 209 #-----------------------------------------------------------------------------
209 210
210 211
211 212 class IPClusterApp(ApplicationWithClusterDir):
212 213
213 214 name = u'ipclusterz'
214 215 description = _description
215 216 usage = None
216 217 command_line_loader = IPClusterAppConfigLoader
217 218 default_config_file_name = default_config_file_name
218 219 default_log_level = logging.INFO
219 220 auto_create_cluster_dir = False
220 221
221 222 def create_default_config(self):
222 223 super(IPClusterApp, self).create_default_config()
223 224 self.default_config.Global.controller_launcher = \
224 225 'IPython.zmq.parallel.launcher.LocalControllerLauncher'
225 226 self.default_config.Global.engine_launcher = \
226 227 'IPython.zmq.parallel.launcher.LocalEngineSetLauncher'
227 228 self.default_config.Global.n = 2
228 229 self.default_config.Global.reset_config = False
229 230 self.default_config.Global.clean_logs = True
230 231 self.default_config.Global.signal = 2
231 232 self.default_config.Global.daemonize = False
232 233
233 234 def find_resources(self):
234 235 subcommand = self.command_line_config.Global.subcommand
235 236 if subcommand=='list':
236 237 self.list_cluster_dirs()
237 238 # Exit immediately because there is nothing left to do.
238 239 self.exit()
239 240 elif subcommand=='create':
240 241 self.auto_create_cluster_dir = True
241 242 super(IPClusterApp, self).find_resources()
242 243 elif subcommand=='start' or subcommand=='stop':
243 244 self.auto_create_cluster_dir = True
244 245 try:
245 246 super(IPClusterApp, self).find_resources()
246 247 except ClusterDirError:
247 248 raise ClusterDirError(
248 249 "Could not find a cluster directory. A cluster dir must "
249 250 "be created before running 'ipcluster start'. Do "
250 251 "'ipcluster create -h' or 'ipcluster list -h' for more "
251 252 "information about creating and listing cluster dirs."
252 253 )
253 254
254 255 def list_cluster_dirs(self):
255 256 # Find the search paths
256 257 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
257 258 if cluster_dir_paths:
258 259 cluster_dir_paths = cluster_dir_paths.split(':')
259 260 else:
260 261 cluster_dir_paths = []
261 262 try:
262 263 ipython_dir = self.command_line_config.Global.ipython_dir
263 264 except AttributeError:
264 265 ipython_dir = self.default_config.Global.ipython_dir
265 266 paths = [os.getcwd(), ipython_dir] + \
266 267 cluster_dir_paths
267 268 paths = list(set(paths))
268 269
269 270 self.log.info('Searching for cluster dirs in paths: %r' % paths)
270 271 for path in paths:
271 272 files = os.listdir(path)
272 273 for f in files:
273 274 full_path = os.path.join(path, f)
274 275 if os.path.isdir(full_path) and f.startswith('cluster_'):
275 276 profile = full_path.split('_')[-1]
276 277 start_cmd = 'ipcluster start -p %s -n 4' % profile
277 278 print start_cmd + " ==> " + full_path
278 279
279 280 def pre_construct(self):
280 281 # IPClusterApp.pre_construct() is where we cd to the working directory.
281 282 super(IPClusterApp, self).pre_construct()
282 283 config = self.master_config
283 284 try:
284 285 daemon = config.Global.daemonize
285 286 if daemon:
286 287 config.Global.log_to_file = True
287 288 except AttributeError:
288 289 pass
289 290
290 291 def construct(self):
291 292 config = self.master_config
292 293 subcmd = config.Global.subcommand
293 294 reset = config.Global.reset_config
294 295 if subcmd == 'list':
295 296 return
296 297 if subcmd == 'create':
297 298 self.log.info('Copying default config files to cluster directory '
298 299 '[overwrite=%r]' % (reset,))
299 300 self.cluster_dir_obj.copy_all_config_files(overwrite=reset)
300 301 if subcmd =='start':
301 302 self.cluster_dir_obj.copy_all_config_files(overwrite=False)
302 303 self.start_logging()
303 304 self.loop = ioloop.IOLoop.instance()
304 305 # reactor.callWhenRunning(self.start_launchers)
305 306 dc = ioloop.DelayedCallback(self.start_launchers, 0, self.loop)
306 307 dc.start()
307 308
308 309 def start_launchers(self):
309 310 config = self.master_config
310 311
311 312 # Create the launchers. In both bases, we set the work_dir of
312 313 # the launcher to the cluster_dir. This is where the launcher's
313 314 # subprocesses will be launched. It is not where the controller
314 315 # and engine will be launched.
315 316 el_class = import_item(config.Global.engine_launcher)
316 317 self.engine_launcher = el_class(
317 318 work_dir=self.cluster_dir, config=config, logname=self.log.name
318 319 )
319 320 cl_class = import_item(config.Global.controller_launcher)
320 321 self.controller_launcher = cl_class(
321 322 work_dir=self.cluster_dir, config=config,
322 323 logname=self.log.name
323 324 )
324 325
325 326 # Setup signals
326 327 signal.signal(signal.SIGINT, self.sigint_handler)
327 328
328 329 # Setup the observing of stopping. If the controller dies, shut
329 330 # everything down as that will be completely fatal for the engines.
330 331 self.controller_launcher.on_stop(self.stop_launchers)
331 332 # d1.addCallback(self.stop_launchers)
332 333 # But, we don't monitor the stopping of engines. An engine dying
333 334 # is just fine and in principle a user could start a new engine.
334 335 # Also, if we did monitor engine stopping, it is difficult to
335 336 # know what to do when only some engines die. Currently, the
336 337 # observing of engine stopping is inconsistent. Some launchers
337 338 # might trigger on a single engine stopping, other wait until
338 339 # all stop. TODO: think more about how to handle this.
339 340
340 341 # Start the controller and engines
341 342 self._stopping = False # Make sure stop_launchers is not called 2x.
342 343 d = self.start_controller()
343 344 self.start_engines()
344 345 self.startup_message()
345 346 # d.addCallback(self.start_engines)
346 347 # d.addCallback(self.startup_message)
347 348 # If the controller or engines fail to start, stop everything
348 349 # d.addErrback(self.stop_launchers)
349 350 return d
350 351
351 352 def startup_message(self, r=None):
352 353 self.log.info("IPython cluster: started")
353 354 return r
354 355
355 356 def start_controller(self, r=None):
356 357 # self.log.info("In start_controller")
357 358 config = self.master_config
358 359 d = self.controller_launcher.start(
359 360 cluster_dir=config.Global.cluster_dir
360 361 )
361 362 return d
362 363
363 364 def start_engines(self, r=None):
364 365 # self.log.info("In start_engines")
365 366 config = self.master_config
366 367 d = self.engine_launcher.start(
367 368 config.Global.n,
368 369 cluster_dir=config.Global.cluster_dir
369 370 )
370 371 return d
371 372
372 373 def stop_controller(self, r=None):
373 374 # self.log.info("In stop_controller")
374 375 if self.controller_launcher.running:
375 376 return self.controller_launcher.stop()
376 377
377 378 def stop_engines(self, r=None):
378 379 # self.log.info("In stop_engines")
379 380 if self.engine_launcher.running:
380 381 d = self.engine_launcher.stop()
381 382 # d.addErrback(self.log_err)
382 383 return d
383 384 else:
384 385 return None
385 386
386 387 def log_err(self, f):
387 388 self.log.error(f.getTraceback())
388 389 return None
389 390
390 391 def stop_launchers(self, r=None):
391 392 if not self._stopping:
392 393 self._stopping = True
393 394 # if isinstance(r, failure.Failure):
394 395 # self.log.error('Unexpected error in ipcluster:')
395 396 # self.log.info(r.getTraceback())
396 397 self.log.error("IPython cluster: stopping")
397 398 # These return deferreds. We are not doing anything with them
398 399 # but we are holding refs to them as a reminder that they
399 400 # do return deferreds.
400 401 d1 = self.stop_engines()
401 402 d2 = self.stop_controller()
402 403 # Wait a few seconds to let things shut down.
403 404 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
404 405 dc.start()
405 406 # reactor.callLater(4.0, reactor.stop)
406 407
407 408 def sigint_handler(self, signum, frame):
408 409 self.stop_launchers()
409 410
410 411 def start_logging(self):
411 412 # Remove old log files of the controller and engine
412 413 if self.master_config.Global.clean_logs:
413 414 log_dir = self.master_config.Global.log_dir
414 415 for f in os.listdir(log_dir):
415 if f.startswith('ipengine' + '-'):
416 if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'):
417 os.remove(os.path.join(log_dir, f))
418 if f.startswith('ipcontroller' + '-'):
419 if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'):
420 os.remove(os.path.join(log_dir, f))
421 # This will remote old log files for ipcluster itself
416 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
417 os.remove(os.path.join(log_dir, f))
418 # This will remove old log files for ipcluster itself
422 419 super(IPClusterApp, self).start_logging()
423 420
424 421 def start_app(self):
425 422 """Start the application, depending on what subcommand is used."""
426 423 subcmd = self.master_config.Global.subcommand
427 424 if subcmd=='create' or subcmd=='list':
428 425 return
429 426 elif subcmd=='start':
430 427 self.start_app_start()
431 428 elif subcmd=='stop':
432 429 self.start_app_stop()
433 430
434 431 def start_app_start(self):
435 432 """Start the app for the start subcommand."""
436 433 config = self.master_config
437 434 # First see if the cluster is already running
438 435 try:
439 436 pid = self.get_pid_from_file()
440 437 except PIDFileError:
441 438 pass
442 439 else:
443 440 self.log.critical(
444 441 'Cluster is already running with [pid=%s]. '
445 442 'use "ipcluster stop" to stop the cluster.' % pid
446 443 )
447 444 # Here I exit with a unusual exit status that other processes
448 445 # can watch for to learn how I existed.
449 446 self.exit(ALREADY_STARTED)
450 447
451 448 # Now log and daemonize
452 449 self.log.info(
453 450 'Starting ipcluster with [daemon=%r]' % config.Global.daemonize
454 451 )
455 452 # TODO: Get daemonize working on Windows or as a Windows Server.
456 453 if config.Global.daemonize:
457 454 if os.name=='posix':
458 455 from twisted.scripts._twistd_unix import daemonize
459 456 daemonize()
460 457
461 458 # Now write the new pid file AFTER our new forked pid is active.
462 459 self.write_pid_file()
463 460 try:
464 461 self.loop.start()
465 462 except:
466 463 self.log.info("stopping...")
467 464 self.remove_pid_file()
468 465
469 466 def start_app_stop(self):
470 467 """Start the app for the stop subcommand."""
471 468 config = self.master_config
472 469 try:
473 470 pid = self.get_pid_from_file()
474 471 except PIDFileError:
475 472 self.log.critical(
476 473 'Problem reading pid file, cluster is probably not running.'
477 474 )
478 475 # Here I exit with a unusual exit status that other processes
479 476 # can watch for to learn how I existed.
480 477 self.exit(ALREADY_STOPPED)
481 478 else:
482 479 if os.name=='posix':
483 480 sig = config.Global.signal
484 481 self.log.info(
485 482 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
486 483 )
487 484 os.kill(pid, sig)
488 485 elif os.name=='nt':
489 486 # As of right now, we don't support daemonize on Windows, so
490 487 # stop will not do anything. Minimally, it should clean up the
491 488 # old .pid files.
492 489 self.remove_pid_file()
493 490
494 491
495 492 def launch_new_instance():
496 493 """Create and run the IPython cluster."""
497 494 app = IPClusterApp()
498 495 app.start()
499 496
500 497
501 498 if __name__ == '__main__':
502 499 launch_new_instance()
503 500
This diff has been collapsed as it changes many lines, (584 lines changed) Show them Hide them
@@ -1,825 +1,835 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 Facilities for launching IPython processes asynchronously.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import os
19 19 import re
20 20 import sys
21 21 import logging
22 22
23 from signal import SIGINT
23 from signal import SIGINT, SIGTERM
24 24 try:
25 25 from signal import SIGKILL
26 26 except ImportError:
27 27 SIGKILL=SIGTERM
28 28
29 from subprocess import Popen, PIPE
29 from subprocess import Popen, PIPE, STDOUT
30 try:
31 from subprocess import check_open
32 except ImportError:
33 # pre-2.7:
34 from StringIO import StringIO
35
36 def check_open(*args, **kwargs):
37 sio = StringIO()
38 kwargs.update(dict(stdout=PIPE, stderr=STDOUT))
39 p = Popen(*args, **kwargs)
40 out,err = p.communicate()
41 return out
30 42
31 43 from zmq.eventloop import ioloop
32 44
45 from IPython.external import Itpl
33 46 # from IPython.config.configurable import Configurable
34 47 from IPython.utils.traitlets import Str, Int, List, Unicode, Instance
35 48 from IPython.utils.path import get_ipython_module_path
36 49 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
37 50
38 51 from factory import LoggingFactory
39 # from IPython.kernel.winhpcjob import (
40 # IPControllerTask, IPEngineTask,
41 # IPControllerJob, IPEngineSetJob
42 # )
52
53 # load winhpcjob from IPython.kernel
54 try:
55 from IPython.kernel.winhpcjob import (
56 IPControllerTask, IPEngineTask,
57 IPControllerJob, IPEngineSetJob
58 )
59 except ImportError:
60 pass
43 61
44 62
45 63 #-----------------------------------------------------------------------------
46 64 # Paths to the kernel apps
47 65 #-----------------------------------------------------------------------------
48 66
49 67
50 68 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
51 69 'IPython.zmq.parallel.ipclusterapp'
52 70 ))
53 71
54 72 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
55 73 'IPython.zmq.parallel.ipengineapp'
56 74 ))
57 75
58 76 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
59 77 'IPython.zmq.parallel.ipcontrollerapp'
60 78 ))
61 79
62 80 #-----------------------------------------------------------------------------
63 81 # Base launchers and errors
64 82 #-----------------------------------------------------------------------------
65 83
66 84
67 85 class LauncherError(Exception):
68 86 pass
69 87
70 88
71 89 class ProcessStateError(LauncherError):
72 90 pass
73 91
74 92
75 93 class UnknownStatus(LauncherError):
76 94 pass
77 95
78 96
79 97 class BaseLauncher(LoggingFactory):
80 98 """An asbtraction for starting, stopping and signaling a process."""
81 99
82 100 # In all of the launchers, the work_dir is where child processes will be
83 101 # run. This will usually be the cluster_dir, but may not be. any work_dir
84 102 # passed into the __init__ method will override the config value.
85 103 # This should not be used to set the work_dir for the actual engine
86 104 # and controller. Instead, use their own config files or the
87 105 # controller_args, engine_args attributes of the launchers to add
88 106 # the --work-dir option.
89 107 work_dir = Unicode(u'.')
90 108 loop = Instance('zmq.eventloop.ioloop.IOLoop')
91 109 def _loop_default(self):
92 110 return ioloop.IOLoop.instance()
93 111
94 112 def __init__(self, work_dir=u'.', config=None, **kwargs):
95 113 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
96 114 self.state = 'before' # can be before, running, after
97 115 self.stop_callbacks = []
98 116 self.start_data = None
99 117 self.stop_data = None
100 118
101 119 @property
102 120 def args(self):
103 121 """A list of cmd and args that will be used to start the process.
104 122
105 123 This is what is passed to :func:`spawnProcess` and the first element
106 124 will be the process name.
107 125 """
108 126 return self.find_args()
109 127
110 128 def find_args(self):
111 129 """The ``.args`` property calls this to find the args list.
112 130
113 131 Subcommand should implement this to construct the cmd and args.
114 132 """
115 133 raise NotImplementedError('find_args must be implemented in a subclass')
116 134
117 135 @property
118 136 def arg_str(self):
119 137 """The string form of the program arguments."""
120 138 return ' '.join(self.args)
121 139
122 140 @property
123 141 def running(self):
124 142 """Am I running."""
125 143 if self.state == 'running':
126 144 return True
127 145 else:
128 146 return False
129 147
130 148 def start(self):
131 149 """Start the process.
132 150
133 151 This must return a deferred that fires with information about the
134 152 process starting (like a pid, job id, etc.).
135 153 """
136 154 raise NotImplementedError('start must be implemented in a subclass')
137 155
138 156 def stop(self):
139 157 """Stop the process and notify observers of stopping.
140 158
141 159 This must return a deferred that fires with information about the
142 160 processing stopping, like errors that occur while the process is
143 161 attempting to be shut down. This deferred won't fire when the process
144 162 actually stops. To observe the actual process stopping, see
145 163 :func:`observe_stop`.
146 164 """
147 165 raise NotImplementedError('stop must be implemented in a subclass')
148 166
149 167 def on_stop(self, f):
150 168 """Get a deferred that will fire when the process stops.
151 169
152 170 The deferred will fire with data that contains information about
153 171 the exit status of the process.
154 172 """
155 173 if self.state=='after':
156 174 return f(self.stop_data)
157 175 else:
158 176 self.stop_callbacks.append(f)
159 177
160 178 def notify_start(self, data):
161 179 """Call this to trigger startup actions.
162 180
163 181 This logs the process startup and sets the state to 'running'. It is
164 182 a pass-through so it can be used as a callback.
165 183 """
166 184
167 185 self.log.info('Process %r started: %r' % (self.args[0], data))
168 186 self.start_data = data
169 187 self.state = 'running'
170 188 return data
171 189
172 190 def notify_stop(self, data):
173 191 """Call this to trigger process stop actions.
174 192
175 193 This logs the process stopping and sets the state to 'after'. Call
176 194 this to trigger all the deferreds from :func:`observe_stop`."""
177 195
178 196 self.log.info('Process %r stopped: %r' % (self.args[0], data))
179 197 self.stop_data = data
180 198 self.state = 'after'
181 199 for i in range(len(self.stop_callbacks)):
182 200 d = self.stop_callbacks.pop()
183 201 d(data)
184 202 return data
185 203
186 204 def signal(self, sig):
187 205 """Signal the process.
188 206
189 207 Return a semi-meaningless deferred after signaling the process.
190 208
191 209 Parameters
192 210 ----------
193 211 sig : str or int
194 212 'KILL', 'INT', etc., or any signal number
195 213 """
196 214 raise NotImplementedError('signal must be implemented in a subclass')
197 215
198 216
199 217 #-----------------------------------------------------------------------------
200 218 # Local process launchers
201 219 #-----------------------------------------------------------------------------
202 220
203 221
204 222 class LocalProcessLauncher(BaseLauncher):
205 223 """Start and stop an external process in an asynchronous manner.
206 224
207 225 This will launch the external process with a working directory of
208 226 ``self.work_dir``.
209 227 """
210 228
211 229 # This is used to to construct self.args, which is passed to
212 230 # spawnProcess.
213 231 cmd_and_args = List([])
214 232 poll_frequency = Int(100) # in ms
215 233
216 234 def __init__(self, work_dir=u'.', config=None, **kwargs):
217 235 super(LocalProcessLauncher, self).__init__(
218 236 work_dir=work_dir, config=config, **kwargs
219 237 )
220 238 self.process = None
221 239 self.start_deferred = None
222 240 self.poller = None
223 241
224 242 def find_args(self):
225 243 return self.cmd_and_args
226 244
227 245 def start(self):
228 246 if self.state == 'before':
229 247 self.process = Popen(self.args,
230 248 stdout=PIPE,stderr=PIPE,stdin=PIPE,
231 249 env=os.environ,
232 250 cwd=self.work_dir
233 251 )
234 252
235 253 self.loop.add_handler(self.process.stdout.fileno(), self.handle_stdout, self.loop.READ)
236 254 self.loop.add_handler(self.process.stderr.fileno(), self.handle_stderr, self.loop.READ)
237 255 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
238 256 self.poller.start()
239 257 self.notify_start(self.process.pid)
240 258 else:
241 259 s = 'The process was already started and has state: %r' % self.state
242 260 raise ProcessStateError(s)
243 261
244 262 def stop(self):
245 263 return self.interrupt_then_kill()
246 264
247 265 def signal(self, sig):
248 266 if self.state == 'running':
249 267 self.process.send_signal(sig)
250 268
251 269 def interrupt_then_kill(self, delay=2.0):
252 270 """Send INT, wait a delay and then send KILL."""
253 271 self.signal(SIGINT)
254 272 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
255 273 self.killer.start()
256 274
257 275 # callbacks, etc:
258 276
259 277 def handle_stdout(self, fd, events):
260 278 line = self.process.stdout.readline()
261 279 # a stopped process will be readable but return empty strings
262 280 if line:
263 281 self.log.info(line[:-1])
264 282 else:
265 283 self.poll()
266 284
267 285 def handle_stderr(self, fd, events):
268 286 line = self.process.stderr.readline()
269 287 # a stopped process will be readable but return empty strings
270 288 if line:
271 289 self.log.error(line[:-1])
272 290 else:
273 291 self.poll()
274 292
275 293 def poll(self):
276 294 status = self.process.poll()
277 295 if status is not None:
278 296 self.poller.stop()
279 297 self.loop.remove_handler(self.process.stdout.fileno())
280 298 self.loop.remove_handler(self.process.stderr.fileno())
281 299 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
282 300 return status
283 301
284 302 class LocalControllerLauncher(LocalProcessLauncher):
285 303 """Launch a controller as a regular external process."""
286 304
287 305 controller_cmd = List(ipcontroller_cmd_argv, config=True)
288 306 # Command line arguments to ipcontroller.
289 controller_args = List(['--log-to-file','--log-level', str(logging.ERROR)], config=True)
307 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
290 308
291 309 def find_args(self):
292 310 return self.controller_cmd + self.controller_args
293 311
294 312 def start(self, cluster_dir):
295 313 """Start the controller by cluster_dir."""
296 314 self.controller_args.extend(['--cluster-dir', cluster_dir])
297 315 self.cluster_dir = unicode(cluster_dir)
298 316 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
299 317 return super(LocalControllerLauncher, self).start()
300 318
301 319
302 320 class LocalEngineLauncher(LocalProcessLauncher):
303 321 """Launch a single engine as a regular externall process."""
304 322
305 323 engine_cmd = List(ipengine_cmd_argv, config=True)
306 324 # Command line arguments for ipengine.
307 325 engine_args = List(
308 ['--log-to-file','--log-level', str(logging.ERROR)], config=True
326 ['--log-to-file','--log-level', str(logging.INFO)], config=True
309 327 )
310 328
311 329 def find_args(self):
312 330 return self.engine_cmd + self.engine_args
313 331
314 332 def start(self, cluster_dir):
315 333 """Start the engine by cluster_dir."""
316 334 self.engine_args.extend(['--cluster-dir', cluster_dir])
317 335 self.cluster_dir = unicode(cluster_dir)
318 336 return super(LocalEngineLauncher, self).start()
319 337
320 338
321 339 class LocalEngineSetLauncher(BaseLauncher):
322 340 """Launch a set of engines as regular external processes."""
323 341
324 342 # Command line arguments for ipengine.
325 343 engine_args = List(
326 ['--log-to-file','--log-level', str(logging.ERROR)], config=True
344 ['--log-to-file','--log-level', str(logging.INFO)], config=True
327 345 )
328 346 # launcher class
329 347 launcher_class = LocalEngineLauncher
330 348
331 349 def __init__(self, work_dir=u'.', config=None, **kwargs):
332 350 super(LocalEngineSetLauncher, self).__init__(
333 351 work_dir=work_dir, config=config, **kwargs
334 352 )
335 353 self.launchers = {}
336 354 self.stop_data = {}
337 355
338 356 def start(self, n, cluster_dir):
339 357 """Start n engines by profile or cluster_dir."""
340 358 self.cluster_dir = unicode(cluster_dir)
341 359 dlist = []
342 360 for i in range(n):
343 361 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
344 362 # Copy the engine args over to each engine launcher.
345 363 import copy
346 364 el.engine_args = copy.deepcopy(self.engine_args)
347 365 el.on_stop(self._notice_engine_stopped)
348 366 d = el.start(cluster_dir)
349 367 if i==0:
350 368 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
351 369 self.launchers[i] = el
352 370 dlist.append(d)
353 371 self.notify_start(dlist)
354 372 # The consumeErrors here could be dangerous
355 373 # dfinal = gatherBoth(dlist, consumeErrors=True)
356 374 # dfinal.addCallback(self.notify_start)
357 375 return dlist
358 376
359 377 def find_args(self):
360 378 return ['engine set']
361 379
362 380 def signal(self, sig):
363 381 dlist = []
364 382 for el in self.launchers.itervalues():
365 383 d = el.signal(sig)
366 384 dlist.append(d)
367 385 # dfinal = gatherBoth(dlist, consumeErrors=True)
368 386 return dlist
369 387
370 388 def interrupt_then_kill(self, delay=1.0):
371 389 dlist = []
372 390 for el in self.launchers.itervalues():
373 391 d = el.interrupt_then_kill(delay)
374 392 dlist.append(d)
375 393 # dfinal = gatherBoth(dlist, consumeErrors=True)
376 394 return dlist
377 395
378 396 def stop(self):
379 397 return self.interrupt_then_kill()
380 398
381 399 def _notice_engine_stopped(self, data):
382 400 print "notice", data
383 401 pid = data['pid']
384 402 for idx,el in self.launchers.iteritems():
385 403 if el.process.pid == pid:
386 404 break
387 405 self.launchers.pop(idx)
388 406 self.stop_data[idx] = data
389 407 if not self.launchers:
390 408 self.notify_stop(self.stop_data)
391 409
392 410
393 411 #-----------------------------------------------------------------------------
394 412 # MPIExec launchers
395 413 #-----------------------------------------------------------------------------
396 414
397 415
398 416 class MPIExecLauncher(LocalProcessLauncher):
399 417 """Launch an external process using mpiexec."""
400 418
401 419 # The mpiexec command to use in starting the process.
402 420 mpi_cmd = List(['mpiexec'], config=True)
403 421 # The command line arguments to pass to mpiexec.
404 422 mpi_args = List([], config=True)
405 423 # The program to start using mpiexec.
406 424 program = List(['date'], config=True)
407 425 # The command line argument to the program.
408 426 program_args = List([], config=True)
409 427 # The number of instances of the program to start.
410 428 n = Int(1, config=True)
411 429
412 430 def find_args(self):
413 431 """Build self.args using all the fields."""
414 432 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
415 433 self.program + self.program_args
416 434
417 435 def start(self, n):
418 436 """Start n instances of the program using mpiexec."""
419 437 self.n = n
420 438 return super(MPIExecLauncher, self).start()
421 439
422 440
423 441 class MPIExecControllerLauncher(MPIExecLauncher):
424 442 """Launch a controller using mpiexec."""
425 443
426 444 controller_cmd = List(ipcontroller_cmd_argv, config=True)
427 445 # Command line arguments to ipcontroller.
428 controller_args = List(['--log-to-file','--log-level', str(logging.ERROR)], config=True)
446 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
429 447 n = Int(1, config=False)
430 448
431 449 def start(self, cluster_dir):
432 450 """Start the controller by cluster_dir."""
433 451 self.controller_args.extend(['--cluster-dir', cluster_dir])
434 452 self.cluster_dir = unicode(cluster_dir)
435 453 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
436 454 return super(MPIExecControllerLauncher, self).start(1)
437 455
438 456 def find_args(self):
439 457 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
440 458 self.controller_cmd + self.controller_args
441 459
442 460
443 461 class MPIExecEngineSetLauncher(MPIExecLauncher):
444 462
445 463 engine_cmd = List(ipengine_cmd_argv, config=True)
446 464 # Command line arguments for ipengine.
447 465 engine_args = List(
448 ['--log-to-file','--log-level', str(logging.ERROR)], config=True
466 ['--log-to-file','--log-level', str(logging.INFO)], config=True
449 467 )
450 468 n = Int(1, config=True)
451 469
452 470 def start(self, n, cluster_dir):
453 471 """Start n engines by profile or cluster_dir."""
454 472 self.engine_args.extend(['--cluster-dir', cluster_dir])
455 473 self.cluster_dir = unicode(cluster_dir)
456 474 self.n = n
457 475 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
458 476 return super(MPIExecEngineSetLauncher, self).start(n)
459 477
460 478 def find_args(self):
461 479 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
462 480 self.engine_cmd + self.engine_args
463 481
464 482
465 483 #-----------------------------------------------------------------------------
466 484 # SSH launchers
467 485 #-----------------------------------------------------------------------------
468 486
469 487 # TODO: Get SSH Launcher working again.
470 488
471 489 class SSHLauncher(LocalProcessLauncher):
472 490 """A minimal launcher for ssh.
473 491
474 492 To be useful this will probably have to be extended to use the ``sshx``
475 493 idea for environment variables. There could be other things this needs
476 494 as well.
477 495 """
478 496
479 497 ssh_cmd = List(['ssh'], config=True)
480 498 ssh_args = List([], config=True)
481 499 program = List(['date'], config=True)
482 500 program_args = List([], config=True)
483 501 hostname = Str('', config=True)
484 502 user = Str(os.environ.get('USER','username'), config=True)
485 503 location = Str('')
486 504
487 505 def _hostname_changed(self, name, old, new):
488 506 self.location = '%s@%s' % (self.user, new)
489 507
490 508 def _user_changed(self, name, old, new):
491 509 self.location = '%s@%s' % (new, self.hostname)
492 510
493 511 def find_args(self):
494 512 return self.ssh_cmd + self.ssh_args + [self.location] + \
495 513 self.program + self.program_args
496 514
497 515 def start(self, cluster_dir, hostname=None, user=None):
498 516 if hostname is not None:
499 517 self.hostname = hostname
500 518 if user is not None:
501 519 self.user = user
502 520 return super(SSHLauncher, self).start()
503 521
504 522
505 523 class SSHControllerLauncher(SSHLauncher):
506 524
507 525 program = List(ipcontroller_cmd_argv, config=True)
508 526 # Command line arguments to ipcontroller.
509 program_args = List(['--log-to-file','--log-level', str(logging.ERROR)], config=True)
527 program_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
510 528
511 529
512 530 class SSHEngineLauncher(SSHLauncher):
513 531 program = List(ipengine_cmd_argv, config=True)
514 532 # Command line arguments for ipengine.
515 533 program_args = List(
516 ['--log-to-file','--log-level', str(logging.ERROR)], config=True
534 ['--log-to-file','--log-level', str(logging.INFO)], config=True
517 535 )
518 536
519 537 class SSHEngineSetLauncher(LocalEngineSetLauncher):
520 538 launcher_class = SSHEngineLauncher
521 539
522 540
523 541 #-----------------------------------------------------------------------------
524 542 # Windows HPC Server 2008 scheduler launchers
525 543 #-----------------------------------------------------------------------------
526 544
527 545
528 # # This is only used on Windows.
529 # def find_job_cmd():
530 # if os.name=='nt':
531 # try:
532 # return find_cmd('job')
533 # except FindCmdError:
534 # return 'job'
535 # else:
536 # return 'job'
537 #
538 #
539 # class WindowsHPCLauncher(BaseLauncher):
540 #
541 # # A regular expression used to get the job id from the output of the
542 # # submit_command.
543 # job_id_regexp = Str(r'\d+', config=True)
544 # # The filename of the instantiated job script.
545 # job_file_name = Unicode(u'ipython_job.xml', config=True)
546 # # The full path to the instantiated job script. This gets made dynamically
547 # # by combining the work_dir with the job_file_name.
548 # job_file = Unicode(u'')
549 # # The hostname of the scheduler to submit the job to
550 # scheduler = Str('', config=True)
551 # job_cmd = Str(find_job_cmd(), config=True)
552 #
553 # def __init__(self, work_dir=u'.', config=None):
554 # super(WindowsHPCLauncher, self).__init__(
555 # work_dir=work_dir, config=config
556 # )
557 #
558 # @property
559 # def job_file(self):
560 # return os.path.join(self.work_dir, self.job_file_name)
561 #
562 # def write_job_file(self, n):
563 # raise NotImplementedError("Implement write_job_file in a subclass.")
564 #
565 # def find_args(self):
566 # return ['job.exe']
567 #
568 # def parse_job_id(self, output):
569 # """Take the output of the submit command and return the job id."""
570 # m = re.search(self.job_id_regexp, output)
571 # if m is not None:
572 # job_id = m.group()
573 # else:
574 # raise LauncherError("Job id couldn't be determined: %s" % output)
575 # self.job_id = job_id
576 # self.log.info('Job started with job id: %r' % job_id)
577 # return job_id
578 #
579 # @inlineCallbacks
580 # def start(self, n):
581 # """Start n copies of the process using the Win HPC job scheduler."""
582 # self.write_job_file(n)
583 # args = [
584 # 'submit',
585 # '/jobfile:%s' % self.job_file,
586 # '/scheduler:%s' % self.scheduler
587 # ]
588 # self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
589 # # Twisted will raise DeprecationWarnings if we try to pass unicode to this
590 # output = yield getProcessOutput(str(self.job_cmd),
591 # [str(a) for a in args],
592 # env=dict((str(k),str(v)) for k,v in os.environ.items()),
593 # path=self.work_dir
594 # )
595 # job_id = self.parse_job_id(output)
596 # self.notify_start(job_id)
597 # defer.returnValue(job_id)
598 #
599 # @inlineCallbacks
600 # def stop(self):
601 # args = [
602 # 'cancel',
603 # self.job_id,
604 # '/scheduler:%s' % self.scheduler
605 # ]
606 # self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
607 # try:
608 # # Twisted will raise DeprecationWarnings if we try to pass unicode to this
609 # output = yield getProcessOutput(str(self.job_cmd),
610 # [str(a) for a in args],
611 # env=dict((str(k),str(v)) for k,v in os.environ.iteritems()),
612 # path=self.work_dir
613 # )
614 # except:
615 # output = 'The job already appears to be stoppped: %r' % self.job_id
616 # self.notify_stop(output) # Pass the output of the kill cmd
617 # defer.returnValue(output)
618 #
619 #
620 # class WindowsHPCControllerLauncher(WindowsHPCLauncher):
621 #
622 # job_file_name = Unicode(u'ipcontroller_job.xml', config=True)
623 # extra_args = List([], config=False)
624 #
625 # def write_job_file(self, n):
626 # job = IPControllerJob(config=self.config)
627 #
628 # t = IPControllerTask(config=self.config)
629 # # The tasks work directory is *not* the actual work directory of
630 # # the controller. It is used as the base path for the stdout/stderr
631 # # files that the scheduler redirects to.
632 # t.work_directory = self.cluster_dir
633 # # Add the --cluster-dir and from self.start().
634 # t.controller_args.extend(self.extra_args)
635 # job.add_task(t)
636 #
637 # self.log.info("Writing job description file: %s" % self.job_file)
638 # job.write(self.job_file)
639 #
640 # @property
641 # def job_file(self):
642 # return os.path.join(self.cluster_dir, self.job_file_name)
643 #
644 # def start(self, cluster_dir):
645 # """Start the controller by cluster_dir."""
646 # self.extra_args = ['--cluster-dir', cluster_dir]
647 # self.cluster_dir = unicode(cluster_dir)
648 # return super(WindowsHPCControllerLauncher, self).start(1)
649 #
650 #
651 # class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
652 #
653 # job_file_name = Unicode(u'ipengineset_job.xml', config=True)
654 # extra_args = List([], config=False)
655 #
656 # def write_job_file(self, n):
657 # job = IPEngineSetJob(config=self.config)
658 #
659 # for i in range(n):
660 # t = IPEngineTask(config=self.config)
661 # # The tasks work directory is *not* the actual work directory of
662 # # the engine. It is used as the base path for the stdout/stderr
663 # # files that the scheduler redirects to.
664 # t.work_directory = self.cluster_dir
665 # # Add the --cluster-dir and from self.start().
666 # t.engine_args.extend(self.extra_args)
667 # job.add_task(t)
668 #
669 # self.log.info("Writing job description file: %s" % self.job_file)
670 # job.write(self.job_file)
671 #
672 # @property
673 # def job_file(self):
674 # return os.path.join(self.cluster_dir, self.job_file_name)
675 #
676 # def start(self, n, cluster_dir):
677 # """Start the controller by cluster_dir."""
678 # self.extra_args = ['--cluster-dir', cluster_dir]
679 # self.cluster_dir = unicode(cluster_dir)
680 # return super(WindowsHPCEngineSetLauncher, self).start(n)
681 #
682 #
683 # #-----------------------------------------------------------------------------
684 # # Batch (PBS) system launchers
685 # #-----------------------------------------------------------------------------
686 #
687 # # TODO: Get PBS launcher working again.
688 #
689 # class BatchSystemLauncher(BaseLauncher):
690 # """Launch an external process using a batch system.
691 #
692 # This class is designed to work with UNIX batch systems like PBS, LSF,
693 # GridEngine, etc. The overall model is that there are different commands
694 # like qsub, qdel, etc. that handle the starting and stopping of the process.
695 #
696 # This class also has the notion of a batch script. The ``batch_template``
697 # attribute can be set to a string that is a template for the batch script.
698 # This template is instantiated using Itpl. Thus the template can use
699 # ${n} fot the number of instances. Subclasses can add additional variables
700 # to the template dict.
701 # """
702 #
703 # # Subclasses must fill these in. See PBSEngineSet
704 # # The name of the command line program used to submit jobs.
705 # submit_command = Str('', config=True)
706 # # The name of the command line program used to delete jobs.
707 # delete_command = Str('', config=True)
708 # # A regular expression used to get the job id from the output of the
709 # # submit_command.
710 # job_id_regexp = Str('', config=True)
711 # # The string that is the batch script template itself.
712 # batch_template = Str('', config=True)
713 # # The filename of the instantiated batch script.
714 # batch_file_name = Unicode(u'batch_script', config=True)
715 # # The full path to the instantiated batch script.
716 # batch_file = Unicode(u'')
717 #
718 # def __init__(self, work_dir=u'.', config=None):
719 # super(BatchSystemLauncher, self).__init__(
720 # work_dir=work_dir, config=config
721 # )
722 # self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
723 # self.context = {}
724 #
725 # def parse_job_id(self, output):
726 # """Take the output of the submit command and return the job id."""
727 # m = re.match(self.job_id_regexp, output)
728 # if m is not None:
729 # job_id = m.group()
730 # else:
731 # raise LauncherError("Job id couldn't be determined: %s" % output)
732 # self.job_id = job_id
733 # self.log.info('Job started with job id: %r' % job_id)
734 # return job_id
735 #
736 # def write_batch_script(self, n):
737 # """Instantiate and write the batch script to the work_dir."""
738 # self.context['n'] = n
739 # script_as_string = Itpl.itplns(self.batch_template, self.context)
740 # self.log.info('Writing instantiated batch script: %s' % self.batch_file)
741 # f = open(self.batch_file, 'w')
742 # f.write(script_as_string)
743 # f.close()
744 #
745 # @inlineCallbacks
746 # def start(self, n):
747 # """Start n copies of the process using a batch system."""
748 # self.write_batch_script(n)
749 # output = yield getProcessOutput(self.submit_command,
750 # [self.batch_file], env=os.environ)
751 # job_id = self.parse_job_id(output)
752 # self.notify_start(job_id)
753 # defer.returnValue(job_id)
754 #
755 # @inlineCallbacks
756 # def stop(self):
757 # output = yield getProcessOutput(self.delete_command,
758 # [self.job_id], env=os.environ
759 # )
760 # self.notify_stop(output) # Pass the output of the kill cmd
761 # defer.returnValue(output)
762 #
763 #
764 # class PBSLauncher(BatchSystemLauncher):
765 # """A BatchSystemLauncher subclass for PBS."""
766 #
767 # submit_command = Str('qsub', config=True)
768 # delete_command = Str('qdel', config=True)
769 # job_id_regexp = Str(r'\d+', config=True)
770 # batch_template = Str('', config=True)
771 # batch_file_name = Unicode(u'pbs_batch_script', config=True)
772 # batch_file = Unicode(u'')
773 #
774 #
775 # class PBSControllerLauncher(PBSLauncher):
776 # """Launch a controller using PBS."""
777 #
778 # batch_file_name = Unicode(u'pbs_batch_script_controller', config=True)
779 #
780 # def start(self, cluster_dir):
781 # """Start the controller by profile or cluster_dir."""
782 # # Here we save profile and cluster_dir in the context so they
783 # # can be used in the batch script template as ${profile} and
784 # # ${cluster_dir}
785 # self.context['cluster_dir'] = cluster_dir
786 # self.cluster_dir = unicode(cluster_dir)
787 # self.log.info("Starting PBSControllerLauncher: %r" % self.args)
788 # return super(PBSControllerLauncher, self).start(1)
789 #
790 #
791 # class PBSEngineSetLauncher(PBSLauncher):
792 #
793 # batch_file_name = Unicode(u'pbs_batch_script_engines', config=True)
794 #
795 # def start(self, n, cluster_dir):
796 # """Start n engines by profile or cluster_dir."""
797 # self.program_args.extend(['--cluster-dir', cluster_dir])
798 # self.cluster_dir = unicode(cluster_dir)
799 # self.log.info('Starting PBSEngineSetLauncher: %r' % self.args)
800 # return super(PBSEngineSetLauncher, self).start(n)
546 # This is only used on Windows.
547 def find_job_cmd():
548 if os.name=='nt':
549 try:
550 return find_cmd('job')
551 except FindCmdError:
552 return 'job'
553 else:
554 return 'job'
555
556
557 class WindowsHPCLauncher(BaseLauncher):
558
559 # A regular expression used to get the job id from the output of the
560 # submit_command.
561 job_id_regexp = Str(r'\d+', config=True)
562 # The filename of the instantiated job script.
563 job_file_name = Unicode(u'ipython_job.xml', config=True)
564 # The full path to the instantiated job script. This gets made dynamically
565 # by combining the work_dir with the job_file_name.
566 job_file = Unicode(u'')
567 # The hostname of the scheduler to submit the job to
568 scheduler = Str('', config=True)
569 job_cmd = Str(find_job_cmd(), config=True)
570
571 def __init__(self, work_dir=u'.', config=None):
572 super(WindowsHPCLauncher, self).__init__(
573 work_dir=work_dir, config=config
574 )
575
576 @property
577 def job_file(self):
578 return os.path.join(self.work_dir, self.job_file_name)
579
580 def write_job_file(self, n):
581 raise NotImplementedError("Implement write_job_file in a subclass.")
582
583 def find_args(self):
584 return ['job.exe']
585
586 def parse_job_id(self, output):
587 """Take the output of the submit command and return the job id."""
588 m = re.search(self.job_id_regexp, output)
589 if m is not None:
590 job_id = m.group()
591 else:
592 raise LauncherError("Job id couldn't be determined: %s" % output)
593 self.job_id = job_id
594 self.log.info('Job started with job id: %r' % job_id)
595 return job_id
596
597 def start(self, n):
598 """Start n copies of the process using the Win HPC job scheduler."""
599 self.write_job_file(n)
600 args = [
601 'submit',
602 '/jobfile:%s' % self.job_file,
603 '/scheduler:%s' % self.scheduler
604 ]
605 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
606 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
607 output = check_output([self.job_cmd]+args,
608 env=os.environ,
609 cwd=self.work_dir,
610 stderr=STDOUT
611 )
612 job_id = self.parse_job_id(output)
613 # self.notify_start(job_id)
614 return job_id
615
616 def stop(self):
617 args = [
618 'cancel',
619 self.job_id,
620 '/scheduler:%s' % self.scheduler
621 ]
622 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
623 try:
624 output = check_output([self.job_cmd]+args,
625 env=os.environ,
626 cwd=self.work_dir,
627 stderr=STDOUT
628 )
629 except:
630 output = 'The job already appears to be stoppped: %r' % self.job_id
631 self.notify_stop(output) # Pass the output of the kill cmd
632 return output
633
634
635 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
636
637 job_file_name = Unicode(u'ipcontroller_job.xml', config=True)
638 extra_args = List([], config=False)
639
640 def write_job_file(self, n):
641 job = IPControllerJob(config=self.config)
642
643 t = IPControllerTask(config=self.config)
644 # The tasks work directory is *not* the actual work directory of
645 # the controller. It is used as the base path for the stdout/stderr
646 # files that the scheduler redirects to.
647 t.work_directory = self.cluster_dir
648 # Add the --cluster-dir and from self.start().
649 t.controller_args.extend(self.extra_args)
650 job.add_task(t)
651
652 self.log.info("Writing job description file: %s" % self.job_file)
653 job.write(self.job_file)
654
655 @property
656 def job_file(self):
657 return os.path.join(self.cluster_dir, self.job_file_name)
658
659 def start(self, cluster_dir):
660 """Start the controller by cluster_dir."""
661 self.extra_args = ['--cluster-dir', cluster_dir]
662 self.cluster_dir = unicode(cluster_dir)
663 return super(WindowsHPCControllerLauncher, self).start(1)
664
665
666 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
667
668 job_file_name = Unicode(u'ipengineset_job.xml', config=True)
669 extra_args = List([], config=False)
670
671 def write_job_file(self, n):
672 job = IPEngineSetJob(config=self.config)
673
674 for i in range(n):
675 t = IPEngineTask(config=self.config)
676 # The tasks work directory is *not* the actual work directory of
677 # the engine. It is used as the base path for the stdout/stderr
678 # files that the scheduler redirects to.
679 t.work_directory = self.cluster_dir
680 # Add the --cluster-dir and from self.start().
681 t.engine_args.extend(self.extra_args)
682 job.add_task(t)
683
684 self.log.info("Writing job description file: %s" % self.job_file)
685 job.write(self.job_file)
686
687 @property
688 def job_file(self):
689 return os.path.join(self.cluster_dir, self.job_file_name)
690
691 def start(self, n, cluster_dir):
692 """Start the controller by cluster_dir."""
693 self.extra_args = ['--cluster-dir', cluster_dir]
694 self.cluster_dir = unicode(cluster_dir)
695 return super(WindowsHPCEngineSetLauncher, self).start(n)
696
697
698 #-----------------------------------------------------------------------------
699 # Batch (PBS) system launchers
700 #-----------------------------------------------------------------------------
701
702 # TODO: Get PBS launcher working again.
703
704 class BatchSystemLauncher(BaseLauncher):
705 """Launch an external process using a batch system.
706
707 This class is designed to work with UNIX batch systems like PBS, LSF,
708 GridEngine, etc. The overall model is that there are different commands
709 like qsub, qdel, etc. that handle the starting and stopping of the process.
710
711 This class also has the notion of a batch script. The ``batch_template``
712 attribute can be set to a string that is a template for the batch script.
713 This template is instantiated using Itpl. Thus the template can use
714 ${n} fot the number of instances. Subclasses can add additional variables
715 to the template dict.
716 """
717
718 # Subclasses must fill these in. See PBSEngineSet
719 # The name of the command line program used to submit jobs.
720 submit_command = Str('', config=True)
721 # The name of the command line program used to delete jobs.
722 delete_command = Str('', config=True)
723 # A regular expression used to get the job id from the output of the
724 # submit_command.
725 job_id_regexp = Str('', config=True)
726 # The string that is the batch script template itself.
727 batch_template = Str('', config=True)
728 # The filename of the instantiated batch script.
729 batch_file_name = Unicode(u'batch_script', config=True)
730 # The full path to the instantiated batch script.
731 batch_file = Unicode(u'')
732
733 def __init__(self, work_dir=u'.', config=None):
734 super(BatchSystemLauncher, self).__init__(
735 work_dir=work_dir, config=config
736 )
737 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
738 self.context = {}
739
740 def parse_job_id(self, output):
741 """Take the output of the submit command and return the job id."""
742 m = re.match(self.job_id_regexp, output)
743 if m is not None:
744 job_id = m.group()
745 else:
746 raise LauncherError("Job id couldn't be determined: %s" % output)
747 self.job_id = job_id
748 self.log.info('Job started with job id: %r' % job_id)
749 return job_id
750
751 def write_batch_script(self, n):
752 """Instantiate and write the batch script to the work_dir."""
753 self.context['n'] = n
754 script_as_string = Itpl.itplns(self.batch_template, self.context)
755 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
756 f = open(self.batch_file, 'w')
757 f.write(script_as_string)
758 f.close()
759
760 def start(self, n):
761 """Start n copies of the process using a batch system."""
762 self.write_batch_script(n)
763 output = check_output([self.submit_command, self.batch_file], env=os.environ, stdout=STDOUT)
764 job_id = self.parse_job_id(output)
765 # self.notify_start(job_id)
766 return job_id
767
768 def stop(self):
769 output = Popen([self.delete_command, self.job_id], env=os.environ, stderr=STDOUT)
770 self.notify_stop(output) # Pass the output of the kill cmd
771 return output
772
773
774 class PBSLauncher(BatchSystemLauncher):
775 """A BatchSystemLauncher subclass for PBS."""
776
777 submit_command = Str('qsub', config=True)
778 delete_command = Str('qdel', config=True)
779 job_id_regexp = Str(r'\d+', config=True)
780 batch_template = Str('', config=True)
781 batch_file_name = Unicode(u'pbs_batch_script', config=True)
782 batch_file = Unicode(u'')
783
784
785 class PBSControllerLauncher(PBSLauncher):
786 """Launch a controller using PBS."""
787
788 batch_file_name = Unicode(u'pbs_batch_script_controller', config=True)
789
790 def start(self, cluster_dir):
791 """Start the controller by profile or cluster_dir."""
792 # Here we save profile and cluster_dir in the context so they
793 # can be used in the batch script template as ${profile} and
794 # ${cluster_dir}
795 self.context['cluster_dir'] = cluster_dir
796 self.cluster_dir = unicode(cluster_dir)
797 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
798 return super(PBSControllerLauncher, self).start(1)
799
800
801 class PBSEngineSetLauncher(PBSLauncher):
802
803 batch_file_name = Unicode(u'pbs_batch_script_engines', config=True)
804
805 def start(self, n, cluster_dir):
806 """Start n engines by profile or cluster_dir."""
807 self.program_args.extend(['--cluster-dir', cluster_dir])
808 self.cluster_dir = unicode(cluster_dir)
809 self.log.info('Starting PBSEngineSetLauncher: %r' % self.args)
810 return super(PBSEngineSetLauncher, self).start(n)
801 811
802 812
803 813 #-----------------------------------------------------------------------------
804 814 # A launcher for ipcluster itself!
805 815 #-----------------------------------------------------------------------------
806 816
807 817
808 818 class IPClusterLauncher(LocalProcessLauncher):
809 819 """Launch the ipcluster program in an external process."""
810 820
811 821 ipcluster_cmd = List(ipcluster_cmd_argv, config=True)
812 822 # Command line arguments to pass to ipcluster.
813 823 ipcluster_args = List(
814 ['--clean-logs', '--log-to-file', '--log-level', str(logging.ERROR)], config=True)
824 ['--clean-logs', '--log-to-file', '--log-level', str(logging.INFO)], config=True)
815 825 ipcluster_subcommand = Str('start')
816 826 ipcluster_n = Int(2)
817 827
818 828 def find_args(self):
819 829 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
820 830 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
821 831
822 832 def start(self):
823 833 self.log.info("Starting ipcluster: %r" % self.args)
824 834 return super(IPClusterLauncher, self).start()
825 835
General Comments 0
You need to be logged in to leave comments. Login now