clienttest.py
104 lines
| 3.2 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3658 | import sys | |
import tempfile | |||
MinRK
|
r3595 | import time | |
from signal import SIGINT | |||
from multiprocessing import Process | |||
MinRK
|
r3637 | from nose import SkipTest | |
MinRK
|
r3595 | from zmq.tests import BaseZMQTestCase | |
MinRK
|
r3637 | from IPython.external.decorator import decorator | |
MinRK
|
r3638 | from IPython.zmq.parallel import error | |
from IPython.zmq.parallel.client import Client | |||
MinRK
|
r3595 | from IPython.zmq.parallel.ipcluster import launch_process | |
from IPython.zmq.parallel.entry_point import select_random_ports | |||
MinRK
|
r3637 | from IPython.zmq.parallel.tests import processes,add_engine | |
# simple tasks for use in apply tests | |||
def segfault(): | |||
MinRK
|
r3641 | """this will segfault""" | |
MinRK
|
r3637 | import ctypes | |
ctypes.memset(-1,0,1) | |||
def wait(n): | |||
"""sleep for a time""" | |||
import time | |||
time.sleep(n) | |||
return n | |||
def raiser(eclass): | |||
"""raise an exception""" | |||
raise eclass() | |||
# test decorator for skipping tests when libraries are unavailable | |||
def skip_without(*names): | |||
"""skip a test if some names are not importable""" | |||
@decorator | |||
def skip_without_names(f, *args, **kwargs): | |||
"""decorator to skip tests in the absence of numpy.""" | |||
for name in names: | |||
try: | |||
__import__(name) | |||
except ImportError: | |||
raise SkipTest | |||
return f(*args, **kwargs) | |||
return skip_without_names | |||
MinRK
|
r3595 | ||
class ClusterTestCase(BaseZMQTestCase): | |||
MinRK
|
r3637 | def add_engines(self, n=1, block=True): | |
MinRK
|
r3595 | """add multiple engines to our cluster""" | |
for i in range(n): | |||
self.engines.append(add_engine()) | |||
MinRK
|
r3637 | if block: | |
self.wait_on_engines() | |||
MinRK
|
r3595 | ||
MinRK
|
r3637 | def wait_on_engines(self, timeout=5): | |
MinRK
|
r3595 | """wait for our engines to connect.""" | |
MinRK
|
r3637 | n = len(self.engines)+self.base_engine_count | |
tic = time.time() | |||
while time.time()-tic < timeout and len(self.client.ids) < n: | |||
MinRK
|
r3595 | time.sleep(0.1) | |
MinRK
|
r3637 | ||
MinRK
|
r3658 | assert not len(self.client.ids) < n, "waiting for engines timed out" | |
MinRK
|
r3595 | ||
MinRK
|
r3637 | def connect_client(self): | |
MinRK
|
r3595 | """connect a client with my Context, and track its sockets for cleanup""" | |
MinRK
|
r3637 | c = Client(profile='iptest',context=self.context) | |
MinRK
|
r3595 | for name in filter(lambda n:n.endswith('socket'), dir(c)): | |
self.sockets.append(getattr(c, name)) | |||
return c | |||
MinRK
|
r3638 | def assertRaisesRemote(self, etype, f, *args, **kwargs): | |
try: | |||
MinRK
|
r3641 | try: | |
f(*args, **kwargs) | |||
except error.CompositeError as e: | |||
e.raise_exception() | |||
MinRK
|
r3638 | except error.RemoteError as e: | |
self.assertEquals(etype.__name__, e.ename, "Should have raised %r, but raised %r"%(e.ename, etype.__name__)) | |||
else: | |||
self.fail("should have raised a RemoteError") | |||
MinRK
|
r3595 | def setUp(self): | |
BaseZMQTestCase.setUp(self) | |||
self.client = self.connect_client() | |||
self.base_engine_count=len(self.client.ids) | |||
self.engines=[] | |||
MinRK
|
r3654 | def tearDown(self): | |
MinRK
|
r3658 | ||
# close fds: | |||
for e in filter(lambda e: e.poll() is not None, processes): | |||
processes.remove(e) | |||
MinRK
|
r3654 | self.client.close() | |
BaseZMQTestCase.tearDown(self) | |||
MinRK
|
r3658 | # this will be superfluous when pyzmq merges PR #88 | |
self.context.term() | |||
print tempfile.TemporaryFile().fileno(), | |||
sys.stdout.flush() | |||
MinRK
|
r3595 |