##// END OF EJS Templates
API update involving map and load-balancing
API update involving map and load-balancing

File last commit:

r3631:d72427c6
r3635:498a93f1
Show More
ipcluster.py
89 lines | 2.5 KiB | text/x-python | PythonLexer
MinRK
added simple cluster entry point
r3552 #!/usr/bin/env python
from __future__ import print_function
MinRK
resort imports in a cleaner order
r3631
import os
import sys
MinRK
ipclusterz notices if controller fails to start
r3554 import time
MinRK
added simple cluster entry point
r3552 from subprocess import Popen, PIPE
MinRK
Refactor newparallel to use Config system...
r3604 from IPython.external.argparse import ArgumentParser, SUPPRESS
MinRK
added simple cluster entry point
r3552
def _filter_arg(flag, args):
filtered = []
if flag in args:
filtered.append(flag)
idx = args.index(flag)
if len(args) > idx+1:
if not args[idx+1].startswith('-'):
filtered.append(args[idx+1])
return filtered
def filter_args(flags, args=sys.argv[1:]):
filtered = []
for flag in flags:
if isinstance(flag, (list,tuple)):
for f in flag:
filtered.extend(_filter_arg(f, args))
else:
filtered.extend(_filter_arg(flag, args))
return filtered
def _strip_arg(flag, args):
while flag in args:
idx = args.index(flag)
args.pop(idx)
if len(args) > idx:
if not args[idx].startswith('-'):
args.pop(idx)
def strip_args(flags, args=sys.argv[1:]):
args = list(args)
for flag in flags:
if isinstance(flag, (list,tuple)):
for f in flag:
_strip_arg(f, args)
else:
_strip_arg(flag, args)
return args
def launch_process(mod, args):
"""Launch a controller or engine in a subprocess."""
MinRK
Refactor newparallel to use Config system...
r3604 code = "from IPython.zmq.parallel.%s import launch_new_instance;launch_new_instance()"%mod
MinRK
added simple cluster entry point
r3552 arguments = [ sys.executable, '-c', code ] + args
blackholew = file(os.devnull, 'w')
blackholer = file(os.devnull, 'r')
MinRK
ipclusterz notices if controller fails to start
r3554 proc = Popen(arguments, stdin=blackholer, stdout=blackholew, stderr=PIPE)
MinRK
added simple cluster entry point
r3552 return proc
def main():
MinRK
Refactor newparallel to use Config system...
r3604 parser = ArgumentParser(argument_default=SUPPRESS)
MinRK
added simple cluster entry point
r3552 parser.add_argument('--n', '-n', type=int, default=1,
help="The number of engines to start.")
MinRK
Refactor newparallel to use Config system...
r3604 ns,args = parser.parse_known_args()
n = ns.n
MinRK
added simple cluster entry point
r3552
MinRK
Refactor newparallel to use Config system...
r3604 controller = launch_process('ipcontrollerapp', args)
MinRK
ipclusterz notices if controller fails to start
r3554 for i in range(10):
time.sleep(.1)
if controller.poll() is not None:
print("Controller failed to launch:")
print (controller.stderr.read())
sys.exit(255)
MinRK
Refactor newparallel to use Config system...
r3604 print("Launched Controller")
engines = [ launch_process('ipengineapp', args+['--ident', 'engine-%i'%i]) for i in range(n) ]
print("%i Engines started"%n)
MinRK
added simple cluster entry point
r3552
def wait_quietly(p):
try:
p.wait()
except KeyboardInterrupt:
pass
MinRK
ipclusterz notices if controller fails to start
r3554
MinRK
added simple cluster entry point
r3552 wait_quietly(controller)
map(wait_quietly, engines)
MinRK
ipclusterz notices if controller fails to start
r3554 print ("Engines cleaned up.")
MinRK
added simple cluster entry point
r3552
if __name__ == '__main__':
main()