diff --git a/examples/demo/dag/dagdeps.py b/docs/examples/newparallel/demo/dag/dagdeps.py
similarity index 88%
rename from examples/demo/dag/dagdeps.py
rename to docs/examples/newparallel/demo/dag/dagdeps.py
index 8291962..c3b6837 100644
--- a/examples/demo/dag/dagdeps.py
+++ b/docs/examples/newparallel/demo/dag/dagdeps.py
@@ -61,13 +61,13 @@ def submit_jobs(client, G, jobs):
results[node] = client.apply(jobs[node], after=deps)
return results
-def validate_tree(G, times):
+def validate_tree(G, results):
"""Validate that jobs executed after their dependencies."""
for node in G:
- t = times[node]
+ started = results[node].metadata.started
for parent in G.predecessors(node):
- pt = times[parent]
- assert t > pt, "%s should have happened after %s"%(node, parent)
+ finished = results[parent].metadata.completed
+ assert started > finished, "%s should have happened after %s"%(node, parent)
def main(nodes, edges):
"""Generate a random graph, submit jobs, then validate that the
@@ -76,28 +76,28 @@ def main(nodes, edges):
in-degree on the y (just for spread). All arrows must
point at least slightly to the right if the graph is valid.
"""
+ from matplotlib.dates import date2num
print "building DAG"
G = random_dag(nodes, edges)
jobs = {}
- msg_ids = {}
- times = {}
pos = {}
for node in G:
jobs[node] = randomwait
- client = cmod.Client('tcp://127.0.0.1:10101')
+ client = cmod.Client()
print "submitting tasks"
results = submit_jobs(client, G, jobs)
print "waiting for results"
client.barrier()
print "done"
for node in G:
- times[node] = results[node].get()
- pos[node] = (times[node], G.in_degree(node)+random())
+ # times[node] = results[node].get()
+ t = date2num(results[node].metadata.started)
+ pos[node] = (t, G.in_degree(node)+random())
- validate_tree(G, times)
+ validate_tree(G, results)
nx.draw(G, pos)
- return G,times,msg_ids
+ return G,results
if __name__ == '__main__':
import pylab
diff --git a/docs/examples/newparallel/demo/dependencies.py b/docs/examples/newparallel/demo/dependencies.py
new file mode 100644
index 0000000..492690d
--- /dev/null
+++ b/docs/examples/newparallel/demo/dependencies.py
@@ -0,0 +1,118 @@
+from IPython.zmq.parallel import error
+from IPython.zmq.parallel.dependency import Dependency
+from IPython.zmq.parallel.client import *
+
+client = Client()
+
+# this will only run on machines that can import numpy:
+@require('numpy')
+def norm(A):
+ from numpy.linalg import norm
+ return norm(A,2)
+
+def checkpid(pid):
+ """return the pid of the engine"""
+ import os
+ return os.getpid() == pid
+
+def checkhostname(host):
+ import socket
+ return socket.gethostname() == host
+
+def getpid():
+ import os
+ return os.getpid()
+
+pid0 = client[0].apply_sync(getpid)
+
+# this will depend on the pid being that of target 0:
+@depend(checkpid, pid0)
+def getpid2():
+ import os
+ return os.getpid()
+
+view = client[None]
+view.block=True
+
+# will run on anything:
+pids1 = [ view.apply(getpid) for i in range(len(client.ids)) ]
+print pids1
+# will only run on e0:
+pids2 = [ view.apply(getpid2) for i in range(len(client.ids)) ]
+print pids2
+
+print "now test some dependency behaviors"
+
+def wait(t):
+ import time
+ time.sleep(t)
+ return t
+
+# fail after some time:
+def wait_and_fail(t):
+ import time
+ time.sleep(t)
+ return 1/0
+
+successes = [ view.apply_async(wait, 1).msg_ids[0] for i in range(len(client.ids)) ]
+failures = [ view.apply_async(wait_and_fail, 1).msg_ids[0] for i in range(len(client.ids)) ]
+
+mixed = [failures[0],successes[0]]
+d1a = Dependency(mixed, mode='any', success_only=False) # yes
+d1b = Dependency(mixed, mode='any', success_only=True) # yes
+d2a = Dependency(mixed, mode='all', success_only=False) # yes after / no follow
+d2b = Dependency(mixed, mode='all', success_only=True) # no
+d3 = Dependency(failures, mode='any', success_only=True) # no
+d4 = Dependency(failures, mode='any', success_only=False) # yes
+d5 = Dependency(failures, mode='all', success_only=False) # yes after / no follow
+d6 = Dependency(successes, mode='all', success_only=False) # yes after / no follow
+
+client.block = False
+
+r1a = client.apply(getpid, after=d1a)
+r1b = client.apply(getpid, follow=d1b)
+r2a = client.apply(getpid, after=d2b, follow=d2a)
+r2b = client.apply(getpid, after=d2a, follow=d2b)
+r3 = client.apply(getpid, after=d3)
+r4a = client.apply(getpid, after=d4)
+r4b = client.apply(getpid, follow=d4)
+r4c = client.apply(getpid, after=d3, follow=d4)
+r5 = client.apply(getpid, after=d5)
+r5b = client.apply(getpid, follow=d5, after=d3)
+r6 = client.apply(getpid, follow=d6)
+r6b = client.apply(getpid, after=d6, follow=d2b)
+
+def should_fail(f):
+ try:
+ f()
+ except error.KernelError:
+ pass
+ else:
+ print 'should have raised'
+ # raise Exception("should have raised")
+
+# print r1a.msg_ids
+r1a.get()
+# print r1b.msg_ids
+r1b.get()
+# print r2a.msg_ids
+should_fail(r2a.get)
+# print r2b.msg_ids
+should_fail(r2b.get)
+# print r3.msg_ids
+should_fail(r3.get)
+# print r4a.msg_ids
+r4a.get()
+# print r4b.msg_ids
+r4b.get()
+# print r4c.msg_ids
+should_fail(r4c.get)
+# print r5.msg_ids
+r5.get()
+# print r5b.msg_ids
+should_fail(r5b.get)
+# print r6.msg_ids
+should_fail(r6.get) # assuming > 1 engine
+# print r6b.msg_ids
+should_fail(r6b.get)
+print 'done'
diff --git a/docs/examples/newparallel/demo/map.py b/docs/examples/newparallel/demo/map.py
new file mode 100644
index 0000000..ccaa070
--- /dev/null
+++ b/docs/examples/newparallel/demo/map.py
@@ -0,0 +1,36 @@
+from IPython.zmq.parallel.client import *
+
+client = Client()
+
+@remote(client, block=True)
+def square(a):
+ """return square of a number"""
+ return a*a
+
+squares = map(square, range(42))
+
+# but that blocked between each result; not exactly useful
+
+square.block = False
+
+arlist = map(square, range(42))
+# submitted very fast
+
+# wait for the results:
+squares2 = [ r.get() for r in arlist ]
+
+# now the more convenient @parallel decorator, which has a map method:
+
+@parallel(client, block=False)
+def psquare(a):
+ """return square of a number"""
+ return a*a
+
+# this chunks the data into n-negines jobs, not 42 jobs:
+ar = psquare.map(range(42))
+
+# wait for the results to be done:
+squares3 = ar.get()
+
+print squares == squares2, squares3==squares
+# True
\ No newline at end of file
diff --git a/examples/demo/noncopying.py b/docs/examples/newparallel/demo/noncopying.py
similarity index 100%
rename from examples/demo/noncopying.py
rename to docs/examples/newparallel/demo/noncopying.py
diff --git a/examples/demo/remotefunction.py b/docs/examples/newparallel/demo/remotefunction.py
similarity index 94%
rename from examples/demo/remotefunction.py
rename to docs/examples/newparallel/demo/remotefunction.py
index 059ca55..84a360f 100644
--- a/examples/demo/remotefunction.py
+++ b/docs/examples/newparallel/demo/remotefunction.py
@@ -1,6 +1,6 @@
from IPython.zmq.parallel.client import *
-client = Client('tcp://127.0.0.1:10101')
+client = Client()
@remote(client, bound=True)
def getkey(name):
diff --git a/examples/demo/throughput.py b/docs/examples/newparallel/demo/throughput.py
similarity index 97%
rename from examples/demo/throughput.py
rename to docs/examples/newparallel/demo/throughput.py
index 787284d..57a85f3 100644
--- a/examples/demo/throughput.py
+++ b/docs/examples/newparallel/demo/throughput.py
@@ -14,7 +14,7 @@ def echo(s=''):
return s
def time_throughput(nmessages, t=0, f=wait):
- client = clientmod.Client('tcp://127.0.0.1:10101')
+ client = clientmod.Client()
view = client[None]
# do one ping before starting timing
if f is echo:
@@ -83,7 +83,4 @@ def do_echo(n,tlist=[0],f=echo, trials=2, runner=time_throughput):
A[i] = n/A[i]
print t,A[i]
return A
-
-def start_cluster(n, scheduler):
- pass
\ No newline at end of file
diff --git a/examples/demo/views.py b/docs/examples/newparallel/demo/views.py
similarity index 88%
rename from examples/demo/views.py
rename to docs/examples/newparallel/demo/views.py
index ada4504..f5bb4c1 100644
--- a/examples/demo/views.py
+++ b/docs/examples/newparallel/demo/views.py
@@ -1,6 +1,6 @@
from IPython.zmq.parallel.client import *
-client = Client('tcp://127.0.0.1:10101')
+client = Client()
for id in client.ids:
client.push(dict(ids=id*id), targets=id)
diff --git a/examples/workflow/client.py b/docs/examples/newparallel/workflow/client.py
similarity index 100%
rename from examples/workflow/client.py
rename to docs/examples/newparallel/workflow/client.py
diff --git a/examples/workflow/job_wrapper.py b/docs/examples/newparallel/workflow/job_wrapper.py
similarity index 77%
rename from examples/workflow/job_wrapper.py
rename to docs/examples/newparallel/workflow/job_wrapper.py
index 55448ca..933611d 100755
--- a/examples/workflow/job_wrapper.py
+++ b/docs/examples/newparallel/workflow/job_wrapper.py
@@ -10,12 +10,16 @@ import sys
argv = sys.argv
-from IPython.zmq.parallel.engine import main
+from IPython.zmq.parallel.engine import EngineFactory
+from IPython.zmq.parallel.ipengineapp import launch_new_instance
ns = {}
# job
exec sys.argv[1] in ns
+# this should really use Config:
+EngineFactory.user_ns = ns
+
# start engine with job namespace
-main([], user_ns=ns)
+launch_new_instance()
diff --git a/examples/workflow/wmanager.py b/docs/examples/newparallel/workflow/wmanager.py
similarity index 96%
rename from examples/workflow/wmanager.py
rename to docs/examples/newparallel/workflow/wmanager.py
index a6ce80c..6565920 100644
--- a/examples/workflow/wmanager.py
+++ b/docs/examples/newparallel/workflow/wmanager.py
@@ -32,7 +32,7 @@ def cleanup(controller, engines):
if __name__ == '__main__':
# Start controller in separate process
- cont = Popen(['python', '-m', 'IPython.zmq.parallel.controller'])
+ cont = Popen(['python', '-m', 'IPython.zmq.parallel.ipcontrollerapp'])
print('Started controller')
# "Submit jobs"
diff --git a/examples/demo/dependencies.py b/examples/demo/dependencies.py
deleted file mode 100644
index a8fcaaf..0000000
--- a/examples/demo/dependencies.py
+++ /dev/null
@@ -1,35 +0,0 @@
-from IPython.zmq.parallel.client import *
-
-client = Client('tcp://127.0.0.1:10101')
-
-@require('numpy')
-def norm(A):
- from numpy.linalg import norm
- return norm(A,2)
-
-def checkpid(pid):
- import os
- return os.getpid() == pid
-
-def checkhostname(host):
- import socket
- return socket.gethostname() == host
-
-def getpid():
- import os
- return os.getpid()
-
-pid0 = client.apply(getpid, targets=0, block=True)
-
-@depend(checkpid, pid0)
-def getpid2():
- import os
- return os.getpid()
-
-rns = client[None]
-rns.block=True
-
-pids1 = [ rns.apply(getpid) for i in range(len(client.ids)) ]
-pids2 = [ rns.apply(getpid2) for i in range(len(client.ids)) ]
-print pids1
-print pids2
diff --git a/examples/demo/map.py b/examples/demo/map.py
deleted file mode 100644
index 2a9c09a..0000000
--- a/examples/demo/map.py
+++ /dev/null
@@ -1,20 +0,0 @@
-from IPython.zmq.parallel.client import *
-
-client = Client('tcp://127.0.0.1:10101')
-
-@remote(client, block=True)
-def square(a):
- """return square of a number"""
- return a*a
-
-squares = map(square, range(42))
-
-# but that blocked between each result, not exactly useful
-square.block=False
-msg_ids = map(square, range(42))
-# submitted very fast
-# wait for them to be done:
-client.barrier(msg_id)
-squares2 = map(client.results.get, msg_ids)
-print squares == squares2
-# True
\ No newline at end of file
diff --git a/examples/zmqontroller/config.py b/examples/zmqontroller/config.py
deleted file mode 100644
index 1e1a1e2..0000000
--- a/examples/zmqontroller/config.py
+++ /dev/null
@@ -1,23 +0,0 @@
-"""setup the ports"""
-config = {
- 'interface': 'tcp://127.0.0.1',
- 'regport': 10101,
- 'heartport': 10102,
-
- 'cqueueport': 10211,
- 'equeueport': 10111,
-
- 'ctaskport': 10221,
- 'etaskport': 10121,
-
- 'ccontrolport': 10231,
- 'econtrolport': 10131,
-
- 'clientport': 10201,
- 'notifierport': 10202,
-
- 'logport': 20202
-}
-
-
-
diff --git a/examples/zmqontroller/controller.py b/examples/zmqontroller/controller.py
deleted file mode 100644
index f890d46..0000000
--- a/examples/zmqontroller/controller.py
+++ /dev/null
@@ -1,139 +0,0 @@
-#!/usr/bin/env python
-"""A script to launch a controller with all its queues and connect it to a logger"""
-
-import time
-import logging
-
-import zmq
-from zmq.devices import ProcessMonitoredQueue, ThreadMonitoredQueue
-from zmq.eventloop import ioloop
-from zmq.eventloop.zmqstream import ZMQStream
-from zmq.log import handlers
-
-from IPython.zmq import log
-from IPython.zmq.parallel import controller, heartmonitor, streamsession as session
-
-
-
-
-def setup():
- """setup a basic controller and open client,registrar, and logging ports. Start the Queue and the heartbeat"""
- ctx = zmq.Context()
- loop = ioloop.IOLoop.instance()
-
- # port config
- # config={}
- execfile('config.py', globals())
- iface = config['interface']
- logport = config['logport']
- rport = config['regport']
- cport = config['clientport']
- cqport = config['cqueueport']
- eqport = config['equeueport']
- ctport = config['ctaskport']
- etport = config['etaskport']
- ccport = config['ccontrolport']
- ecport = config['econtrolport']
- hport = config['heartport']
- nport = config['notifierport']
-
- # setup logging
- lsock = ctx.socket(zmq.PUB)
- lsock.connect('%s:%i'%(iface,logport))
- # connected=False
- # while not connected:
- # try:
- # except:
- # logport = logport + 1
- # else:
- # connected=True
- #
- handler = handlers.PUBHandler(lsock)
- handler.setLevel(logging.DEBUG)
- handler.root_topic = "controller"
- log.logger.addHandler(handler)
- time.sleep(.5)
-
- ### Engine connections ###
-
- # Engine registrar socket
- reg = ZMQStream(ctx.socket(zmq.XREP), loop)
- reg.bind("%s:%i"%(iface, rport))
-
- # heartbeat
- hpub = ctx.socket(zmq.PUB)
- hpub.bind("%s:%i"%(iface, hport))
- hrep = ctx.socket(zmq.XREP)
- hrep.bind("%s:%i"%(iface, hport+1))
-
- hb = heartmonitor.HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop),2500)
- hb.start()
-
- ### Client connections ###
- # Clientele socket
- c = ZMQStream(ctx.socket(zmq.XREP), loop)
- c.bind("%s:%i"%(iface, cport))
-
- n = ZMQStream(ctx.socket(zmq.PUB), loop)
- n.bind("%s:%i"%(iface, nport))
-
- thesession = session.StreamSession(username="controller")
-
-
-
- # build and launch the queue
- sub = ctx.socket(zmq.SUB)
- sub.setsockopt(zmq.SUBSCRIBE, "")
- monport = sub.bind_to_random_port(iface)
- sub = ZMQStream(sub, loop)
-
- # Multiplexer Queue (in a Process)
- q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
- q.bind_in("%s:%i"%(iface, cqport))
- q.bind_out("%s:%i"%(iface, eqport))
- q.connect_mon("%s:%i"%(iface, monport))
- q.daemon=True
- q.start()
-
- # Control Queue (in a Process)
- q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
- q.bind_in("%s:%i"%(iface, ccport))
- q.bind_out("%s:%i"%(iface, ecport))
- q.connect_mon("%s:%i"%(iface, monport))
- q.daemon=True
- q.start()
-
- # Task Queue (in a Process)
- q = ProcessMonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
- q.bind_in("%s:%i"%(iface, ctport))
- q.bind_out("%s:%i"%(iface, etport))
- q.connect_mon("%s:%i"%(iface, monport))
- q.daemon=True
- q.start()
-
- time.sleep(.25)
-
- # build connection dicts
- engine_addrs = {
- 'control' : "%s:%i"%(iface, ecport),
- 'queue': "%s:%i"%(iface, eqport),
- 'heartbeat': ("%s:%i"%(iface, hport), "%s:%i"%(iface, hport+1)),
- 'task' : "%s:%i"%(iface, etport),
- 'monitor' : "%s:%i"%(iface, monport),
- }
-
- client_addrs = {
- 'control' : "%s:%i"%(iface, ccport),
- 'query': "%s:%i"%(iface, cport),
- 'queue': "%s:%i"%(iface, cqport),
- 'task' : "%s:%i"%(iface, ctport),
- 'notification': "%s:%i"%(iface, nport)
- }
- con = controller.Controller(loop, thesession, sub, reg, hb, c, n, None, engine_addrs, client_addrs)
-
- return loop
-
-
-if __name__ == '__main__':
- loop = setup()
- loop.start()
\ No newline at end of file
diff --git a/examples/zmqontroller/floodclient.py b/examples/zmqontroller/floodclient.py
deleted file mode 100644
index b7dd578..0000000
--- a/examples/zmqontroller/floodclient.py
+++ /dev/null
@@ -1,85 +0,0 @@
-#!/usr/bin/env python
-import time
-import zmq
-from zmq.eventloop import ioloop
-from zmq.eventloop.zmqstream import ZMQStream
-from IPython.zmq import streamsession as session
-Message = session.Message
-# from IPython.zmq.messages import send_message_pickle as send_message
-import uuid
-
-thesession = session.StreamSession()
-
-max_messages=10000
-printstep=1000
-
-counter = dict(count=0, engines=1)
-
-def poit(msg):
- print "POIT"
- print msg
-
-def count(msg):
- count = counter["count"] = counter["count"]+1
- if not count % printstep:
- print "#########################"
- print count, time.time()-counter['tic']
-
-def unpack_and_print(msg):
- global msg_counter
- msg_counter += 1
- print msg
- try:
- msg = thesession.unpack_message(msg[-3:])
- except Exception, e:
- print e
- # pass
- print msg
-
-
-ctx = zmq.Context()
-
-loop = ioloop.IOLoop()
-sock = ctx.socket(zmq.XREQ)
-queue = ZMQStream(ctx.socket(zmq.XREQ), loop)
-client = ZMQStream(sock, loop)
-client.on_send(poit)
-def check_engines(msg):
- # client.on_recv(unpack_and_print)
- queue.on_recv(count)
- idents = msg[:-3]
- msg = thesession.unpack_message(msg[-3:])
- msg = Message(msg)
- print msg
- queue.connect(str(msg.content.queue))
- engines = dict(msg.content.engines)
- # global tic
- N=max_messages
- if engines:
- tic = time.time()
- counter['tic']= tic
- for i in xrange(N/len(engines)):
- for eid,key in engines.iteritems():
- thesession.send(queue, "execute_request", dict(code='id=%i'%(int(eid)+i)),ident=str(key))
- toc = time.time()
- print "#####################################"
- print N, toc-tic
- print "#####################################"
-
-
-
-
-client.on_recv(check_engines)
-
-sock.connect('tcp://127.0.0.1:10102')
-sock.setsockopt(zmq.IDENTITY, thesession.username)
-# stream = ZMQStream()
-# header = dict(msg_id = uuid.uuid4().bytes, msg_type='relay', id=0)
-parent = dict(targets=2)
-# content = "GARBAGE"
-thesession.send(client, "connection_request")
-
-# send_message(client, (header, content))
-# print thesession.recv(client, 0)
-
-loop.start()
diff --git a/examples/zmqontroller/logwatcher.py b/examples/zmqontroller/logwatcher.py
deleted file mode 100644
index 7887818..0000000
--- a/examples/zmqontroller/logwatcher.py
+++ /dev/null
@@ -1,70 +0,0 @@
-#!/usr/bin/env python
-"""A simple log process that prints messages incoming from"""
-
-#
-# Copyright (c) 2010 Min Ragan-Kelley
-#
-# This file is part of pyzmq.
-#
-# pyzmq is free software; you can redistribute it and/or modify it under
-# the terms of the Lesser GNU General Public License as published by
-# the Free Software Foundation; either version 3 of the License, or
-# (at your option) any later version.
-#
-# pyzmq is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# Lesser GNU General Public License for more details.
-#
-# You should have received a copy of the Lesser GNU General Public License
-# along with this program. If not, see