##// END OF EJS Templates
ignore data_pub in Hub
ignore data_pub in Hub

File last commit:

r4910:0dc49390
r8108:1c717b2f
Show More
multiengine1.ipy
212 lines | 3.6 KiB | text/plain | TextLexer
#-------------------------------------------------------------------------------
# Imports
#-------------------------------------------------------------------------------
import time
import numpy
# import IPython.kernel.magic
from IPython.parallel import Client, Reference
from IPython.parallel import error
rc = Client()
mux = rc[:]
#-------------------------------------------------------------------------------
# Setup
#-------------------------------------------------------------------------------
%load_ext parallelmagic
mux.block = True
mux.clear()
mux.activate()
n = len(rc)
assert n >= 4, "Not Enough Engines: %i, 4 needed for this script"%n
values = [
10,
1.0,
range(100),
('asdf', 1000),
{'a': 10, 'b': 20}
]
keys = ['a','b','c','d','e']
sequences = [
range(100),
numpy.arange(100)
]
#-------------------------------------------------------------------------------
# Blocking execution
#-------------------------------------------------------------------------------
# Execute
with mux.sync_imports():
import math
import numpy
mux.execute('a = 2.0*math.pi')
print mux['a']
for id in mux.targets:
mux.execute('b=%d' % id, targets=id)
try:
mux.execute('b = 10',targets=max(mux.targets)+1)
except IndexError:
print "Caught invalid engine ID OK."
try:
mux.apply(lambda : 1/0)
except error.CompositeError:
print "Caught 1/0 correctly."
%px print a, b
try:
%px 1/0
except error.CompositeError:
print "Caught 1/0 correctly."
# %autopx
%px a = numpy.random.random((4,4))
%px a = a+a.transpose()
# %autopx
print mux.apply(numpy.linalg.eigvals, Reference('a'))
mux.targets = [0,2]
%px a = 5
mux.targets = [1,3]
%px a = 10
mux.targets = rc.ids
print mux['a']
# Push/Pull
mux.push(dict(a=10, b=30, c={'f':range(10)}))
mux.pull(('a', 'b'))
for id in mux.targets:
mux.push(dict(a=id), targets=id)
for id in mux.targets:
mux.pull('a', targets=id)
mux.pull('a')
mux['a'] = 100
mux['a']
# get_result/reset/keys
mux.get_result()
%result
mux.apply(lambda : globals().keys())
mux.clear()
mux.apply(lambda : globals().keys())
try:
%result
except error.CompositeError:
print "Caught IndexError ok."
%px a = 5
mux.get_result(-1)
mux.apply(lambda : globals().keys())
# Queue management methods
%px import time
ars = [mux.apply_async(time.sleep, 2.0) for x in range(5)]
mux.queue_status()
time.sleep(3.0)
mux.abort(ars, block=True)
mux.queue_status()
time.sleep(2.0)
mux.queue_status()
mux.wait(ars)
for ar in ars:
try:
ar.r
except error.CompositeError:
print "Caught QueueCleared OK."
# scatter/gather
mux.scatter('a', range(10))
mux.gather('a')
mux.scatter('b', numpy.arange(10))
mux.gather('b')
#-------------------------------------------------------------------------------
# Non-Blocking execution
#-------------------------------------------------------------------------------
mux.block = False
# execute
ar1 = mux.execute('a=5')
with mux.sync_imports():
import sets
ar1.wait()
ar1 = mux.execute('1/0')
ar2 = mux.execute('c = sets.Set()')
mux.wait((ar1, ar2))
try:
ar1.r
except error.CompositeError:
print "Caught ZeroDivisionError OK."
ar = mux.execute("arint 'hi'")
ar.r
ar = mux.apply(lambda : 1/0)
try:
ar.r
except error.CompositeError:
print "Caught ZeroDivisionError OK."
# Make sure we can reraise it!
try:
ar.r
except error.CompositeError:
print "Caught ZeroDivisionError OK."
# push/pull
ar1 = mux.push(dict(a=10))
ar1.get()
ar2 = mux.pull('a')
ar2.r
# This is a command to make sure the end of the file is happy.
print "The tests are done!"