##// END OF EJS Templates
add check_pid, and handle stale PID info in ipcluster....
MinRK -
Show More
@@ -1,537 +1,566 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython cluster directory
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 from __future__ import with_statement
19 19
20 20 import os
21 21 import logging
22 22 import re
23 23 import shutil
24 24 import sys
25 25
26 from subprocess import Popen, PIPE
27
26 28 from IPython.config.loader import PyFileConfigLoader
27 29 from IPython.config.configurable import Configurable
28 30 from IPython.core.application import Application, BaseAppConfigLoader
29 31 from IPython.core.crashhandler import CrashHandler
30 32 from IPython.core import release
31 33 from IPython.utils.path import (
32 34 get_ipython_package_dir,
33 35 expand_path
34 36 )
35 37 from IPython.utils.traitlets import Unicode
36 38
37 39 #-----------------------------------------------------------------------------
38 40 # Module errors
39 41 #-----------------------------------------------------------------------------
40 42
41 43 class ClusterDirError(Exception):
42 44 pass
43 45
44 46
45 47 class PIDFileError(Exception):
46 48 pass
47 49
48 50
49 51 #-----------------------------------------------------------------------------
50 52 # Class for managing cluster directories
51 53 #-----------------------------------------------------------------------------
52 54
53 55 class ClusterDir(Configurable):
54 56 """An object to manage the cluster directory and its resources.
55 57
56 58 The cluster directory is used by :command:`ipengine`,
57 59 :command:`ipcontroller` and :command:`ipclsuter` to manage the
58 60 configuration, logging and security of these applications.
59 61
60 62 This object knows how to find, create and manage these directories. This
61 63 should be used by any code that want's to handle cluster directories.
62 64 """
63 65
64 66 security_dir_name = Unicode('security')
65 67 log_dir_name = Unicode('log')
66 68 pid_dir_name = Unicode('pid')
67 69 security_dir = Unicode(u'')
68 70 log_dir = Unicode(u'')
69 71 pid_dir = Unicode(u'')
70 72 location = Unicode(u'')
71 73
72 74 def __init__(self, location=u''):
73 75 super(ClusterDir, self).__init__(location=location)
74 76
75 77 def _location_changed(self, name, old, new):
76 78 if not os.path.isdir(new):
77 79 os.makedirs(new)
78 80 self.security_dir = os.path.join(new, self.security_dir_name)
79 81 self.log_dir = os.path.join(new, self.log_dir_name)
80 82 self.pid_dir = os.path.join(new, self.pid_dir_name)
81 83 self.check_dirs()
82 84
83 85 def _log_dir_changed(self, name, old, new):
84 86 self.check_log_dir()
85 87
86 88 def check_log_dir(self):
87 89 if not os.path.isdir(self.log_dir):
88 90 os.mkdir(self.log_dir)
89 91
90 92 def _security_dir_changed(self, name, old, new):
91 93 self.check_security_dir()
92 94
93 95 def check_security_dir(self):
94 96 if not os.path.isdir(self.security_dir):
95 97 os.mkdir(self.security_dir, 0700)
96 98 os.chmod(self.security_dir, 0700)
97 99
98 100 def _pid_dir_changed(self, name, old, new):
99 101 self.check_pid_dir()
100 102
101 103 def check_pid_dir(self):
102 104 if not os.path.isdir(self.pid_dir):
103 105 os.mkdir(self.pid_dir, 0700)
104 106 os.chmod(self.pid_dir, 0700)
105 107
106 108 def check_dirs(self):
107 109 self.check_security_dir()
108 110 self.check_log_dir()
109 111 self.check_pid_dir()
110 112
111 113 def load_config_file(self, filename):
112 114 """Load a config file from the top level of the cluster dir.
113 115
114 116 Parameters
115 117 ----------
116 118 filename : unicode or str
117 119 The filename only of the config file that must be located in
118 120 the top-level of the cluster directory.
119 121 """
120 122 loader = PyFileConfigLoader(filename, self.location)
121 123 return loader.load_config()
122 124
123 125 def copy_config_file(self, config_file, path=None, overwrite=False):
124 126 """Copy a default config file into the active cluster directory.
125 127
126 128 Default configuration files are kept in :mod:`IPython.config.default`.
127 129 This function moves these from that location to the working cluster
128 130 directory.
129 131 """
130 132 if path is None:
131 133 import IPython.config.default
132 134 path = IPython.config.default.__file__.split(os.path.sep)[:-1]
133 135 path = os.path.sep.join(path)
134 136 src = os.path.join(path, config_file)
135 137 dst = os.path.join(self.location, config_file)
136 138 if not os.path.isfile(dst) or overwrite:
137 139 shutil.copy(src, dst)
138 140
139 141 def copy_all_config_files(self, path=None, overwrite=False):
140 142 """Copy all config files into the active cluster directory."""
141 143 for f in [u'ipcontroller_config.py', u'ipengine_config.py',
142 144 u'ipcluster_config.py']:
143 145 self.copy_config_file(f, path=path, overwrite=overwrite)
144 146
145 147 @classmethod
146 148 def create_cluster_dir(csl, cluster_dir):
147 149 """Create a new cluster directory given a full path.
148 150
149 151 Parameters
150 152 ----------
151 153 cluster_dir : str
152 154 The full path to the cluster directory. If it does exist, it will
153 155 be used. If not, it will be created.
154 156 """
155 157 return ClusterDir(location=cluster_dir)
156 158
157 159 @classmethod
158 160 def create_cluster_dir_by_profile(cls, path, profile=u'default'):
159 161 """Create a cluster dir by profile name and path.
160 162
161 163 Parameters
162 164 ----------
163 165 path : str
164 166 The path (directory) to put the cluster directory in.
165 167 profile : str
166 168 The name of the profile. The name of the cluster directory will
167 169 be "cluster_<profile>".
168 170 """
169 171 if not os.path.isdir(path):
170 172 raise ClusterDirError('Directory not found: %s' % path)
171 173 cluster_dir = os.path.join(path, u'cluster_' + profile)
172 174 return ClusterDir(location=cluster_dir)
173 175
174 176 @classmethod
175 177 def find_cluster_dir_by_profile(cls, ipython_dir, profile=u'default'):
176 178 """Find an existing cluster dir by profile name, return its ClusterDir.
177 179
178 180 This searches through a sequence of paths for a cluster dir. If it
179 181 is not found, a :class:`ClusterDirError` exception will be raised.
180 182
181 183 The search path algorithm is:
182 184 1. ``os.getcwd()``
183 185 2. ``ipython_dir``
184 186 3. The directories found in the ":" separated
185 187 :env:`IPCLUSTER_DIR_PATH` environment variable.
186 188
187 189 Parameters
188 190 ----------
189 191 ipython_dir : unicode or str
190 192 The IPython directory to use.
191 193 profile : unicode or str
192 194 The name of the profile. The name of the cluster directory
193 195 will be "cluster_<profile>".
194 196 """
195 197 dirname = u'cluster_' + profile
196 198 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
197 199 if cluster_dir_paths:
198 200 cluster_dir_paths = cluster_dir_paths.split(':')
199 201 else:
200 202 cluster_dir_paths = []
201 203 paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
202 204 for p in paths:
203 205 cluster_dir = os.path.join(p, dirname)
204 206 if os.path.isdir(cluster_dir):
205 207 return ClusterDir(location=cluster_dir)
206 208 else:
207 209 raise ClusterDirError('Cluster directory not found in paths: %s' % dirname)
208 210
209 211 @classmethod
210 212 def find_cluster_dir(cls, cluster_dir):
211 213 """Find/create a cluster dir and return its ClusterDir.
212 214
213 215 This will create the cluster directory if it doesn't exist.
214 216
215 217 Parameters
216 218 ----------
217 219 cluster_dir : unicode or str
218 220 The path of the cluster directory. This is expanded using
219 221 :func:`IPython.utils.genutils.expand_path`.
220 222 """
221 223 cluster_dir = expand_path(cluster_dir)
222 224 if not os.path.isdir(cluster_dir):
223 225 raise ClusterDirError('Cluster directory not found: %s' % cluster_dir)
224 226 return ClusterDir(location=cluster_dir)
225 227
226 228
227 229 #-----------------------------------------------------------------------------
228 230 # Command line options
229 231 #-----------------------------------------------------------------------------
230 232
231 233 class ClusterDirConfigLoader(BaseAppConfigLoader):
232 234
233 235 def _add_cluster_profile(self, parser):
234 236 paa = parser.add_argument
235 237 paa('-p', '--profile',
236 238 dest='Global.profile',type=unicode,
237 239 help=
238 240 """The string name of the profile to be used. This determines the name
239 241 of the cluster dir as: cluster_<profile>. The default profile is named
240 242 'default'. The cluster directory is resolve this way if the
241 243 --cluster-dir option is not used.""",
242 244 metavar='Global.profile')
243 245
244 246 def _add_cluster_dir(self, parser):
245 247 paa = parser.add_argument
246 248 paa('--cluster-dir',
247 249 dest='Global.cluster_dir',type=unicode,
248 250 help="""Set the cluster dir. This overrides the logic used by the
249 251 --profile option.""",
250 252 metavar='Global.cluster_dir')
251 253
252 254 def _add_work_dir(self, parser):
253 255 paa = parser.add_argument
254 256 paa('--work-dir',
255 257 dest='Global.work_dir',type=unicode,
256 258 help='Set the working dir for the process.',
257 259 metavar='Global.work_dir')
258 260
259 261 def _add_clean_logs(self, parser):
260 262 paa = parser.add_argument
261 263 paa('--clean-logs',
262 264 dest='Global.clean_logs', action='store_true',
263 265 help='Delete old log flies before starting.')
264 266
265 267 def _add_no_clean_logs(self, parser):
266 268 paa = parser.add_argument
267 269 paa('--no-clean-logs',
268 270 dest='Global.clean_logs', action='store_false',
269 271 help="Don't Delete old log flies before starting.")
270 272
271 273 def _add_arguments(self):
272 274 super(ClusterDirConfigLoader, self)._add_arguments()
273 275 self._add_cluster_profile(self.parser)
274 276 self._add_cluster_dir(self.parser)
275 277 self._add_work_dir(self.parser)
276 278 self._add_clean_logs(self.parser)
277 279 self._add_no_clean_logs(self.parser)
278 280
279 281
280 282 #-----------------------------------------------------------------------------
281 283 # Crash handler for this application
282 284 #-----------------------------------------------------------------------------
283 285
284 286
285 287 _message_template = """\
286 288 Oops, $self.app_name crashed. We do our best to make it stable, but...
287 289
288 290 A crash report was automatically generated with the following information:
289 291 - A verbatim copy of the crash traceback.
290 292 - Data on your current $self.app_name configuration.
291 293
292 294 It was left in the file named:
293 295 \t'$self.crash_report_fname'
294 296 If you can email this file to the developers, the information in it will help
295 297 them in understanding and correcting the problem.
296 298
297 299 You can mail it to: $self.contact_name at $self.contact_email
298 300 with the subject '$self.app_name Crash Report'.
299 301
300 302 If you want to do it now, the following command will work (under Unix):
301 303 mail -s '$self.app_name Crash Report' $self.contact_email < $self.crash_report_fname
302 304
303 305 To ensure accurate tracking of this issue, please file a report about it at:
304 306 $self.bug_tracker
305 307 """
306 308
307 309 class ClusterDirCrashHandler(CrashHandler):
308 310 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
309 311
310 312 message_template = _message_template
311 313
312 314 def __init__(self, app):
313 315 contact_name = release.authors['Brian'][0]
314 316 contact_email = release.authors['Brian'][1]
315 317 bug_tracker = 'http://github.com/ipython/ipython/issues'
316 318 super(ClusterDirCrashHandler,self).__init__(
317 319 app, contact_name, contact_email, bug_tracker
318 320 )
319 321
320 322
321 323 #-----------------------------------------------------------------------------
322 324 # Main application
323 325 #-----------------------------------------------------------------------------
324 326
325 327 class ApplicationWithClusterDir(Application):
326 328 """An application that puts everything into a cluster directory.
327 329
328 330 Instead of looking for things in the ipython_dir, this type of application
329 331 will use its own private directory called the "cluster directory"
330 332 for things like config files, log files, etc.
331 333
332 334 The cluster directory is resolved as follows:
333 335
334 336 * If the ``--cluster-dir`` option is given, it is used.
335 337 * If ``--cluster-dir`` is not given, the application directory is
336 338 resolve using the profile name as ``cluster_<profile>``. The search
337 339 path for this directory is then i) cwd if it is found there
338 340 and ii) in ipython_dir otherwise.
339 341
340 342 The config file for the application is to be put in the cluster
341 343 dir and named the value of the ``config_file_name`` class attribute.
342 344 """
343 345
344 346 command_line_loader = ClusterDirConfigLoader
345 347 crash_handler_class = ClusterDirCrashHandler
346 348 auto_create_cluster_dir = True
347 349 # temporarily override default_log_level to INFO
348 350 default_log_level = logging.INFO
349 351
350 352 def create_default_config(self):
351 353 super(ApplicationWithClusterDir, self).create_default_config()
352 354 self.default_config.Global.profile = u'default'
353 355 self.default_config.Global.cluster_dir = u''
354 356 self.default_config.Global.work_dir = os.getcwd()
355 357 self.default_config.Global.log_to_file = False
356 358 self.default_config.Global.log_url = None
357 359 self.default_config.Global.clean_logs = False
358 360
359 361 def find_resources(self):
360 362 """This resolves the cluster directory.
361 363
362 364 This tries to find the cluster directory and if successful, it will
363 365 have done:
364 366 * Sets ``self.cluster_dir_obj`` to the :class:`ClusterDir` object for
365 367 the application.
366 368 * Sets ``self.cluster_dir`` attribute of the application and config
367 369 objects.
368 370
369 371 The algorithm used for this is as follows:
370 372 1. Try ``Global.cluster_dir``.
371 373 2. Try using ``Global.profile``.
372 374 3. If both of these fail and ``self.auto_create_cluster_dir`` is
373 375 ``True``, then create the new cluster dir in the IPython directory.
374 376 4. If all fails, then raise :class:`ClusterDirError`.
375 377 """
376 378
377 379 try:
378 380 cluster_dir = self.command_line_config.Global.cluster_dir
379 381 except AttributeError:
380 382 cluster_dir = self.default_config.Global.cluster_dir
381 383 cluster_dir = expand_path(cluster_dir)
382 384 try:
383 385 self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
384 386 except ClusterDirError:
385 387 pass
386 388 else:
387 389 self.log.info('Using existing cluster dir: %s' % \
388 390 self.cluster_dir_obj.location
389 391 )
390 392 self.finish_cluster_dir()
391 393 return
392 394
393 395 try:
394 396 self.profile = self.command_line_config.Global.profile
395 397 except AttributeError:
396 398 self.profile = self.default_config.Global.profile
397 399 try:
398 400 self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
399 401 self.ipython_dir, self.profile)
400 402 except ClusterDirError:
401 403 pass
402 404 else:
403 405 self.log.info('Using existing cluster dir: %s' % \
404 406 self.cluster_dir_obj.location
405 407 )
406 408 self.finish_cluster_dir()
407 409 return
408 410
409 411 if self.auto_create_cluster_dir:
410 412 self.cluster_dir_obj = ClusterDir.create_cluster_dir_by_profile(
411 413 self.ipython_dir, self.profile
412 414 )
413 415 self.log.info('Creating new cluster dir: %s' % \
414 416 self.cluster_dir_obj.location
415 417 )
416 418 self.finish_cluster_dir()
417 419 else:
418 420 raise ClusterDirError('Could not find a valid cluster directory.')
419 421
420 422 def finish_cluster_dir(self):
421 423 # Set the cluster directory
422 424 self.cluster_dir = self.cluster_dir_obj.location
423 425
424 426 # These have to be set because they could be different from the one
425 427 # that we just computed. Because command line has the highest
426 428 # priority, this will always end up in the master_config.
427 429 self.default_config.Global.cluster_dir = self.cluster_dir
428 430 self.command_line_config.Global.cluster_dir = self.cluster_dir
429 431
430 432 def find_config_file_name(self):
431 433 """Find the config file name for this application."""
432 434 # For this type of Application it should be set as a class attribute.
433 435 if not hasattr(self, 'default_config_file_name'):
434 436 self.log.critical("No config filename found")
435 437 else:
436 438 self.config_file_name = self.default_config_file_name
437 439
438 440 def find_config_file_paths(self):
439 441 # Set the search path to to the cluster directory. We should NOT
440 442 # include IPython.config.default here as the default config files
441 443 # are ALWAYS automatically moved to the cluster directory.
442 444 conf_dir = os.path.join(get_ipython_package_dir(), 'config', 'default')
443 445 self.config_file_paths = (self.cluster_dir,)
444 446
445 447 def pre_construct(self):
446 448 # The log and security dirs were set earlier, but here we put them
447 449 # into the config and log them.
448 450 config = self.master_config
449 451 sdir = self.cluster_dir_obj.security_dir
450 452 self.security_dir = config.Global.security_dir = sdir
451 453 ldir = self.cluster_dir_obj.log_dir
452 454 self.log_dir = config.Global.log_dir = ldir
453 455 pdir = self.cluster_dir_obj.pid_dir
454 456 self.pid_dir = config.Global.pid_dir = pdir
455 457 self.log.info("Cluster directory set to: %s" % self.cluster_dir)
456 458 config.Global.work_dir = unicode(expand_path(config.Global.work_dir))
457 459 # Change to the working directory. We do this just before construct
458 460 # is called so all the components there have the right working dir.
459 461 self.to_work_dir()
460 462
461 463 def to_work_dir(self):
462 464 wd = self.master_config.Global.work_dir
463 465 if unicode(wd) != unicode(os.getcwd()):
464 466 os.chdir(wd)
465 467 self.log.info("Changing to working dir: %s" % wd)
466 468
467 469 def start_logging(self):
468 470 # Remove old log files
469 471 if self.master_config.Global.clean_logs:
470 472 log_dir = self.master_config.Global.log_dir
471 473 for f in os.listdir(log_dir):
472 474 if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
473 475 # if f.startswith(self.name + u'-') and f.endswith('.log'):
474 476 os.remove(os.path.join(log_dir, f))
475 477 # Start logging to the new log file
476 478 if self.master_config.Global.log_to_file:
477 479 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
478 480 logfile = os.path.join(self.log_dir, log_filename)
479 481 open_log_file = open(logfile, 'w')
480 482 elif self.master_config.Global.log_url:
481 483 open_log_file = None
482 484 else:
483 485 open_log_file = sys.stdout
484 486 if open_log_file is not None:
485 487 self.log.removeHandler(self._log_handler)
486 488 self._log_handler = logging.StreamHandler(open_log_file)
487 489 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
488 490 self._log_handler.setFormatter(self._log_formatter)
489 491 self.log.addHandler(self._log_handler)
490 492 # log.startLogging(open_log_file)
491 493
492 494 def write_pid_file(self, overwrite=False):
493 495 """Create a .pid file in the pid_dir with my pid.
494 496
495 497 This must be called after pre_construct, which sets `self.pid_dir`.
496 498 This raises :exc:`PIDFileError` if the pid file exists already.
497 499 """
498 500 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
499 501 if os.path.isfile(pid_file):
500 502 pid = self.get_pid_from_file()
501 503 if not overwrite:
502 504 raise PIDFileError(
503 505 'The pid file [%s] already exists. \nThis could mean that this '
504 506 'server is already running with [pid=%s].' % (pid_file, pid)
505 507 )
506 508 with open(pid_file, 'w') as f:
507 509 self.log.info("Creating pid file: %s" % pid_file)
508 510 f.write(repr(os.getpid())+'\n')
509 511
510 512 def remove_pid_file(self):
511 513 """Remove the pid file.
512 514
513 515 This should be called at shutdown by registering a callback with
514 516 :func:`reactor.addSystemEventTrigger`. This needs to return
515 517 ``None``.
516 518 """
517 519 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
518 520 if os.path.isfile(pid_file):
519 521 try:
520 522 self.log.info("Removing pid file: %s" % pid_file)
521 523 os.remove(pid_file)
522 524 except:
523 525 self.log.warn("Error removing the pid file: %s" % pid_file)
524 526
525 527 def get_pid_from_file(self):
526 528 """Get the pid from the pid file.
527 529
528 530 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
529 531 """
530 532 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
531 533 if os.path.isfile(pid_file):
532 534 with open(pid_file, 'r') as f:
533 535 pid = int(f.read().strip())
534 536 return pid
535 537 else:
536 538 raise PIDFileError('pid file not found: %s' % pid_file)
537
539
540 def check_pid(self, pid):
541 if os.name == 'nt':
542 try:
543 import ctypes
544 # returns 0 if no such process (of ours) exists
545 # positive int otherwise
546 p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
547 except Exception:
548 self.log.warn(
549 "Could not determine whether pid %i is running via `OpenProcess`. "
550 " Making the likely assumption that it is."%pid
551 )
552 return True
553 return bool(p)
554 else:
555 try:
556 p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
557 output,_ = p.communicate()
558 except OSError:
559 self.log.warn(
560 "Could not determine whether pid %i is running via `ps x`. "
561 " Making the likely assumption that it is."%pid
562 )
563 return True
564 pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
565 return pid in pids
566 No newline at end of file
@@ -1,593 +1,617 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The ipcluster application.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import errno
19 19 import logging
20 20 import os
21 21 import re
22 22 import signal
23 23
24 from subprocess import check_call, CalledProcessError, PIPE
24 25 import zmq
25 26 from zmq.eventloop import ioloop
26 27
27 28 from IPython.external.argparse import ArgumentParser, SUPPRESS
28 29 from IPython.utils.importstring import import_item
29 30
30 31 from IPython.parallel.apps.clusterdir import (
31 32 ApplicationWithClusterDir, ClusterDirConfigLoader,
32 33 ClusterDirError, PIDFileError
33 34 )
34 35
35 36
36 37 #-----------------------------------------------------------------------------
37 38 # Module level variables
38 39 #-----------------------------------------------------------------------------
39 40
40 41
41 42 default_config_file_name = u'ipcluster_config.py'
42 43
43 44
44 45 _description = """\
45 46 Start an IPython cluster for parallel computing.\n\n
46 47
47 48 An IPython cluster consists of 1 controller and 1 or more engines.
48 49 This command automates the startup of these processes using a wide
49 50 range of startup methods (SSH, local processes, PBS, mpiexec,
50 51 Windows HPC Server 2008). To start a cluster with 4 engines on your
51 52 local host simply do 'ipcluster start -n 4'. For more complex usage
52 53 you will typically do 'ipcluster create -p mycluster', then edit
53 54 configuration files, followed by 'ipcluster start -p mycluster -n 4'.
54 55 """
55 56
56 57
57 58 # Exit codes for ipcluster
58 59
59 60 # This will be the exit code if the ipcluster appears to be running because
60 61 # a .pid file exists
61 62 ALREADY_STARTED = 10
62 63
63 64
64 65 # This will be the exit code if ipcluster stop is run, but there is not .pid
65 66 # file to be found.
66 67 ALREADY_STOPPED = 11
67 68
68 69 # This will be the exit code if ipcluster engines is run, but there is not .pid
69 70 # file to be found.
70 71 NO_CLUSTER = 12
71 72
72 73
73 74 #-----------------------------------------------------------------------------
74 75 # Command line options
75 76 #-----------------------------------------------------------------------------
76 77
77 78
78 79 class IPClusterAppConfigLoader(ClusterDirConfigLoader):
79 80
80 81 def _add_arguments(self):
81 82 # Don't call ClusterDirConfigLoader._add_arguments as we don't want
82 83 # its defaults on self.parser. Instead, we will put those on
83 84 # default options on our subparsers.
84 85
85 86 # This has all the common options that all subcommands use
86 87 parent_parser1 = ArgumentParser(
87 88 add_help=False,
88 89 argument_default=SUPPRESS
89 90 )
90 91 self._add_ipython_dir(parent_parser1)
91 92 self._add_log_level(parent_parser1)
92 93
93 94 # This has all the common options that other subcommands use
94 95 parent_parser2 = ArgumentParser(
95 96 add_help=False,
96 97 argument_default=SUPPRESS
97 98 )
98 99 self._add_cluster_profile(parent_parser2)
99 100 self._add_cluster_dir(parent_parser2)
100 101 self._add_work_dir(parent_parser2)
101 102 paa = parent_parser2.add_argument
102 103 paa('--log-to-file',
103 104 action='store_true', dest='Global.log_to_file',
104 105 help='Log to a file in the log directory (default is stdout)')
105 106
106 107 # Create the object used to create the subparsers.
107 108 subparsers = self.parser.add_subparsers(
108 109 dest='Global.subcommand',
109 110 title='ipcluster subcommands',
110 111 description=
111 112 """ipcluster has a variety of subcommands. The general way of
112 113 running ipcluster is 'ipcluster <cmd> [options]'. To get help
113 114 on a particular subcommand do 'ipcluster <cmd> -h'."""
114 115 # help="For more help, type 'ipcluster <cmd> -h'",
115 116 )
116 117
117 118 # The "list" subcommand parser
118 119 parser_list = subparsers.add_parser(
119 120 'list',
120 121 parents=[parent_parser1],
121 122 argument_default=SUPPRESS,
122 123 help="List all clusters in cwd and ipython_dir.",
123 124 description=
124 125 """List all available clusters, by cluster directory, that can
125 126 be found in the current working directly or in the ipython
126 127 directory. Cluster directories are named using the convention
127 128 'cluster_<profile>'."""
128 129 )
129 130
130 131 # The "create" subcommand parser
131 132 parser_create = subparsers.add_parser(
132 133 'create',
133 134 parents=[parent_parser1, parent_parser2],
134 135 argument_default=SUPPRESS,
135 136 help="Create a new cluster directory.",
136 137 description=
137 138 """Create an ipython cluster directory by its profile name or
138 139 cluster directory path. Cluster directories contain
139 140 configuration, log and security related files and are named
140 141 using the convention 'cluster_<profile>'. By default they are
141 142 located in your ipython directory. Once created, you will
142 143 probably need to edit the configuration files in the cluster
143 144 directory to configure your cluster. Most users will create a
144 145 cluster directory by profile name,
145 146 'ipcluster create -p mycluster', which will put the directory
146 147 in '<ipython_dir>/cluster_mycluster'.
147 148 """
148 149 )
149 150 paa = parser_create.add_argument
150 151 paa('--reset-config',
151 152 dest='Global.reset_config', action='store_true',
152 153 help=
153 154 """Recopy the default config files to the cluster directory.
154 155 You will loose any modifications you have made to these files.""")
155 156
156 157 # The "start" subcommand parser
157 158 parser_start = subparsers.add_parser(
158 159 'start',
159 160 parents=[parent_parser1, parent_parser2],
160 161 argument_default=SUPPRESS,
161 162 help="Start a cluster.",
162 163 description=
163 164 """Start an ipython cluster by its profile name or cluster
164 165 directory. Cluster directories contain configuration, log and
165 166 security related files and are named using the convention
166 167 'cluster_<profile>' and should be creating using the 'start'
167 168 subcommand of 'ipcluster'. If your cluster directory is in
168 169 the cwd or the ipython directory, you can simply refer to it
169 170 using its profile name, 'ipcluster start -n 4 -p <profile>`,
170 171 otherwise use the '--cluster-dir' option.
171 172 """
172 173 )
173 174
174 175 paa = parser_start.add_argument
175 176 paa('-n', '--number',
176 177 type=int, dest='Global.n',
177 178 help='The number of engines to start.',
178 179 metavar='Global.n')
179 180 paa('--clean-logs',
180 181 dest='Global.clean_logs', action='store_true',
181 182 help='Delete old log flies before starting.')
182 183 paa('--no-clean-logs',
183 184 dest='Global.clean_logs', action='store_false',
184 185 help="Don't delete old log flies before starting.")
185 186 paa('--daemon',
186 187 dest='Global.daemonize', action='store_true',
187 188 help='Daemonize the ipcluster program. This implies --log-to-file')
188 189 paa('--no-daemon',
189 190 dest='Global.daemonize', action='store_false',
190 191 help="Dont't daemonize the ipcluster program.")
191 192 paa('--delay',
192 193 type=float, dest='Global.delay',
193 194 help="Specify the delay (in seconds) between starting the controller and starting the engine(s).")
194 195
195 196 # The "stop" subcommand parser
196 197 parser_stop = subparsers.add_parser(
197 198 'stop',
198 199 parents=[parent_parser1, parent_parser2],
199 200 argument_default=SUPPRESS,
200 201 help="Stop a running cluster.",
201 202 description=
202 203 """Stop a running ipython cluster by its profile name or cluster
203 204 directory. Cluster directories are named using the convention
204 205 'cluster_<profile>'. If your cluster directory is in
205 206 the cwd or the ipython directory, you can simply refer to it
206 207 using its profile name, 'ipcluster stop -p <profile>`, otherwise
207 208 use the '--cluster-dir' option.
208 209 """
209 210 )
210 211 paa = parser_stop.add_argument
211 212 paa('--signal',
212 213 dest='Global.signal', type=int,
213 214 help="The signal number to use in stopping the cluster (default=2).",
214 215 metavar="Global.signal")
215 216
216 217 # the "engines" subcommand parser
217 218 parser_engines = subparsers.add_parser(
218 219 'engines',
219 220 parents=[parent_parser1, parent_parser2],
220 221 argument_default=SUPPRESS,
221 222 help="Attach some engines to an existing controller or cluster.",
222 223 description=
223 224 """Start one or more engines to connect to an existing Cluster
224 225 by profile name or cluster directory.
225 226 Cluster directories contain configuration, log and
226 227 security related files and are named using the convention
227 228 'cluster_<profile>' and should be creating using the 'start'
228 229 subcommand of 'ipcluster'. If your cluster directory is in
229 230 the cwd or the ipython directory, you can simply refer to it
230 231 using its profile name, 'ipcluster engines -n 4 -p <profile>`,
231 232 otherwise use the '--cluster-dir' option.
232 233 """
233 234 )
234 235 paa = parser_engines.add_argument
235 236 paa('-n', '--number',
236 237 type=int, dest='Global.n',
237 238 help='The number of engines to start.',
238 239 metavar='Global.n')
239 240 paa('--daemon',
240 241 dest='Global.daemonize', action='store_true',
241 242 help='Daemonize the ipcluster program. This implies --log-to-file')
242 243 paa('--no-daemon',
243 244 dest='Global.daemonize', action='store_false',
244 245 help="Dont't daemonize the ipcluster program.")
245 246
246 247 #-----------------------------------------------------------------------------
247 248 # Main application
248 249 #-----------------------------------------------------------------------------
249 250
250 251
251 252 class IPClusterApp(ApplicationWithClusterDir):
252 253
253 254 name = u'ipcluster'
254 255 description = _description
255 256 usage = None
256 257 command_line_loader = IPClusterAppConfigLoader
257 258 default_config_file_name = default_config_file_name
258 259 default_log_level = logging.INFO
259 260 auto_create_cluster_dir = False
260 261
261 262 def create_default_config(self):
262 263 super(IPClusterApp, self).create_default_config()
263 264 self.default_config.Global.controller_launcher = \
264 265 'IPython.parallel.apps.launcher.LocalControllerLauncher'
265 266 self.default_config.Global.engine_launcher = \
266 267 'IPython.parallel.apps.launcher.LocalEngineSetLauncher'
267 268 self.default_config.Global.n = 2
268 269 self.default_config.Global.delay = 2
269 270 self.default_config.Global.reset_config = False
270 271 self.default_config.Global.clean_logs = True
271 272 self.default_config.Global.signal = signal.SIGINT
272 273 self.default_config.Global.daemonize = False
273 274
274 275 def find_resources(self):
275 276 subcommand = self.command_line_config.Global.subcommand
276 277 if subcommand=='list':
277 278 self.list_cluster_dirs()
278 279 # Exit immediately because there is nothing left to do.
279 280 self.exit()
280 281 elif subcommand=='create':
281 282 self.auto_create_cluster_dir = True
282 283 super(IPClusterApp, self).find_resources()
283 284 elif subcommand=='start' or subcommand=='stop':
284 285 self.auto_create_cluster_dir = True
285 286 try:
286 287 super(IPClusterApp, self).find_resources()
287 288 except ClusterDirError:
288 289 raise ClusterDirError(
289 290 "Could not find a cluster directory. A cluster dir must "
290 291 "be created before running 'ipcluster start'. Do "
291 292 "'ipcluster create -h' or 'ipcluster list -h' for more "
292 293 "information about creating and listing cluster dirs."
293 294 )
294 295 elif subcommand=='engines':
295 296 self.auto_create_cluster_dir = False
296 297 try:
297 298 super(IPClusterApp, self).find_resources()
298 299 except ClusterDirError:
299 300 raise ClusterDirError(
300 301 "Could not find a cluster directory. A cluster dir must "
301 302 "be created before running 'ipcluster start'. Do "
302 303 "'ipcluster create -h' or 'ipcluster list -h' for more "
303 304 "information about creating and listing cluster dirs."
304 305 )
305 306
306 307 def list_cluster_dirs(self):
307 308 # Find the search paths
308 309 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
309 310 if cluster_dir_paths:
310 311 cluster_dir_paths = cluster_dir_paths.split(':')
311 312 else:
312 313 cluster_dir_paths = []
313 314 try:
314 315 ipython_dir = self.command_line_config.Global.ipython_dir
315 316 except AttributeError:
316 317 ipython_dir = self.default_config.Global.ipython_dir
317 318 paths = [os.getcwd(), ipython_dir] + \
318 319 cluster_dir_paths
319 320 paths = list(set(paths))
320 321
321 322 self.log.info('Searching for cluster dirs in paths: %r' % paths)
322 323 for path in paths:
323 324 files = os.listdir(path)
324 325 for f in files:
325 326 full_path = os.path.join(path, f)
326 327 if os.path.isdir(full_path) and f.startswith('cluster_'):
327 328 profile = full_path.split('_')[-1]
328 329 start_cmd = 'ipcluster start -p %s -n 4' % profile
329 330 print start_cmd + " ==> " + full_path
330 331
331 332 def pre_construct(self):
332 333 # IPClusterApp.pre_construct() is where we cd to the working directory.
333 334 super(IPClusterApp, self).pre_construct()
334 335 config = self.master_config
335 336 try:
336 337 daemon = config.Global.daemonize
337 338 if daemon:
338 339 config.Global.log_to_file = True
339 340 except AttributeError:
340 341 pass
341 342
342 343 def construct(self):
343 344 config = self.master_config
344 345 subcmd = config.Global.subcommand
345 346 reset = config.Global.reset_config
346 347 if subcmd == 'list':
347 348 return
348 349 if subcmd == 'create':
349 350 self.log.info('Copying default config files to cluster directory '
350 351 '[overwrite=%r]' % (reset,))
351 352 self.cluster_dir_obj.copy_all_config_files(overwrite=reset)
352 353 if subcmd =='start':
353 354 self.cluster_dir_obj.copy_all_config_files(overwrite=False)
354 355 self.start_logging()
355 356 self.loop = ioloop.IOLoop.instance()
356 357 # reactor.callWhenRunning(self.start_launchers)
357 358 dc = ioloop.DelayedCallback(self.start_launchers, 0, self.loop)
358 359 dc.start()
359 360 if subcmd == 'engines':
360 361 self.start_logging()
361 362 self.loop = ioloop.IOLoop.instance()
362 363 # reactor.callWhenRunning(self.start_launchers)
363 364 engine_only = lambda : self.start_launchers(controller=False)
364 365 dc = ioloop.DelayedCallback(engine_only, 0, self.loop)
365 366 dc.start()
366 367
367 368 def start_launchers(self, controller=True):
368 369 config = self.master_config
369 370
370 371 # Create the launchers. In both bases, we set the work_dir of
371 372 # the launcher to the cluster_dir. This is where the launcher's
372 373 # subprocesses will be launched. It is not where the controller
373 374 # and engine will be launched.
374 375 if controller:
375 376 cl_class = import_item(config.Global.controller_launcher)
376 377 self.controller_launcher = cl_class(
377 378 work_dir=self.cluster_dir, config=config,
378 379 logname=self.log.name
379 380 )
380 381 # Setup the observing of stopping. If the controller dies, shut
381 382 # everything down as that will be completely fatal for the engines.
382 383 self.controller_launcher.on_stop(self.stop_launchers)
383 384 # But, we don't monitor the stopping of engines. An engine dying
384 385 # is just fine and in principle a user could start a new engine.
385 386 # Also, if we did monitor engine stopping, it is difficult to
386 387 # know what to do when only some engines die. Currently, the
387 388 # observing of engine stopping is inconsistent. Some launchers
388 389 # might trigger on a single engine stopping, other wait until
389 390 # all stop. TODO: think more about how to handle this.
390 391 else:
391 392 self.controller_launcher = None
392 393
393 394 el_class = import_item(config.Global.engine_launcher)
394 395 self.engine_launcher = el_class(
395 396 work_dir=self.cluster_dir, config=config, logname=self.log.name
396 397 )
397 398
398 399 # Setup signals
399 400 signal.signal(signal.SIGINT, self.sigint_handler)
400 401
401 402 # Start the controller and engines
402 403 self._stopping = False # Make sure stop_launchers is not called 2x.
403 404 if controller:
404 405 self.start_controller()
405 406 dc = ioloop.DelayedCallback(self.start_engines, 1000*config.Global.delay*controller, self.loop)
406 407 dc.start()
407 408 self.startup_message()
408 409
409 410 def startup_message(self, r=None):
410 411 self.log.info("IPython cluster: started")
411 412 return r
412 413
413 414 def start_controller(self, r=None):
414 415 # self.log.info("In start_controller")
415 416 config = self.master_config
416 417 d = self.controller_launcher.start(
417 418 cluster_dir=config.Global.cluster_dir
418 419 )
419 420 return d
420 421
421 422 def start_engines(self, r=None):
422 423 # self.log.info("In start_engines")
423 424 config = self.master_config
424 425
425 426 d = self.engine_launcher.start(
426 427 config.Global.n,
427 428 cluster_dir=config.Global.cluster_dir
428 429 )
429 430 return d
430 431
431 432 def stop_controller(self, r=None):
432 433 # self.log.info("In stop_controller")
433 434 if self.controller_launcher and self.controller_launcher.running:
434 435 return self.controller_launcher.stop()
435 436
436 437 def stop_engines(self, r=None):
437 438 # self.log.info("In stop_engines")
438 439 if self.engine_launcher.running:
439 440 d = self.engine_launcher.stop()
440 441 # d.addErrback(self.log_err)
441 442 return d
442 443 else:
443 444 return None
444 445
445 446 def log_err(self, f):
446 447 self.log.error(f.getTraceback())
447 448 return None
448 449
449 450 def stop_launchers(self, r=None):
450 451 if not self._stopping:
451 452 self._stopping = True
452 453 # if isinstance(r, failure.Failure):
453 454 # self.log.error('Unexpected error in ipcluster:')
454 455 # self.log.info(r.getTraceback())
455 456 self.log.error("IPython cluster: stopping")
456 457 # These return deferreds. We are not doing anything with them
457 458 # but we are holding refs to them as a reminder that they
458 459 # do return deferreds.
459 460 d1 = self.stop_engines()
460 461 d2 = self.stop_controller()
461 462 # Wait a few seconds to let things shut down.
462 463 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
463 464 dc.start()
464 465 # reactor.callLater(4.0, reactor.stop)
465 466
466 467 def sigint_handler(self, signum, frame):
467 468 self.stop_launchers()
468 469
469 470 def start_logging(self):
470 471 # Remove old log files of the controller and engine
471 472 if self.master_config.Global.clean_logs:
472 473 log_dir = self.master_config.Global.log_dir
473 474 for f in os.listdir(log_dir):
474 475 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
475 476 os.remove(os.path.join(log_dir, f))
476 477 # This will remove old log files for ipcluster itself
477 478 super(IPClusterApp, self).start_logging()
478 479
479 480 def start_app(self):
480 481 """Start the application, depending on what subcommand is used."""
481 482 subcmd = self.master_config.Global.subcommand
482 483 if subcmd=='create' or subcmd=='list':
483 484 return
484 485 elif subcmd=='start':
485 486 self.start_app_start()
486 487 elif subcmd=='stop':
487 488 self.start_app_stop()
488 489 elif subcmd=='engines':
489 490 self.start_app_engines()
490 491
491 492 def start_app_start(self):
492 493 """Start the app for the start subcommand."""
493 494 config = self.master_config
494 495 # First see if the cluster is already running
495 496 try:
496 497 pid = self.get_pid_from_file()
497 498 except PIDFileError:
498 499 pass
499 500 else:
500 self.log.critical(
501 'Cluster is already running with [pid=%s]. '
502 'use "ipcluster stop" to stop the cluster.' % pid
503 )
504 # Here I exit with a unusual exit status that other processes
505 # can watch for to learn how I existed.
506 self.exit(ALREADY_STARTED)
501 if self.check_pid(pid):
502 self.log.critical(
503 'Cluster is already running with [pid=%s]. '
504 'use "ipcluster stop" to stop the cluster.' % pid
505 )
506 # Here I exit with a unusual exit status that other processes
507 # can watch for to learn how I existed.
508 self.exit(ALREADY_STARTED)
509 else:
510 self.remove_pid_file()
511
507 512
508 513 # Now log and daemonize
509 514 self.log.info(
510 515 'Starting ipcluster with [daemon=%r]' % config.Global.daemonize
511 516 )
512 517 # TODO: Get daemonize working on Windows or as a Windows Server.
513 518 if config.Global.daemonize:
514 519 if os.name=='posix':
515 520 from twisted.scripts._twistd_unix import daemonize
516 521 daemonize()
517 522
518 523 # Now write the new pid file AFTER our new forked pid is active.
519 524 self.write_pid_file()
520 525 try:
521 526 self.loop.start()
522 527 except KeyboardInterrupt:
523 528 pass
524 529 except zmq.ZMQError as e:
525 530 if e.errno == errno.EINTR:
526 531 pass
527 532 else:
528 533 raise
529 self.remove_pid_file()
534 finally:
535 self.remove_pid_file()
530 536
531 537 def start_app_engines(self):
532 538 """Start the app for the start subcommand."""
533 539 config = self.master_config
534 540 # First see if the cluster is already running
535 541
536 542 # Now log and daemonize
537 543 self.log.info(
538 544 'Starting engines with [daemon=%r]' % config.Global.daemonize
539 545 )
540 546 # TODO: Get daemonize working on Windows or as a Windows Server.
541 547 if config.Global.daemonize:
542 548 if os.name=='posix':
543 549 from twisted.scripts._twistd_unix import daemonize
544 550 daemonize()
545 551
546 552 # Now write the new pid file AFTER our new forked pid is active.
547 553 # self.write_pid_file()
548 554 try:
549 555 self.loop.start()
550 556 except KeyboardInterrupt:
551 557 pass
552 558 except zmq.ZMQError as e:
553 559 if e.errno == errno.EINTR:
554 560 pass
555 561 else:
556 562 raise
557 563 # self.remove_pid_file()
558 564
559 565 def start_app_stop(self):
560 566 """Start the app for the stop subcommand."""
561 567 config = self.master_config
562 568 try:
563 569 pid = self.get_pid_from_file()
564 570 except PIDFileError:
565 571 self.log.critical(
566 'Problem reading pid file, cluster is probably not running.'
572 'Could not read pid file, cluster is probably not running.'
567 573 )
568 574 # Here I exit with a unusual exit status that other processes
569 575 # can watch for to learn how I existed.
576 self.remove_pid_file()
570 577 self.exit(ALREADY_STOPPED)
571 else:
572 if os.name=='posix':
573 sig = config.Global.signal
574 self.log.info(
575 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
576 )
578
579 if not self.check_pid(pid):
580 self.log.critical(
581 'Cluster [pid=%r] is not running.' % pid
582 )
583 self.remove_pid_file()
584 # Here I exit with a unusual exit status that other processes
585 # can watch for to learn how I existed.
586 self.exit(ALREADY_STOPPED)
587
588 elif os.name=='posix':
589 sig = config.Global.signal
590 self.log.info(
591 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
592 )
593 try:
577 594 os.kill(pid, sig)
578 elif os.name=='nt':
579 # As of right now, we don't support daemonize on Windows, so
580 # stop will not do anything. Minimally, it should clean up the
581 # old .pid files.
595 except OSError:
596 self.log.error("Stopping cluster failed, assuming already dead.",
597 exc_info=True)
582 598 self.remove_pid_file()
599 elif os.name=='nt':
600 try:
601 # kill the whole tree
602 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
603 except (CalledProcessError, OSError):
604 self.log.error("Stopping cluster failed, assuming already dead.",
605 exc_info=True)
606 self.remove_pid_file()
583 607
584 608
585 609 def launch_new_instance():
586 610 """Create and run the IPython cluster."""
587 611 app = IPClusterApp()
588 612 app.start()
589 613
590 614
591 615 if __name__ == '__main__':
592 616 launch_new_instance()
593 617
General Comments 0
You need to be logged in to leave comments. Login now