ipcluster.py
1041 lines
| 33.6 KiB
| text/x-python
|
PythonLexer
Brian Granger
|
r1943 | #!/usr/bin/env python | ||
Brian Granger
|
r1774 | # encoding: utf-8 | ||
"""Start an IPython cluster = (controller + engines).""" | ||||
#----------------------------------------------------------------------------- | ||||
# Copyright (C) 2008 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
Brian E Granger
|
r1234 | import os | ||
Brian Granger
|
r1770 | import re | ||
Brian E Granger
|
r1234 | import sys | ||
Brian Granger
|
r1770 | import signal | ||
Matthieu Brucher
|
r2680 | import stat | ||
Brian Granger
|
r1830 | import tempfile | ||
Brian Granger
|
r1770 | pjoin = os.path.join | ||
from twisted.internet import reactor, defer | ||||
from twisted.internet.protocol import ProcessProtocol | ||||
from twisted.internet.error import ProcessDone, ProcessTerminated | ||||
from twisted.internet.utils import getProcessOutput | ||||
Brian Granger
|
r1826 | from twisted.python import failure, log | ||
Brian Granger
|
r1770 | |||
from IPython.external import argparse | ||||
from IPython.external import Itpl | ||||
Brian Granger
|
r1958 | from IPython.genutils import ( | ||
get_ipython_dir, | ||||
get_log_dir, | ||||
get_security_dir, | ||||
num_cpus | ||||
) | ||||
Administrator
|
r1815 | from IPython.kernel.fcutil import have_crypto | ||
Brian Granger
|
r1945 | |||
# Create various ipython directories if they don't exist. | ||||
# This must be done before IPython.kernel.config is imported. | ||||
from IPython.iplib import user_setup | ||||
if os.name == 'posix': | ||||
rc_suffix = '' | ||||
else: | ||||
rc_suffix = '.ini' | ||||
user_setup(get_ipython_dir(), rc_suffix, mode='install', interactive=False) | ||||
get_log_dir() | ||||
get_security_dir() | ||||
Brian Granger
|
r1944 | from IPython.kernel.config import config_manager as kernel_config_manager | ||
from IPython.kernel.error import SecurityError, FileTimeoutError | ||||
Brian Granger
|
r1826 | from IPython.kernel.fcutil import have_crypto | ||
Brian Granger
|
r1944 | from IPython.kernel.twistedutil import gatherBoth, wait_for_file | ||
Brian Granger
|
r1826 | from IPython.kernel.util import printer | ||
Brian Granger
|
r1774 | #----------------------------------------------------------------------------- | ||
# General process handling code | ||||
#----------------------------------------------------------------------------- | ||||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r1770 | class ProcessStateError(Exception): | ||
pass | ||||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r1770 | class UnknownStatus(Exception): | ||
pass | ||||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r1770 | class LauncherProcessProtocol(ProcessProtocol): | ||
""" | ||||
A ProcessProtocol to go with the ProcessLauncher. | ||||
""" | ||||
def __init__(self, process_launcher): | ||||
self.process_launcher = process_launcher | ||||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r1770 | def connectionMade(self): | ||
self.process_launcher.fire_start_deferred(self.transport.pid) | ||||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r1770 | def processEnded(self, status): | ||
value = status.value | ||||
if isinstance(value, ProcessDone): | ||||
self.process_launcher.fire_stop_deferred(0) | ||||
elif isinstance(value, ProcessTerminated): | ||||
self.process_launcher.fire_stop_deferred( | ||||
{'exit_code':value.exitCode, | ||||
'signal':value.signal, | ||||
'status':value.status | ||||
} | ||||
) | ||||
else: | ||||
raise UnknownStatus("unknown exit status, this is probably a bug in Twisted") | ||||
Brian Granger
|
r1830 | |||
Brian Granger
|
r1770 | def outReceived(self, data): | ||
log.msg(data) | ||||
Brian Granger
|
r1830 | |||
Brian Granger
|
r1770 | def errReceived(self, data): | ||
log.err(data) | ||||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r1770 | class ProcessLauncher(object): | ||
""" | ||||
Start and stop an external process in an asynchronous manner. | ||||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r1770 | Currently this uses deferreds to notify other parties of process state | ||
changes. This is an awkward design and should be moved to using | ||||
a formal NotificationCenter. | ||||
""" | ||||
def __init__(self, cmd_and_args): | ||||
self.cmd = cmd_and_args[0] | ||||
self.args = cmd_and_args | ||||
self._reset() | ||||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r1770 | def _reset(self): | ||
self.process_protocol = None | ||||
self.pid = None | ||||
self.start_deferred = None | ||||
self.stop_deferreds = [] | ||||
self.state = 'before' # before, running, or after | ||||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r1770 | @property | ||
def running(self): | ||||
if self.state == 'running': | ||||
return True | ||||
else: | ||||
return False | ||||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r1770 | def fire_start_deferred(self, pid): | ||
self.pid = pid | ||||
self.state = 'running' | ||||
log.msg('Process %r has started with pid=%i' % (self.args, pid)) | ||||
self.start_deferred.callback(pid) | ||||
def start(self): | ||||
if self.state == 'before': | ||||
self.process_protocol = LauncherProcessProtocol(self) | ||||
self.start_deferred = defer.Deferred() | ||||
self.process_transport = reactor.spawnProcess( | ||||
self.process_protocol, | ||||
self.cmd, | ||||
self.args, | ||||
env=os.environ | ||||
) | ||||
return self.start_deferred | ||||
Brian E Granger
|
r1234 | else: | ||
Brian Granger
|
r1770 | s = 'the process has already been started and has state: %r' % \ | ||
self.state | ||||
return defer.fail(ProcessStateError(s)) | ||||
def get_stop_deferred(self): | ||||
if self.state == 'running' or self.state == 'before': | ||||
d = defer.Deferred() | ||||
self.stop_deferreds.append(d) | ||||
return d | ||||
else: | ||||
s = 'this process is already complete' | ||||
return defer.fail(ProcessStateError(s)) | ||||
def fire_stop_deferred(self, exit_code): | ||||
log.msg('Process %r has stopped with %r' % (self.args, exit_code)) | ||||
self.state = 'after' | ||||
for d in self.stop_deferreds: | ||||
d.callback(exit_code) | ||||
def signal(self, sig): | ||||
""" | ||||
Send a signal to the process. | ||||
The argument sig can be ('KILL','INT', etc.) or any signal number. | ||||
""" | ||||
if self.state == 'running': | ||||
self.process_transport.signalProcess(sig) | ||||
Brian Granger
|
r1771 | # def __del__(self): | ||
# self.signal('KILL') | ||||
Brian Granger
|
r1610 | |||
Brian Granger
|
r1770 | def interrupt_then_kill(self, delay=1.0): | ||
self.signal('INT') | ||||
reactor.callLater(delay, self.signal, 'KILL') | ||||
Brian Granger
|
r1610 | |||
Brian Granger
|
r1774 | #----------------------------------------------------------------------------- | ||
# Code for launching controller and engines | ||||
#----------------------------------------------------------------------------- | ||||
Brian Granger
|
r1770 | class ControllerLauncher(ProcessLauncher): | ||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r1770 | def __init__(self, extra_args=None): | ||
Brian Granger
|
r1771 | if sys.platform == 'win32': | ||
Administrator
|
r1815 | # This logic is needed because the ipcontroller script doesn't | ||
# always get installed in the same way or in the same location. | ||||
from IPython.kernel.scripts import ipcontroller | ||||
script_location = ipcontroller.__file__.replace('.pyc', '.py') | ||||
# The -u option here turns on unbuffered output, which is required | ||||
Brian Granger
|
r1943 | # on Win32 to prevent wierd conflict and problems with Twisted. | ||
# Also, use sys.executable to make sure we are picking up the | ||||
# right python exe. | ||||
args = [sys.executable, '-u', script_location] | ||||
Brian Granger
|
r1771 | else: | ||
args = ['ipcontroller'] | ||||
Brian Granger
|
r1770 | self.extra_args = extra_args | ||
if extra_args is not None: | ||||
Brian Granger
|
r1771 | args.extend(extra_args) | ||
Brian Granger
|
r1770 | |||
Brian Granger
|
r1771 | ProcessLauncher.__init__(self, args) | ||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r1770 | class EngineLauncher(ProcessLauncher): | ||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r1770 | def __init__(self, extra_args=None): | ||
Brian Granger
|
r1771 | if sys.platform == 'win32': | ||
Administrator
|
r1815 | # This logic is needed because the ipcontroller script doesn't | ||
# always get installed in the same way or in the same location. | ||||
from IPython.kernel.scripts import ipengine | ||||
script_location = ipengine.__file__.replace('.pyc', '.py') | ||||
# The -u option here turns on unbuffered output, which is required | ||||
Brian Granger
|
r1943 | # on Win32 to prevent wierd conflict and problems with Twisted. | ||
# Also, use sys.executable to make sure we are picking up the | ||||
# right python exe. | ||||
args = [sys.executable, '-u', script_location] | ||||
Brian Granger
|
r1771 | else: | ||
args = ['ipengine'] | ||||
Brian Granger
|
r1770 | self.extra_args = extra_args | ||
if extra_args is not None: | ||||
Brian Granger
|
r1771 | args.extend(extra_args) | ||
Brian Granger
|
r1770 | |||
Brian Granger
|
r1771 | ProcessLauncher.__init__(self, args) | ||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r1770 | class LocalEngineSet(object): | ||
def __init__(self, extra_args=None): | ||||
self.extra_args = extra_args | ||||
self.launchers = [] | ||||
def start(self, n): | ||||
dlist = [] | ||||
for i in range(n): | ||||
Satrajit Ghosh
|
r2623 | print "starting engine:", i | ||
Brian Granger
|
r1770 | el = EngineLauncher(extra_args=self.extra_args) | ||
d = el.start() | ||||
self.launchers.append(el) | ||||
dlist.append(d) | ||||
dfinal = gatherBoth(dlist, consumeErrors=True) | ||||
dfinal.addCallback(self._handle_start) | ||||
return dfinal | ||||
def _handle_start(self, r): | ||||
log.msg('Engines started with pids: %r' % r) | ||||
return r | ||||
def _handle_stop(self, r): | ||||
log.msg('Engines received signal: %r' % r) | ||||
return r | ||||
def signal(self, sig): | ||||
dlist = [] | ||||
for el in self.launchers: | ||||
d = el.get_stop_deferred() | ||||
dlist.append(d) | ||||
el.signal(sig) | ||||
dfinal = gatherBoth(dlist, consumeErrors=True) | ||||
dfinal.addCallback(self._handle_stop) | ||||
return dfinal | ||||
def interrupt_then_kill(self, delay=1.0): | ||||
dlist = [] | ||||
for el in self.launchers: | ||||
d = el.get_stop_deferred() | ||||
dlist.append(d) | ||||
el.interrupt_then_kill(delay) | ||||
dfinal = gatherBoth(dlist, consumeErrors=True) | ||||
dfinal.addCallback(self._handle_stop) | ||||
return dfinal | ||||
class BatchEngineSet(object): | ||||
Justin Riley
|
r2676 | |||
# Subclasses must fill these in. See PBSEngineSet/SGEEngineSet | ||||
name = '' | ||||
Brian Granger
|
r1770 | submit_command = '' | ||
delete_command = '' | ||||
job_id_regexp = '' | ||||
Justin Riley
|
r2676 | job_array_regexp = '' | ||
Justin Riley
|
r2679 | job_array_template = '' | ||
queue_regexp = '' | ||||
queue_template = '' | ||||
Justin Riley
|
r2676 | default_template = '' | ||
Justin Riley
|
r2679 | def __init__(self, template_file, queue, **kwargs): | ||
Brian Granger
|
r1770 | self.template_file = template_file | ||
Justin Riley
|
r2679 | self.queue = queue | ||
Justin Riley
|
r2676 | |||
Brian Granger
|
r1770 | def parse_job_id(self, output): | ||
Justin Riley
|
r2676 | m = re.search(self.job_id_regexp, output) | ||
Brian Granger
|
r1770 | if m is not None: | ||
job_id = m.group() | ||||
Brian E Granger
|
r1234 | else: | ||
Brian Granger
|
r1770 | raise Exception("job id couldn't be determined: %s" % output) | ||
self.job_id = job_id | ||||
Brian Granger
|
r1773 | log.msg('Job started with job id: %r' % job_id) | ||
Brian Granger
|
r1770 | return job_id | ||
Justin Riley
|
r2676 | |||
Brian Granger
|
r1770 | def handle_error(self, f): | ||
f.printTraceback() | ||||
Brian Granger
|
r1773 | f.raiseException() | ||
Justin Riley
|
r2675 | |||
Satrajit Ghosh
|
r2623 | def start(self, n): | ||
Justin Riley
|
r2675 | log.msg("starting %d engines" % n) | ||
self._temp_file = tempfile.NamedTemporaryFile() | ||||
Justin Riley
|
r2681 | os.chmod(self._temp_file.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) | ||
Justin Riley
|
r2675 | if self.template_file: | ||
Justin Riley
|
r2676 | log.msg("Using %s script %s" % (self.name, self.template_file)) | ||
Justin Riley
|
r2675 | contents = open(self.template_file, 'r').read() | ||
Justin Riley
|
r2679 | new_script = contents | ||
regex = re.compile(self.job_array_regexp) | ||||
Justin Riley
|
r2675 | if not regex.search(contents): | ||
Justin Riley
|
r2676 | log.msg("adding job array settings to %s script" % self.name) | ||
Justin Riley
|
r2679 | new_script = self.job_array_template % n +'\n' + new_script | ||
print self.queue_regexp | ||||
regex = re.compile(self.queue_regexp) | ||||
print regex.search(contents) | ||||
if self.queue and not regex.search(contents): | ||||
log.msg("adding queue settings to %s script" % self.name) | ||||
new_script = self.queue_template % self.queue + '\n' + new_script | ||||
if new_script != contents: | ||||
self._temp_file.write(new_script) | ||||
Justin Riley
|
r2675 | self.template_file = self._temp_file.name | ||
else: | ||||
Justin Riley
|
r2679 | default_script = self.default_template % n | ||
if self.queue: | ||||
default_script = self.queue_template % self.queue + \ | ||||
'\n' + default_script | ||||
Justin Riley
|
r2676 | log.msg("using default ipengine %s script: \n%s" % | ||
Justin Riley
|
r2679 | (self.name, default_script)) | ||
self._temp_file.file.write(default_script) | ||||
Justin Riley
|
r2675 | self.template_file = self._temp_file.name | ||
Matthieu Brucher
|
r2680 | self._temp_file.file.close() | ||
Justin Riley
|
r2675 | d = getProcessOutput(self.submit_command, | ||
[self.template_file], | ||||
env=os.environ) | ||||
d.addCallback(self.parse_job_id) | ||||
d.addErrback(self.handle_error) | ||||
return d | ||||
Justin Riley
|
r2676 | def kill(self): | ||
d = getProcessOutput(self.delete_command, | ||||
[self.job_id],env=os.environ) | ||||
return d | ||||
class PBSEngineSet(BatchEngineSet): | ||||
name = 'PBS' | ||||
submit_command = 'qsub' | ||||
delete_command = 'qdel' | ||||
job_id_regexp = '\d+' | ||||
job_array_regexp = '#PBS[ \t]+-t[ \t]+\d+' | ||||
Justin Riley
|
r2679 | job_array_template = '#PBS -t 1-%d' | ||
queue_regexp = '#PBS[ \t]+-q[ \t]+\w+' | ||||
queue_template = '#PBS -q %s' | ||||
Justin Riley
|
r2681 | default_template="""#!/bin/sh | ||
#PBS -V | ||||
Justin Riley
|
r2676 | #PBS -t 1-%d | ||
#PBS -N ipengine | ||||
eid=$(($PBS_ARRAYID - 1)) | ||||
ipengine --logfile=ipengine${eid}.log | ||||
""" | ||||
class SGEEngineSet(PBSEngineSet): | ||||
name = 'SGE' | ||||
job_array_regexp = '#\$[ \t]+-t[ \t]+\d+' | ||||
Justin Riley
|
r2679 | job_array_template = '#$ -t 1-%d' | ||
queue_regexp = '#\$[ \t]+-q[ \t]+\w+' | ||||
queue_template = '#$ -q %s' | ||||
Justin Riley
|
r2676 | default_template="""#$ -V | ||
Justin Riley
|
r2681 | #$ -S /bin/sh | ||
Justin Riley
|
r2675 | #$ -t 1-%d | ||
#$ -N ipengine | ||||
eid=$(($SGE_TASK_ID - 1)) | ||||
ipengine --logfile=ipengine${eid}.log | ||||
""" | ||||
Justin Riley
|
r2677 | class LSFEngineSet(PBSEngineSet): | ||
name = 'LSF' | ||||
submit_command = 'bsub' | ||||
delete_command = 'bkill' | ||||
Justin Riley
|
r2679 | job_array_regexp = '#BSUB[ \t]-J+\w+\[\d+-\d+\]' | ||
job_array_template = '#BSUB -J ipengine[1-%d]' | ||||
queue_regexp = '#BSUB[ \t]+-q[ \t]+\w+' | ||||
queue_template = '#BSUB -q %s' | ||||
Justin Riley
|
r2681 | default_template="""#!/bin/sh | ||
#BSUB -J ipengine[1-%d] | ||||
Justin Riley
|
r2677 | eid=$(($LSB_JOBINDEX - 1)) | ||
ipengine --logfile=ipengine${eid}.log | ||||
""" | ||||
Justin Riley
|
r2682 | bsub_wrapper="""#!/bin/sh | ||
bsub < $1 | ||||
""" | ||||
def __init__(self, template_file, queue, **kwargs): | ||||
self._bsub_wrapper = self._make_bsub_wrapper() | ||||
self.submit_command = self._bsub_wrapper.name | ||||
PBSEngineSet.__init__(self,template_file, queue, **kwargs) | ||||
def _make_bsub_wrapper(self): | ||||
bsub_wrapper = tempfile.NamedTemporaryFile() | ||||
bsub_wrapper.write(self.bsub_wrapper) | ||||
bsub_wrapper.file.close() | ||||
os.chmod(bsub_wrapper.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) | ||||
return bsub_wrapper | ||||
Justin Riley
|
r2677 | |||
Satrajit Ghosh
|
r3061 | sshx_template_prefix="""#!/bin/sh | ||
""" | ||||
Fernando Perez
|
r3370 | sshx_template_suffix=""""$@" > /dev/null 2>&1 & | ||
Brian Granger
|
r1832 | echo $! | ||
""" | ||||
engine_killer_template="""#!/bin/sh | ||||
ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM | ||||
""" | ||||
Brian Granger
|
r1830 | |||
Satrajit Ghosh
|
r3063 | def escape_strings(val): | ||
val = val.replace('(','\(') | ||||
val = val.replace(')','\)') | ||||
if ' ' in val: | ||||
val = '"%s"'%val | ||||
return val | ||||
Brian Granger
|
r1832 | class SSHEngineSet(object): | ||
Satrajit Ghosh
|
r3061 | sshx_template_prefix=sshx_template_prefix | ||
sshx_template_suffix=sshx_template_suffix | ||||
Brian Granger
|
r1832 | engine_killer_template=engine_killer_template | ||
Brian Granger
|
r1830 | |||
Satrajit Ghosh
|
r3061 | def __init__(self, engine_hosts, sshx=None, copyenvs=None, ipengine="ipengine"): | ||
Brian Granger
|
r1832 | """Start a controller on localhost and engines using ssh. | ||
The engine_hosts argument is a dict with hostnames as keys and | ||||
the number of engine (int) as values. sshx is the name of a local | ||||
file that will be used to run remote commands. This file is used | ||||
to setup the environment properly. | ||||
""" | ||||
Brian Granger
|
r1830 | self.temp_dir = tempfile.gettempdir() | ||
Brian Granger
|
r1832 | if sshx is not None: | ||
Brian Granger
|
r1830 | self.sshx = sshx | ||
else: | ||||
Brian Granger
|
r1832 | # Write the sshx.sh file locally from our template. | ||
self.sshx = os.path.join( | ||||
self.temp_dir, | ||||
'%s-main-sshx.sh' % os.environ['USER'] | ||||
) | ||||
Brian Granger
|
r1830 | f = open(self.sshx, 'w') | ||
Satrajit Ghosh
|
r3061 | f.writelines(self.sshx_template_prefix) | ||
if copyenvs: | ||||
Satrajit Ghosh
|
r3063 | for key, val in sorted(os.environ.items()): | ||
newval = escape_strings(val) | ||||
f.writelines('export %s=%s\n'%(key,newval)) | ||||
Satrajit Ghosh
|
r3061 | f.writelines(self.sshx_template_suffix) | ||
Brian Granger
|
r1830 | f.close() | ||
self.engine_command = ipengine | ||||
self.engine_hosts = engine_hosts | ||||
Brian Granger
|
r1832 | # Write the engine killer script file locally from our template. | ||
self.engine_killer = os.path.join( | ||||
self.temp_dir, | ||||
'%s-local-engine_killer.sh' % os.environ['USER'] | ||||
) | ||||
Brian Granger
|
r1830 | f = open(self.engine_killer, 'w') | ||
f.writelines(self.engine_killer_template) | ||||
f.close() | ||||
def start(self, send_furl=False): | ||||
Brian Granger
|
r1832 | dlist = [] | ||
Brian Granger
|
r1830 | for host in self.engine_hosts.keys(): | ||
count = self.engine_hosts[host] | ||||
Brian Granger
|
r1832 | d = self._start(host, count, send_furl) | ||
dlist.append(d) | ||||
return gatherBoth(dlist, consumeErrors=True) | ||||
Brian Granger
|
r1830 | |||
Brian Granger
|
r1832 | def _start(self, hostname, count=1, send_furl=False): | ||
Brian Granger
|
r1830 | if send_furl: | ||
Brian Granger
|
r1832 | d = self._scp_furl(hostname) | ||
Brian Granger
|
r1830 | else: | ||
Brian Granger
|
r1832 | d = defer.succeed(None) | ||
d.addCallback(lambda r: self._scp_sshx(hostname)) | ||||
d.addCallback(lambda r: self._ssh_engine(hostname, count)) | ||||
return d | ||||
Brian Granger
|
r1830 | |||
Brian Granger
|
r1832 | def _scp_furl(self, hostname): | ||
scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname) | ||||
cmd_list = scp_cmd.split() | ||||
cmd_list[1] = os.path.expanduser(cmd_list[1]) | ||||
log.msg('Copying furl file: %s' % scp_cmd) | ||||
d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ) | ||||
return d | ||||
def _scp_sshx(self, hostname): | ||||
scp_cmd = "scp %s %s:%s/%s-sshx.sh" % ( | ||||
self.sshx, hostname, | ||||
self.temp_dir, os.environ['USER'] | ||||
) | ||||
log.msg("Copying sshx: %s" % scp_cmd) | ||||
sshx_scp = scp_cmd.split() | ||||
d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ) | ||||
return d | ||||
def _ssh_engine(self, hostname, count): | ||||
exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % ( | ||||
hostname, self.temp_dir, | ||||
os.environ['USER'], self.engine_command | ||||
) | ||||
cmds = exec_engine.split() | ||||
dlist = [] | ||||
log.msg("about to start engines...") | ||||
for i in range(count): | ||||
log.msg('Starting engines: %s' % exec_engine) | ||||
d = getProcessOutput(cmds[0], cmds[1:], env=os.environ) | ||||
dlist.append(d) | ||||
return gatherBoth(dlist, consumeErrors=True) | ||||
def kill(self): | ||||
dlist = [] | ||||
for host in self.engine_hosts.keys(): | ||||
d = self._killall(host) | ||||
dlist.append(d) | ||||
return gatherBoth(dlist, consumeErrors=True) | ||||
def _killall(self, hostname): | ||||
d = self._scp_engine_killer(hostname) | ||||
d.addCallback(lambda r: self._ssh_kill(hostname)) | ||||
# d.addErrback(self._exec_err) | ||||
return d | ||||
def _scp_engine_killer(self, hostname): | ||||
scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % ( | ||||
self.engine_killer, | ||||
hostname, | ||||
self.temp_dir, | ||||
os.environ['USER'] | ||||
) | ||||
Brian Granger
|
r1830 | cmds = scp_cmd.split() | ||
Brian Granger
|
r1832 | log.msg('Copying engine_killer: %s' % scp_cmd) | ||
Brian Granger
|
r1830 | d = getProcessOutput(cmds[0], cmds[1:], env=os.environ) | ||
Brian Granger
|
r1832 | return d | ||
def _ssh_kill(self, hostname): | ||||
kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % ( | ||||
hostname, | ||||
self.temp_dir, | ||||
os.environ['USER'] | ||||
) | ||||
log.msg('Killing engine: %s' % kill_cmd) | ||||
kill_cmd = kill_cmd.split() | ||||
d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ) | ||||
return d | ||||
Brian Granger
|
r1830 | |||
Brian Granger
|
r1832 | def _exec_err(self, r): | ||
log.msg(r) | ||||
Brian Granger
|
r1770 | |||
Brian Granger
|
r1774 | #----------------------------------------------------------------------------- | ||
# Main functions for the different types of clusters | ||||
#----------------------------------------------------------------------------- | ||||
# TODO: | ||||
# The logic in these codes should be moved into classes like LocalCluster | ||||
# MpirunCluster, PBSCluster, etc. This would remove alot of the duplications. | ||||
# The main functions should then just parse the command line arguments, create | ||||
# the appropriate class and call a 'start' method. | ||||
Brian Granger
|
r1944 | |||
Administrator
|
r1815 | def check_security(args, cont_args): | ||
Brian Granger
|
r1958 | """Check to see if we should run with SSL support.""" | ||
Administrator
|
r1815 | if (not args.x or not args.y) and not have_crypto: | ||
log.err(""" | ||||
OpenSSL/pyOpenSSL is not available, so we can't run in secure mode. | ||||
Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""") | ||||
reactor.stop() | ||||
return False | ||||
Brian Granger
|
r1771 | if args.x: | ||
cont_args.append('-x') | ||||
if args.y: | ||||
cont_args.append('-y') | ||||
Administrator
|
r1815 | return True | ||
Brian Granger
|
r1944 | |||
Brian Granger
|
r1883 | def check_reuse(args, cont_args): | ||
Brian Granger
|
r1958 | """Check to see if we should try to resuse FURL files.""" | ||
Brian Granger
|
r1883 | if args.r: | ||
cont_args.append('-r') | ||||
if args.client_port == 0 or args.engine_port == 0: | ||||
log.err(""" | ||||
To reuse FURL files, you must also set the client and engine ports using | ||||
the --client-port and --engine-port options.""") | ||||
reactor.stop() | ||||
return False | ||||
cont_args.append('--client-port=%i' % args.client_port) | ||||
cont_args.append('--engine-port=%i' % args.engine_port) | ||||
return True | ||||
Brian Granger
|
r1830 | |||
Brian Granger
|
r1944 | |||
def _err_and_stop(f): | ||||
Brian Granger
|
r1958 | """Errback to log a failure and halt the reactor on a fatal error.""" | ||
Brian Granger
|
r1944 | log.err(f) | ||
reactor.stop() | ||||
def _delay_start(cont_pid, start_engines, furl_file, reuse): | ||||
Brian Granger
|
r1958 | """Wait for controller to create FURL files and the start the engines.""" | ||
Brian Granger
|
r1944 | if not reuse: | ||
if os.path.isfile(furl_file): | ||||
os.unlink(furl_file) | ||||
log.msg('Waiting for controller to finish starting...') | ||||
d = wait_for_file(furl_file, delay=0.2, max_tries=50) | ||||
d.addCallback(lambda _: log.msg('Controller started')) | ||||
d.addCallback(lambda _: start_engines(cont_pid)) | ||||
return d | ||||
Administrator
|
r1815 | def main_local(args): | ||
cont_args = [] | ||||
cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller')) | ||||
Brian Granger
|
r1883 | |||
Administrator
|
r1815 | # Check security settings before proceeding | ||
Brian Granger
|
r1826 | if not check_security(args, cont_args): | ||
return | ||||
Brian Granger
|
r1883 | |||
# See if we are reusing FURL files | ||||
if not check_reuse(args, cont_args): | ||||
return | ||||
Brian Granger
|
r1770 | cl = ControllerLauncher(extra_args=cont_args) | ||
dstart = cl.start() | ||||
def start_engines(cont_pid): | ||||
engine_args = [] | ||||
engine_args.append('--logfile=%s' % \ | ||||
pjoin(args.logdir,'ipengine%s-' % cont_pid)) | ||||
eset = LocalEngineSet(extra_args=engine_args) | ||||
def shutdown(signum, frame): | ||||
log.msg('Stopping local cluster') | ||||
# We are still playing with the times here, but these seem | ||||
# to be reliable in allowing everything to exit cleanly. | ||||
eset.interrupt_then_kill(0.5) | ||||
cl.interrupt_then_kill(0.5) | ||||
reactor.callLater(1.0, reactor.stop) | ||||
signal.signal(signal.SIGINT,shutdown) | ||||
d = eset.start(args.n) | ||||
return d | ||||
Brian Granger
|
r1944 | config = kernel_config_manager.get_config_obj() | ||
furl_file = config['controller']['engine_furl_file'] | ||||
dstart.addCallback(_delay_start, start_engines, furl_file, args.r) | ||||
dstart.addErrback(_err_and_stop) | ||||
Brian Granger
|
r1770 | |||
Brian Granger
|
r1830 | |||
Brian Granger
|
r1880 | def main_mpi(args): | ||
Brian Granger
|
r1770 | cont_args = [] | ||
cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller')) | ||||
Brian Granger
|
r1883 | |||
Administrator
|
r1815 | # Check security settings before proceeding | ||
Brian Granger
|
r1826 | if not check_security(args, cont_args): | ||
return | ||||
Brian Granger
|
r1883 | |||
# See if we are reusing FURL files | ||||
if not check_reuse(args, cont_args): | ||||
return | ||||
Brian Granger
|
r1770 | cl = ControllerLauncher(extra_args=cont_args) | ||
dstart = cl.start() | ||||
def start_engines(cont_pid): | ||||
Brian Granger
|
r1880 | raw_args = [args.cmd] | ||
Brian Granger
|
r1770 | raw_args.extend(['-n',str(args.n)]) | ||
raw_args.append('ipengine') | ||||
raw_args.append('-l') | ||||
raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid)) | ||||
Brian Granger
|
r1788 | if args.mpi: | ||
raw_args.append('--mpi=%s' % args.mpi) | ||||
Brian Granger
|
r1770 | eset = ProcessLauncher(raw_args) | ||
def shutdown(signum, frame): | ||||
log.msg('Stopping local cluster') | ||||
# We are still playing with the times here, but these seem | ||||
# to be reliable in allowing everything to exit cleanly. | ||||
eset.interrupt_then_kill(1.0) | ||||
cl.interrupt_then_kill(1.0) | ||||
reactor.callLater(2.0, reactor.stop) | ||||
signal.signal(signal.SIGINT,shutdown) | ||||
d = eset.start() | ||||
return d | ||||
Brian Granger
|
r1944 | config = kernel_config_manager.get_config_obj() | ||
furl_file = config['controller']['engine_furl_file'] | ||||
dstart.addCallback(_delay_start, start_engines, furl_file, args.r) | ||||
dstart.addErrback(_err_and_stop) | ||||
Brian Granger
|
r1770 | |||
Brian Granger
|
r1830 | |||
Brian Granger
|
r1770 | def main_pbs(args): | ||
Brian Granger
|
r1772 | cont_args = [] | ||
cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller')) | ||||
Brian Granger
|
r1883 | |||
Administrator
|
r1815 | # Check security settings before proceeding | ||
Brian Granger
|
r1826 | if not check_security(args, cont_args): | ||
return | ||||
Brian Granger
|
r1883 | |||
# See if we are reusing FURL files | ||||
if not check_reuse(args, cont_args): | ||||
return | ||||
Justin Riley
|
r2678 | |||
if args.pbsscript and not os.path.isfile(args.pbsscript): | ||||
log.err('PBS script does not exist: %s' % args.pbsscript) | ||||
return | ||||
Brian Granger
|
r1883 | |||
Brian Granger
|
r1772 | cl = ControllerLauncher(extra_args=cont_args) | ||
Brian Granger
|
r1770 | dstart = cl.start() | ||
def start_engines(r): | ||||
Justin Riley
|
r2679 | pbs_set = PBSEngineSet(args.pbsscript, args.pbsqueue) | ||
Brian Granger
|
r1773 | def shutdown(signum, frame): | ||
Justin Riley
|
r2678 | log.msg('Stopping PBS cluster') | ||
Brian Granger
|
r1773 | d = pbs_set.kill() | ||
d.addBoth(lambda _: cl.interrupt_then_kill(1.0)) | ||||
d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop)) | ||||
signal.signal(signal.SIGINT,shutdown) | ||||
Brian Granger
|
r1770 | d = pbs_set.start(args.n) | ||
return d | ||||
Brian Granger
|
r1944 | config = kernel_config_manager.get_config_obj() | ||
furl_file = config['controller']['engine_furl_file'] | ||||
dstart.addCallback(_delay_start, start_engines, furl_file, args.r) | ||||
dstart.addErrback(_err_and_stop) | ||||
Brian Granger
|
r1770 | |||
Satrajit Ghosh
|
r2621 | def main_sge(args): | ||
cont_args = [] | ||||
cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller')) | ||||
# Check security settings before proceeding | ||||
if not check_security(args, cont_args): | ||||
return | ||||
# See if we are reusing FURL files | ||||
if not check_reuse(args, cont_args): | ||||
return | ||||
Justin Riley
|
r2675 | |||
if args.sgescript and not os.path.isfile(args.sgescript): | ||||
log.err('SGE script does not exist: %s' % args.sgescript) | ||||
return | ||||
Satrajit Ghosh
|
r2621 | |||
cl = ControllerLauncher(extra_args=cont_args) | ||||
dstart = cl.start() | ||||
def start_engines(r): | ||||
Justin Riley
|
r2679 | sge_set = SGEEngineSet(args.sgescript, args.sgequeue) | ||
Satrajit Ghosh
|
r2621 | def shutdown(signum, frame): | ||
log.msg('Stopping sge cluster') | ||||
d = sge_set.kill() | ||||
d.addBoth(lambda _: cl.interrupt_then_kill(1.0)) | ||||
d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop)) | ||||
signal.signal(signal.SIGINT,shutdown) | ||||
d = sge_set.start(args.n) | ||||
return d | ||||
config = kernel_config_manager.get_config_obj() | ||||
furl_file = config['controller']['engine_furl_file'] | ||||
dstart.addCallback(_delay_start, start_engines, furl_file, args.r) | ||||
dstart.addErrback(_err_and_stop) | ||||
Justin Riley
|
r2678 | def main_lsf(args): | ||
cont_args = [] | ||||
cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller')) | ||||
# Check security settings before proceeding | ||||
if not check_security(args, cont_args): | ||||
return | ||||
# See if we are reusing FURL files | ||||
if not check_reuse(args, cont_args): | ||||
return | ||||
if args.lsfscript and not os.path.isfile(args.lsfscript): | ||||
log.err('LSF script does not exist: %s' % args.lsfscript) | ||||
return | ||||
cl = ControllerLauncher(extra_args=cont_args) | ||||
dstart = cl.start() | ||||
def start_engines(r): | ||||
Justin Riley
|
r2679 | lsf_set = LSFEngineSet(args.lsfscript, args.lsfqueue) | ||
Justin Riley
|
r2678 | def shutdown(signum, frame): | ||
log.msg('Stopping LSF cluster') | ||||
d = lsf_set.kill() | ||||
d.addBoth(lambda _: cl.interrupt_then_kill(1.0)) | ||||
d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop)) | ||||
signal.signal(signal.SIGINT,shutdown) | ||||
d = lsf_set.start(args.n) | ||||
return d | ||||
config = kernel_config_manager.get_config_obj() | ||||
furl_file = config['controller']['engine_furl_file'] | ||||
dstart.addCallback(_delay_start, start_engines, furl_file, args.r) | ||||
dstart.addErrback(_err_and_stop) | ||||
Brian Granger
|
r1770 | |||
Brian Granger
|
r1830 | def main_ssh(args): | ||
Brian Granger
|
r1832 | """Start a controller on localhost and engines using ssh. | ||
Your clusterfile should look like:: | ||||
send_furl = False # True, if you want | ||||
engines = { | ||||
'engine_host1' : engine_count, | ||||
'engine_host2' : engine_count2 | ||||
} | ||||
""" | ||||
Brian Granger
|
r1830 | clusterfile = {} | ||
execfile(args.clusterfile, clusterfile) | ||||
if not clusterfile.has_key('send_furl'): | ||||
clusterfile['send_furl'] = False | ||||
cont_args = [] | ||||
cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller')) | ||||
Brian Granger
|
r1832 | |||
# Check security settings before proceeding | ||||
if not check_security(args, cont_args): | ||||
return | ||||
Brian Granger
|
r1883 | # See if we are reusing FURL files | ||
if not check_reuse(args, cont_args): | ||||
return | ||||
Brian Granger
|
r1830 | cl = ControllerLauncher(extra_args=cont_args) | ||
dstart = cl.start() | ||||
def start_engines(cont_pid): | ||||
Satrajit Ghosh
|
r3061 | ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx, | ||
copyenvs=args.copyenvs) | ||||
Brian Granger
|
r1830 | def shutdown(signum, frame): | ||
Brian Granger
|
r1832 | d = ssh_set.kill() | ||
cl.interrupt_then_kill(1.0) | ||||
Brian Granger
|
r1830 | reactor.callLater(2.0, reactor.stop) | ||
signal.signal(signal.SIGINT,shutdown) | ||||
Brian Granger
|
r1832 | d = ssh_set.start(clusterfile['send_furl']) | ||
return d | ||||
Brian Granger
|
r1944 | config = kernel_config_manager.get_config_obj() | ||
furl_file = config['controller']['engine_furl_file'] | ||||
dstart.addCallback(_delay_start, start_engines, furl_file, args.r) | ||||
dstart.addErrback(_err_and_stop) | ||||
Brian Granger
|
r1830 | |||
Brian Granger
|
r1770 | def get_args(): | ||
Brian Granger
|
r1771 | base_parser = argparse.ArgumentParser(add_help=False) | ||
base_parser.add_argument( | ||||
Brian Granger
|
r1883 | '-r', | ||
action='store_true', | ||||
dest='r', | ||||
help='try to reuse FURL files. Use with --client-port and --engine-port' | ||||
) | ||||
base_parser.add_argument( | ||||
'--client-port', | ||||
type=int, | ||||
dest='client_port', | ||||
help='the port the controller will listen on for client connections', | ||||
default=0 | ||||
) | ||||
base_parser.add_argument( | ||||
'--engine-port', | ||||
type=int, | ||||
dest='engine_port', | ||||
help='the port the controller will listen on for engine connections', | ||||
default=0 | ||||
) | ||||
base_parser.add_argument( | ||||
Brian Granger
|
r1771 | '-x', | ||
action='store_true', | ||||
dest='x', | ||||
help='turn off client security' | ||||
) | ||||
base_parser.add_argument( | ||||
'-y', | ||||
action='store_true', | ||||
dest='y', | ||||
help='turn off engine security' | ||||
) | ||||
base_parser.add_argument( | ||||
"--logdir", | ||||
type=str, | ||||
dest="logdir", | ||||
help="directory to put log files (default=$IPYTHONDIR/log)", | ||||
default=pjoin(get_ipython_dir(),'log') | ||||
) | ||||
base_parser.add_argument( | ||||
"-n", | ||||
"--num", | ||||
type=int, | ||||
dest="n", | ||||
default=2, | ||||
help="the number of engines to start" | ||||
) | ||||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r1771 | parser = argparse.ArgumentParser( | ||
description='IPython cluster startup. This starts a controller and\ | ||||
Brian Granger
|
r1959 | engines using various approaches. Use the IPYTHONDIR environment\ | ||
variable to change your IPython directory from the default of\ | ||||
.ipython or _ipython. The log and security subdirectories of your\ | ||||
IPython directory will be used by this script for log files and\ | ||||
security files.' | ||||
Brian Granger
|
r1771 | ) | ||
subparsers = parser.add_subparsers( | ||||
help='available cluster types. For help, do "ipcluster TYPE --help"') | ||||
Brian Granger
|
r1613 | |||
Brian Granger
|
r1771 | parser_local = subparsers.add_parser( | ||
'local', | ||||
help='run a local cluster', | ||||
parents=[base_parser] | ||||
) | ||||
Brian Granger
|
r1770 | parser_local.set_defaults(func=main_local) | ||
Brian Granger
|
r1771 | parser_mpirun = subparsers.add_parser( | ||
'mpirun', | ||||
Brian Granger
|
r1880 | help='run a cluster using mpirun (mpiexec also works)', | ||
Brian Granger
|
r1771 | parents=[base_parser] | ||
) | ||||
parser_mpirun.add_argument( | ||||
"--mpi", | ||||
type=str, | ||||
Brian Granger
|
r1788 | dest="mpi", # Don't put a default here to allow no MPI support | ||
Brian Granger
|
r1771 | help="how to call MPI_Init (default=mpi4py)" | ||
) | ||||
Brian Granger
|
r1880 | parser_mpirun.set_defaults(func=main_mpi, cmd='mpirun') | ||
parser_mpiexec = subparsers.add_parser( | ||||
'mpiexec', | ||||
help='run a cluster using mpiexec (mpirun also works)', | ||||
parents=[base_parser] | ||||
) | ||||
parser_mpiexec.add_argument( | ||||
"--mpi", | ||||
type=str, | ||||
dest="mpi", # Don't put a default here to allow no MPI support | ||||
help="how to call MPI_Init (default=mpi4py)" | ||||
) | ||||
parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec') | ||||
Justin Riley
|
r2679 | |||
Brian Granger
|
r1772 | parser_pbs = subparsers.add_parser( | ||
Justin Riley
|
r2679 | 'pbs', | ||
Brian Granger
|
r1772 | help='run a pbs cluster', | ||
parents=[base_parser] | ||||
) | ||||
parser_pbs.add_argument( | ||||
Justin Riley
|
r2679 | '-s', | ||
Brian Granger
|
r1772 | '--pbs-script', | ||
Justin Riley
|
r2679 | type=str, | ||
Brian Granger
|
r1772 | dest='pbsscript', | ||
help='PBS script template', | ||||
Justin Riley
|
r2676 | default='' | ||
Brian Granger
|
r1772 | ) | ||
Justin Riley
|
r2679 | parser_pbs.add_argument( | ||
'-q', | ||||
'--queue', | ||||
type=str, | ||||
dest='pbsqueue', | ||||
help='PBS queue to use when starting the engines', | ||||
default=None, | ||||
) | ||||
Brian Granger
|
r1770 | parser_pbs.set_defaults(func=main_pbs) | ||
Justin Riley
|
r2679 | |||
Satrajit Ghosh
|
r2621 | parser_sge = subparsers.add_parser( | ||
Justin Riley
|
r2679 | 'sge', | ||
Satrajit Ghosh
|
r2622 | help='run an sge cluster', | ||
Satrajit Ghosh
|
r2621 | parents=[base_parser] | ||
) | ||||
parser_sge.add_argument( | ||||
Justin Riley
|
r2679 | '-s', | ||
Satrajit Ghosh
|
r2621 | '--sge-script', | ||
Justin Riley
|
r2679 | type=str, | ||
Satrajit Ghosh
|
r2621 | dest='sgescript', | ||
help='SGE script template', | ||||
Justin Riley
|
r2675 | default='' # SGEEngineSet will create one if not specified | ||
Satrajit Ghosh
|
r2621 | ) | ||
Justin Riley
|
r2679 | parser_sge.add_argument( | ||
'-q', | ||||
'--queue', | ||||
type=str, | ||||
dest='sgequeue', | ||||
help='SGE queue to use when starting the engines', | ||||
default=None, | ||||
) | ||||
Satrajit Ghosh
|
r2621 | parser_sge.set_defaults(func=main_sge) | ||
Justin Riley
|
r2677 | |||
parser_lsf = subparsers.add_parser( | ||||
Justin Riley
|
r2679 | 'lsf', | ||
Justin Riley
|
r2677 | help='run an lsf cluster', | ||
parents=[base_parser] | ||||
) | ||||
Justin Riley
|
r2679 | |||
Justin Riley
|
r2677 | parser_lsf.add_argument( | ||
Justin Riley
|
r2679 | '-s', | ||
Justin Riley
|
r2677 | '--lsf-script', | ||
Justin Riley
|
r2679 | type=str, | ||
Justin Riley
|
r2677 | dest='lsfscript', | ||
help='LSF script template', | ||||
default='' # LSFEngineSet will create one if not specified | ||||
) | ||||
Justin Riley
|
r2679 | |||
parser_lsf.add_argument( | ||||
'-q', | ||||
'--queue', | ||||
type=str, | ||||
dest='lsfqueue', | ||||
help='LSF queue to use when starting the engines', | ||||
default=None, | ||||
) | ||||
Justin Riley
|
r2678 | parser_lsf.set_defaults(func=main_lsf) | ||
Satrajit Ghosh
|
r2621 | |||
Brian Granger
|
r1830 | parser_ssh = subparsers.add_parser( | ||
'ssh', | ||||
help='run a cluster using ssh, should have ssh-keys setup', | ||||
parents=[base_parser] | ||||
) | ||||
parser_ssh.add_argument( | ||||
Satrajit Ghosh
|
r3061 | '-e', | ||
'--copyenvs', | ||||
action='store_true', | ||||
dest='copyenvs', | ||||
help='Copy current shell environment to remote location', | ||||
default=False, | ||||
) | ||||
parser_ssh.add_argument( | ||||
Brian Granger
|
r1830 | '--clusterfile', | ||
type=str, | ||||
dest='clusterfile', | ||||
help='python file describing the cluster', | ||||
default='clusterfile.py', | ||||
) | ||||
parser_ssh.add_argument( | ||||
'--sshx', | ||||
type=str, | ||||
dest='sshx', | ||||
Brian Granger
|
r1832 | help='sshx launcher helper' | ||
Brian Granger
|
r1830 | ) | ||
parser_ssh.set_defaults(func=main_ssh) | ||||
Brian Granger
|
r1770 | args = parser.parse_args() | ||
return args | ||||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r1770 | def main(): | ||
args = get_args() | ||||
reactor.callWhenRunning(args.func, args) | ||||
log.startLogging(sys.stdout) | ||||
reactor.run() | ||||
if __name__ == '__main__': | ||||
Brian E Granger
|
r1234 | main() | ||