##// END OF EJS Templates
some initial tests for newparallel
some initial tests for newparallel

File last commit:

r3637:e0abb035
r3637:e0abb035
Show More
clienttest.py
84 lines | 2.5 KiB | text/x-python | PythonLexer
MinRK
added preliminary tests for zmq.parallel
r3595 import time
from signal import SIGINT
from multiprocessing import Process
MinRK
some initial tests for newparallel
r3637 from nose import SkipTest
MinRK
added preliminary tests for zmq.parallel
r3595 from zmq.tests import BaseZMQTestCase
MinRK
some initial tests for newparallel
r3637 from IPython.external.decorator import decorator
MinRK
added preliminary tests for zmq.parallel
r3595 from IPython.zmq.parallel.ipcluster import launch_process
from IPython.zmq.parallel.entry_point import select_random_ports
from IPython.zmq.parallel.client import Client
MinRK
some initial tests for newparallel
r3637 from IPython.zmq.parallel.tests import processes,add_engine
# simple tasks for use in apply tests
def segfault():
""""""
import ctypes
ctypes.memset(-1,0,1)
def wait(n):
"""sleep for a time"""
import time
time.sleep(n)
return n
def raiser(eclass):
"""raise an exception"""
raise eclass()
# test decorator for skipping tests when libraries are unavailable
def skip_without(*names):
"""skip a test if some names are not importable"""
@decorator
def skip_without_names(f, *args, **kwargs):
"""decorator to skip tests in the absence of numpy."""
for name in names:
try:
__import__(name)
except ImportError:
raise SkipTest
return f(*args, **kwargs)
return skip_without_names
MinRK
added preliminary tests for zmq.parallel
r3595
class ClusterTestCase(BaseZMQTestCase):
MinRK
some initial tests for newparallel
r3637 def add_engines(self, n=1, block=True):
MinRK
added preliminary tests for zmq.parallel
r3595 """add multiple engines to our cluster"""
for i in range(n):
self.engines.append(add_engine())
MinRK
some initial tests for newparallel
r3637 if block:
self.wait_on_engines()
MinRK
added preliminary tests for zmq.parallel
r3595
MinRK
some initial tests for newparallel
r3637 def wait_on_engines(self, timeout=5):
MinRK
added preliminary tests for zmq.parallel
r3595 """wait for our engines to connect."""
MinRK
some initial tests for newparallel
r3637 n = len(self.engines)+self.base_engine_count
tic = time.time()
while time.time()-tic < timeout and len(self.client.ids) < n:
MinRK
added preliminary tests for zmq.parallel
r3595 time.sleep(0.1)
MinRK
some initial tests for newparallel
r3637
assert not self.client.ids < n, "waiting for engines timed out"
MinRK
added preliminary tests for zmq.parallel
r3595
MinRK
some initial tests for newparallel
r3637 def connect_client(self):
MinRK
added preliminary tests for zmq.parallel
r3595 """connect a client with my Context, and track its sockets for cleanup"""
MinRK
some initial tests for newparallel
r3637 c = Client(profile='iptest',context=self.context)
MinRK
added preliminary tests for zmq.parallel
r3595 for name in filter(lambda n:n.endswith('socket'), dir(c)):
self.sockets.append(getattr(c, name))
return c
def setUp(self):
BaseZMQTestCase.setUp(self)
self.client = self.connect_client()
self.base_engine_count=len(self.client.ids)
self.engines=[]
def tearDown(self):
[ e.terminate() for e in filter(lambda e: e.poll() is None, self.engines) ]
# while len(self.client.ids) > self.base_engine_count:
# time.sleep(.1)
del self.engines
BaseZMQTestCase.tearDown(self)