Show More
@@ -63,6 +63,10 b' ALREADY_STARTED = 10' | |||||
63 | # file to be found. |
|
63 | # file to be found. | |
64 | ALREADY_STOPPED = 11 |
|
64 | ALREADY_STOPPED = 11 | |
65 |
|
65 | |||
|
66 | # This will be the exit code if ipcluster engines is run, but there is not .pid | |||
|
67 | # file to be found. | |||
|
68 | NO_CLUSTER = 12 | |||
|
69 | ||||
66 |
|
70 | |||
67 | #----------------------------------------------------------------------------- |
|
71 | #----------------------------------------------------------------------------- | |
68 | # Command line options |
|
72 | # Command line options | |
@@ -164,6 +168,7 b' class IPClusterAppConfigLoader(ClusterDirConfigLoader):' | |||||
164 | otherwise use the '--cluster-dir' option. |
|
168 | otherwise use the '--cluster-dir' option. | |
165 | """ |
|
169 | """ | |
166 | ) |
|
170 | ) | |
|
171 | ||||
167 | paa = parser_start.add_argument |
|
172 | paa = parser_start.add_argument | |
168 | paa('-n', '--number', |
|
173 | paa('-n', '--number', | |
169 | type=int, dest='Global.n', |
|
174 | type=int, dest='Global.n', | |
@@ -206,6 +211,35 b' class IPClusterAppConfigLoader(ClusterDirConfigLoader):' | |||||
206 | help="The signal number to use in stopping the cluster (default=2).", |
|
211 | help="The signal number to use in stopping the cluster (default=2).", | |
207 | metavar="Global.signal") |
|
212 | metavar="Global.signal") | |
208 |
|
213 | |||
|
214 | # the "engines" subcommand parser | |||
|
215 | parser_engines = subparsers.add_parser( | |||
|
216 | 'engines', | |||
|
217 | parents=[parent_parser1, parent_parser2], | |||
|
218 | argument_default=SUPPRESS, | |||
|
219 | help="Attach some engines to an existing controller or cluster.", | |||
|
220 | description= | |||
|
221 | """Start one or more engines to connect to an existing Cluster | |||
|
222 | by profile name or cluster directory. | |||
|
223 | Cluster directories contain configuration, log and | |||
|
224 | security related files and are named using the convention | |||
|
225 | 'cluster_<profile>' and should be creating using the 'start' | |||
|
226 | subcommand of 'ipcluster'. If your cluster directory is in | |||
|
227 | the cwd or the ipython directory, you can simply refer to it | |||
|
228 | using its profile name, 'ipclusterz engines -n 4 -p <profile>`, | |||
|
229 | otherwise use the '--cluster-dir' option. | |||
|
230 | """ | |||
|
231 | ) | |||
|
232 | paa = parser_engines.add_argument | |||
|
233 | paa('-n', '--number', | |||
|
234 | type=int, dest='Global.n', | |||
|
235 | help='The number of engines to start.', | |||
|
236 | metavar='Global.n') | |||
|
237 | paa('--daemon', | |||
|
238 | dest='Global.daemonize', action='store_true', | |||
|
239 | help='Daemonize the ipcluster program. This implies --log-to-file') | |||
|
240 | paa('--no-daemon', | |||
|
241 | dest='Global.daemonize', action='store_false', | |||
|
242 | help="Dont't daemonize the ipcluster program.") | |||
209 |
|
243 | |||
210 | #----------------------------------------------------------------------------- |
|
244 | #----------------------------------------------------------------------------- | |
211 | # Main application |
|
245 | # Main application | |
@@ -232,7 +266,7 b' class IPClusterApp(ApplicationWithClusterDir):' | |||||
232 | self.default_config.Global.delay = 1 |
|
266 | self.default_config.Global.delay = 1 | |
233 | self.default_config.Global.reset_config = False |
|
267 | self.default_config.Global.reset_config = False | |
234 | self.default_config.Global.clean_logs = True |
|
268 | self.default_config.Global.clean_logs = True | |
235 |
self.default_config.Global.signal = |
|
269 | self.default_config.Global.signal = signal.SIGINT | |
236 | self.default_config.Global.daemonize = False |
|
270 | self.default_config.Global.daemonize = False | |
237 |
|
271 | |||
238 | def find_resources(self): |
|
272 | def find_resources(self): | |
@@ -255,6 +289,17 b' class IPClusterApp(ApplicationWithClusterDir):' | |||||
255 | "'ipclusterz create -h' or 'ipclusterz list -h' for more " |
|
289 | "'ipclusterz create -h' or 'ipclusterz list -h' for more " | |
256 | "information about creating and listing cluster dirs." |
|
290 | "information about creating and listing cluster dirs." | |
257 | ) |
|
291 | ) | |
|
292 | elif subcommand=='engines': | |||
|
293 | self.auto_create_cluster_dir = False | |||
|
294 | try: | |||
|
295 | super(IPClusterApp, self).find_resources() | |||
|
296 | except ClusterDirError: | |||
|
297 | raise ClusterDirError( | |||
|
298 | "Could not find a cluster directory. A cluster dir must " | |||
|
299 | "be created before running 'ipclusterz start'. Do " | |||
|
300 | "'ipclusterz create -h' or 'ipclusterz list -h' for more " | |||
|
301 | "information about creating and listing cluster dirs." | |||
|
302 | ) | |||
258 |
|
303 | |||
259 | def list_cluster_dirs(self): |
|
304 | def list_cluster_dirs(self): | |
260 | # Find the search paths |
|
305 | # Find the search paths | |
@@ -309,31 +354,30 b' class IPClusterApp(ApplicationWithClusterDir):' | |||||
309 | # reactor.callWhenRunning(self.start_launchers) |
|
354 | # reactor.callWhenRunning(self.start_launchers) | |
310 | dc = ioloop.DelayedCallback(self.start_launchers, 0, self.loop) |
|
355 | dc = ioloop.DelayedCallback(self.start_launchers, 0, self.loop) | |
311 | dc.start() |
|
356 | dc.start() | |
|
357 | if subcmd == 'engines': | |||
|
358 | self.start_logging() | |||
|
359 | self.loop = ioloop.IOLoop.instance() | |||
|
360 | # reactor.callWhenRunning(self.start_launchers) | |||
|
361 | engine_only = lambda : self.start_launchers(controller=False) | |||
|
362 | dc = ioloop.DelayedCallback(engine_only, 0, self.loop) | |||
|
363 | dc.start() | |||
312 |
|
364 | |||
313 | def start_launchers(self): |
|
365 | def start_launchers(self, controller=True): | |
314 | config = self.master_config |
|
366 | config = self.master_config | |
315 |
|
367 | |||
316 | # Create the launchers. In both bases, we set the work_dir of |
|
368 | # Create the launchers. In both bases, we set the work_dir of | |
317 | # the launcher to the cluster_dir. This is where the launcher's |
|
369 | # the launcher to the cluster_dir. This is where the launcher's | |
318 | # subprocesses will be launched. It is not where the controller |
|
370 | # subprocesses will be launched. It is not where the controller | |
319 | # and engine will be launched. |
|
371 | # and engine will be launched. | |
320 | el_class = import_item(config.Global.engine_launcher) |
|
372 | if controller: | |
321 | self.engine_launcher = el_class( |
|
|||
322 | work_dir=self.cluster_dir, config=config, logname=self.log.name |
|
|||
323 | ) |
|
|||
324 | cl_class = import_item(config.Global.controller_launcher) |
|
373 | cl_class = import_item(config.Global.controller_launcher) | |
325 | self.controller_launcher = cl_class( |
|
374 | self.controller_launcher = cl_class( | |
326 | work_dir=self.cluster_dir, config=config, |
|
375 | work_dir=self.cluster_dir, config=config, | |
327 | logname=self.log.name |
|
376 | logname=self.log.name | |
328 | ) |
|
377 | ) | |
329 |
|
||||
330 | # Setup signals |
|
|||
331 | signal.signal(signal.SIGINT, self.sigint_handler) |
|
|||
332 |
|
||||
333 | # Setup the observing of stopping. If the controller dies, shut |
|
378 | # Setup the observing of stopping. If the controller dies, shut | |
334 | # everything down as that will be completely fatal for the engines. |
|
379 | # everything down as that will be completely fatal for the engines. | |
335 | self.controller_launcher.on_stop(self.stop_launchers) |
|
380 | self.controller_launcher.on_stop(self.stop_launchers) | |
336 | # d1.addCallback(self.stop_launchers) |
|
|||
337 | # But, we don't monitor the stopping of engines. An engine dying |
|
381 | # But, we don't monitor the stopping of engines. An engine dying | |
338 | # is just fine and in principle a user could start a new engine. |
|
382 | # is just fine and in principle a user could start a new engine. | |
339 | # Also, if we did monitor engine stopping, it is difficult to |
|
383 | # Also, if we did monitor engine stopping, it is difficult to | |
@@ -342,17 +386,22 b' class IPClusterApp(ApplicationWithClusterDir):' | |||||
342 | # might trigger on a single engine stopping, other wait until |
|
386 | # might trigger on a single engine stopping, other wait until | |
343 | # all stop. TODO: think more about how to handle this. |
|
387 | # all stop. TODO: think more about how to handle this. | |
344 |
|
388 | |||
|
389 | ||||
|
390 | el_class = import_item(config.Global.engine_launcher) | |||
|
391 | self.engine_launcher = el_class( | |||
|
392 | work_dir=self.cluster_dir, config=config, logname=self.log.name | |||
|
393 | ) | |||
|
394 | ||||
|
395 | # Setup signals | |||
|
396 | signal.signal(signal.SIGINT, self.sigint_handler) | |||
|
397 | ||||
345 | # Start the controller and engines |
|
398 | # Start the controller and engines | |
346 | self._stopping = False # Make sure stop_launchers is not called 2x. |
|
399 | self._stopping = False # Make sure stop_launchers is not called 2x. | |
347 |
|
|
400 | if controller: | |
348 | dc = ioloop.DelayedCallback(self.start_engines, 1000*config.Global.delay, self.loop) |
|
401 | self.start_controller() | |
|
402 | dc = ioloop.DelayedCallback(self.start_engines, 1000*config.Global.delay*controller, self.loop) | |||
349 | dc.start() |
|
403 | dc.start() | |
350 | self.startup_message() |
|
404 | self.startup_message() | |
351 | # d.addCallback(self.start_engines) |
|
|||
352 | # d.addCallback(self.startup_message) |
|
|||
353 | # If the controller or engines fail to start, stop everything |
|
|||
354 | # d.addErrback(self.stop_launchers) |
|
|||
355 | return d |
|
|||
356 |
|
405 | |||
357 | def startup_message(self, r=None): |
|
406 | def startup_message(self, r=None): | |
358 | self.log.info("IPython cluster: started") |
|
407 | self.log.info("IPython cluster: started") | |
@@ -369,6 +418,7 b' class IPClusterApp(ApplicationWithClusterDir):' | |||||
369 | def start_engines(self, r=None): |
|
418 | def start_engines(self, r=None): | |
370 | # self.log.info("In start_engines") |
|
419 | # self.log.info("In start_engines") | |
371 | config = self.master_config |
|
420 | config = self.master_config | |
|
421 | ||||
372 | d = self.engine_launcher.start( |
|
422 | d = self.engine_launcher.start( | |
373 | config.Global.n, |
|
423 | config.Global.n, | |
374 | cluster_dir=config.Global.cluster_dir |
|
424 | cluster_dir=config.Global.cluster_dir | |
@@ -432,6 +482,8 b' class IPClusterApp(ApplicationWithClusterDir):' | |||||
432 | self.start_app_start() |
|
482 | self.start_app_start() | |
433 | elif subcmd=='stop': |
|
483 | elif subcmd=='stop': | |
434 | self.start_app_stop() |
|
484 | self.start_app_stop() | |
|
485 | elif subcmd=='engines': | |||
|
486 | self.start_app_engines() | |||
435 |
|
487 | |||
436 | def start_app_start(self): |
|
488 | def start_app_start(self): | |
437 | """Start the app for the start subcommand.""" |
|
489 | """Start the app for the start subcommand.""" | |
@@ -468,6 +520,29 b' class IPClusterApp(ApplicationWithClusterDir):' | |||||
468 | self.log.info("stopping...") |
|
520 | self.log.info("stopping...") | |
469 | self.remove_pid_file() |
|
521 | self.remove_pid_file() | |
470 |
|
522 | |||
|
523 | def start_app_engines(self): | |||
|
524 | """Start the app for the start subcommand.""" | |||
|
525 | config = self.master_config | |||
|
526 | # First see if the cluster is already running | |||
|
527 | ||||
|
528 | # Now log and daemonize | |||
|
529 | self.log.info( | |||
|
530 | 'Starting engines with [daemon=%r]' % config.Global.daemonize | |||
|
531 | ) | |||
|
532 | # TODO: Get daemonize working on Windows or as a Windows Server. | |||
|
533 | if config.Global.daemonize: | |||
|
534 | if os.name=='posix': | |||
|
535 | from twisted.scripts._twistd_unix import daemonize | |||
|
536 | daemonize() | |||
|
537 | ||||
|
538 | # Now write the new pid file AFTER our new forked pid is active. | |||
|
539 | # self.write_pid_file() | |||
|
540 | try: | |||
|
541 | self.loop.start() | |||
|
542 | except: | |||
|
543 | self.log.fatal("stopping...") | |||
|
544 | # self.remove_pid_file() | |||
|
545 | ||||
471 | def start_app_stop(self): |
|
546 | def start_app_stop(self): | |
472 | """Start the app for the stop subcommand.""" |
|
547 | """Start the app for the stop subcommand.""" | |
473 | config = self.master_config |
|
548 | config = self.master_config |
@@ -28,12 +28,12 b' except ImportError:' | |||||
28 |
|
28 | |||
29 | from subprocess import Popen, PIPE, STDOUT |
|
29 | from subprocess import Popen, PIPE, STDOUT | |
30 | try: |
|
30 | try: | |
31 |
from subprocess import check_o |
|
31 | from subprocess import check_output | |
32 | except ImportError: |
|
32 | except ImportError: | |
33 | # pre-2.7: |
|
33 | # pre-2.7: | |
34 | from StringIO import StringIO |
|
34 | from StringIO import StringIO | |
35 |
|
35 | |||
36 |
def check_o |
|
36 | def check_output(*args, **kwargs): | |
37 | sio = StringIO() |
|
37 | sio = StringIO() | |
38 | kwargs.update(dict(stdout=PIPE, stderr=STDOUT)) |
|
38 | kwargs.update(dict(stdout=PIPE, stderr=STDOUT)) | |
39 | p = Popen(*args, **kwargs) |
|
39 | p = Popen(*args, **kwargs) | |
@@ -495,7 +495,7 b' class SSHLauncher(LocalProcessLauncher):' | |||||
495 | """ |
|
495 | """ | |
496 |
|
496 | |||
497 | ssh_cmd = List(['ssh'], config=True) |
|
497 | ssh_cmd = List(['ssh'], config=True) | |
498 | ssh_args = List([], config=True) |
|
498 | ssh_args = List(['-tt'], config=True) | |
499 | program = List(['date'], config=True) |
|
499 | program = List(['date'], config=True) | |
500 | program_args = List([], config=True) |
|
500 | program_args = List([], config=True) | |
501 | hostname = Str('', config=True) |
|
501 | hostname = Str('', config=True) | |
@@ -513,12 +513,21 b' class SSHLauncher(LocalProcessLauncher):' | |||||
513 | self.program + self.program_args |
|
513 | self.program + self.program_args | |
514 |
|
514 | |||
515 | def start(self, cluster_dir, hostname=None, user=None): |
|
515 | def start(self, cluster_dir, hostname=None, user=None): | |
|
516 | print self.config | |||
516 | if hostname is not None: |
|
517 | if hostname is not None: | |
517 | self.hostname = hostname |
|
518 | self.hostname = hostname | |
518 | if user is not None: |
|
519 | if user is not None: | |
519 | self.user = user |
|
520 | self.user = user | |
|
521 | print (self.location, hostname, user) | |||
520 | return super(SSHLauncher, self).start() |
|
522 | return super(SSHLauncher, self).start() | |
521 |
|
523 | |||
|
524 | def signal(self, sig): | |||
|
525 | if self.state == 'running': | |||
|
526 | # send escaped ssh connection-closer | |||
|
527 | self.process.stdin.write('~.') | |||
|
528 | self.process.stdin.flush() | |||
|
529 | ||||
|
530 | ||||
522 |
|
531 | |||
523 | class SSHControllerLauncher(SSHLauncher): |
|
532 | class SSHControllerLauncher(SSHLauncher): | |
524 |
|
533 | |||
@@ -568,9 +577,9 b' class WindowsHPCLauncher(BaseLauncher):' | |||||
568 | scheduler = Str('', config=True) |
|
577 | scheduler = Str('', config=True) | |
569 | job_cmd = Str(find_job_cmd(), config=True) |
|
578 | job_cmd = Str(find_job_cmd(), config=True) | |
570 |
|
579 | |||
571 | def __init__(self, work_dir=u'.', config=None): |
|
580 | def __init__(self, work_dir=u'.', config=None, **kwargs): | |
572 | super(WindowsHPCLauncher, self).__init__( |
|
581 | super(WindowsHPCLauncher, self).__init__( | |
573 | work_dir=work_dir, config=config |
|
582 | work_dir=work_dir, config=config, **kwargs | |
574 | ) |
|
583 | ) | |
575 |
|
584 | |||
576 | @property |
|
585 | @property | |
@@ -730,9 +739,9 b' class BatchSystemLauncher(BaseLauncher):' | |||||
730 | # The full path to the instantiated batch script. |
|
739 | # The full path to the instantiated batch script. | |
731 | batch_file = Unicode(u'') |
|
740 | batch_file = Unicode(u'') | |
732 |
|
741 | |||
733 | def __init__(self, work_dir=u'.', config=None): |
|
742 | def __init__(self, work_dir=u'.', config=None, **kwargs): | |
734 | super(BatchSystemLauncher, self).__init__( |
|
743 | super(BatchSystemLauncher, self).__init__( | |
735 | work_dir=work_dir, config=config |
|
744 | work_dir=work_dir, config=config, **kwargs | |
736 | ) |
|
745 | ) | |
737 | self.batch_file = os.path.join(self.work_dir, self.batch_file_name) |
|
746 | self.batch_file = os.path.join(self.work_dir, self.batch_file_name) | |
738 | self.context = {} |
|
747 | self.context = {} | |
@@ -766,7 +775,7 b' class BatchSystemLauncher(BaseLauncher):' | |||||
766 | return job_id |
|
775 | return job_id | |
767 |
|
776 | |||
768 | def stop(self): |
|
777 | def stop(self): | |
769 |
output = |
|
778 | output = check_output([self.delete_command, self.job_id], env=os.environ, stderr=STDOUT) | |
770 | self.notify_stop(output) # Pass the output of the kill cmd |
|
779 | self.notify_stop(output) # Pass the output of the kill cmd | |
771 | return output |
|
780 | return output | |
772 |
|
781 |
General Comments 0
You need to be logged in to leave comments.
Login now