From 1a93dba9e49c6641ec80d6c7733added7f8b214c 2011-04-20 21:02:01 From: MinRK Date: 2011-04-20 21:02:01 Subject: [PATCH] improve process cleanup on Windows * use system taskkill call to shutdown process trees on Windows * use apps.launchers in tests, to make use of the fix Also remove unnecessary and sometimes wrong assert in test_asyncresult As of this commit, all parallel tests pass on Windows --- diff --git a/IPython/parallel/apps/launcher.py b/IPython/parallel/apps/launcher.py index f5b7d7f..171a248 100644 --- a/IPython/parallel/apps/launcher.py +++ b/IPython/parallel/apps/launcher.py @@ -21,13 +21,21 @@ import os import re import stat +# signal imports, handling various platforms, versions + from signal import SIGINT, SIGTERM try: from signal import SIGKILL except ImportError: - # windows + # Windows SIGKILL=SIGTERM +try: + # Windows >= 2.7, 3.2 + from signal import CTRL_C_EVENT as SIGINT +except ImportError: + pass + from subprocess import Popen, PIPE, STDOUT try: from subprocess import check_output @@ -51,24 +59,10 @@ from IPython.parallel.factory import LoggingFactory from .win32support import forward_read_events -# load winhpcjob only on Windows -try: - from .winhpcjob import ( - IPControllerTask, IPEngineTask, - IPControllerJob, IPEngineSetJob - ) -except ImportError: - pass +from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob WINDOWS = os.name == 'nt' -if WINDOWS: - try: - # >= 2.7, 3.2 - from signal import CTRL_C_EVENT as SIGINT - except ImportError: - pass - #----------------------------------------------------------------------------- # Paths to the kernel apps #----------------------------------------------------------------------------- @@ -282,11 +276,19 @@ class LocalProcessLauncher(BaseLauncher): def signal(self, sig): if self.state == 'running': - self.process.send_signal(sig) + if WINDOWS and sig != SIGINT: + # use Windows tree-kill for better child cleanup + check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f']) + else: + self.process.send_signal(sig) def interrupt_then_kill(self, delay=2.0): """Send INT, wait a delay and then send KILL.""" - self.signal(SIGINT) + try: + self.signal(SIGINT) + except Exception: + self.log.debug("interrupt failed") + pass self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop) self.killer.start() diff --git a/IPython/parallel/apps/winhpcjob.py b/IPython/parallel/apps/winhpcjob.py index c9219e2..492df58 100644 --- a/IPython/parallel/apps/winhpcjob.py +++ b/IPython/parallel/apps/winhpcjob.py @@ -16,8 +16,6 @@ Job and task components for writing .xml files that the Windows HPC Server # Imports #----------------------------------------------------------------------------- -from __future__ import with_statement - import os import re import uuid diff --git a/IPython/parallel/tests/__init__.py b/IPython/parallel/tests/__init__.py index 23d9ef9..0b1e013 100644 --- a/IPython/parallel/tests/__init__.py +++ b/IPython/parallel/tests/__init__.py @@ -14,21 +14,46 @@ import os import tempfile import time -from subprocess import Popen, PIPE, STDOUT +from subprocess import Popen from IPython.utils.path import get_ipython_dir from IPython.parallel import Client +from IPython.parallel.apps.launcher import (LocalProcessLauncher, + ipengine_cmd_argv, + ipcontroller_cmd_argv, + SIGKILL) -processes = [] -blackhole = tempfile.TemporaryFile() +# globals +launchers = [] +blackhole = open(os.devnull, 'w') + +# Launcher class +class TestProcessLauncher(LocalProcessLauncher): + """subclass LocalProcessLauncher, to prevent extra sockets and threads being created on Windows""" + def start(self): + if self.state == 'before': + self.process = Popen(self.args, + stdout=blackhole, stderr=blackhole, + env=os.environ, + cwd=self.work_dir + ) + self.notify_start(self.process.pid) + self.poll = self.process.poll + else: + s = 'The process was already started and has state: %r' % self.state + raise ProcessStateError(s) # nose setup/teardown def setup(): - cp = Popen('ipcontroller --profile iptest -r --log-level 10 --log-to-file --usethreads'.split(), stdout=blackhole, stderr=STDOUT) - processes.append(cp) - engine_json = os.path.join(get_ipython_dir(), 'cluster_iptest', 'security', 'ipcontroller-engine.json') - client_json = os.path.join(get_ipython_dir(), 'cluster_iptest', 'security', 'ipcontroller-client.json') + cp = TestProcessLauncher() + cp.cmd_and_args = ipcontroller_cmd_argv + \ + ['--profile', 'iptest', '--log-level', '99', '-r', '--usethreads'] + cp.start() + launchers.append(cp) + cluster_dir = os.path.join(get_ipython_dir(), 'cluster_iptest') + engine_json = os.path.join(cluster_dir, 'security', 'ipcontroller-engine.json') + client_json = os.path.join(cluster_dir, 'security', 'ipcontroller-client.json') tic = time.time() while not os.path.exists(engine_json) or not os.path.exists(client_json): if cp.poll() is not None: @@ -44,9 +69,10 @@ def add_engines(n=1, profile='iptest'): base = len(rc) eps = [] for i in range(n): - ep = Popen(['ipengine']+ ['--profile', profile, '--log-level', '10', '--log-to-file'], stdout=blackhole, stderr=STDOUT) - # ep.start() - processes.append(ep) + ep = TestProcessLauncher() + ep.cmd_and_args = ipengine_cmd_argv + ['--profile', profile, '--log-level', '99'] + ep.start() + launchers.append(ep) eps.append(ep) tic = time.time() while len(rc) < base+n: @@ -61,11 +87,11 @@ def add_engines(n=1, profile='iptest'): def teardown(): time.sleep(1) - while processes: - p = processes.pop() + while launchers: + p = launchers.pop() if p.poll() is None: try: - p.terminate() + p.stop() except Exception, e: print e pass @@ -73,8 +99,9 @@ def teardown(): time.sleep(.25) if p.poll() is None: try: - print 'killing' - p.kill() + print 'cleaning up test process...' + p.signal(SIGKILL) except: print "couldn't shutdown process: ", p + blackhole.close() diff --git a/IPython/parallel/tests/clienttest.py b/IPython/parallel/tests/clienttest.py index 8cee15f..b6c2ed0 100644 --- a/IPython/parallel/tests/clienttest.py +++ b/IPython/parallel/tests/clienttest.py @@ -20,7 +20,8 @@ from IPython.external.decorator import decorator from IPython.parallel import error from IPython.parallel import Client -from IPython.parallel.tests import processes,add_engines + +from IPython.parallel.tests import launchers, add_engines # simple tasks for use in apply tests @@ -112,8 +113,8 @@ class ClusterTestCase(BaseZMQTestCase): def tearDown(self): # self.client.clear(block=True) # close fds: - for e in filter(lambda e: e.poll() is not None, processes): - processes.remove(e) + for e in filter(lambda e: e.poll() is not None, launchers): + launchers.remove(e) # allow flushing of incoming messages to prevent crash on socket close self.client.wait(timeout=2) diff --git a/IPython/parallel/tests/test_asyncresult.py b/IPython/parallel/tests/test_asyncresult.py index e3ef072..9457033 100644 --- a/IPython/parallel/tests/test_asyncresult.py +++ b/IPython/parallel/tests/test_asyncresult.py @@ -38,7 +38,6 @@ class AsyncResultTest(ClusterTestCase): def test_get_after_done(self): ar = self.client[-1].apply_async(lambda : 42) - self.assertFalse(ar.ready()) ar.wait() self.assertTrue(ar.ready()) self.assertEquals(ar.get(), 42)