##// END OF EJS Templates
fix regex for cleaning old logs with ipcluster...
MinRK -
Show More
@@ -1,612 +1,610 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The ipcluster application.
5 5
6 6 Authors:
7 7
8 8 * Brian Granger
9 9 * MinRK
10 10
11 11 """
12 12 from __future__ import print_function
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Copyright (C) 2008-2011 The IPython Development Team
16 16 #
17 17 # Distributed under the terms of the BSD License. The full license is in
18 18 # the file COPYING, distributed as part of this software.
19 19 #-----------------------------------------------------------------------------
20 20
21 21 #-----------------------------------------------------------------------------
22 22 # Imports
23 23 #-----------------------------------------------------------------------------
24 24
25 25 import errno
26 26 import logging
27 27 import os
28 28 import re
29 29 import signal
30 30
31 31 from subprocess import check_call, CalledProcessError, PIPE
32 32 import zmq
33 33 from zmq.eventloop import ioloop
34 34
35 35 from IPython.config.application import Application, boolean_flag, catch_config_error
36 36 from IPython.config.loader import Config
37 37 from IPython.core.application import BaseIPythonApplication
38 38 from IPython.core.profiledir import ProfileDir
39 39 from IPython.utils.daemonize import daemonize
40 40 from IPython.utils.importstring import import_item
41 41 from IPython.utils.py3compat import string_types
42 42 from IPython.utils.sysinfo import num_cpus
43 43 from IPython.utils.traitlets import (Integer, Unicode, Bool, CFloat, Dict, List, Any,
44 44 DottedObjectName)
45 45
46 46 from IPython.parallel.apps.baseapp import (
47 47 BaseParallelApplication,
48 48 PIDFileError,
49 49 base_flags, base_aliases
50 50 )
51 51
52 52
53 53 #-----------------------------------------------------------------------------
54 54 # Module level variables
55 55 #-----------------------------------------------------------------------------
56 56
57 57
58 58 _description = """Start an IPython cluster for parallel computing.
59 59
60 60 An IPython cluster consists of 1 controller and 1 or more engines.
61 61 This command automates the startup of these processes using a wide range of
62 62 startup methods (SSH, local processes, PBS, mpiexec, SGE, LSF, HTCondor,
63 63 Windows HPC Server 2008). To start a cluster with 4 engines on your
64 64 local host simply do 'ipcluster start --n=4'. For more complex usage
65 65 you will typically do 'ipython profile create mycluster --parallel', then edit
66 66 configuration files, followed by 'ipcluster start --profile=mycluster --n=4'.
67 67 """
68 68
69 69 _main_examples = """
70 70 ipcluster start --n=4 # start a 4 node cluster on localhost
71 71 ipcluster start -h # show the help string for the start subcmd
72 72
73 73 ipcluster stop -h # show the help string for the stop subcmd
74 74 ipcluster engines -h # show the help string for the engines subcmd
75 75 """
76 76
77 77 _start_examples = """
78 78 ipython profile create mycluster --parallel # create mycluster profile
79 79 ipcluster start --profile=mycluster --n=4 # start mycluster with 4 nodes
80 80 """
81 81
82 82 _stop_examples = """
83 83 ipcluster stop --profile=mycluster # stop a running cluster by profile name
84 84 """
85 85
86 86 _engines_examples = """
87 87 ipcluster engines --profile=mycluster --n=4 # start 4 engines only
88 88 """
89 89
90 90
91 91 # Exit codes for ipcluster
92 92
93 93 # This will be the exit code if the ipcluster appears to be running because
94 94 # a .pid file exists
95 95 ALREADY_STARTED = 10
96 96
97 97
98 98 # This will be the exit code if ipcluster stop is run, but there is not .pid
99 99 # file to be found.
100 100 ALREADY_STOPPED = 11
101 101
102 102 # This will be the exit code if ipcluster engines is run, but there is not .pid
103 103 # file to be found.
104 104 NO_CLUSTER = 12
105 105
106 106
107 107 #-----------------------------------------------------------------------------
108 108 # Utilities
109 109 #-----------------------------------------------------------------------------
110 110
111 111 def find_launcher_class(clsname, kind):
112 112 """Return a launcher for a given clsname and kind.
113 113
114 114 Parameters
115 115 ==========
116 116 clsname : str
117 117 The full name of the launcher class, either with or without the
118 118 module path, or an abbreviation (MPI, SSH, SGE, PBS, LSF, HTCondor
119 119 WindowsHPC).
120 120 kind : str
121 121 Either 'EngineSet' or 'Controller'.
122 122 """
123 123 if '.' not in clsname:
124 124 # not a module, presume it's the raw name in apps.launcher
125 125 if kind and kind not in clsname:
126 126 # doesn't match necessary full class name, assume it's
127 127 # just 'PBS' or 'MPI' etc prefix:
128 128 clsname = clsname + kind + 'Launcher'
129 129 clsname = 'IPython.parallel.apps.launcher.'+clsname
130 130 klass = import_item(clsname)
131 131 return klass
132 132
133 133 #-----------------------------------------------------------------------------
134 134 # Main application
135 135 #-----------------------------------------------------------------------------
136 136
137 137 start_help = """Start an IPython cluster for parallel computing
138 138
139 139 Start an ipython cluster by its profile name or cluster
140 140 directory. Cluster directories contain configuration, log and
141 141 security related files and are named using the convention
142 142 'profile_<name>' and should be creating using the 'start'
143 143 subcommand of 'ipcluster'. If your cluster directory is in
144 144 the cwd or the ipython directory, you can simply refer to it
145 145 using its profile name, 'ipcluster start --n=4 --profile=<profile>`,
146 146 otherwise use the 'profile-dir' option.
147 147 """
148 148 stop_help = """Stop a running IPython cluster
149 149
150 150 Stop a running ipython cluster by its profile name or cluster
151 151 directory. Cluster directories are named using the convention
152 152 'profile_<name>'. If your cluster directory is in
153 153 the cwd or the ipython directory, you can simply refer to it
154 154 using its profile name, 'ipcluster stop --profile=<profile>`, otherwise
155 155 use the '--profile-dir' option.
156 156 """
157 157 engines_help = """Start engines connected to an existing IPython cluster
158 158
159 159 Start one or more engines to connect to an existing Cluster
160 160 by profile name or cluster directory.
161 161 Cluster directories contain configuration, log and
162 162 security related files and are named using the convention
163 163 'profile_<name>' and should be creating using the 'start'
164 164 subcommand of 'ipcluster'. If your cluster directory is in
165 165 the cwd or the ipython directory, you can simply refer to it
166 166 using its profile name, 'ipcluster engines --n=4 --profile=<profile>`,
167 167 otherwise use the 'profile-dir' option.
168 168 """
169 169 stop_aliases = dict(
170 170 signal='IPClusterStop.signal',
171 171 )
172 172 stop_aliases.update(base_aliases)
173 173
174 174 class IPClusterStop(BaseParallelApplication):
175 175 name = u'ipcluster'
176 176 description = stop_help
177 177 examples = _stop_examples
178 178
179 179 signal = Integer(signal.SIGINT, config=True,
180 180 help="signal to use for stopping processes.")
181 181
182 182 aliases = Dict(stop_aliases)
183 183
184 184 def start(self):
185 185 """Start the app for the stop subcommand."""
186 186 try:
187 187 pid = self.get_pid_from_file()
188 188 except PIDFileError:
189 189 self.log.critical(
190 190 'Could not read pid file, cluster is probably not running.'
191 191 )
192 192 # Here I exit with a unusual exit status that other processes
193 193 # can watch for to learn how I existed.
194 194 self.remove_pid_file()
195 195 self.exit(ALREADY_STOPPED)
196 196
197 197 if not self.check_pid(pid):
198 198 self.log.critical(
199 199 'Cluster [pid=%r] is not running.' % pid
200 200 )
201 201 self.remove_pid_file()
202 202 # Here I exit with a unusual exit status that other processes
203 203 # can watch for to learn how I existed.
204 204 self.exit(ALREADY_STOPPED)
205 205
206 206 elif os.name=='posix':
207 207 sig = self.signal
208 208 self.log.info(
209 209 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
210 210 )
211 211 try:
212 212 os.kill(pid, sig)
213 213 except OSError:
214 214 self.log.error("Stopping cluster failed, assuming already dead.",
215 215 exc_info=True)
216 216 self.remove_pid_file()
217 217 elif os.name=='nt':
218 218 try:
219 219 # kill the whole tree
220 220 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
221 221 except (CalledProcessError, OSError):
222 222 self.log.error("Stopping cluster failed, assuming already dead.",
223 223 exc_info=True)
224 224 self.remove_pid_file()
225 225
226 226 engine_aliases = {}
227 227 engine_aliases.update(base_aliases)
228 228 engine_aliases.update(dict(
229 229 n='IPClusterEngines.n',
230 230 engines = 'IPClusterEngines.engine_launcher_class',
231 231 daemonize = 'IPClusterEngines.daemonize',
232 232 ))
233 233 engine_flags = {}
234 234 engine_flags.update(base_flags)
235 235
236 236 engine_flags.update(dict(
237 237 daemonize=(
238 238 {'IPClusterEngines' : {'daemonize' : True}},
239 239 """run the cluster into the background (not available on Windows)""",
240 240 )
241 241 ))
242 242 class IPClusterEngines(BaseParallelApplication):
243 243
244 244 name = u'ipcluster'
245 245 description = engines_help
246 246 examples = _engines_examples
247 247 usage = None
248 248 default_log_level = logging.INFO
249 249 classes = List()
250 250 def _classes_default(self):
251 251 from IPython.parallel.apps import launcher
252 252 launchers = launcher.all_launchers
253 253 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
254 254 return [ProfileDir]+eslaunchers
255 255
256 256 n = Integer(num_cpus(), config=True,
257 257 help="""The number of engines to start. The default is to use one for each
258 258 CPU on your machine""")
259 259
260 260 engine_launcher = Any(config=True, help="Deprecated, use engine_launcher_class")
261 261 def _engine_launcher_changed(self, name, old, new):
262 262 if isinstance(new, string_types):
263 263 self.log.warn("WARNING: %s.engine_launcher is deprecated as of 0.12,"
264 264 " use engine_launcher_class" % self.__class__.__name__)
265 265 self.engine_launcher_class = new
266 266 engine_launcher_class = DottedObjectName('LocalEngineSetLauncher',
267 267 config=True,
268 268 help="""The class for launching a set of Engines. Change this value
269 269 to use various batch systems to launch your engines, such as PBS,SGE,MPI,etc.
270 270 Each launcher class has its own set of configuration options, for making sure
271 271 it will work in your environment.
272 272
273 273 You can also write your own launcher, and specify it's absolute import path,
274 274 as in 'mymodule.launcher.FTLEnginesLauncher`.
275 275
276 276 IPython's bundled examples include:
277 277
278 278 Local : start engines locally as subprocesses [default]
279 279 MPI : use mpiexec to launch engines in an MPI environment
280 280 PBS : use PBS (qsub) to submit engines to a batch queue
281 281 SGE : use SGE (qsub) to submit engines to a batch queue
282 282 LSF : use LSF (bsub) to submit engines to a batch queue
283 283 SSH : use SSH to start the controller
284 284 Note that SSH does *not* move the connection files
285 285 around, so you will likely have to do this manually
286 286 unless the machines are on a shared file system.
287 287 HTCondor : use HTCondor to submit engines to a batch queue
288 288 WindowsHPC : use Windows HPC
289 289
290 290 If you are using one of IPython's builtin launchers, you can specify just the
291 291 prefix, e.g:
292 292
293 293 c.IPClusterEngines.engine_launcher_class = 'SSH'
294 294
295 295 or:
296 296
297 297 ipcluster start --engines=MPI
298 298
299 299 """
300 300 )
301 301 daemonize = Bool(False, config=True,
302 302 help="""Daemonize the ipcluster program. This implies --log-to-file.
303 303 Not available on Windows.
304 304 """)
305 305
306 306 def _daemonize_changed(self, name, old, new):
307 307 if new:
308 308 self.log_to_file = True
309 309
310 310 early_shutdown = Integer(30, config=True, help="The timeout (in seconds)")
311 311 _stopping = False
312 312
313 313 aliases = Dict(engine_aliases)
314 314 flags = Dict(engine_flags)
315 315
316 316 @catch_config_error
317 317 def initialize(self, argv=None):
318 318 super(IPClusterEngines, self).initialize(argv)
319 319 self.init_signal()
320 320 self.init_launchers()
321 321
322 322 def init_launchers(self):
323 323 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
324 324
325 325 def init_signal(self):
326 326 # Setup signals
327 327 signal.signal(signal.SIGINT, self.sigint_handler)
328 328
329 329 def build_launcher(self, clsname, kind=None):
330 330 """import and instantiate a Launcher based on importstring"""
331 331 try:
332 332 klass = find_launcher_class(clsname, kind)
333 333 except (ImportError, KeyError):
334 334 self.log.fatal("Could not import launcher class: %r"%clsname)
335 335 self.exit(1)
336 336
337 337 launcher = klass(
338 338 work_dir=u'.', parent=self, log=self.log,
339 339 profile_dir=self.profile_dir.location, cluster_id=self.cluster_id,
340 340 )
341 341 return launcher
342 342
343 343 def engines_started_ok(self):
344 344 self.log.info("Engines appear to have started successfully")
345 345 self.early_shutdown = 0
346 346
347 347 def start_engines(self):
348 348 # Some EngineSetLaunchers ignore `n` and use their own engine count, such as SSH:
349 349 n = getattr(self.engine_launcher, 'engine_count', self.n)
350 350 self.log.info("Starting %s Engines with %s", n, self.engine_launcher_class)
351 351 self.engine_launcher.start(self.n)
352 352 self.engine_launcher.on_stop(self.engines_stopped_early)
353 353 if self.early_shutdown:
354 354 ioloop.DelayedCallback(self.engines_started_ok, self.early_shutdown*1000, self.loop).start()
355 355
356 356 def engines_stopped_early(self, r):
357 357 if self.early_shutdown and not self._stopping:
358 358 self.log.error("""
359 359 Engines shutdown early, they probably failed to connect.
360 360
361 361 Check the engine log files for output.
362 362
363 363 If your controller and engines are not on the same machine, you probably
364 364 have to instruct the controller to listen on an interface other than localhost.
365 365
366 366 You can set this by adding "--ip='*'" to your ControllerLauncher.controller_args.
367 367
368 368 Be sure to read our security docs before instructing your controller to listen on
369 369 a public interface.
370 370 """)
371 371 self.stop_launchers()
372 372
373 373 return self.engines_stopped(r)
374 374
375 375 def engines_stopped(self, r):
376 376 return self.loop.stop()
377 377
378 378 def stop_engines(self):
379 379 if self.engine_launcher.running:
380 380 self.log.info("Stopping Engines...")
381 381 d = self.engine_launcher.stop()
382 382 return d
383 383 else:
384 384 return None
385 385
386 386 def stop_launchers(self, r=None):
387 387 if not self._stopping:
388 388 self._stopping = True
389 389 self.log.error("IPython cluster: stopping")
390 390 self.stop_engines()
391 391 # Wait a few seconds to let things shut down.
392 392 dc = ioloop.DelayedCallback(self.loop.stop, 3000, self.loop)
393 393 dc.start()
394 394
395 395 def sigint_handler(self, signum, frame):
396 396 self.log.debug("SIGINT received, stopping launchers...")
397 397 self.stop_launchers()
398 398
399 399 def start_logging(self):
400 400 # Remove old log files of the controller and engine
401 401 if self.clean_logs:
402 402 log_dir = self.profile_dir.log_dir
403 403 for f in os.listdir(log_dir):
404 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
404 if re.match(r'ip(engine|controller)-.+\.(log|err|out)',f):
405 405 os.remove(os.path.join(log_dir, f))
406 # This will remove old log files for ipcluster itself
407 # super(IPBaseParallelApplication, self).start_logging()
408 406
409 407 def start(self):
410 408 """Start the app for the engines subcommand."""
411 409 self.log.info("IPython cluster: started")
412 410 # First see if the cluster is already running
413 411
414 412 # Now log and daemonize
415 413 self.log.info(
416 414 'Starting engines with [daemon=%r]' % self.daemonize
417 415 )
418 416 # TODO: Get daemonize working on Windows or as a Windows Server.
419 417 if self.daemonize:
420 418 if os.name=='posix':
421 419 daemonize()
422 420
423 421 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
424 422 dc.start()
425 423 # Now write the new pid file AFTER our new forked pid is active.
426 424 # self.write_pid_file()
427 425 try:
428 426 self.loop.start()
429 427 except KeyboardInterrupt:
430 428 pass
431 429 except zmq.ZMQError as e:
432 430 if e.errno == errno.EINTR:
433 431 pass
434 432 else:
435 433 raise
436 434
437 435 start_aliases = {}
438 436 start_aliases.update(engine_aliases)
439 437 start_aliases.update(dict(
440 438 delay='IPClusterStart.delay',
441 439 controller = 'IPClusterStart.controller_launcher_class',
442 440 ))
443 441 start_aliases['clean-logs'] = 'IPClusterStart.clean_logs'
444 442
445 443 class IPClusterStart(IPClusterEngines):
446 444
447 445 name = u'ipcluster'
448 446 description = start_help
449 447 examples = _start_examples
450 448 default_log_level = logging.INFO
451 449 auto_create = Bool(True, config=True,
452 450 help="whether to create the profile_dir if it doesn't exist")
453 451 classes = List()
454 452 def _classes_default(self,):
455 453 from IPython.parallel.apps import launcher
456 454 return [ProfileDir] + [IPClusterEngines] + launcher.all_launchers
457 455
458 456 clean_logs = Bool(True, config=True,
459 457 help="whether to cleanup old logs before starting")
460 458
461 459 delay = CFloat(1., config=True,
462 460 help="delay (in s) between starting the controller and the engines")
463 461
464 462 controller_launcher = Any(config=True, help="Deprecated, use controller_launcher_class")
465 463 def _controller_launcher_changed(self, name, old, new):
466 464 if isinstance(new, string_types):
467 465 # old 0.11-style config
468 466 self.log.warn("WARNING: %s.controller_launcher is deprecated as of 0.12,"
469 467 " use controller_launcher_class" % self.__class__.__name__)
470 468 self.controller_launcher_class = new
471 469 controller_launcher_class = DottedObjectName('LocalControllerLauncher',
472 470 config=True,
473 471 help="""The class for launching a Controller. Change this value if you want
474 472 your controller to also be launched by a batch system, such as PBS,SGE,MPI,etc.
475 473
476 474 Each launcher class has its own set of configuration options, for making sure
477 475 it will work in your environment.
478 476
479 477 Note that using a batch launcher for the controller *does not* put it
480 478 in the same batch job as the engines, so they will still start separately.
481 479
482 480 IPython's bundled examples include:
483 481
484 482 Local : start engines locally as subprocesses
485 483 MPI : use mpiexec to launch the controller in an MPI universe
486 484 PBS : use PBS (qsub) to submit the controller to a batch queue
487 485 SGE : use SGE (qsub) to submit the controller to a batch queue
488 486 LSF : use LSF (bsub) to submit the controller to a batch queue
489 487 HTCondor : use HTCondor to submit the controller to a batch queue
490 488 SSH : use SSH to start the controller
491 489 WindowsHPC : use Windows HPC
492 490
493 491 If you are using one of IPython's builtin launchers, you can specify just the
494 492 prefix, e.g:
495 493
496 494 c.IPClusterStart.controller_launcher_class = 'SSH'
497 495
498 496 or:
499 497
500 498 ipcluster start --controller=MPI
501 499
502 500 """
503 501 )
504 502 reset = Bool(False, config=True,
505 503 help="Whether to reset config files as part of '--create'."
506 504 )
507 505
508 506 # flags = Dict(flags)
509 507 aliases = Dict(start_aliases)
510 508
511 509 def init_launchers(self):
512 510 self.controller_launcher = self.build_launcher(self.controller_launcher_class, 'Controller')
513 511 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
514 512
515 513 def engines_stopped(self, r):
516 514 """prevent parent.engines_stopped from stopping everything on engine shutdown"""
517 515 pass
518 516
519 517 def start_controller(self):
520 518 self.log.info("Starting Controller with %s", self.controller_launcher_class)
521 519 self.controller_launcher.on_stop(self.stop_launchers)
522 520 self.controller_launcher.start()
523 521
524 522 def stop_controller(self):
525 523 # self.log.info("In stop_controller")
526 524 if self.controller_launcher and self.controller_launcher.running:
527 525 return self.controller_launcher.stop()
528 526
529 527 def stop_launchers(self, r=None):
530 528 if not self._stopping:
531 529 self.stop_controller()
532 530 super(IPClusterStart, self).stop_launchers()
533 531
534 532 def start(self):
535 533 """Start the app for the start subcommand."""
536 534 # First see if the cluster is already running
537 535 try:
538 536 pid = self.get_pid_from_file()
539 537 except PIDFileError:
540 538 pass
541 539 else:
542 540 if self.check_pid(pid):
543 541 self.log.critical(
544 542 'Cluster is already running with [pid=%s]. '
545 543 'use "ipcluster stop" to stop the cluster.' % pid
546 544 )
547 545 # Here I exit with a unusual exit status that other processes
548 546 # can watch for to learn how I existed.
549 547 self.exit(ALREADY_STARTED)
550 548 else:
551 549 self.remove_pid_file()
552 550
553 551
554 552 # Now log and daemonize
555 553 self.log.info(
556 554 'Starting ipcluster with [daemon=%r]' % self.daemonize
557 555 )
558 556 # TODO: Get daemonize working on Windows or as a Windows Server.
559 557 if self.daemonize:
560 558 if os.name=='posix':
561 559 daemonize()
562 560
563 561 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
564 562 dc.start()
565 563 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
566 564 dc.start()
567 565 # Now write the new pid file AFTER our new forked pid is active.
568 566 self.write_pid_file()
569 567 try:
570 568 self.loop.start()
571 569 except KeyboardInterrupt:
572 570 pass
573 571 except zmq.ZMQError as e:
574 572 if e.errno == errno.EINTR:
575 573 pass
576 574 else:
577 575 raise
578 576 finally:
579 577 self.remove_pid_file()
580 578
581 579 base='IPython.parallel.apps.ipclusterapp.IPCluster'
582 580
583 581 class IPClusterApp(BaseIPythonApplication):
584 582 name = u'ipcluster'
585 583 description = _description
586 584 examples = _main_examples
587 585
588 586 subcommands = {
589 587 'start' : (base+'Start', start_help),
590 588 'stop' : (base+'Stop', stop_help),
591 589 'engines' : (base+'Engines', engines_help),
592 590 }
593 591
594 592 # no aliases or flags for parent App
595 593 aliases = Dict()
596 594 flags = Dict()
597 595
598 596 def start(self):
599 597 if self.subapp is None:
600 598 print("No subcommand specified. Must specify one of: %s"%(self.subcommands.keys()))
601 599 print()
602 600 self.print_description()
603 601 self.print_subcommands()
604 602 self.exit(1)
605 603 else:
606 604 return self.subapp.start()
607 605
608 606 launch_new_instance = IPClusterApp.launch_instance
609 607
610 608 if __name__ == '__main__':
611 609 launch_new_instance()
612 610
General Comments 0
You need to be logged in to leave comments. Login now