from IPython.zmq.parallel import error from IPython.zmq.parallel.dependency import Dependency from IPython.zmq.parallel.client import * client = Client() # this will only run on machines that can import numpy: @require('numpy') def norm(A): from numpy.linalg import norm return norm(A,2) def checkpid(pid): """return the pid of the engine""" import os return os.getpid() == pid def checkhostname(host): import socket return socket.gethostname() == host def getpid(): import os return os.getpid() pid0 = client[0].apply_sync(getpid) # this will depend on the pid being that of target 0: @depend(checkpid, pid0) def getpid2(): import os return os.getpid() view = client[None] view.block=True # will run on anything: pids1 = [ view.apply(getpid) for i in range(len(client.ids)) ] print pids1 # will only run on e0: pids2 = [ view.apply(getpid2) for i in range(len(client.ids)) ] print pids2 print "now test some dependency behaviors" def wait(t): import time time.sleep(t) return t # fail after some time: def wait_and_fail(t): import time time.sleep(t) return 1/0 successes = [ view.apply_async(wait, 1).msg_ids[0] for i in range(len(client.ids)) ] failures = [ view.apply_async(wait_and_fail, 1).msg_ids[0] for i in range(len(client.ids)) ] mixed = [failures[0],successes[0]] d1a = Dependency(mixed, mode='any', success_only=False) # yes d1b = Dependency(mixed, mode='any', success_only=True) # yes d2a = Dependency(mixed, mode='all', success_only=False) # yes after / no follow d2b = Dependency(mixed, mode='all', success_only=True) # no d3 = Dependency(failures, mode='any', success_only=True) # no d4 = Dependency(failures, mode='any', success_only=False) # yes d5 = Dependency(failures, mode='all', success_only=False) # yes after / no follow d6 = Dependency(successes, mode='all', success_only=False) # yes after / no follow client.block = False r1a = client.apply(getpid, after=d1a) r1b = client.apply(getpid, follow=d1b) r2a = client.apply(getpid, after=d2b, follow=d2a) r2b = client.apply(getpid, after=d2a, follow=d2b) r3 = client.apply(getpid, after=d3) r4a = client.apply(getpid, after=d4) r4b = client.apply(getpid, follow=d4) r4c = client.apply(getpid, after=d3, follow=d4) r5 = client.apply(getpid, after=d5) r5b = client.apply(getpid, follow=d5, after=d3) r6 = client.apply(getpid, follow=d6) r6b = client.apply(getpid, after=d6, follow=d2b) def should_fail(f): try: f() except error.KernelError: pass else: print 'should have raised' # raise Exception("should have raised") # print r1a.msg_ids r1a.get() # print r1b.msg_ids r1b.get() # print r2a.msg_ids should_fail(r2a.get) # print r2b.msg_ids should_fail(r2b.get) # print r3.msg_ids should_fail(r3.get) # print r4a.msg_ids r4a.get() # print r4b.msg_ids r4b.get() # print r4c.msg_ids should_fail(r4c.get) # print r5.msg_ids r5.get() # print r5b.msg_ids should_fail(r5b.get) # print r6.msg_ids should_fail(r6.get) # assuming > 1 engine # print r6b.msg_ids should_fail(r6b.get) print 'done'