clienttest.py
212 lines
| 6.1 KiB
| text/x-python
|
PythonLexer
MinRK
|
r4018 | """base class for parallel client tests | |
Authors: | |||
* Min RK | |||
""" | |||
MinRK
|
r3664 | ||
#------------------------------------------------------------------------------- | |||
# Copyright (C) 2011 The IPython Development Team | |||
# | |||
# Distributed under the terms of the BSD License. The full license is in | |||
# the file COPYING, distributed as part of this software. | |||
#------------------------------------------------------------------------------- | |||
MinRK
|
r3658 | import sys | |
import tempfile | |||
MinRK
|
r3595 | import time | |
MinRK
|
r7054 | from StringIO import StringIO | |
MinRK
|
r3595 | ||
MinRK
|
r3637 | from nose import SkipTest | |
MinRK
|
r3664 | import zmq | |
MinRK
|
r3595 | from zmq.tests import BaseZMQTestCase | |
MinRK
|
r3637 | from IPython.external.decorator import decorator | |
MinRK
|
r3666 | from IPython.parallel import error | |
MinRK
|
r3673 | from IPython.parallel import Client | |
MinRK
|
r3778 | ||
from IPython.parallel.tests import launchers, add_engines | |||
MinRK
|
r3637 | ||
# simple tasks for use in apply tests | |||
def segfault(): | |||
MinRK
|
r3641 | """this will segfault""" | |
MinRK
|
r3637 | import ctypes | |
ctypes.memset(-1,0,1) | |||
MinRK
|
r3776 | def crash(): | |
"""from stdlib crashers in the test suite""" | |||
import types | |||
if sys.platform.startswith('win'): | |||
import ctypes | |||
ctypes.windll.kernel32.SetErrorMode(0x0002); | |||
MinRK
|
r4155 | args = [ 0, 0, 0, 0, b'\x04\x71\x00\x00', (), (), (), '', '', 1, b''] | |
if sys.version_info[0] >= 3: | |||
MinRK
|
r4161 | # Python3 adds 'kwonlyargcount' as the second argument to Code | |
MinRK
|
r4155 | args.insert(1, 0) | |
co = types.CodeType(*args) | |||
MinRK
|
r3776 | exec(co) | |
MinRK
|
r3637 | def wait(n): | |
"""sleep for a time""" | |||
import time | |||
time.sleep(n) | |||
return n | |||
def raiser(eclass): | |||
"""raise an exception""" | |||
raise eclass() | |||
MinRK
|
r7054 | def generate_output(): | |
"""function for testing output | |||
publishes two outputs of each type, and returns | |||
a rich displayable object. | |||
""" | |||
import sys | |||
from IPython.core.display import display, HTML, Math | |||
print "stdout" | |||
print >> sys.stderr, "stderr" | |||
display(HTML("<b>HTML</b>")) | |||
print "stdout2" | |||
print >> sys.stderr, "stderr2" | |||
display(Math(r"\alpha=\beta")) | |||
return Math("42") | |||
MinRK
|
r3637 | # test decorator for skipping tests when libraries are unavailable | |
def skip_without(*names): | |||
"""skip a test if some names are not importable""" | |||
@decorator | |||
def skip_without_names(f, *args, **kwargs): | |||
"""decorator to skip tests in the absence of numpy.""" | |||
for name in names: | |||
try: | |||
__import__(name) | |||
except ImportError: | |||
raise SkipTest | |||
return f(*args, **kwargs) | |||
return skip_without_names | |||
MinRK
|
r3595 | ||
MinRK
|
r7054 | #------------------------------------------------------------------------------- | |
# Classes | |||
#------------------------------------------------------------------------------- | |||
class CapturedIO(object): | |||
"""Simple object for containing captured stdout/err StringIO objects""" | |||
def __init__(self, stdout, stderr): | |||
self.stdout_io = stdout | |||
self.stderr_io = stderr | |||
@property | |||
def stdout(self): | |||
return self.stdout_io.getvalue() | |||
@property | |||
def stderr(self): | |||
return self.stderr_io.getvalue() | |||
class capture_output(object): | |||
"""context manager for capturing stdout/err""" | |||
def __enter__(self): | |||
self.sys_stdout = sys.stdout | |||
self.sys_stderr = sys.stderr | |||
stdout = sys.stdout = StringIO() | |||
stderr = sys.stderr = StringIO() | |||
return CapturedIO(stdout, stderr) | |||
def __exit__(self, exc_type, exc_value, traceback): | |||
sys.stdout = self.sys_stdout | |||
sys.stderr = self.sys_stderr | |||
MinRK
|
r3595 | class ClusterTestCase(BaseZMQTestCase): | |
MinRK
|
r3637 | def add_engines(self, n=1, block=True): | |
MinRK
|
r3595 | """add multiple engines to our cluster""" | |
MinRK
|
r3664 | self.engines.extend(add_engines(n)) | |
MinRK
|
r3637 | if block: | |
self.wait_on_engines() | |||
MinRK
|
r6162 | ||
def minimum_engines(self, n=1, block=True): | |||
"""add engines until there are at least n connected""" | |||
self.engines.extend(add_engines(n, total=True)) | |||
if block: | |||
self.wait_on_engines() | |||
MinRK
|
r3595 | ||
MinRK
|
r3637 | def wait_on_engines(self, timeout=5): | |
MinRK
|
r3595 | """wait for our engines to connect.""" | |
MinRK
|
r3637 | n = len(self.engines)+self.base_engine_count | |
tic = time.time() | |||
while time.time()-tic < timeout and len(self.client.ids) < n: | |||
MinRK
|
r3595 | time.sleep(0.1) | |
MinRK
|
r3637 | ||
MinRK
|
r3658 | assert not len(self.client.ids) < n, "waiting for engines timed out" | |
MinRK
|
r3595 | ||
MinRK
|
r3637 | def connect_client(self): | |
MinRK
|
r3595 | """connect a client with my Context, and track its sockets for cleanup""" | |
MinRK
|
r3664 | c = Client(profile='iptest', context=self.context) | |
for name in filter(lambda n:n.endswith('socket'), dir(c)): | |||
s = getattr(c, name) | |||
s.setsockopt(zmq.LINGER, 0) | |||
self.sockets.append(s) | |||
MinRK
|
r3595 | return c | |
MinRK
|
r3638 | def assertRaisesRemote(self, etype, f, *args, **kwargs): | |
try: | |||
MinRK
|
r3641 | try: | |
f(*args, **kwargs) | |||
except error.CompositeError as e: | |||
e.raise_exception() | |||
MinRK
|
r3638 | except error.RemoteError as e: | |
MinRK
|
r3776 | self.assertEquals(etype.__name__, e.ename, "Should have raised %r, but raised %r"%(etype.__name__, e.ename)) | |
MinRK
|
r3638 | else: | |
self.fail("should have raised a RemoteError") | |||
MinRK
|
r7054 | def _wait_for(self, f, timeout=10): | |
"""wait for a condition""" | |||
tic = time.time() | |||
while time.time() <= tic + timeout: | |||
if f(): | |||
return | |||
time.sleep(0.1) | |||
self.client.spin() | |||
if not f(): | |||
print "Warning: Awaited condition never arrived" | |||
MinRK
|
r3595 | def setUp(self): | |
BaseZMQTestCase.setUp(self) | |||
self.client = self.connect_client() | |||
MinRK
|
r3775 | # start every test with clean engine namespaces: | |
self.client.clear(block=True) | |||
MinRK
|
r3595 | self.base_engine_count=len(self.client.ids) | |
self.engines=[] | |||
MinRK
|
r3654 | def tearDown(self): | |
MinRK
|
r3664 | # self.client.clear(block=True) | |
MinRK
|
r3658 | # close fds: | |
MinRK
|
r3778 | for e in filter(lambda e: e.poll() is not None, launchers): | |
launchers.remove(e) | |||
MinRK
|
r3658 | ||
MinRK
|
r3664 | # allow flushing of incoming messages to prevent crash on socket close | |
self.client.wait(timeout=2) | |||
# time.sleep(2) | |||
self.client.spin() | |||
MinRK
|
r3654 | self.client.close() | |
BaseZMQTestCase.tearDown(self) | |||
MinRK
|
r3664 | # this will be redundant when pyzmq merges PR #88 | |
# self.context.term() | |||
MinRK
|
r3661 | # print tempfile.TemporaryFile().fileno(), | |
# sys.stdout.flush() | |||
MinRK
|
r3595 |