##// END OF EJS Templates
improve process cleanup on Windows...
MinRK -
Show More
@@ -21,13 +21,21 b' import os'
21 import re
21 import re
22 import stat
22 import stat
23
23
24 # signal imports, handling various platforms, versions
25
24 from signal import SIGINT, SIGTERM
26 from signal import SIGINT, SIGTERM
25 try:
27 try:
26 from signal import SIGKILL
28 from signal import SIGKILL
27 except ImportError:
29 except ImportError:
28 # windows
30 # Windows
29 SIGKILL=SIGTERM
31 SIGKILL=SIGTERM
30
32
33 try:
34 # Windows >= 2.7, 3.2
35 from signal import CTRL_C_EVENT as SIGINT
36 except ImportError:
37 pass
38
31 from subprocess import Popen, PIPE, STDOUT
39 from subprocess import Popen, PIPE, STDOUT
32 try:
40 try:
33 from subprocess import check_output
41 from subprocess import check_output
@@ -51,24 +59,10 b' from IPython.parallel.factory import LoggingFactory'
51
59
52 from .win32support import forward_read_events
60 from .win32support import forward_read_events
53
61
54 # load winhpcjob only on Windows
62 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
55 try:
56 from .winhpcjob import (
57 IPControllerTask, IPEngineTask,
58 IPControllerJob, IPEngineSetJob
59 )
60 except ImportError:
61 pass
62
63
63 WINDOWS = os.name == 'nt'
64 WINDOWS = os.name == 'nt'
64
65
65 if WINDOWS:
66 try:
67 # >= 2.7, 3.2
68 from signal import CTRL_C_EVENT as SIGINT
69 except ImportError:
70 pass
71
72 #-----------------------------------------------------------------------------
66 #-----------------------------------------------------------------------------
73 # Paths to the kernel apps
67 # Paths to the kernel apps
74 #-----------------------------------------------------------------------------
68 #-----------------------------------------------------------------------------
@@ -282,11 +276,19 b' class LocalProcessLauncher(BaseLauncher):'
282
276
283 def signal(self, sig):
277 def signal(self, sig):
284 if self.state == 'running':
278 if self.state == 'running':
285 self.process.send_signal(sig)
279 if WINDOWS and sig != SIGINT:
280 # use Windows tree-kill for better child cleanup
281 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
282 else:
283 self.process.send_signal(sig)
286
284
287 def interrupt_then_kill(self, delay=2.0):
285 def interrupt_then_kill(self, delay=2.0):
288 """Send INT, wait a delay and then send KILL."""
286 """Send INT, wait a delay and then send KILL."""
289 self.signal(SIGINT)
287 try:
288 self.signal(SIGINT)
289 except Exception:
290 self.log.debug("interrupt failed")
291 pass
290 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
292 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
291 self.killer.start()
293 self.killer.start()
292
294
@@ -16,8 +16,6 b' Job and task components for writing .xml files that the Windows HPC Server'
16 # Imports
16 # Imports
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18
18
19 from __future__ import with_statement
20
21 import os
19 import os
22 import re
20 import re
23 import uuid
21 import uuid
@@ -14,21 +14,46 b''
14 import os
14 import os
15 import tempfile
15 import tempfile
16 import time
16 import time
17 from subprocess import Popen, PIPE, STDOUT
17 from subprocess import Popen
18
18
19 from IPython.utils.path import get_ipython_dir
19 from IPython.utils.path import get_ipython_dir
20 from IPython.parallel import Client
20 from IPython.parallel import Client
21 from IPython.parallel.apps.launcher import (LocalProcessLauncher,
22 ipengine_cmd_argv,
23 ipcontroller_cmd_argv,
24 SIGKILL)
21
25
22 processes = []
26 # globals
23 blackhole = tempfile.TemporaryFile()
27 launchers = []
28 blackhole = open(os.devnull, 'w')
29
30 # Launcher class
31 class TestProcessLauncher(LocalProcessLauncher):
32 """subclass LocalProcessLauncher, to prevent extra sockets and threads being created on Windows"""
33 def start(self):
34 if self.state == 'before':
35 self.process = Popen(self.args,
36 stdout=blackhole, stderr=blackhole,
37 env=os.environ,
38 cwd=self.work_dir
39 )
40 self.notify_start(self.process.pid)
41 self.poll = self.process.poll
42 else:
43 s = 'The process was already started and has state: %r' % self.state
44 raise ProcessStateError(s)
24
45
25 # nose setup/teardown
46 # nose setup/teardown
26
47
27 def setup():
48 def setup():
28 cp = Popen('ipcontroller --profile iptest -r --log-level 10 --log-to-file --usethreads'.split(), stdout=blackhole, stderr=STDOUT)
49 cp = TestProcessLauncher()
29 processes.append(cp)
50 cp.cmd_and_args = ipcontroller_cmd_argv + \
30 engine_json = os.path.join(get_ipython_dir(), 'cluster_iptest', 'security', 'ipcontroller-engine.json')
51 ['--profile', 'iptest', '--log-level', '99', '-r', '--usethreads']
31 client_json = os.path.join(get_ipython_dir(), 'cluster_iptest', 'security', 'ipcontroller-client.json')
52 cp.start()
53 launchers.append(cp)
54 cluster_dir = os.path.join(get_ipython_dir(), 'cluster_iptest')
55 engine_json = os.path.join(cluster_dir, 'security', 'ipcontroller-engine.json')
56 client_json = os.path.join(cluster_dir, 'security', 'ipcontroller-client.json')
32 tic = time.time()
57 tic = time.time()
33 while not os.path.exists(engine_json) or not os.path.exists(client_json):
58 while not os.path.exists(engine_json) or not os.path.exists(client_json):
34 if cp.poll() is not None:
59 if cp.poll() is not None:
@@ -44,9 +69,10 b" def add_engines(n=1, profile='iptest'):"
44 base = len(rc)
69 base = len(rc)
45 eps = []
70 eps = []
46 for i in range(n):
71 for i in range(n):
47 ep = Popen(['ipengine']+ ['--profile', profile, '--log-level', '10', '--log-to-file'], stdout=blackhole, stderr=STDOUT)
72 ep = TestProcessLauncher()
48 # ep.start()
73 ep.cmd_and_args = ipengine_cmd_argv + ['--profile', profile, '--log-level', '99']
49 processes.append(ep)
74 ep.start()
75 launchers.append(ep)
50 eps.append(ep)
76 eps.append(ep)
51 tic = time.time()
77 tic = time.time()
52 while len(rc) < base+n:
78 while len(rc) < base+n:
@@ -61,11 +87,11 b" def add_engines(n=1, profile='iptest'):"
61
87
62 def teardown():
88 def teardown():
63 time.sleep(1)
89 time.sleep(1)
64 while processes:
90 while launchers:
65 p = processes.pop()
91 p = launchers.pop()
66 if p.poll() is None:
92 if p.poll() is None:
67 try:
93 try:
68 p.terminate()
94 p.stop()
69 except Exception, e:
95 except Exception, e:
70 print e
96 print e
71 pass
97 pass
@@ -73,8 +99,9 b' def teardown():'
73 time.sleep(.25)
99 time.sleep(.25)
74 if p.poll() is None:
100 if p.poll() is None:
75 try:
101 try:
76 print 'killing'
102 print 'cleaning up test process...'
77 p.kill()
103 p.signal(SIGKILL)
78 except:
104 except:
79 print "couldn't shutdown process: ", p
105 print "couldn't shutdown process: ", p
106 blackhole.close()
80
107
@@ -20,7 +20,8 b' from IPython.external.decorator import decorator'
20
20
21 from IPython.parallel import error
21 from IPython.parallel import error
22 from IPython.parallel import Client
22 from IPython.parallel import Client
23 from IPython.parallel.tests import processes,add_engines
23
24 from IPython.parallel.tests import launchers, add_engines
24
25
25 # simple tasks for use in apply tests
26 # simple tasks for use in apply tests
26
27
@@ -112,8 +113,8 b' class ClusterTestCase(BaseZMQTestCase):'
112 def tearDown(self):
113 def tearDown(self):
113 # self.client.clear(block=True)
114 # self.client.clear(block=True)
114 # close fds:
115 # close fds:
115 for e in filter(lambda e: e.poll() is not None, processes):
116 for e in filter(lambda e: e.poll() is not None, launchers):
116 processes.remove(e)
117 launchers.remove(e)
117
118
118 # allow flushing of incoming messages to prevent crash on socket close
119 # allow flushing of incoming messages to prevent crash on socket close
119 self.client.wait(timeout=2)
120 self.client.wait(timeout=2)
@@ -38,7 +38,6 b' class AsyncResultTest(ClusterTestCase):'
38
38
39 def test_get_after_done(self):
39 def test_get_after_done(self):
40 ar = self.client[-1].apply_async(lambda : 42)
40 ar = self.client[-1].apply_async(lambda : 42)
41 self.assertFalse(ar.ready())
42 ar.wait()
41 ar.wait()
43 self.assertTrue(ar.ready())
42 self.assertTrue(ar.ready())
44 self.assertEquals(ar.get(), 42)
43 self.assertEquals(ar.get(), 42)
General Comments 0
You need to be logged in to leave comments. Login now