##// END OF EJS Templates
add timeout for unmet dependencies in task scheduler
add timeout for unmet dependencies in task scheduler

File last commit:

r3595:39eab52f
r3611:23d19065
Show More
clienttest.py
54 lines | 2.0 KiB | text/x-python | PythonLexer
import time
from signal import SIGINT
from multiprocessing import Process
from zmq.tests import BaseZMQTestCase
from IPython.zmq.parallel.ipcluster import launch_process
from IPython.zmq.parallel.entry_point import select_random_ports
from IPython.zmq.parallel.client import Client
from IPython.zmq.parallel.tests import cluster_logs,add_engine
class ClusterTestCase(BaseZMQTestCase):
def add_engines(self, n=1):
"""add multiple engines to our cluster"""
for i in range(n):
self.engines.append(add_engine())
def wait_on_engines(self):
"""wait for our engines to connect."""
while len(self.client.ids) < len(self.engines)+self.base_engine_count:
time.sleep(0.1)
def start_cluster(self, n=1):
"""start a cluster"""
raise NotImplementedError("Don't use this anymore")
rport = select_random_ports(1)[0]
args = [ '--regport', str(rport), '--ip', '127.0.0.1' ]
cp = launch_process('controller', args)
eps = [ launch_process('engine', args+['--ident', 'engine-%i'%i]) for i in range(n) ]
return rport, args, cp, eps
def connect_client(self, port=None):
"""connect a client with my Context, and track its sockets for cleanup"""
if port is None:
port = cluster_logs['regport']
c = Client('tcp://127.0.0.1:%i'%port,context=self.context)
for name in filter(lambda n:n.endswith('socket'), dir(c)):
self.sockets.append(getattr(c, name))
return c
def setUp(self):
BaseZMQTestCase.setUp(self)
self.client = self.connect_client()
self.base_engine_count=len(self.client.ids)
self.engines=[]
def tearDown(self):
[ e.terminate() for e in filter(lambda e: e.poll() is None, self.engines) ]
# while len(self.client.ids) > self.base_engine_count:
# time.sleep(.1)
del self.engines
BaseZMQTestCase.tearDown(self)