##// END OF EJS Templates
add ipcluster engines; fix ssh process shutdown
MinRK -
Show More
@@ -63,6 +63,10 b' ALREADY_STARTED = 10'
63 63 # file to be found.
64 64 ALREADY_STOPPED = 11
65 65
66 # This will be the exit code if ipcluster engines is run, but there is not .pid
67 # file to be found.
68 NO_CLUSTER = 12
69
66 70
67 71 #-----------------------------------------------------------------------------
68 72 # Command line options
@@ -164,6 +168,7 b' class IPClusterAppConfigLoader(ClusterDirConfigLoader):'
164 168 otherwise use the '--cluster-dir' option.
165 169 """
166 170 )
171
167 172 paa = parser_start.add_argument
168 173 paa('-n', '--number',
169 174 type=int, dest='Global.n',
@@ -206,6 +211,35 b' class IPClusterAppConfigLoader(ClusterDirConfigLoader):'
206 211 help="The signal number to use in stopping the cluster (default=2).",
207 212 metavar="Global.signal")
208 213
214 # the "engines" subcommand parser
215 parser_engines = subparsers.add_parser(
216 'engines',
217 parents=[parent_parser1, parent_parser2],
218 argument_default=SUPPRESS,
219 help="Attach some engines to an existing controller or cluster.",
220 description=
221 """Start one or more engines to connect to an existing Cluster
222 by profile name or cluster directory.
223 Cluster directories contain configuration, log and
224 security related files and are named using the convention
225 'cluster_<profile>' and should be creating using the 'start'
226 subcommand of 'ipcluster'. If your cluster directory is in
227 the cwd or the ipython directory, you can simply refer to it
228 using its profile name, 'ipclusterz engines -n 4 -p <profile>`,
229 otherwise use the '--cluster-dir' option.
230 """
231 )
232 paa = parser_engines.add_argument
233 paa('-n', '--number',
234 type=int, dest='Global.n',
235 help='The number of engines to start.',
236 metavar='Global.n')
237 paa('--daemon',
238 dest='Global.daemonize', action='store_true',
239 help='Daemonize the ipcluster program. This implies --log-to-file')
240 paa('--no-daemon',
241 dest='Global.daemonize', action='store_false',
242 help="Dont't daemonize the ipcluster program.")
209 243
210 244 #-----------------------------------------------------------------------------
211 245 # Main application
@@ -232,7 +266,7 b' class IPClusterApp(ApplicationWithClusterDir):'
232 266 self.default_config.Global.delay = 1
233 267 self.default_config.Global.reset_config = False
234 268 self.default_config.Global.clean_logs = True
235 self.default_config.Global.signal = 2
269 self.default_config.Global.signal = signal.SIGINT
236 270 self.default_config.Global.daemonize = False
237 271
238 272 def find_resources(self):
@@ -255,6 +289,17 b' class IPClusterApp(ApplicationWithClusterDir):'
255 289 "'ipclusterz create -h' or 'ipclusterz list -h' for more "
256 290 "information about creating and listing cluster dirs."
257 291 )
292 elif subcommand=='engines':
293 self.auto_create_cluster_dir = False
294 try:
295 super(IPClusterApp, self).find_resources()
296 except ClusterDirError:
297 raise ClusterDirError(
298 "Could not find a cluster directory. A cluster dir must "
299 "be created before running 'ipclusterz start'. Do "
300 "'ipclusterz create -h' or 'ipclusterz list -h' for more "
301 "information about creating and listing cluster dirs."
302 )
258 303
259 304 def list_cluster_dirs(self):
260 305 # Find the search paths
@@ -309,31 +354,30 b' class IPClusterApp(ApplicationWithClusterDir):'
309 354 # reactor.callWhenRunning(self.start_launchers)
310 355 dc = ioloop.DelayedCallback(self.start_launchers, 0, self.loop)
311 356 dc.start()
357 if subcmd == 'engines':
358 self.start_logging()
359 self.loop = ioloop.IOLoop.instance()
360 # reactor.callWhenRunning(self.start_launchers)
361 engine_only = lambda : self.start_launchers(controller=False)
362 dc = ioloop.DelayedCallback(engine_only, 0, self.loop)
363 dc.start()
312 364
313 def start_launchers(self):
365 def start_launchers(self, controller=True):
314 366 config = self.master_config
315 367
316 368 # Create the launchers. In both bases, we set the work_dir of
317 369 # the launcher to the cluster_dir. This is where the launcher's
318 370 # subprocesses will be launched. It is not where the controller
319 371 # and engine will be launched.
320 el_class = import_item(config.Global.engine_launcher)
321 self.engine_launcher = el_class(
322 work_dir=self.cluster_dir, config=config, logname=self.log.name
323 )
372 if controller:
324 373 cl_class = import_item(config.Global.controller_launcher)
325 374 self.controller_launcher = cl_class(
326 375 work_dir=self.cluster_dir, config=config,
327 376 logname=self.log.name
328 377 )
329
330 # Setup signals
331 signal.signal(signal.SIGINT, self.sigint_handler)
332
333 378 # Setup the observing of stopping. If the controller dies, shut
334 379 # everything down as that will be completely fatal for the engines.
335 380 self.controller_launcher.on_stop(self.stop_launchers)
336 # d1.addCallback(self.stop_launchers)
337 381 # But, we don't monitor the stopping of engines. An engine dying
338 382 # is just fine and in principle a user could start a new engine.
339 383 # Also, if we did monitor engine stopping, it is difficult to
@@ -342,17 +386,22 b' class IPClusterApp(ApplicationWithClusterDir):'
342 386 # might trigger on a single engine stopping, other wait until
343 387 # all stop. TODO: think more about how to handle this.
344 388
389
390 el_class = import_item(config.Global.engine_launcher)
391 self.engine_launcher = el_class(
392 work_dir=self.cluster_dir, config=config, logname=self.log.name
393 )
394
395 # Setup signals
396 signal.signal(signal.SIGINT, self.sigint_handler)
397
345 398 # Start the controller and engines
346 399 self._stopping = False # Make sure stop_launchers is not called 2x.
347 d = self.start_controller()
348 dc = ioloop.DelayedCallback(self.start_engines, 1000*config.Global.delay, self.loop)
400 if controller:
401 self.start_controller()
402 dc = ioloop.DelayedCallback(self.start_engines, 1000*config.Global.delay*controller, self.loop)
349 403 dc.start()
350 404 self.startup_message()
351 # d.addCallback(self.start_engines)
352 # d.addCallback(self.startup_message)
353 # If the controller or engines fail to start, stop everything
354 # d.addErrback(self.stop_launchers)
355 return d
356 405
357 406 def startup_message(self, r=None):
358 407 self.log.info("IPython cluster: started")
@@ -369,6 +418,7 b' class IPClusterApp(ApplicationWithClusterDir):'
369 418 def start_engines(self, r=None):
370 419 # self.log.info("In start_engines")
371 420 config = self.master_config
421
372 422 d = self.engine_launcher.start(
373 423 config.Global.n,
374 424 cluster_dir=config.Global.cluster_dir
@@ -432,6 +482,8 b' class IPClusterApp(ApplicationWithClusterDir):'
432 482 self.start_app_start()
433 483 elif subcmd=='stop':
434 484 self.start_app_stop()
485 elif subcmd=='engines':
486 self.start_app_engines()
435 487
436 488 def start_app_start(self):
437 489 """Start the app for the start subcommand."""
@@ -468,6 +520,29 b' class IPClusterApp(ApplicationWithClusterDir):'
468 520 self.log.info("stopping...")
469 521 self.remove_pid_file()
470 522
523 def start_app_engines(self):
524 """Start the app for the start subcommand."""
525 config = self.master_config
526 # First see if the cluster is already running
527
528 # Now log and daemonize
529 self.log.info(
530 'Starting engines with [daemon=%r]' % config.Global.daemonize
531 )
532 # TODO: Get daemonize working on Windows or as a Windows Server.
533 if config.Global.daemonize:
534 if os.name=='posix':
535 from twisted.scripts._twistd_unix import daemonize
536 daemonize()
537
538 # Now write the new pid file AFTER our new forked pid is active.
539 # self.write_pid_file()
540 try:
541 self.loop.start()
542 except:
543 self.log.fatal("stopping...")
544 # self.remove_pid_file()
545
471 546 def start_app_stop(self):
472 547 """Start the app for the stop subcommand."""
473 548 config = self.master_config
@@ -28,12 +28,12 b' except ImportError:'
28 28
29 29 from subprocess import Popen, PIPE, STDOUT
30 30 try:
31 from subprocess import check_open
31 from subprocess import check_output
32 32 except ImportError:
33 33 # pre-2.7:
34 34 from StringIO import StringIO
35 35
36 def check_open(*args, **kwargs):
36 def check_output(*args, **kwargs):
37 37 sio = StringIO()
38 38 kwargs.update(dict(stdout=PIPE, stderr=STDOUT))
39 39 p = Popen(*args, **kwargs)
@@ -495,7 +495,7 b' class SSHLauncher(LocalProcessLauncher):'
495 495 """
496 496
497 497 ssh_cmd = List(['ssh'], config=True)
498 ssh_args = List([], config=True)
498 ssh_args = List(['-tt'], config=True)
499 499 program = List(['date'], config=True)
500 500 program_args = List([], config=True)
501 501 hostname = Str('', config=True)
@@ -513,12 +513,21 b' class SSHLauncher(LocalProcessLauncher):'
513 513 self.program + self.program_args
514 514
515 515 def start(self, cluster_dir, hostname=None, user=None):
516 print self.config
516 517 if hostname is not None:
517 518 self.hostname = hostname
518 519 if user is not None:
519 520 self.user = user
521 print (self.location, hostname, user)
520 522 return super(SSHLauncher, self).start()
521 523
524 def signal(self, sig):
525 if self.state == 'running':
526 # send escaped ssh connection-closer
527 self.process.stdin.write('~.')
528 self.process.stdin.flush()
529
530
522 531
523 532 class SSHControllerLauncher(SSHLauncher):
524 533
@@ -568,9 +577,9 b' class WindowsHPCLauncher(BaseLauncher):'
568 577 scheduler = Str('', config=True)
569 578 job_cmd = Str(find_job_cmd(), config=True)
570 579
571 def __init__(self, work_dir=u'.', config=None):
580 def __init__(self, work_dir=u'.', config=None, **kwargs):
572 581 super(WindowsHPCLauncher, self).__init__(
573 work_dir=work_dir, config=config
582 work_dir=work_dir, config=config, **kwargs
574 583 )
575 584
576 585 @property
@@ -730,9 +739,9 b' class BatchSystemLauncher(BaseLauncher):'
730 739 # The full path to the instantiated batch script.
731 740 batch_file = Unicode(u'')
732 741
733 def __init__(self, work_dir=u'.', config=None):
742 def __init__(self, work_dir=u'.', config=None, **kwargs):
734 743 super(BatchSystemLauncher, self).__init__(
735 work_dir=work_dir, config=config
744 work_dir=work_dir, config=config, **kwargs
736 745 )
737 746 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
738 747 self.context = {}
@@ -766,7 +775,7 b' class BatchSystemLauncher(BaseLauncher):'
766 775 return job_id
767 776
768 777 def stop(self):
769 output = Popen([self.delete_command, self.job_id], env=os.environ, stderr=STDOUT)
778 output = check_output([self.delete_command, self.job_id], env=os.environ, stderr=STDOUT)
770 779 self.notify_stop(output) # Pass the output of the kill cmd
771 780 return output
772 781
General Comments 0
You need to be logged in to leave comments. Login now