"""toplevel setup/teardown for parallel tests.""" #------------------------------------------------------------------------------- # Copyright (C) 2011 The IPython Development Team # # Distributed under the terms of the BSD License. The full license is in # the file COPYING, distributed as part of this software. #------------------------------------------------------------------------------- #------------------------------------------------------------------------------- # Imports #------------------------------------------------------------------------------- import os import tempfile import time from subprocess import Popen from IPython.utils.path import get_ipython_dir from IPython.parallel import Client from IPython.parallel.apps.launcher import (LocalProcessLauncher, ipengine_cmd_argv, ipcontroller_cmd_argv, SIGKILL) # globals launchers = [] blackhole = open(os.devnull, 'w') # Launcher class class TestProcessLauncher(LocalProcessLauncher): """subclass LocalProcessLauncher, to prevent extra sockets and threads being created on Windows""" def start(self): if self.state == 'before': self.process = Popen(self.args, stdout=blackhole, stderr=blackhole, env=os.environ, cwd=self.work_dir ) self.notify_start(self.process.pid) self.poll = self.process.poll else: s = 'The process was already started and has state: %r' % self.state raise ProcessStateError(s) # nose setup/teardown def setup(): cp = TestProcessLauncher() cp.cmd_and_args = ipcontroller_cmd_argv + \ ['--profile', 'iptest', '--log-level', '99', '-r', '--usethreads'] cp.start() launchers.append(cp) cluster_dir = os.path.join(get_ipython_dir(), 'cluster_iptest') engine_json = os.path.join(cluster_dir, 'security', 'ipcontroller-engine.json') client_json = os.path.join(cluster_dir, 'security', 'ipcontroller-client.json') tic = time.time() while not os.path.exists(engine_json) or not os.path.exists(client_json): if cp.poll() is not None: print cp.poll() raise RuntimeError("The test controller failed to start.") elif time.time()-tic > 10: raise RuntimeError("Timeout waiting for the test controller to start.") time.sleep(0.1) add_engines(1) def add_engines(n=1, profile='iptest'): rc = Client(profile=profile) base = len(rc) eps = [] for i in range(n): ep = TestProcessLauncher() ep.cmd_and_args = ipengine_cmd_argv + ['--profile', profile, '--log-level', '99'] ep.start() launchers.append(ep) eps.append(ep) tic = time.time() while len(rc) < base+n: if any([ ep.poll() is not None for ep in eps ]): raise RuntimeError("A test engine failed to start.") elif time.time()-tic > 10: raise RuntimeError("Timeout waiting for engines to connect.") time.sleep(.1) rc.spin() rc.close() return eps def teardown(): time.sleep(1) while launchers: p = launchers.pop() if p.poll() is None: try: p.stop() except Exception, e: print e pass if p.poll() is None: time.sleep(.25) if p.poll() is None: try: print 'cleaning up test process...' p.signal(SIGKILL) except: print "couldn't shutdown process: ", p blackhole.close()