Show More
@@ -63,6 +63,10 b' ALREADY_STARTED = 10' | |||
|
63 | 63 | # file to be found. |
|
64 | 64 | ALREADY_STOPPED = 11 |
|
65 | 65 | |
|
66 | # This will be the exit code if ipcluster engines is run, but there is not .pid | |
|
67 | # file to be found. | |
|
68 | NO_CLUSTER = 12 | |
|
69 | ||
|
66 | 70 | |
|
67 | 71 | #----------------------------------------------------------------------------- |
|
68 | 72 | # Command line options |
@@ -164,6 +168,7 b' class IPClusterAppConfigLoader(ClusterDirConfigLoader):' | |||
|
164 | 168 | otherwise use the '--cluster-dir' option. |
|
165 | 169 | """ |
|
166 | 170 | ) |
|
171 | ||
|
167 | 172 | paa = parser_start.add_argument |
|
168 | 173 | paa('-n', '--number', |
|
169 | 174 | type=int, dest='Global.n', |
@@ -206,6 +211,35 b' class IPClusterAppConfigLoader(ClusterDirConfigLoader):' | |||
|
206 | 211 | help="The signal number to use in stopping the cluster (default=2).", |
|
207 | 212 | metavar="Global.signal") |
|
208 | 213 | |
|
214 | # the "engines" subcommand parser | |
|
215 | parser_engines = subparsers.add_parser( | |
|
216 | 'engines', | |
|
217 | parents=[parent_parser1, parent_parser2], | |
|
218 | argument_default=SUPPRESS, | |
|
219 | help="Attach some engines to an existing controller or cluster.", | |
|
220 | description= | |
|
221 | """Start one or more engines to connect to an existing Cluster | |
|
222 | by profile name or cluster directory. | |
|
223 | Cluster directories contain configuration, log and | |
|
224 | security related files and are named using the convention | |
|
225 | 'cluster_<profile>' and should be creating using the 'start' | |
|
226 | subcommand of 'ipcluster'. If your cluster directory is in | |
|
227 | the cwd or the ipython directory, you can simply refer to it | |
|
228 | using its profile name, 'ipclusterz engines -n 4 -p <profile>`, | |
|
229 | otherwise use the '--cluster-dir' option. | |
|
230 | """ | |
|
231 | ) | |
|
232 | paa = parser_engines.add_argument | |
|
233 | paa('-n', '--number', | |
|
234 | type=int, dest='Global.n', | |
|
235 | help='The number of engines to start.', | |
|
236 | metavar='Global.n') | |
|
237 | paa('--daemon', | |
|
238 | dest='Global.daemonize', action='store_true', | |
|
239 | help='Daemonize the ipcluster program. This implies --log-to-file') | |
|
240 | paa('--no-daemon', | |
|
241 | dest='Global.daemonize', action='store_false', | |
|
242 | help="Dont't daemonize the ipcluster program.") | |
|
209 | 243 | |
|
210 | 244 | #----------------------------------------------------------------------------- |
|
211 | 245 | # Main application |
@@ -232,7 +266,7 b' class IPClusterApp(ApplicationWithClusterDir):' | |||
|
232 | 266 | self.default_config.Global.delay = 1 |
|
233 | 267 | self.default_config.Global.reset_config = False |
|
234 | 268 | self.default_config.Global.clean_logs = True |
|
235 |
self.default_config.Global.signal = |
|
|
269 | self.default_config.Global.signal = signal.SIGINT | |
|
236 | 270 | self.default_config.Global.daemonize = False |
|
237 | 271 | |
|
238 | 272 | def find_resources(self): |
@@ -255,6 +289,17 b' class IPClusterApp(ApplicationWithClusterDir):' | |||
|
255 | 289 | "'ipclusterz create -h' or 'ipclusterz list -h' for more " |
|
256 | 290 | "information about creating and listing cluster dirs." |
|
257 | 291 | ) |
|
292 | elif subcommand=='engines': | |
|
293 | self.auto_create_cluster_dir = False | |
|
294 | try: | |
|
295 | super(IPClusterApp, self).find_resources() | |
|
296 | except ClusterDirError: | |
|
297 | raise ClusterDirError( | |
|
298 | "Could not find a cluster directory. A cluster dir must " | |
|
299 | "be created before running 'ipclusterz start'. Do " | |
|
300 | "'ipclusterz create -h' or 'ipclusterz list -h' for more " | |
|
301 | "information about creating and listing cluster dirs." | |
|
302 | ) | |
|
258 | 303 | |
|
259 | 304 | def list_cluster_dirs(self): |
|
260 | 305 | # Find the search paths |
@@ -309,31 +354,30 b' class IPClusterApp(ApplicationWithClusterDir):' | |||
|
309 | 354 | # reactor.callWhenRunning(self.start_launchers) |
|
310 | 355 | dc = ioloop.DelayedCallback(self.start_launchers, 0, self.loop) |
|
311 | 356 | dc.start() |
|
357 | if subcmd == 'engines': | |
|
358 | self.start_logging() | |
|
359 | self.loop = ioloop.IOLoop.instance() | |
|
360 | # reactor.callWhenRunning(self.start_launchers) | |
|
361 | engine_only = lambda : self.start_launchers(controller=False) | |
|
362 | dc = ioloop.DelayedCallback(engine_only, 0, self.loop) | |
|
363 | dc.start() | |
|
312 | 364 | |
|
313 | def start_launchers(self): | |
|
365 | def start_launchers(self, controller=True): | |
|
314 | 366 | config = self.master_config |
|
315 | 367 | |
|
316 | 368 | # Create the launchers. In both bases, we set the work_dir of |
|
317 | 369 | # the launcher to the cluster_dir. This is where the launcher's |
|
318 | 370 | # subprocesses will be launched. It is not where the controller |
|
319 | 371 | # and engine will be launched. |
|
320 | el_class = import_item(config.Global.engine_launcher) | |
|
321 | self.engine_launcher = el_class( | |
|
322 | work_dir=self.cluster_dir, config=config, logname=self.log.name | |
|
323 | ) | |
|
372 | if controller: | |
|
324 | 373 | cl_class = import_item(config.Global.controller_launcher) |
|
325 | 374 | self.controller_launcher = cl_class( |
|
326 | 375 | work_dir=self.cluster_dir, config=config, |
|
327 | 376 | logname=self.log.name |
|
328 | 377 | ) |
|
329 | ||
|
330 | # Setup signals | |
|
331 | signal.signal(signal.SIGINT, self.sigint_handler) | |
|
332 | ||
|
333 | 378 | # Setup the observing of stopping. If the controller dies, shut |
|
334 | 379 | # everything down as that will be completely fatal for the engines. |
|
335 | 380 | self.controller_launcher.on_stop(self.stop_launchers) |
|
336 | # d1.addCallback(self.stop_launchers) | |
|
337 | 381 | # But, we don't monitor the stopping of engines. An engine dying |
|
338 | 382 | # is just fine and in principle a user could start a new engine. |
|
339 | 383 | # Also, if we did monitor engine stopping, it is difficult to |
@@ -342,17 +386,22 b' class IPClusterApp(ApplicationWithClusterDir):' | |||
|
342 | 386 | # might trigger on a single engine stopping, other wait until |
|
343 | 387 | # all stop. TODO: think more about how to handle this. |
|
344 | 388 | |
|
389 | ||
|
390 | el_class = import_item(config.Global.engine_launcher) | |
|
391 | self.engine_launcher = el_class( | |
|
392 | work_dir=self.cluster_dir, config=config, logname=self.log.name | |
|
393 | ) | |
|
394 | ||
|
395 | # Setup signals | |
|
396 | signal.signal(signal.SIGINT, self.sigint_handler) | |
|
397 | ||
|
345 | 398 | # Start the controller and engines |
|
346 | 399 | self._stopping = False # Make sure stop_launchers is not called 2x. |
|
347 |
|
|
|
348 | dc = ioloop.DelayedCallback(self.start_engines, 1000*config.Global.delay, self.loop) | |
|
400 | if controller: | |
|
401 | self.start_controller() | |
|
402 | dc = ioloop.DelayedCallback(self.start_engines, 1000*config.Global.delay*controller, self.loop) | |
|
349 | 403 | dc.start() |
|
350 | 404 | self.startup_message() |
|
351 | # d.addCallback(self.start_engines) | |
|
352 | # d.addCallback(self.startup_message) | |
|
353 | # If the controller or engines fail to start, stop everything | |
|
354 | # d.addErrback(self.stop_launchers) | |
|
355 | return d | |
|
356 | 405 | |
|
357 | 406 | def startup_message(self, r=None): |
|
358 | 407 | self.log.info("IPython cluster: started") |
@@ -369,6 +418,7 b' class IPClusterApp(ApplicationWithClusterDir):' | |||
|
369 | 418 | def start_engines(self, r=None): |
|
370 | 419 | # self.log.info("In start_engines") |
|
371 | 420 | config = self.master_config |
|
421 | ||
|
372 | 422 | d = self.engine_launcher.start( |
|
373 | 423 | config.Global.n, |
|
374 | 424 | cluster_dir=config.Global.cluster_dir |
@@ -432,6 +482,8 b' class IPClusterApp(ApplicationWithClusterDir):' | |||
|
432 | 482 | self.start_app_start() |
|
433 | 483 | elif subcmd=='stop': |
|
434 | 484 | self.start_app_stop() |
|
485 | elif subcmd=='engines': | |
|
486 | self.start_app_engines() | |
|
435 | 487 | |
|
436 | 488 | def start_app_start(self): |
|
437 | 489 | """Start the app for the start subcommand.""" |
@@ -468,6 +520,29 b' class IPClusterApp(ApplicationWithClusterDir):' | |||
|
468 | 520 | self.log.info("stopping...") |
|
469 | 521 | self.remove_pid_file() |
|
470 | 522 | |
|
523 | def start_app_engines(self): | |
|
524 | """Start the app for the start subcommand.""" | |
|
525 | config = self.master_config | |
|
526 | # First see if the cluster is already running | |
|
527 | ||
|
528 | # Now log and daemonize | |
|
529 | self.log.info( | |
|
530 | 'Starting engines with [daemon=%r]' % config.Global.daemonize | |
|
531 | ) | |
|
532 | # TODO: Get daemonize working on Windows or as a Windows Server. | |
|
533 | if config.Global.daemonize: | |
|
534 | if os.name=='posix': | |
|
535 | from twisted.scripts._twistd_unix import daemonize | |
|
536 | daemonize() | |
|
537 | ||
|
538 | # Now write the new pid file AFTER our new forked pid is active. | |
|
539 | # self.write_pid_file() | |
|
540 | try: | |
|
541 | self.loop.start() | |
|
542 | except: | |
|
543 | self.log.fatal("stopping...") | |
|
544 | # self.remove_pid_file() | |
|
545 | ||
|
471 | 546 | def start_app_stop(self): |
|
472 | 547 | """Start the app for the stop subcommand.""" |
|
473 | 548 | config = self.master_config |
@@ -28,12 +28,12 b' except ImportError:' | |||
|
28 | 28 | |
|
29 | 29 | from subprocess import Popen, PIPE, STDOUT |
|
30 | 30 | try: |
|
31 |
from subprocess import check_o |
|
|
31 | from subprocess import check_output | |
|
32 | 32 | except ImportError: |
|
33 | 33 | # pre-2.7: |
|
34 | 34 | from StringIO import StringIO |
|
35 | 35 | |
|
36 |
def check_o |
|
|
36 | def check_output(*args, **kwargs): | |
|
37 | 37 | sio = StringIO() |
|
38 | 38 | kwargs.update(dict(stdout=PIPE, stderr=STDOUT)) |
|
39 | 39 | p = Popen(*args, **kwargs) |
@@ -495,7 +495,7 b' class SSHLauncher(LocalProcessLauncher):' | |||
|
495 | 495 | """ |
|
496 | 496 | |
|
497 | 497 | ssh_cmd = List(['ssh'], config=True) |
|
498 | ssh_args = List([], config=True) | |
|
498 | ssh_args = List(['-tt'], config=True) | |
|
499 | 499 | program = List(['date'], config=True) |
|
500 | 500 | program_args = List([], config=True) |
|
501 | 501 | hostname = Str('', config=True) |
@@ -513,12 +513,21 b' class SSHLauncher(LocalProcessLauncher):' | |||
|
513 | 513 | self.program + self.program_args |
|
514 | 514 | |
|
515 | 515 | def start(self, cluster_dir, hostname=None, user=None): |
|
516 | print self.config | |
|
516 | 517 | if hostname is not None: |
|
517 | 518 | self.hostname = hostname |
|
518 | 519 | if user is not None: |
|
519 | 520 | self.user = user |
|
521 | print (self.location, hostname, user) | |
|
520 | 522 | return super(SSHLauncher, self).start() |
|
521 | 523 | |
|
524 | def signal(self, sig): | |
|
525 | if self.state == 'running': | |
|
526 | # send escaped ssh connection-closer | |
|
527 | self.process.stdin.write('~.') | |
|
528 | self.process.stdin.flush() | |
|
529 | ||
|
530 | ||
|
522 | 531 | |
|
523 | 532 | class SSHControllerLauncher(SSHLauncher): |
|
524 | 533 | |
@@ -568,9 +577,9 b' class WindowsHPCLauncher(BaseLauncher):' | |||
|
568 | 577 | scheduler = Str('', config=True) |
|
569 | 578 | job_cmd = Str(find_job_cmd(), config=True) |
|
570 | 579 | |
|
571 | def __init__(self, work_dir=u'.', config=None): | |
|
580 | def __init__(self, work_dir=u'.', config=None, **kwargs): | |
|
572 | 581 | super(WindowsHPCLauncher, self).__init__( |
|
573 | work_dir=work_dir, config=config | |
|
582 | work_dir=work_dir, config=config, **kwargs | |
|
574 | 583 | ) |
|
575 | 584 | |
|
576 | 585 | @property |
@@ -730,9 +739,9 b' class BatchSystemLauncher(BaseLauncher):' | |||
|
730 | 739 | # The full path to the instantiated batch script. |
|
731 | 740 | batch_file = Unicode(u'') |
|
732 | 741 | |
|
733 | def __init__(self, work_dir=u'.', config=None): | |
|
742 | def __init__(self, work_dir=u'.', config=None, **kwargs): | |
|
734 | 743 | super(BatchSystemLauncher, self).__init__( |
|
735 | work_dir=work_dir, config=config | |
|
744 | work_dir=work_dir, config=config, **kwargs | |
|
736 | 745 | ) |
|
737 | 746 | self.batch_file = os.path.join(self.work_dir, self.batch_file_name) |
|
738 | 747 | self.context = {} |
@@ -766,7 +775,7 b' class BatchSystemLauncher(BaseLauncher):' | |||
|
766 | 775 | return job_id |
|
767 | 776 | |
|
768 | 777 | def stop(self): |
|
769 |
output = |
|
|
778 | output = check_output([self.delete_command, self.job_id], env=os.environ, stderr=STDOUT) | |
|
770 | 779 | self.notify_stop(output) # Pass the output of the kill cmd |
|
771 | 780 | return output |
|
772 | 781 |
General Comments 0
You need to be logged in to leave comments.
Login now