##// END OF EJS Templates
some initial tests for newparallel
some initial tests for newparallel

File last commit:

r3637:e0abb035
r3637:e0abb035
Show More
clienttest.py
84 lines | 2.5 KiB | text/x-python | PythonLexer
import time
from signal import SIGINT
from multiprocessing import Process
from nose import SkipTest
from zmq.tests import BaseZMQTestCase
from IPython.external.decorator import decorator
from IPython.zmq.parallel.ipcluster import launch_process
from IPython.zmq.parallel.entry_point import select_random_ports
from IPython.zmq.parallel.client import Client
from IPython.zmq.parallel.tests import processes,add_engine
# simple tasks for use in apply tests
def segfault():
""""""
import ctypes
ctypes.memset(-1,0,1)
def wait(n):
"""sleep for a time"""
import time
time.sleep(n)
return n
def raiser(eclass):
"""raise an exception"""
raise eclass()
# test decorator for skipping tests when libraries are unavailable
def skip_without(*names):
"""skip a test if some names are not importable"""
@decorator
def skip_without_names(f, *args, **kwargs):
"""decorator to skip tests in the absence of numpy."""
for name in names:
try:
__import__(name)
except ImportError:
raise SkipTest
return f(*args, **kwargs)
return skip_without_names
class ClusterTestCase(BaseZMQTestCase):
def add_engines(self, n=1, block=True):
"""add multiple engines to our cluster"""
for i in range(n):
self.engines.append(add_engine())
if block:
self.wait_on_engines()
def wait_on_engines(self, timeout=5):
"""wait for our engines to connect."""
n = len(self.engines)+self.base_engine_count
tic = time.time()
while time.time()-tic < timeout and len(self.client.ids) < n:
time.sleep(0.1)
assert not self.client.ids < n, "waiting for engines timed out"
def connect_client(self):
"""connect a client with my Context, and track its sockets for cleanup"""
c = Client(profile='iptest',context=self.context)
for name in filter(lambda n:n.endswith('socket'), dir(c)):
self.sockets.append(getattr(c, name))
return c
def setUp(self):
BaseZMQTestCase.setUp(self)
self.client = self.connect_client()
self.base_engine_count=len(self.client.ids)
self.engines=[]
def tearDown(self):
[ e.terminate() for e in filter(lambda e: e.poll() is None, self.engines) ]
# while len(self.client.ids) > self.base_engine_count:
# time.sleep(.1)
del self.engines
BaseZMQTestCase.tearDown(self)