##// END OF EJS Templates
tweak dagdeps for new AsyncResult objects
tweak dagdeps for new AsyncResult objects

File last commit:

r3604:2c044319
r3606:9f1a03ab
Show More
ipcluster.py
87 lines | 2.5 KiB | text/x-python | PythonLexer
MinRK
added simple cluster entry point
r3552 #!/usr/bin/env python
from __future__ import print_function
import sys,os
MinRK
ipclusterz notices if controller fails to start
r3554 import time
MinRK
added simple cluster entry point
r3552 from subprocess import Popen, PIPE
MinRK
Refactor newparallel to use Config system...
r3604 from IPython.external.argparse import ArgumentParser, SUPPRESS
MinRK
added simple cluster entry point
r3552
def _filter_arg(flag, args):
filtered = []
if flag in args:
filtered.append(flag)
idx = args.index(flag)
if len(args) > idx+1:
if not args[idx+1].startswith('-'):
filtered.append(args[idx+1])
return filtered
def filter_args(flags, args=sys.argv[1:]):
filtered = []
for flag in flags:
if isinstance(flag, (list,tuple)):
for f in flag:
filtered.extend(_filter_arg(f, args))
else:
filtered.extend(_filter_arg(flag, args))
return filtered
def _strip_arg(flag, args):
while flag in args:
idx = args.index(flag)
args.pop(idx)
if len(args) > idx:
if not args[idx].startswith('-'):
args.pop(idx)
def strip_args(flags, args=sys.argv[1:]):
args = list(args)
for flag in flags:
if isinstance(flag, (list,tuple)):
for f in flag:
_strip_arg(f, args)
else:
_strip_arg(flag, args)
return args
def launch_process(mod, args):
"""Launch a controller or engine in a subprocess."""
MinRK
Refactor newparallel to use Config system...
r3604 code = "from IPython.zmq.parallel.%s import launch_new_instance;launch_new_instance()"%mod
MinRK
added simple cluster entry point
r3552 arguments = [ sys.executable, '-c', code ] + args
blackholew = file(os.devnull, 'w')
blackholer = file(os.devnull, 'r')
MinRK
ipclusterz notices if controller fails to start
r3554 proc = Popen(arguments, stdin=blackholer, stdout=blackholew, stderr=PIPE)
MinRK
added simple cluster entry point
r3552 return proc
def main():
MinRK
Refactor newparallel to use Config system...
r3604 parser = ArgumentParser(argument_default=SUPPRESS)
MinRK
added simple cluster entry point
r3552 parser.add_argument('--n', '-n', type=int, default=1,
help="The number of engines to start.")
MinRK
Refactor newparallel to use Config system...
r3604 ns,args = parser.parse_known_args()
n = ns.n
MinRK
added simple cluster entry point
r3552
MinRK
Refactor newparallel to use Config system...
r3604 controller = launch_process('ipcontrollerapp', args)
MinRK
ipclusterz notices if controller fails to start
r3554 for i in range(10):
time.sleep(.1)
if controller.poll() is not None:
print("Controller failed to launch:")
print (controller.stderr.read())
sys.exit(255)
MinRK
Refactor newparallel to use Config system...
r3604 print("Launched Controller")
engines = [ launch_process('ipengineapp', args+['--ident', 'engine-%i'%i]) for i in range(n) ]
print("%i Engines started"%n)
MinRK
added simple cluster entry point
r3552
def wait_quietly(p):
try:
p.wait()
except KeyboardInterrupt:
pass
MinRK
ipclusterz notices if controller fails to start
r3554
MinRK
added simple cluster entry point
r3552 wait_quietly(controller)
map(wait_quietly, engines)
MinRK
ipclusterz notices if controller fails to start
r3554 print ("Engines cleaned up.")
MinRK
added simple cluster entry point
r3552
if __name__ == '__main__':
main()