##// END OF EJS Templates
shim out IPython.parallel into ipython_parallel
shim out IPython.parallel into ipython_parallel

File last commit:

r20858:330bfc56
r20858:330bfc56
Show More
clienttest.py
192 lines | 5.7 KiB | text/x-python | PythonLexer
MinRK
update recently changed modules with Authors in docstring
r4018 """base class for parallel client tests
Authors:
* Min RK
"""
MinRK
update API after sagedays29...
r3664
#-------------------------------------------------------------------------------
# Copyright (C) 2011 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-------------------------------------------------------------------------------
Matthias BUSSONNIER
use print function in module with `print >>`
r7817 from __future__ import print_function
MinRK
update API after sagedays29...
r3664
MinRK
update connections and diagrams for reduced sockets
r3658 import sys
import tempfile
MinRK
added preliminary tests for zmq.parallel
r3595 import time
MinRK
some initial tests for newparallel
r3637 from nose import SkipTest
MinRK
update API after sagedays29...
r3664 import zmq
MinRK
added preliminary tests for zmq.parallel
r3595 from zmq.tests import BaseZMQTestCase
MinRK
remove decorator from external
r20813 from decorator import decorator
MinRK
some initial tests for newparallel
r3637
MinRK
move IPython.zmq.parallel to IPython.parallel
r3666 from IPython.parallel import error
MinRK
organize IPython.parallel into subpackages
r3673 from IPython.parallel import Client
MinRK
improve process cleanup on Windows...
r3778
from IPython.parallel.tests import launchers, add_engines
MinRK
some initial tests for newparallel
r3637
# simple tasks for use in apply tests
def segfault():
MinRK
testing fixes
r3641 """this will segfault"""
MinRK
some initial tests for newparallel
r3637 import ctypes
ctypes.memset(-1,0,1)
MinRK
use crash that will also kill a Windows engine...
r3776 def crash():
"""from stdlib crashers in the test suite"""
import types
if sys.platform.startswith('win'):
import ctypes
ctypes.windll.kernel32.SetErrorMode(0x0002);
MinRK
update parallel code for py3k...
r4155 args = [ 0, 0, 0, 0, b'\x04\x71\x00\x00', (), (), (), '', '', 1, b'']
if sys.version_info[0] >= 3:
MinRK
cleanup per review...
r4161 # Python3 adds 'kwonlyargcount' as the second argument to Code
MinRK
update parallel code for py3k...
r4155 args.insert(1, 0)
co = types.CodeType(*args)
MinRK
use crash that will also kill a Windows engine...
r3776 exec(co)
MinRK
some initial tests for newparallel
r3637 def wait(n):
"""sleep for a time"""
import time
time.sleep(n)
return n
def raiser(eclass):
"""raise an exception"""
raise eclass()
MinRK
move some general parallel test utilities to clienttest
r7054 def generate_output():
"""function for testing output
publishes two outputs of each type, and returns
a rich displayable object.
"""
import sys
from IPython.core.display import display, HTML, Math
Matthias BUSSONNIER
use print function in module with `print >>`
r7817 print("stdout")
print("stderr", file=sys.stderr)
MinRK
move some general parallel test utilities to clienttest
r7054
display(HTML("<b>HTML</b>"))
Matthias BUSSONNIER
use print function in module with `print >>`
r7817 print("stdout2")
print("stderr2", file=sys.stderr)
MinRK
move some general parallel test utilities to clienttest
r7054
display(Math(r"\alpha=\beta"))
return Math("42")
MinRK
some initial tests for newparallel
r3637 # test decorator for skipping tests when libraries are unavailable
def skip_without(*names):
"""skip a test if some names are not importable"""
@decorator
def skip_without_names(f, *args, **kwargs):
"""decorator to skip tests in the absence of numpy."""
for name in names:
try:
__import__(name)
except ImportError:
raise SkipTest
return f(*args, **kwargs)
return skip_without_names
MinRK
added preliminary tests for zmq.parallel
r3595
MinRK
move some general parallel test utilities to clienttest
r7054 #-------------------------------------------------------------------------------
# Classes
#-------------------------------------------------------------------------------
MinRK
added preliminary tests for zmq.parallel
r3595 class ClusterTestCase(BaseZMQTestCase):
MinRK
change default timeout to 10s from infinite in parallel tests...
r12784 timeout = 10
MinRK
added preliminary tests for zmq.parallel
r3595
MinRK
some initial tests for newparallel
r3637 def add_engines(self, n=1, block=True):
MinRK
added preliminary tests for zmq.parallel
r3595 """add multiple engines to our cluster"""
MinRK
update API after sagedays29...
r3664 self.engines.extend(add_engines(n))
MinRK
some initial tests for newparallel
r3637 if block:
self.wait_on_engines()
MinRK
expedite IPython.parallel tests...
r6162
def minimum_engines(self, n=1, block=True):
"""add engines until there are at least n connected"""
self.engines.extend(add_engines(n, total=True))
if block:
self.wait_on_engines()
MinRK
added preliminary tests for zmq.parallel
r3595
MinRK
some initial tests for newparallel
r3637 def wait_on_engines(self, timeout=5):
MinRK
added preliminary tests for zmq.parallel
r3595 """wait for our engines to connect."""
MinRK
some initial tests for newparallel
r3637 n = len(self.engines)+self.base_engine_count
tic = time.time()
while time.time()-tic < timeout and len(self.client.ids) < n:
MinRK
added preliminary tests for zmq.parallel
r3595 time.sleep(0.1)
MinRK
some initial tests for newparallel
r3637
MinRK
update connections and diagrams for reduced sockets
r3658 assert not len(self.client.ids) < n, "waiting for engines timed out"
MinRK
added preliminary tests for zmq.parallel
r3595
MinRK
change default timeout to 10s from infinite in parallel tests...
r12784 def client_wait(self, client, jobs=None, timeout=-1):
"""my wait wrapper, sets a default finite timeout to avoid hangs"""
if timeout < 0:
timeout = self.timeout
return Client.wait(client, jobs, timeout)
MinRK
some initial tests for newparallel
r3637 def connect_client(self):
MinRK
added preliminary tests for zmq.parallel
r3595 """connect a client with my Context, and track its sockets for cleanup"""
MinRK
update API after sagedays29...
r3664 c = Client(profile='iptest', context=self.context)
MinRK
change default timeout to 10s from infinite in parallel tests...
r12784 c.wait = lambda *a, **kw: self.client_wait(c, *a, **kw)
MinRK
update API after sagedays29...
r3664 for name in filter(lambda n:n.endswith('socket'), dir(c)):
s = getattr(c, name)
s.setsockopt(zmq.LINGER, 0)
self.sockets.append(s)
MinRK
added preliminary tests for zmq.parallel
r3595 return c
MinRK
fix/test pushed function globals
r3638 def assertRaisesRemote(self, etype, f, *args, **kwargs):
try:
MinRK
testing fixes
r3641 try:
f(*args, **kwargs)
except error.CompositeError as e:
e.raise_exception()
MinRK
fix/test pushed function globals
r3638 except error.RemoteError as e:
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(etype.__name__, e.ename, "Should have raised %r, but raised %r"%(etype.__name__, e.ename))
MinRK
fix/test pushed function globals
r3638 else:
self.fail("should have raised a RemoteError")
MinRK
move some general parallel test utilities to clienttest
r7054 def _wait_for(self, f, timeout=10):
"""wait for a condition"""
tic = time.time()
while time.time() <= tic + timeout:
if f():
return
time.sleep(0.1)
self.client.spin()
if not f():
Matthias BUSSONNIER
use print function in module with `print >>`
r7817 print("Warning: Awaited condition never arrived")
MinRK
move some general parallel test utilities to clienttest
r7054
MinRK
added preliminary tests for zmq.parallel
r3595 def setUp(self):
BaseZMQTestCase.setUp(self)
self.client = self.connect_client()
MinRK
Start each test with clear engine namespaces...
r3775 # start every test with clean engine namespaces:
self.client.clear(block=True)
MinRK
added preliminary tests for zmq.parallel
r3595 self.base_engine_count=len(self.client.ids)
self.engines=[]
MinRK
add message tracking to client, add/improve tests
r3654 def tearDown(self):
MinRK
update API after sagedays29...
r3664 # self.client.clear(block=True)
MinRK
update connections and diagrams for reduced sockets
r3658 # close fds:
MinRK
improve process cleanup on Windows...
r3778 for e in filter(lambda e: e.poll() is not None, launchers):
launchers.remove(e)
MinRK
update connections and diagrams for reduced sockets
r3658
MinRK
update API after sagedays29...
r3664 # allow flushing of incoming messages to prevent crash on socket close
self.client.wait(timeout=2)
# time.sleep(2)
self.client.spin()
MinRK
add message tracking to client, add/improve tests
r3654 self.client.close()
BaseZMQTestCase.tearDown(self)
MinRK
update API after sagedays29...
r3664 # this will be redundant when pyzmq merges PR #88
# self.context.term()
MinRK
pyzmq-2.1.3 related testing adjustments
r3661 # print tempfile.TemporaryFile().fileno(),
# sys.stdout.flush()
Matthias BUSSONNIER
use print function in module with `print >>`
r7817