Show More
clienttest.py
192 lines
| 5.7 KiB
| text/x-python
|
PythonLexer
MinRK
|
r4018 | """base class for parallel client tests | ||
Authors: | ||||
* Min RK | ||||
""" | ||||
MinRK
|
r3664 | |||
#------------------------------------------------------------------------------- | ||||
# Copyright (C) 2011 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#------------------------------------------------------------------------------- | ||||
Matthias BUSSONNIER
|
r7817 | from __future__ import print_function | ||
MinRK
|
r3664 | |||
MinRK
|
r3658 | import sys | ||
import tempfile | ||||
MinRK
|
r3595 | import time | ||
MinRK
|
r3637 | from nose import SkipTest | ||
MinRK
|
r3664 | import zmq | ||
MinRK
|
r3595 | from zmq.tests import BaseZMQTestCase | ||
MinRK
|
r3637 | from IPython.external.decorator import decorator | ||
MinRK
|
r3666 | from IPython.parallel import error | ||
MinRK
|
r3673 | from IPython.parallel import Client | ||
MinRK
|
r3778 | |||
from IPython.parallel.tests import launchers, add_engines | ||||
MinRK
|
r3637 | |||
# simple tasks for use in apply tests | ||||
def segfault(): | ||||
MinRK
|
r3641 | """this will segfault""" | ||
MinRK
|
r3637 | import ctypes | ||
ctypes.memset(-1,0,1) | ||||
MinRK
|
r3776 | def crash(): | ||
"""from stdlib crashers in the test suite""" | ||||
import types | ||||
if sys.platform.startswith('win'): | ||||
import ctypes | ||||
ctypes.windll.kernel32.SetErrorMode(0x0002); | ||||
MinRK
|
r4155 | args = [ 0, 0, 0, 0, b'\x04\x71\x00\x00', (), (), (), '', '', 1, b''] | ||
if sys.version_info[0] >= 3: | ||||
MinRK
|
r4161 | # Python3 adds 'kwonlyargcount' as the second argument to Code | ||
MinRK
|
r4155 | args.insert(1, 0) | ||
co = types.CodeType(*args) | ||||
MinRK
|
r3776 | exec(co) | ||
MinRK
|
r3637 | def wait(n): | ||
"""sleep for a time""" | ||||
import time | ||||
time.sleep(n) | ||||
return n | ||||
def raiser(eclass): | ||||
"""raise an exception""" | ||||
raise eclass() | ||||
MinRK
|
r7054 | def generate_output(): | ||
"""function for testing output | ||||
publishes two outputs of each type, and returns | ||||
a rich displayable object. | ||||
""" | ||||
import sys | ||||
from IPython.core.display import display, HTML, Math | ||||
Matthias BUSSONNIER
|
r7817 | print("stdout") | ||
print("stderr", file=sys.stderr) | ||||
MinRK
|
r7054 | |||
display(HTML("<b>HTML</b>")) | ||||
Matthias BUSSONNIER
|
r7817 | print("stdout2") | ||
print("stderr2", file=sys.stderr) | ||||
MinRK
|
r7054 | |||
display(Math(r"\alpha=\beta")) | ||||
return Math("42") | ||||
MinRK
|
r3637 | # test decorator for skipping tests when libraries are unavailable | ||
def skip_without(*names): | ||||
"""skip a test if some names are not importable""" | ||||
@decorator | ||||
def skip_without_names(f, *args, **kwargs): | ||||
"""decorator to skip tests in the absence of numpy.""" | ||||
for name in names: | ||||
try: | ||||
__import__(name) | ||||
except ImportError: | ||||
raise SkipTest | ||||
return f(*args, **kwargs) | ||||
return skip_without_names | ||||
MinRK
|
r3595 | |||
MinRK
|
r7054 | #------------------------------------------------------------------------------- | ||
# Classes | ||||
#------------------------------------------------------------------------------- | ||||
MinRK
|
r3595 | class ClusterTestCase(BaseZMQTestCase): | ||
MinRK
|
r12784 | timeout = 10 | ||
MinRK
|
r3595 | |||
MinRK
|
r3637 | def add_engines(self, n=1, block=True): | ||
MinRK
|
r3595 | """add multiple engines to our cluster""" | ||
MinRK
|
r3664 | self.engines.extend(add_engines(n)) | ||
MinRK
|
r3637 | if block: | ||
self.wait_on_engines() | ||||
MinRK
|
r6162 | |||
def minimum_engines(self, n=1, block=True): | ||||
"""add engines until there are at least n connected""" | ||||
self.engines.extend(add_engines(n, total=True)) | ||||
if block: | ||||
self.wait_on_engines() | ||||
MinRK
|
r3595 | |||
MinRK
|
r3637 | def wait_on_engines(self, timeout=5): | ||
MinRK
|
r3595 | """wait for our engines to connect.""" | ||
MinRK
|
r3637 | n = len(self.engines)+self.base_engine_count | ||
tic = time.time() | ||||
while time.time()-tic < timeout and len(self.client.ids) < n: | ||||
MinRK
|
r3595 | time.sleep(0.1) | ||
MinRK
|
r3637 | |||
MinRK
|
r3658 | assert not len(self.client.ids) < n, "waiting for engines timed out" | ||
MinRK
|
r3595 | |||
MinRK
|
r12784 | def client_wait(self, client, jobs=None, timeout=-1): | ||
"""my wait wrapper, sets a default finite timeout to avoid hangs""" | ||||
if timeout < 0: | ||||
timeout = self.timeout | ||||
return Client.wait(client, jobs, timeout) | ||||
MinRK
|
r3637 | def connect_client(self): | ||
MinRK
|
r3595 | """connect a client with my Context, and track its sockets for cleanup""" | ||
MinRK
|
r3664 | c = Client(profile='iptest', context=self.context) | ||
MinRK
|
r12784 | c.wait = lambda *a, **kw: self.client_wait(c, *a, **kw) | ||
MinRK
|
r3664 | for name in filter(lambda n:n.endswith('socket'), dir(c)): | ||
s = getattr(c, name) | ||||
s.setsockopt(zmq.LINGER, 0) | ||||
self.sockets.append(s) | ||||
MinRK
|
r3595 | return c | ||
MinRK
|
r3638 | def assertRaisesRemote(self, etype, f, *args, **kwargs): | ||
try: | ||||
MinRK
|
r3641 | try: | ||
f(*args, **kwargs) | ||||
except error.CompositeError as e: | ||||
e.raise_exception() | ||||
MinRK
|
r3638 | except error.RemoteError as e: | ||
Bradley M. Froehle
|
r7874 | self.assertEqual(etype.__name__, e.ename, "Should have raised %r, but raised %r"%(etype.__name__, e.ename)) | ||
MinRK
|
r3638 | else: | ||
self.fail("should have raised a RemoteError") | ||||
MinRK
|
r7054 | def _wait_for(self, f, timeout=10): | ||
"""wait for a condition""" | ||||
tic = time.time() | ||||
while time.time() <= tic + timeout: | ||||
if f(): | ||||
return | ||||
time.sleep(0.1) | ||||
self.client.spin() | ||||
if not f(): | ||||
Matthias BUSSONNIER
|
r7817 | print("Warning: Awaited condition never arrived") | ||
MinRK
|
r7054 | |||
MinRK
|
r3595 | def setUp(self): | ||
BaseZMQTestCase.setUp(self) | ||||
self.client = self.connect_client() | ||||
MinRK
|
r3775 | # start every test with clean engine namespaces: | ||
self.client.clear(block=True) | ||||
MinRK
|
r3595 | self.base_engine_count=len(self.client.ids) | ||
self.engines=[] | ||||
MinRK
|
r3654 | def tearDown(self): | ||
MinRK
|
r3664 | # self.client.clear(block=True) | ||
MinRK
|
r3658 | # close fds: | ||
MinRK
|
r3778 | for e in filter(lambda e: e.poll() is not None, launchers): | ||
launchers.remove(e) | ||||
MinRK
|
r3658 | |||
MinRK
|
r3664 | # allow flushing of incoming messages to prevent crash on socket close | ||
self.client.wait(timeout=2) | ||||
# time.sleep(2) | ||||
self.client.spin() | ||||
MinRK
|
r3654 | self.client.close() | ||
BaseZMQTestCase.tearDown(self) | ||||
MinRK
|
r3664 | # this will be redundant when pyzmq merges PR #88 | ||
# self.context.term() | ||||
MinRK
|
r3661 | # print tempfile.TemporaryFile().fileno(), | ||
# sys.stdout.flush() | ||||
Matthias BUSSONNIER
|
r7817 | |||