##// END OF EJS Templates
add ipcluster engines; fix ssh process shutdown
MinRK -
Show More
@@ -63,6 +63,10 b' ALREADY_STARTED = 10'
63 # file to be found.
63 # file to be found.
64 ALREADY_STOPPED = 11
64 ALREADY_STOPPED = 11
65
65
66 # This will be the exit code if ipcluster engines is run, but there is not .pid
67 # file to be found.
68 NO_CLUSTER = 12
69
66
70
67 #-----------------------------------------------------------------------------
71 #-----------------------------------------------------------------------------
68 # Command line options
72 # Command line options
@@ -164,6 +168,7 b' class IPClusterAppConfigLoader(ClusterDirConfigLoader):'
164 otherwise use the '--cluster-dir' option.
168 otherwise use the '--cluster-dir' option.
165 """
169 """
166 )
170 )
171
167 paa = parser_start.add_argument
172 paa = parser_start.add_argument
168 paa('-n', '--number',
173 paa('-n', '--number',
169 type=int, dest='Global.n',
174 type=int, dest='Global.n',
@@ -205,7 +210,36 b' class IPClusterAppConfigLoader(ClusterDirConfigLoader):'
205 dest='Global.signal', type=int,
210 dest='Global.signal', type=int,
206 help="The signal number to use in stopping the cluster (default=2).",
211 help="The signal number to use in stopping the cluster (default=2).",
207 metavar="Global.signal")
212 metavar="Global.signal")
208
213
214 # the "engines" subcommand parser
215 parser_engines = subparsers.add_parser(
216 'engines',
217 parents=[parent_parser1, parent_parser2],
218 argument_default=SUPPRESS,
219 help="Attach some engines to an existing controller or cluster.",
220 description=
221 """Start one or more engines to connect to an existing Cluster
222 by profile name or cluster directory.
223 Cluster directories contain configuration, log and
224 security related files and are named using the convention
225 'cluster_<profile>' and should be creating using the 'start'
226 subcommand of 'ipcluster'. If your cluster directory is in
227 the cwd or the ipython directory, you can simply refer to it
228 using its profile name, 'ipclusterz engines -n 4 -p <profile>`,
229 otherwise use the '--cluster-dir' option.
230 """
231 )
232 paa = parser_engines.add_argument
233 paa('-n', '--number',
234 type=int, dest='Global.n',
235 help='The number of engines to start.',
236 metavar='Global.n')
237 paa('--daemon',
238 dest='Global.daemonize', action='store_true',
239 help='Daemonize the ipcluster program. This implies --log-to-file')
240 paa('--no-daemon',
241 dest='Global.daemonize', action='store_false',
242 help="Dont't daemonize the ipcluster program.")
209
243
210 #-----------------------------------------------------------------------------
244 #-----------------------------------------------------------------------------
211 # Main application
245 # Main application
@@ -232,7 +266,7 b' class IPClusterApp(ApplicationWithClusterDir):'
232 self.default_config.Global.delay = 1
266 self.default_config.Global.delay = 1
233 self.default_config.Global.reset_config = False
267 self.default_config.Global.reset_config = False
234 self.default_config.Global.clean_logs = True
268 self.default_config.Global.clean_logs = True
235 self.default_config.Global.signal = 2
269 self.default_config.Global.signal = signal.SIGINT
236 self.default_config.Global.daemonize = False
270 self.default_config.Global.daemonize = False
237
271
238 def find_resources(self):
272 def find_resources(self):
@@ -255,6 +289,17 b' class IPClusterApp(ApplicationWithClusterDir):'
255 "'ipclusterz create -h' or 'ipclusterz list -h' for more "
289 "'ipclusterz create -h' or 'ipclusterz list -h' for more "
256 "information about creating and listing cluster dirs."
290 "information about creating and listing cluster dirs."
257 )
291 )
292 elif subcommand=='engines':
293 self.auto_create_cluster_dir = False
294 try:
295 super(IPClusterApp, self).find_resources()
296 except ClusterDirError:
297 raise ClusterDirError(
298 "Could not find a cluster directory. A cluster dir must "
299 "be created before running 'ipclusterz start'. Do "
300 "'ipclusterz create -h' or 'ipclusterz list -h' for more "
301 "information about creating and listing cluster dirs."
302 )
258
303
259 def list_cluster_dirs(self):
304 def list_cluster_dirs(self):
260 # Find the search paths
305 # Find the search paths
@@ -309,50 +354,54 b' class IPClusterApp(ApplicationWithClusterDir):'
309 # reactor.callWhenRunning(self.start_launchers)
354 # reactor.callWhenRunning(self.start_launchers)
310 dc = ioloop.DelayedCallback(self.start_launchers, 0, self.loop)
355 dc = ioloop.DelayedCallback(self.start_launchers, 0, self.loop)
311 dc.start()
356 dc.start()
357 if subcmd == 'engines':
358 self.start_logging()
359 self.loop = ioloop.IOLoop.instance()
360 # reactor.callWhenRunning(self.start_launchers)
361 engine_only = lambda : self.start_launchers(controller=False)
362 dc = ioloop.DelayedCallback(engine_only, 0, self.loop)
363 dc.start()
312
364
313 def start_launchers(self):
365 def start_launchers(self, controller=True):
314 config = self.master_config
366 config = self.master_config
315
367
316 # Create the launchers. In both bases, we set the work_dir of
368 # Create the launchers. In both bases, we set the work_dir of
317 # the launcher to the cluster_dir. This is where the launcher's
369 # the launcher to the cluster_dir. This is where the launcher's
318 # subprocesses will be launched. It is not where the controller
370 # subprocesses will be launched. It is not where the controller
319 # and engine will be launched.
371 # and engine will be launched.
372 if controller:
373 cl_class = import_item(config.Global.controller_launcher)
374 self.controller_launcher = cl_class(
375 work_dir=self.cluster_dir, config=config,
376 logname=self.log.name
377 )
378 # Setup the observing of stopping. If the controller dies, shut
379 # everything down as that will be completely fatal for the engines.
380 self.controller_launcher.on_stop(self.stop_launchers)
381 # But, we don't monitor the stopping of engines. An engine dying
382 # is just fine and in principle a user could start a new engine.
383 # Also, if we did monitor engine stopping, it is difficult to
384 # know what to do when only some engines die. Currently, the
385 # observing of engine stopping is inconsistent. Some launchers
386 # might trigger on a single engine stopping, other wait until
387 # all stop. TODO: think more about how to handle this.
388
389
320 el_class = import_item(config.Global.engine_launcher)
390 el_class = import_item(config.Global.engine_launcher)
321 self.engine_launcher = el_class(
391 self.engine_launcher = el_class(
322 work_dir=self.cluster_dir, config=config, logname=self.log.name
392 work_dir=self.cluster_dir, config=config, logname=self.log.name
323 )
393 )
324 cl_class = import_item(config.Global.controller_launcher)
325 self.controller_launcher = cl_class(
326 work_dir=self.cluster_dir, config=config,
327 logname=self.log.name
328 )
329
394
330 # Setup signals
395 # Setup signals
331 signal.signal(signal.SIGINT, self.sigint_handler)
396 signal.signal(signal.SIGINT, self.sigint_handler)
332
397
333 # Setup the observing of stopping. If the controller dies, shut
334 # everything down as that will be completely fatal for the engines.
335 self.controller_launcher.on_stop(self.stop_launchers)
336 # d1.addCallback(self.stop_launchers)
337 # But, we don't monitor the stopping of engines. An engine dying
338 # is just fine and in principle a user could start a new engine.
339 # Also, if we did monitor engine stopping, it is difficult to
340 # know what to do when only some engines die. Currently, the
341 # observing of engine stopping is inconsistent. Some launchers
342 # might trigger on a single engine stopping, other wait until
343 # all stop. TODO: think more about how to handle this.
344
345 # Start the controller and engines
398 # Start the controller and engines
346 self._stopping = False # Make sure stop_launchers is not called 2x.
399 self._stopping = False # Make sure stop_launchers is not called 2x.
347 d = self.start_controller()
400 if controller:
348 dc = ioloop.DelayedCallback(self.start_engines, 1000*config.Global.delay, self.loop)
401 self.start_controller()
402 dc = ioloop.DelayedCallback(self.start_engines, 1000*config.Global.delay*controller, self.loop)
349 dc.start()
403 dc.start()
350 self.startup_message()
404 self.startup_message()
351 # d.addCallback(self.start_engines)
352 # d.addCallback(self.startup_message)
353 # If the controller or engines fail to start, stop everything
354 # d.addErrback(self.stop_launchers)
355 return d
356
405
357 def startup_message(self, r=None):
406 def startup_message(self, r=None):
358 self.log.info("IPython cluster: started")
407 self.log.info("IPython cluster: started")
@@ -369,6 +418,7 b' class IPClusterApp(ApplicationWithClusterDir):'
369 def start_engines(self, r=None):
418 def start_engines(self, r=None):
370 # self.log.info("In start_engines")
419 # self.log.info("In start_engines")
371 config = self.master_config
420 config = self.master_config
421
372 d = self.engine_launcher.start(
422 d = self.engine_launcher.start(
373 config.Global.n,
423 config.Global.n,
374 cluster_dir=config.Global.cluster_dir
424 cluster_dir=config.Global.cluster_dir
@@ -432,6 +482,8 b' class IPClusterApp(ApplicationWithClusterDir):'
432 self.start_app_start()
482 self.start_app_start()
433 elif subcmd=='stop':
483 elif subcmd=='stop':
434 self.start_app_stop()
484 self.start_app_stop()
485 elif subcmd=='engines':
486 self.start_app_engines()
435
487
436 def start_app_start(self):
488 def start_app_start(self):
437 """Start the app for the start subcommand."""
489 """Start the app for the start subcommand."""
@@ -468,6 +520,29 b' class IPClusterApp(ApplicationWithClusterDir):'
468 self.log.info("stopping...")
520 self.log.info("stopping...")
469 self.remove_pid_file()
521 self.remove_pid_file()
470
522
523 def start_app_engines(self):
524 """Start the app for the start subcommand."""
525 config = self.master_config
526 # First see if the cluster is already running
527
528 # Now log and daemonize
529 self.log.info(
530 'Starting engines with [daemon=%r]' % config.Global.daemonize
531 )
532 # TODO: Get daemonize working on Windows or as a Windows Server.
533 if config.Global.daemonize:
534 if os.name=='posix':
535 from twisted.scripts._twistd_unix import daemonize
536 daemonize()
537
538 # Now write the new pid file AFTER our new forked pid is active.
539 # self.write_pid_file()
540 try:
541 self.loop.start()
542 except:
543 self.log.fatal("stopping...")
544 # self.remove_pid_file()
545
471 def start_app_stop(self):
546 def start_app_stop(self):
472 """Start the app for the stop subcommand."""
547 """Start the app for the stop subcommand."""
473 config = self.master_config
548 config = self.master_config
@@ -28,12 +28,12 b' except ImportError:'
28
28
29 from subprocess import Popen, PIPE, STDOUT
29 from subprocess import Popen, PIPE, STDOUT
30 try:
30 try:
31 from subprocess import check_open
31 from subprocess import check_output
32 except ImportError:
32 except ImportError:
33 # pre-2.7:
33 # pre-2.7:
34 from StringIO import StringIO
34 from StringIO import StringIO
35
35
36 def check_open(*args, **kwargs):
36 def check_output(*args, **kwargs):
37 sio = StringIO()
37 sio = StringIO()
38 kwargs.update(dict(stdout=PIPE, stderr=STDOUT))
38 kwargs.update(dict(stdout=PIPE, stderr=STDOUT))
39 p = Popen(*args, **kwargs)
39 p = Popen(*args, **kwargs)
@@ -495,7 +495,7 b' class SSHLauncher(LocalProcessLauncher):'
495 """
495 """
496
496
497 ssh_cmd = List(['ssh'], config=True)
497 ssh_cmd = List(['ssh'], config=True)
498 ssh_args = List([], config=True)
498 ssh_args = List(['-tt'], config=True)
499 program = List(['date'], config=True)
499 program = List(['date'], config=True)
500 program_args = List([], config=True)
500 program_args = List([], config=True)
501 hostname = Str('', config=True)
501 hostname = Str('', config=True)
@@ -513,11 +513,20 b' class SSHLauncher(LocalProcessLauncher):'
513 self.program + self.program_args
513 self.program + self.program_args
514
514
515 def start(self, cluster_dir, hostname=None, user=None):
515 def start(self, cluster_dir, hostname=None, user=None):
516 print self.config
516 if hostname is not None:
517 if hostname is not None:
517 self.hostname = hostname
518 self.hostname = hostname
518 if user is not None:
519 if user is not None:
519 self.user = user
520 self.user = user
521 print (self.location, hostname, user)
520 return super(SSHLauncher, self).start()
522 return super(SSHLauncher, self).start()
523
524 def signal(self, sig):
525 if self.state == 'running':
526 # send escaped ssh connection-closer
527 self.process.stdin.write('~.')
528 self.process.stdin.flush()
529
521
530
522
531
523 class SSHControllerLauncher(SSHLauncher):
532 class SSHControllerLauncher(SSHLauncher):
@@ -568,9 +577,9 b' class WindowsHPCLauncher(BaseLauncher):'
568 scheduler = Str('', config=True)
577 scheduler = Str('', config=True)
569 job_cmd = Str(find_job_cmd(), config=True)
578 job_cmd = Str(find_job_cmd(), config=True)
570
579
571 def __init__(self, work_dir=u'.', config=None):
580 def __init__(self, work_dir=u'.', config=None, **kwargs):
572 super(WindowsHPCLauncher, self).__init__(
581 super(WindowsHPCLauncher, self).__init__(
573 work_dir=work_dir, config=config
582 work_dir=work_dir, config=config, **kwargs
574 )
583 )
575
584
576 @property
585 @property
@@ -730,9 +739,9 b' class BatchSystemLauncher(BaseLauncher):'
730 # The full path to the instantiated batch script.
739 # The full path to the instantiated batch script.
731 batch_file = Unicode(u'')
740 batch_file = Unicode(u'')
732
741
733 def __init__(self, work_dir=u'.', config=None):
742 def __init__(self, work_dir=u'.', config=None, **kwargs):
734 super(BatchSystemLauncher, self).__init__(
743 super(BatchSystemLauncher, self).__init__(
735 work_dir=work_dir, config=config
744 work_dir=work_dir, config=config, **kwargs
736 )
745 )
737 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
746 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
738 self.context = {}
747 self.context = {}
@@ -766,7 +775,7 b' class BatchSystemLauncher(BaseLauncher):'
766 return job_id
775 return job_id
767
776
768 def stop(self):
777 def stop(self):
769 output = Popen([self.delete_command, self.job_id], env=os.environ, stderr=STDOUT)
778 output = check_output([self.delete_command, self.job_id], env=os.environ, stderr=STDOUT)
770 self.notify_stop(output) # Pass the output of the kill cmd
779 self.notify_stop(output) # Pass the output of the kill cmd
771 return output
780 return output
772
781
General Comments 0
You need to be logged in to leave comments. Login now