From 8d078bccd24ac1fdbc629ad56b518726cc7a1fc3 2011-04-08 00:38:16 From: MinRK Date: 2011-04-08 00:38:16 Subject: [PATCH] updated newparallel examples, moved into docs --- diff --git a/examples/demo/dag/dagdeps.py b/docs/examples/newparallel/demo/dag/dagdeps.py similarity index 88% rename from examples/demo/dag/dagdeps.py rename to docs/examples/newparallel/demo/dag/dagdeps.py index 8291962..c3b6837 100644 --- a/examples/demo/dag/dagdeps.py +++ b/docs/examples/newparallel/demo/dag/dagdeps.py @@ -61,13 +61,13 @@ def submit_jobs(client, G, jobs): results[node] = client.apply(jobs[node], after=deps) return results -def validate_tree(G, times): +def validate_tree(G, results): """Validate that jobs executed after their dependencies.""" for node in G: - t = times[node] + started = results[node].metadata.started for parent in G.predecessors(node): - pt = times[parent] - assert t > pt, "%s should have happened after %s"%(node, parent) + finished = results[parent].metadata.completed + assert started > finished, "%s should have happened after %s"%(node, parent) def main(nodes, edges): """Generate a random graph, submit jobs, then validate that the @@ -76,28 +76,28 @@ def main(nodes, edges): in-degree on the y (just for spread). All arrows must point at least slightly to the right if the graph is valid. """ + from matplotlib.dates import date2num print "building DAG" G = random_dag(nodes, edges) jobs = {} - msg_ids = {} - times = {} pos = {} for node in G: jobs[node] = randomwait - client = cmod.Client('tcp://127.0.0.1:10101') + client = cmod.Client() print "submitting tasks" results = submit_jobs(client, G, jobs) print "waiting for results" client.barrier() print "done" for node in G: - times[node] = results[node].get() - pos[node] = (times[node], G.in_degree(node)+random()) + # times[node] = results[node].get() + t = date2num(results[node].metadata.started) + pos[node] = (t, G.in_degree(node)+random()) - validate_tree(G, times) + validate_tree(G, results) nx.draw(G, pos) - return G,times,msg_ids + return G,results if __name__ == '__main__': import pylab diff --git a/docs/examples/newparallel/demo/dependencies.py b/docs/examples/newparallel/demo/dependencies.py new file mode 100644 index 0000000..492690d --- /dev/null +++ b/docs/examples/newparallel/demo/dependencies.py @@ -0,0 +1,118 @@ +from IPython.zmq.parallel import error +from IPython.zmq.parallel.dependency import Dependency +from IPython.zmq.parallel.client import * + +client = Client() + +# this will only run on machines that can import numpy: +@require('numpy') +def norm(A): + from numpy.linalg import norm + return norm(A,2) + +def checkpid(pid): + """return the pid of the engine""" + import os + return os.getpid() == pid + +def checkhostname(host): + import socket + return socket.gethostname() == host + +def getpid(): + import os + return os.getpid() + +pid0 = client[0].apply_sync(getpid) + +# this will depend on the pid being that of target 0: +@depend(checkpid, pid0) +def getpid2(): + import os + return os.getpid() + +view = client[None] +view.block=True + +# will run on anything: +pids1 = [ view.apply(getpid) for i in range(len(client.ids)) ] +print pids1 +# will only run on e0: +pids2 = [ view.apply(getpid2) for i in range(len(client.ids)) ] +print pids2 + +print "now test some dependency behaviors" + +def wait(t): + import time + time.sleep(t) + return t + +# fail after some time: +def wait_and_fail(t): + import time + time.sleep(t) + return 1/0 + +successes = [ view.apply_async(wait, 1).msg_ids[0] for i in range(len(client.ids)) ] +failures = [ view.apply_async(wait_and_fail, 1).msg_ids[0] for i in range(len(client.ids)) ] + +mixed = [failures[0],successes[0]] +d1a = Dependency(mixed, mode='any', success_only=False) # yes +d1b = Dependency(mixed, mode='any', success_only=True) # yes +d2a = Dependency(mixed, mode='all', success_only=False) # yes after / no follow +d2b = Dependency(mixed, mode='all', success_only=True) # no +d3 = Dependency(failures, mode='any', success_only=True) # no +d4 = Dependency(failures, mode='any', success_only=False) # yes +d5 = Dependency(failures, mode='all', success_only=False) # yes after / no follow +d6 = Dependency(successes, mode='all', success_only=False) # yes after / no follow + +client.block = False + +r1a = client.apply(getpid, after=d1a) +r1b = client.apply(getpid, follow=d1b) +r2a = client.apply(getpid, after=d2b, follow=d2a) +r2b = client.apply(getpid, after=d2a, follow=d2b) +r3 = client.apply(getpid, after=d3) +r4a = client.apply(getpid, after=d4) +r4b = client.apply(getpid, follow=d4) +r4c = client.apply(getpid, after=d3, follow=d4) +r5 = client.apply(getpid, after=d5) +r5b = client.apply(getpid, follow=d5, after=d3) +r6 = client.apply(getpid, follow=d6) +r6b = client.apply(getpid, after=d6, follow=d2b) + +def should_fail(f): + try: + f() + except error.KernelError: + pass + else: + print 'should have raised' + # raise Exception("should have raised") + +# print r1a.msg_ids +r1a.get() +# print r1b.msg_ids +r1b.get() +# print r2a.msg_ids +should_fail(r2a.get) +# print r2b.msg_ids +should_fail(r2b.get) +# print r3.msg_ids +should_fail(r3.get) +# print r4a.msg_ids +r4a.get() +# print r4b.msg_ids +r4b.get() +# print r4c.msg_ids +should_fail(r4c.get) +# print r5.msg_ids +r5.get() +# print r5b.msg_ids +should_fail(r5b.get) +# print r6.msg_ids +should_fail(r6.get) # assuming > 1 engine +# print r6b.msg_ids +should_fail(r6b.get) +print 'done' diff --git a/docs/examples/newparallel/demo/map.py b/docs/examples/newparallel/demo/map.py new file mode 100644 index 0000000..ccaa070 --- /dev/null +++ b/docs/examples/newparallel/demo/map.py @@ -0,0 +1,36 @@ +from IPython.zmq.parallel.client import * + +client = Client() + +@remote(client, block=True) +def square(a): + """return square of a number""" + return a*a + +squares = map(square, range(42)) + +# but that blocked between each result; not exactly useful + +square.block = False + +arlist = map(square, range(42)) +# submitted very fast + +# wait for the results: +squares2 = [ r.get() for r in arlist ] + +# now the more convenient @parallel decorator, which has a map method: + +@parallel(client, block=False) +def psquare(a): + """return square of a number""" + return a*a + +# this chunks the data into n-negines jobs, not 42 jobs: +ar = psquare.map(range(42)) + +# wait for the results to be done: +squares3 = ar.get() + +print squares == squares2, squares3==squares +# True \ No newline at end of file diff --git a/examples/demo/noncopying.py b/docs/examples/newparallel/demo/noncopying.py similarity index 100% rename from examples/demo/noncopying.py rename to docs/examples/newparallel/demo/noncopying.py diff --git a/examples/demo/remotefunction.py b/docs/examples/newparallel/demo/remotefunction.py similarity index 94% rename from examples/demo/remotefunction.py rename to docs/examples/newparallel/demo/remotefunction.py index 059ca55..84a360f 100644 --- a/examples/demo/remotefunction.py +++ b/docs/examples/newparallel/demo/remotefunction.py @@ -1,6 +1,6 @@ from IPython.zmq.parallel.client import * -client = Client('tcp://127.0.0.1:10101') +client = Client() @remote(client, bound=True) def getkey(name): diff --git a/examples/demo/throughput.py b/docs/examples/newparallel/demo/throughput.py similarity index 97% rename from examples/demo/throughput.py rename to docs/examples/newparallel/demo/throughput.py index 787284d..57a85f3 100644 --- a/examples/demo/throughput.py +++ b/docs/examples/newparallel/demo/throughput.py @@ -14,7 +14,7 @@ def echo(s=''): return s def time_throughput(nmessages, t=0, f=wait): - client = clientmod.Client('tcp://127.0.0.1:10101') + client = clientmod.Client() view = client[None] # do one ping before starting timing if f is echo: @@ -83,7 +83,4 @@ def do_echo(n,tlist=[0],f=echo, trials=2, runner=time_throughput): A[i] = n/A[i] print t,A[i] return A - -def start_cluster(n, scheduler): - pass \ No newline at end of file diff --git a/examples/demo/views.py b/docs/examples/newparallel/demo/views.py similarity index 88% rename from examples/demo/views.py rename to docs/examples/newparallel/demo/views.py index ada4504..f5bb4c1 100644 --- a/examples/demo/views.py +++ b/docs/examples/newparallel/demo/views.py @@ -1,6 +1,6 @@ from IPython.zmq.parallel.client import * -client = Client('tcp://127.0.0.1:10101') +client = Client() for id in client.ids: client.push(dict(ids=id*id), targets=id) diff --git a/examples/workflow/client.py b/docs/examples/newparallel/workflow/client.py similarity index 100% rename from examples/workflow/client.py rename to docs/examples/newparallel/workflow/client.py diff --git a/examples/workflow/job_wrapper.py b/docs/examples/newparallel/workflow/job_wrapper.py similarity index 77% rename from examples/workflow/job_wrapper.py rename to docs/examples/newparallel/workflow/job_wrapper.py index 55448ca..933611d 100755 --- a/examples/workflow/job_wrapper.py +++ b/docs/examples/newparallel/workflow/job_wrapper.py @@ -10,12 +10,16 @@ import sys argv = sys.argv -from IPython.zmq.parallel.engine import main +from IPython.zmq.parallel.engine import EngineFactory +from IPython.zmq.parallel.ipengineapp import launch_new_instance ns = {} # job exec sys.argv[1] in ns +# this should really use Config: +EngineFactory.user_ns = ns + # start engine with job namespace -main([], user_ns=ns) +launch_new_instance() diff --git a/examples/workflow/wmanager.py b/docs/examples/newparallel/workflow/wmanager.py similarity index 96% rename from examples/workflow/wmanager.py rename to docs/examples/newparallel/workflow/wmanager.py index a6ce80c..6565920 100644 --- a/examples/workflow/wmanager.py +++ b/docs/examples/newparallel/workflow/wmanager.py @@ -32,7 +32,7 @@ def cleanup(controller, engines): if __name__ == '__main__': # Start controller in separate process - cont = Popen(['python', '-m', 'IPython.zmq.parallel.controller']) + cont = Popen(['python', '-m', 'IPython.zmq.parallel.ipcontrollerapp']) print('Started controller') # "Submit jobs" diff --git a/examples/demo/dependencies.py b/examples/demo/dependencies.py deleted file mode 100644 index a8fcaaf..0000000 --- a/examples/demo/dependencies.py +++ /dev/null @@ -1,35 +0,0 @@ -from IPython.zmq.parallel.client import * - -client = Client('tcp://127.0.0.1:10101') - -@require('numpy') -def norm(A): - from numpy.linalg import norm - return norm(A,2) - -def checkpid(pid): - import os - return os.getpid() == pid - -def checkhostname(host): - import socket - return socket.gethostname() == host - -def getpid(): - import os - return os.getpid() - -pid0 = client.apply(getpid, targets=0, block=True) - -@depend(checkpid, pid0) -def getpid2(): - import os - return os.getpid() - -rns = client[None] -rns.block=True - -pids1 = [ rns.apply(getpid) for i in range(len(client.ids)) ] -pids2 = [ rns.apply(getpid2) for i in range(len(client.ids)) ] -print pids1 -print pids2 diff --git a/examples/demo/map.py b/examples/demo/map.py deleted file mode 100644 index 2a9c09a..0000000 --- a/examples/demo/map.py +++ /dev/null @@ -1,20 +0,0 @@ -from IPython.zmq.parallel.client import * - -client = Client('tcp://127.0.0.1:10101') - -@remote(client, block=True) -def square(a): - """return square of a number""" - return a*a - -squares = map(square, range(42)) - -# but that blocked between each result, not exactly useful -square.block=False -msg_ids = map(square, range(42)) -# submitted very fast -# wait for them to be done: -client.barrier(msg_id) -squares2 = map(client.results.get, msg_ids) -print squares == squares2 -# True \ No newline at end of file diff --git a/examples/zmqontroller/config.py b/examples/zmqontroller/config.py deleted file mode 100644 index 1e1a1e2..0000000 --- a/examples/zmqontroller/config.py +++ /dev/null @@ -1,23 +0,0 @@ -"""setup the ports""" -config = { - 'interface': 'tcp://127.0.0.1', - 'regport': 10101, - 'heartport': 10102, - - 'cqueueport': 10211, - 'equeueport': 10111, - - 'ctaskport': 10221, - 'etaskport': 10121, - - 'ccontrolport': 10231, - 'econtrolport': 10131, - - 'clientport': 10201, - 'notifierport': 10202, - - 'logport': 20202 -} - - - diff --git a/examples/zmqontroller/controller.py b/examples/zmqontroller/controller.py deleted file mode 100644 index f890d46..0000000 --- a/examples/zmqontroller/controller.py +++ /dev/null @@ -1,139 +0,0 @@ -#!/usr/bin/env python -"""A script to launch a controller with all its queues and connect it to a logger""" - -import time -import logging - -import zmq -from zmq.devices import ProcessMonitoredQueue, ThreadMonitoredQueue -from zmq.eventloop import ioloop -from zmq.eventloop.zmqstream import ZMQStream -from zmq.log import handlers - -from IPython.zmq import log -from IPython.zmq.parallel import controller, heartmonitor, streamsession as session - - - - -def setup(): - """setup a basic controller and open client,registrar, and logging ports. Start the Queue and the heartbeat""" - ctx = zmq.Context() - loop = ioloop.IOLoop.instance() - - # port config - # config={} - execfile('config.py', globals()) - iface = config['interface'] - logport = config['logport'] - rport = config['regport'] - cport = config['clientport'] - cqport = config['cqueueport'] - eqport = config['equeueport'] - ctport = config['ctaskport'] - etport = config['etaskport'] - ccport = config['ccontrolport'] - ecport = config['econtrolport'] - hport = config['heartport'] - nport = config['notifierport'] - - # setup logging - lsock = ctx.socket(zmq.PUB) - lsock.connect('%s:%i'%(iface,logport)) - # connected=False - # while not connected: - # try: - # except: - # logport = logport + 1 - # else: - # connected=True - # - handler = handlers.PUBHandler(lsock) - handler.setLevel(logging.DEBUG) - handler.root_topic = "controller" - log.logger.addHandler(handler) - time.sleep(.5) - - ### Engine connections ### - - # Engine registrar socket - reg = ZMQStream(ctx.socket(zmq.XREP), loop) - reg.bind("%s:%i"%(iface, rport)) - - # heartbeat - hpub = ctx.socket(zmq.PUB) - hpub.bind("%s:%i"%(iface, hport)) - hrep = ctx.socket(zmq.XREP) - hrep.bind("%s:%i"%(iface, hport+1)) - - hb = heartmonitor.HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop),2500) - hb.start() - - ### Client connections ### - # Clientele socket - c = ZMQStream(ctx.socket(zmq.XREP), loop) - c.bind("%s:%i"%(iface, cport)) - - n = ZMQStream(ctx.socket(zmq.PUB), loop) - n.bind("%s:%i"%(iface, nport)) - - thesession = session.StreamSession(username="controller") - - - - # build and launch the queue - sub = ctx.socket(zmq.SUB) - sub.setsockopt(zmq.SUBSCRIBE, "") - monport = sub.bind_to_random_port(iface) - sub = ZMQStream(sub, loop) - - # Multiplexer Queue (in a Process) - q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out') - q.bind_in("%s:%i"%(iface, cqport)) - q.bind_out("%s:%i"%(iface, eqport)) - q.connect_mon("%s:%i"%(iface, monport)) - q.daemon=True - q.start() - - # Control Queue (in a Process) - q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol') - q.bind_in("%s:%i"%(iface, ccport)) - q.bind_out("%s:%i"%(iface, ecport)) - q.connect_mon("%s:%i"%(iface, monport)) - q.daemon=True - q.start() - - # Task Queue (in a Process) - q = ProcessMonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask') - q.bind_in("%s:%i"%(iface, ctport)) - q.bind_out("%s:%i"%(iface, etport)) - q.connect_mon("%s:%i"%(iface, monport)) - q.daemon=True - q.start() - - time.sleep(.25) - - # build connection dicts - engine_addrs = { - 'control' : "%s:%i"%(iface, ecport), - 'queue': "%s:%i"%(iface, eqport), - 'heartbeat': ("%s:%i"%(iface, hport), "%s:%i"%(iface, hport+1)), - 'task' : "%s:%i"%(iface, etport), - 'monitor' : "%s:%i"%(iface, monport), - } - - client_addrs = { - 'control' : "%s:%i"%(iface, ccport), - 'query': "%s:%i"%(iface, cport), - 'queue': "%s:%i"%(iface, cqport), - 'task' : "%s:%i"%(iface, ctport), - 'notification': "%s:%i"%(iface, nport) - } - con = controller.Controller(loop, thesession, sub, reg, hb, c, n, None, engine_addrs, client_addrs) - - return loop - - -if __name__ == '__main__': - loop = setup() - loop.start() \ No newline at end of file diff --git a/examples/zmqontroller/floodclient.py b/examples/zmqontroller/floodclient.py deleted file mode 100644 index b7dd578..0000000 --- a/examples/zmqontroller/floodclient.py +++ /dev/null @@ -1,85 +0,0 @@ -#!/usr/bin/env python -import time -import zmq -from zmq.eventloop import ioloop -from zmq.eventloop.zmqstream import ZMQStream -from IPython.zmq import streamsession as session -Message = session.Message -# from IPython.zmq.messages import send_message_pickle as send_message -import uuid - -thesession = session.StreamSession() - -max_messages=10000 -printstep=1000 - -counter = dict(count=0, engines=1) - -def poit(msg): - print "POIT" - print msg - -def count(msg): - count = counter["count"] = counter["count"]+1 - if not count % printstep: - print "#########################" - print count, time.time()-counter['tic'] - -def unpack_and_print(msg): - global msg_counter - msg_counter += 1 - print msg - try: - msg = thesession.unpack_message(msg[-3:]) - except Exception, e: - print e - # pass - print msg - - -ctx = zmq.Context() - -loop = ioloop.IOLoop() -sock = ctx.socket(zmq.XREQ) -queue = ZMQStream(ctx.socket(zmq.XREQ), loop) -client = ZMQStream(sock, loop) -client.on_send(poit) -def check_engines(msg): - # client.on_recv(unpack_and_print) - queue.on_recv(count) - idents = msg[:-3] - msg = thesession.unpack_message(msg[-3:]) - msg = Message(msg) - print msg - queue.connect(str(msg.content.queue)) - engines = dict(msg.content.engines) - # global tic - N=max_messages - if engines: - tic = time.time() - counter['tic']= tic - for i in xrange(N/len(engines)): - for eid,key in engines.iteritems(): - thesession.send(queue, "execute_request", dict(code='id=%i'%(int(eid)+i)),ident=str(key)) - toc = time.time() - print "#####################################" - print N, toc-tic - print "#####################################" - - - - -client.on_recv(check_engines) - -sock.connect('tcp://127.0.0.1:10102') -sock.setsockopt(zmq.IDENTITY, thesession.username) -# stream = ZMQStream() -# header = dict(msg_id = uuid.uuid4().bytes, msg_type='relay', id=0) -parent = dict(targets=2) -# content = "GARBAGE" -thesession.send(client, "connection_request") - -# send_message(client, (header, content)) -# print thesession.recv(client, 0) - -loop.start() diff --git a/examples/zmqontroller/logwatcher.py b/examples/zmqontroller/logwatcher.py deleted file mode 100644 index 7887818..0000000 --- a/examples/zmqontroller/logwatcher.py +++ /dev/null @@ -1,70 +0,0 @@ -#!/usr/bin/env python -"""A simple log process that prints messages incoming from""" - -# -# Copyright (c) 2010 Min Ragan-Kelley -# -# This file is part of pyzmq. -# -# pyzmq is free software; you can redistribute it and/or modify it under -# the terms of the Lesser GNU General Public License as published by -# the Free Software Foundation; either version 3 of the License, or -# (at your option) any later version. -# -# pyzmq is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# Lesser GNU General Public License for more details. -# -# You should have received a copy of the Lesser GNU General Public License -# along with this program. If not, see . - -import sys -import zmq -logport = 20202 -def main(topics, addrs): - - context = zmq.Context() - socket = context.socket(zmq.SUB) - for topic in topics: - print "Subscribing to: %r"%topic - socket.setsockopt(zmq.SUBSCRIBE, topic) - if addrs: - for addr in addrs: - print "Connecting to: ", addr - socket.connect(addr) - else: - socket.bind('tcp://*:%i'%logport) - - while True: - # topic = socket.recv() - # print topic - # print 'tic' - raw = socket.recv_multipart() - if len(raw) != 2: - print "!!! invalid log message: %s"%raw - else: - topic, msg = raw - # don't newline, since log messages always newline: - print "%s | %s" % (topic, msg), - sys.stdout.flush() - -if __name__ == '__main__': - import sys - topics = [] - addrs = [] - for arg in sys.argv[1:]: - if '://' in arg: - addrs.append(arg) - else: - topics.append(arg) - if not topics: - # default to everything - topics = [''] - if len(addrs) < 1: - print "binding instead of connecting" - # addrs = ['tcp://127.0.0.1:%i'%p for p in range(logport,logport+10)] - # print "usage: display.py
[
...]" - # raise SystemExit - - main(topics, addrs)