##// END OF EJS Templates
Update PBS/SGE launchers with 0.10.1 options and defaults
Update PBS/SGE launchers with 0.10.1 options and defaults

File last commit:

r3658:8fb951e7
r3659:42860010
Show More
clienttest.py
104 lines | 3.2 KiB | text/x-python | PythonLexer
MinRK
update connections and diagrams for reduced sockets
r3658 import sys
import tempfile
MinRK
added preliminary tests for zmq.parallel
r3595 import time
from signal import SIGINT
from multiprocessing import Process
MinRK
some initial tests for newparallel
r3637 from nose import SkipTest
MinRK
added preliminary tests for zmq.parallel
r3595 from zmq.tests import BaseZMQTestCase
MinRK
some initial tests for newparallel
r3637 from IPython.external.decorator import decorator
MinRK
fix/test pushed function globals
r3638 from IPython.zmq.parallel import error
from IPython.zmq.parallel.client import Client
MinRK
added preliminary tests for zmq.parallel
r3595 from IPython.zmq.parallel.ipcluster import launch_process
from IPython.zmq.parallel.entry_point import select_random_ports
MinRK
some initial tests for newparallel
r3637 from IPython.zmq.parallel.tests import processes,add_engine
# simple tasks for use in apply tests
def segfault():
MinRK
testing fixes
r3641 """this will segfault"""
MinRK
some initial tests for newparallel
r3637 import ctypes
ctypes.memset(-1,0,1)
def wait(n):
"""sleep for a time"""
import time
time.sleep(n)
return n
def raiser(eclass):
"""raise an exception"""
raise eclass()
# test decorator for skipping tests when libraries are unavailable
def skip_without(*names):
"""skip a test if some names are not importable"""
@decorator
def skip_without_names(f, *args, **kwargs):
"""decorator to skip tests in the absence of numpy."""
for name in names:
try:
__import__(name)
except ImportError:
raise SkipTest
return f(*args, **kwargs)
return skip_without_names
MinRK
added preliminary tests for zmq.parallel
r3595
class ClusterTestCase(BaseZMQTestCase):
MinRK
some initial tests for newparallel
r3637 def add_engines(self, n=1, block=True):
MinRK
added preliminary tests for zmq.parallel
r3595 """add multiple engines to our cluster"""
for i in range(n):
self.engines.append(add_engine())
MinRK
some initial tests for newparallel
r3637 if block:
self.wait_on_engines()
MinRK
added preliminary tests for zmq.parallel
r3595
MinRK
some initial tests for newparallel
r3637 def wait_on_engines(self, timeout=5):
MinRK
added preliminary tests for zmq.parallel
r3595 """wait for our engines to connect."""
MinRK
some initial tests for newparallel
r3637 n = len(self.engines)+self.base_engine_count
tic = time.time()
while time.time()-tic < timeout and len(self.client.ids) < n:
MinRK
added preliminary tests for zmq.parallel
r3595 time.sleep(0.1)
MinRK
some initial tests for newparallel
r3637
MinRK
update connections and diagrams for reduced sockets
r3658 assert not len(self.client.ids) < n, "waiting for engines timed out"
MinRK
added preliminary tests for zmq.parallel
r3595
MinRK
some initial tests for newparallel
r3637 def connect_client(self):
MinRK
added preliminary tests for zmq.parallel
r3595 """connect a client with my Context, and track its sockets for cleanup"""
MinRK
some initial tests for newparallel
r3637 c = Client(profile='iptest',context=self.context)
MinRK
added preliminary tests for zmq.parallel
r3595 for name in filter(lambda n:n.endswith('socket'), dir(c)):
self.sockets.append(getattr(c, name))
return c
MinRK
fix/test pushed function globals
r3638 def assertRaisesRemote(self, etype, f, *args, **kwargs):
try:
MinRK
testing fixes
r3641 try:
f(*args, **kwargs)
except error.CompositeError as e:
e.raise_exception()
MinRK
fix/test pushed function globals
r3638 except error.RemoteError as e:
self.assertEquals(etype.__name__, e.ename, "Should have raised %r, but raised %r"%(e.ename, etype.__name__))
else:
self.fail("should have raised a RemoteError")
MinRK
added preliminary tests for zmq.parallel
r3595 def setUp(self):
BaseZMQTestCase.setUp(self)
self.client = self.connect_client()
self.base_engine_count=len(self.client.ids)
self.engines=[]
MinRK
add message tracking to client, add/improve tests
r3654 def tearDown(self):
MinRK
update connections and diagrams for reduced sockets
r3658
# close fds:
for e in filter(lambda e: e.poll() is not None, processes):
processes.remove(e)
MinRK
add message tracking to client, add/improve tests
r3654 self.client.close()
BaseZMQTestCase.tearDown(self)
MinRK
update connections and diagrams for reduced sockets
r3658 # this will be superfluous when pyzmq merges PR #88
self.context.term()
print tempfile.TemporaryFile().fileno(),
sys.stdout.flush()
MinRK
added preliminary tests for zmq.parallel
r3595