From e2a7b4363cc40985befa5a911a1c3048a8c84285 2011-04-08 00:38:16 From: Fernando Perez Date: 2011-04-08 00:38:16 Subject: [PATCH] Add little soma workflow example --- diff --git a/examples/workflow/client.py b/examples/workflow/client.py new file mode 100644 index 0000000..5346b07 --- /dev/null +++ b/examples/workflow/client.py @@ -0,0 +1,3 @@ +from IPython.zmq.parallel.client import Client + +client = Client() diff --git a/examples/workflow/job_wrapper.py b/examples/workflow/job_wrapper.py new file mode 100755 index 0000000..55448ca --- /dev/null +++ b/examples/workflow/job_wrapper.py @@ -0,0 +1,21 @@ +#!/usr/bin/env python +"""Python wrapper around a submitted workflow job. + +In reality this would be a more sophisticated script, here we only illustrate +the basic idea by considering that a submitted 'job' is a Python string to be +executed. +""" + +import sys + +argv = sys.argv + +from IPython.zmq.parallel.engine import main + +ns = {} + +# job +exec sys.argv[1] in ns + +# start engine with job namespace +main([], user_ns=ns) diff --git a/examples/workflow/wmanager.py b/examples/workflow/wmanager.py new file mode 100644 index 0000000..a6ce80c --- /dev/null +++ b/examples/workflow/wmanager.py @@ -0,0 +1,44 @@ +"""Mock workflow manager. + +This is a mock work manager whose submitted 'jobs' simply consist of executing +a python string. What we want is to see the implementation of the ipython +controller part. +""" + +from __future__ import print_function + +import atexit +import sys + +from subprocess import Popen + +def cleanup(controller, engines): + """Cleanup routine to shut down all subprocesses we opened.""" + import signal, time + + print('Starting cleanup') + print('Stopping engines...') + for e in engines: + e.send_signal(signal.SIGINT) + print('Stopping controller...') + # so it can shut down its queues + controller.send_signal(signal.SIGINT) + time.sleep(0.1) + print('Killing controller...') + controller.kill() + print('Cleanup done') + + +if __name__ == '__main__': + + # Start controller in separate process + cont = Popen(['python', '-m', 'IPython.zmq.parallel.controller']) + print('Started controller') + + # "Submit jobs" + eng = [] + for i in range(4): + eng.append(Popen(['python', 'job_wrapper.py','x=%s' % i])) + + # Ensure that all subpro + atexit.register(lambda : cleanup(cont, eng))