dependencies.py
128 lines
| 3.0 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3666 | from IPython.parallel import * | ||
MinRK
|
r3609 | |||
client = Client() | ||||
# this will only run on machines that can import numpy: | ||||
@require('numpy') | ||||
def norm(A): | ||||
from numpy.linalg import norm | ||||
return norm(A,2) | ||||
def checkpid(pid): | ||||
"""return the pid of the engine""" | ||||
import os | ||||
return os.getpid() == pid | ||||
def checkhostname(host): | ||||
import socket | ||||
return socket.gethostname() == host | ||||
def getpid(): | ||||
import os | ||||
return os.getpid() | ||||
pid0 = client[0].apply_sync(getpid) | ||||
# this will depend on the pid being that of target 0: | ||||
@depend(checkpid, pid0) | ||||
def getpid2(): | ||||
import os | ||||
return os.getpid() | ||||
MinRK
|
r3664 | view = client.load_balanced_view() | ||
MinRK
|
r3609 | view.block=True | ||
# will run on anything: | ||||
pids1 = [ view.apply(getpid) for i in range(len(client.ids)) ] | ||||
Thomas Kluyver
|
r6455 | print(pids1) | ||
MinRK
|
r3609 | # will only run on e0: | ||
pids2 = [ view.apply(getpid2) for i in range(len(client.ids)) ] | ||||
Thomas Kluyver
|
r6455 | print(pids2) | ||
MinRK
|
r3609 | |||
Thomas Kluyver
|
r6455 | print("now test some dependency behaviors") | ||
MinRK
|
r3609 | |||
def wait(t): | ||||
import time | ||||
time.sleep(t) | ||||
return t | ||||
# fail after some time: | ||||
def wait_and_fail(t): | ||||
import time | ||||
time.sleep(t) | ||||
return 1/0 | ||||
successes = [ view.apply_async(wait, 1).msg_ids[0] for i in range(len(client.ids)) ] | ||||
failures = [ view.apply_async(wait_and_fail, 1).msg_ids[0] for i in range(len(client.ids)) ] | ||||
mixed = [failures[0],successes[0]] | ||||
MinRK
|
r3664 | d1a = Dependency(mixed, all=False, failure=True) # yes | ||
d1b = Dependency(mixed, all=False) # yes | ||||
d2a = Dependency(mixed, all=True, failure=True) # yes after / no follow | ||||
d2b = Dependency(mixed, all=True) # no | ||||
d3 = Dependency(failures, all=False) # no | ||||
d4 = Dependency(failures, all=False, failure=True) # yes | ||||
d5 = Dependency(failures, all=True, failure=True) # yes after / no follow | ||||
d6 = Dependency(successes, all=True, failure=True) # yes after / no follow | ||||
view.block = False | ||||
flags = view.temp_flags | ||||
with flags(after=d1a): | ||||
r1a = view.apply(getpid) | ||||
with flags(follow=d1b): | ||||
r1b = view.apply(getpid) | ||||
with flags(after=d2b, follow=d2a): | ||||
r2a = view.apply(getpid) | ||||
with flags(after=d2a, follow=d2b): | ||||
r2b = view.apply(getpid) | ||||
with flags(after=d3): | ||||
r3 = view.apply(getpid) | ||||
with flags(after=d4): | ||||
r4a = view.apply(getpid) | ||||
with flags(follow=d4): | ||||
r4b = view.apply(getpid) | ||||
with flags(after=d3, follow=d4): | ||||
r4c = view.apply(getpid) | ||||
with flags(after=d5): | ||||
r5 = view.apply(getpid) | ||||
with flags(follow=d5, after=d3): | ||||
r5b = view.apply(getpid) | ||||
with flags(follow=d6): | ||||
r6 = view.apply(getpid) | ||||
with flags(after=d6, follow=d2b): | ||||
r6b = view.apply(getpid) | ||||
MinRK
|
r3609 | |||
def should_fail(f): | ||||
try: | ||||
f() | ||||
except error.KernelError: | ||||
pass | ||||
else: | ||||
Thomas Kluyver
|
r6455 | print('should have raised') | ||
MinRK
|
r3609 | # raise Exception("should have raised") | ||
Thomas Kluyver
|
r6455 | # print(r1a.msg_ids) | ||
MinRK
|
r3609 | r1a.get() | ||
Thomas Kluyver
|
r6455 | # print(r1b.msg_ids) | ||
MinRK
|
r3609 | r1b.get() | ||
Thomas Kluyver
|
r6455 | # print(r2a.msg_ids) | ||
MinRK
|
r3609 | should_fail(r2a.get) | ||
Thomas Kluyver
|
r6455 | # print(r2b.msg_ids) | ||
MinRK
|
r3609 | should_fail(r2b.get) | ||
Thomas Kluyver
|
r6455 | # print(r3.msg_ids) | ||
MinRK
|
r3609 | should_fail(r3.get) | ||
Thomas Kluyver
|
r6455 | # print(r4a.msg_ids) | ||
MinRK
|
r3609 | r4a.get() | ||
Thomas Kluyver
|
r6455 | # print(r4b.msg_ids) | ||
MinRK
|
r3609 | r4b.get() | ||
Thomas Kluyver
|
r6455 | # print(r4c.msg_ids) | ||
MinRK
|
r3609 | should_fail(r4c.get) | ||
Thomas Kluyver
|
r6455 | # print(r5.msg_ids) | ||
MinRK
|
r3609 | r5.get() | ||
Thomas Kluyver
|
r6455 | # print(r5b.msg_ids) | ||
MinRK
|
r3609 | should_fail(r5b.get) | ||
Thomas Kluyver
|
r6455 | # print(r6.msg_ids) | ||
MinRK
|
r3609 | should_fail(r6.get) # assuming > 1 engine | ||
Thomas Kluyver
|
r6455 | # print(r6b.msg_ids) | ||
MinRK
|
r3609 | should_fail(r6b.get) | ||
Thomas Kluyver
|
r6455 | print('done') | ||