##// END OF EJS Templates
general parallel code cleanup
general parallel code cleanup

File last commit:

r3554:97154617
r3556:b8dd49c8
Show More
ipcluster.py
92 lines | 2.7 KiB | text/x-python | PythonLexer
#!/usr/bin/env python
from __future__ import print_function
import sys,os
import time
from subprocess import Popen, PIPE
from entry_point import parse_url
from controller import make_argument_parser
def _filter_arg(flag, args):
filtered = []
if flag in args:
filtered.append(flag)
idx = args.index(flag)
if len(args) > idx+1:
if not args[idx+1].startswith('-'):
filtered.append(args[idx+1])
return filtered
def filter_args(flags, args=sys.argv[1:]):
filtered = []
for flag in flags:
if isinstance(flag, (list,tuple)):
for f in flag:
filtered.extend(_filter_arg(f, args))
else:
filtered.extend(_filter_arg(flag, args))
return filtered
def _strip_arg(flag, args):
while flag in args:
idx = args.index(flag)
args.pop(idx)
if len(args) > idx:
if not args[idx].startswith('-'):
args.pop(idx)
def strip_args(flags, args=sys.argv[1:]):
args = list(args)
for flag in flags:
if isinstance(flag, (list,tuple)):
for f in flag:
_strip_arg(f, args)
else:
_strip_arg(flag, args)
return args
def launch_process(mod, args):
"""Launch a controller or engine in a subprocess."""
code = "from IPython.zmq.parallel.%s import main;main()"%mod
arguments = [ sys.executable, '-c', code ] + args
blackholew = file(os.devnull, 'w')
blackholer = file(os.devnull, 'r')
proc = Popen(arguments, stdin=blackholer, stdout=blackholew, stderr=PIPE)
return proc
def main():
parser = make_argument_parser()
parser.add_argument('--n', '-n', type=int, default=1,
help="The number of engines to start.")
args = parser.parse_args()
parse_url(args)
controller_args = strip_args([('--n','-n')])
engine_args = filter_args(['--url', '--regport', '--logport', '--ip',
'--transport','--loglevel','--packer'])+['--ident']
controller = launch_process('controller', controller_args)
for i in range(10):
time.sleep(.1)
if controller.poll() is not None:
print("Controller failed to launch:")
print (controller.stderr.read())
sys.exit(255)
print("Launched Controller at %s"%args.url)
engines = [ launch_process('engine', engine_args+['engine-%i'%i]) for i in range(args.n) ]
print("%i Engines started"%args.n)
def wait_quietly(p):
try:
p.wait()
except KeyboardInterrupt:
pass
wait_quietly(controller)
map(wait_quietly, engines)
print ("Engines cleaned up.")
if __name__ == '__main__':
main()