##// END OF EJS Templates
improve process cleanup on Windows...
MinRK -
Show More
@@ -21,13 +21,21 b' import os'
21 21 import re
22 22 import stat
23 23
24 # signal imports, handling various platforms, versions
25
24 26 from signal import SIGINT, SIGTERM
25 27 try:
26 28 from signal import SIGKILL
27 29 except ImportError:
28 # windows
30 # Windows
29 31 SIGKILL=SIGTERM
30 32
33 try:
34 # Windows >= 2.7, 3.2
35 from signal import CTRL_C_EVENT as SIGINT
36 except ImportError:
37 pass
38
31 39 from subprocess import Popen, PIPE, STDOUT
32 40 try:
33 41 from subprocess import check_output
@@ -51,24 +59,10 b' from IPython.parallel.factory import LoggingFactory'
51 59
52 60 from .win32support import forward_read_events
53 61
54 # load winhpcjob only on Windows
55 try:
56 from .winhpcjob import (
57 IPControllerTask, IPEngineTask,
58 IPControllerJob, IPEngineSetJob
59 )
60 except ImportError:
61 pass
62 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
62 63
63 64 WINDOWS = os.name == 'nt'
64 65
65 if WINDOWS:
66 try:
67 # >= 2.7, 3.2
68 from signal import CTRL_C_EVENT as SIGINT
69 except ImportError:
70 pass
71
72 66 #-----------------------------------------------------------------------------
73 67 # Paths to the kernel apps
74 68 #-----------------------------------------------------------------------------
@@ -282,11 +276,19 b' class LocalProcessLauncher(BaseLauncher):'
282 276
283 277 def signal(self, sig):
284 278 if self.state == 'running':
279 if WINDOWS and sig != SIGINT:
280 # use Windows tree-kill for better child cleanup
281 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
282 else:
285 283 self.process.send_signal(sig)
286 284
287 285 def interrupt_then_kill(self, delay=2.0):
288 286 """Send INT, wait a delay and then send KILL."""
287 try:
289 288 self.signal(SIGINT)
289 except Exception:
290 self.log.debug("interrupt failed")
291 pass
290 292 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
291 293 self.killer.start()
292 294
@@ -16,8 +16,6 b' Job and task components for writing .xml files that the Windows HPC Server'
16 16 # Imports
17 17 #-----------------------------------------------------------------------------
18 18
19 from __future__ import with_statement
20
21 19 import os
22 20 import re
23 21 import uuid
@@ -14,21 +14,46 b''
14 14 import os
15 15 import tempfile
16 16 import time
17 from subprocess import Popen, PIPE, STDOUT
17 from subprocess import Popen
18 18
19 19 from IPython.utils.path import get_ipython_dir
20 20 from IPython.parallel import Client
21 from IPython.parallel.apps.launcher import (LocalProcessLauncher,
22 ipengine_cmd_argv,
23 ipcontroller_cmd_argv,
24 SIGKILL)
21 25
22 processes = []
23 blackhole = tempfile.TemporaryFile()
26 # globals
27 launchers = []
28 blackhole = open(os.devnull, 'w')
29
30 # Launcher class
31 class TestProcessLauncher(LocalProcessLauncher):
32 """subclass LocalProcessLauncher, to prevent extra sockets and threads being created on Windows"""
33 def start(self):
34 if self.state == 'before':
35 self.process = Popen(self.args,
36 stdout=blackhole, stderr=blackhole,
37 env=os.environ,
38 cwd=self.work_dir
39 )
40 self.notify_start(self.process.pid)
41 self.poll = self.process.poll
42 else:
43 s = 'The process was already started and has state: %r' % self.state
44 raise ProcessStateError(s)
24 45
25 46 # nose setup/teardown
26 47
27 48 def setup():
28 cp = Popen('ipcontroller --profile iptest -r --log-level 10 --log-to-file --usethreads'.split(), stdout=blackhole, stderr=STDOUT)
29 processes.append(cp)
30 engine_json = os.path.join(get_ipython_dir(), 'cluster_iptest', 'security', 'ipcontroller-engine.json')
31 client_json = os.path.join(get_ipython_dir(), 'cluster_iptest', 'security', 'ipcontroller-client.json')
49 cp = TestProcessLauncher()
50 cp.cmd_and_args = ipcontroller_cmd_argv + \
51 ['--profile', 'iptest', '--log-level', '99', '-r', '--usethreads']
52 cp.start()
53 launchers.append(cp)
54 cluster_dir = os.path.join(get_ipython_dir(), 'cluster_iptest')
55 engine_json = os.path.join(cluster_dir, 'security', 'ipcontroller-engine.json')
56 client_json = os.path.join(cluster_dir, 'security', 'ipcontroller-client.json')
32 57 tic = time.time()
33 58 while not os.path.exists(engine_json) or not os.path.exists(client_json):
34 59 if cp.poll() is not None:
@@ -44,9 +69,10 b" def add_engines(n=1, profile='iptest'):"
44 69 base = len(rc)
45 70 eps = []
46 71 for i in range(n):
47 ep = Popen(['ipengine']+ ['--profile', profile, '--log-level', '10', '--log-to-file'], stdout=blackhole, stderr=STDOUT)
48 # ep.start()
49 processes.append(ep)
72 ep = TestProcessLauncher()
73 ep.cmd_and_args = ipengine_cmd_argv + ['--profile', profile, '--log-level', '99']
74 ep.start()
75 launchers.append(ep)
50 76 eps.append(ep)
51 77 tic = time.time()
52 78 while len(rc) < base+n:
@@ -61,11 +87,11 b" def add_engines(n=1, profile='iptest'):"
61 87
62 88 def teardown():
63 89 time.sleep(1)
64 while processes:
65 p = processes.pop()
90 while launchers:
91 p = launchers.pop()
66 92 if p.poll() is None:
67 93 try:
68 p.terminate()
94 p.stop()
69 95 except Exception, e:
70 96 print e
71 97 pass
@@ -73,8 +99,9 b' def teardown():'
73 99 time.sleep(.25)
74 100 if p.poll() is None:
75 101 try:
76 print 'killing'
77 p.kill()
102 print 'cleaning up test process...'
103 p.signal(SIGKILL)
78 104 except:
79 105 print "couldn't shutdown process: ", p
106 blackhole.close()
80 107
@@ -20,7 +20,8 b' from IPython.external.decorator import decorator'
20 20
21 21 from IPython.parallel import error
22 22 from IPython.parallel import Client
23 from IPython.parallel.tests import processes,add_engines
23
24 from IPython.parallel.tests import launchers, add_engines
24 25
25 26 # simple tasks for use in apply tests
26 27
@@ -112,8 +113,8 b' class ClusterTestCase(BaseZMQTestCase):'
112 113 def tearDown(self):
113 114 # self.client.clear(block=True)
114 115 # close fds:
115 for e in filter(lambda e: e.poll() is not None, processes):
116 processes.remove(e)
116 for e in filter(lambda e: e.poll() is not None, launchers):
117 launchers.remove(e)
117 118
118 119 # allow flushing of incoming messages to prevent crash on socket close
119 120 self.client.wait(timeout=2)
@@ -38,7 +38,6 b' class AsyncResultTest(ClusterTestCase):'
38 38
39 39 def test_get_after_done(self):
40 40 ar = self.client[-1].apply_async(lambda : 42)
41 self.assertFalse(ar.ready())
42 41 ar.wait()
43 42 self.assertTrue(ar.ready())
44 43 self.assertEquals(ar.get(), 42)
General Comments 0
You need to be logged in to leave comments. Login now