##// END OF EJS Templates
use quarter-second heartbeat period in IPython.parallel tests...
MinRK -
Show More
@@ -1,111 +1,111 b''
1 1 """toplevel setup/teardown for parallel tests."""
2 2
3 3 #-------------------------------------------------------------------------------
4 4 # Copyright (C) 2011 The IPython Development Team
5 5 #
6 6 # Distributed under the terms of the BSD License. The full license is in
7 7 # the file COPYING, distributed as part of this software.
8 8 #-------------------------------------------------------------------------------
9 9
10 10 #-------------------------------------------------------------------------------
11 11 # Imports
12 12 #-------------------------------------------------------------------------------
13 13
14 14 import os
15 15 import tempfile
16 16 import time
17 17 from subprocess import Popen
18 18
19 19 from IPython.utils.path import get_ipython_dir
20 20 from IPython.parallel import Client
21 21 from IPython.parallel.apps.launcher import (LocalProcessLauncher,
22 22 ipengine_cmd_argv,
23 23 ipcontroller_cmd_argv,
24 24 SIGKILL)
25 25
26 26 # globals
27 27 launchers = []
28 28 blackhole = open(os.devnull, 'w')
29 29
30 30 # Launcher class
31 31 class TestProcessLauncher(LocalProcessLauncher):
32 32 """subclass LocalProcessLauncher, to prevent extra sockets and threads being created on Windows"""
33 33 def start(self):
34 34 if self.state == 'before':
35 35 self.process = Popen(self.args,
36 36 stdout=blackhole, stderr=blackhole,
37 37 env=os.environ,
38 38 cwd=self.work_dir
39 39 )
40 40 self.notify_start(self.process.pid)
41 41 self.poll = self.process.poll
42 42 else:
43 43 s = 'The process was already started and has state: %r' % self.state
44 44 raise ProcessStateError(s)
45 45
46 46 # nose setup/teardown
47 47
48 48 def setup():
49 49 cluster_dir = os.path.join(get_ipython_dir(), 'profile_iptest')
50 50 engine_json = os.path.join(cluster_dir, 'security', 'ipcontroller-engine.json')
51 51 client_json = os.path.join(cluster_dir, 'security', 'ipcontroller-client.json')
52 52 for json in (engine_json, client_json):
53 53 if os.path.exists(json):
54 54 os.remove(json)
55 55
56 56 cp = TestProcessLauncher()
57 57 cp.cmd_and_args = ipcontroller_cmd_argv + \
58 ['--profile=iptest', '--log-level=50']
58 ['--profile=iptest', '--log-level=50', '--ping=250']
59 59 cp.start()
60 60 launchers.append(cp)
61 61 tic = time.time()
62 62 while not os.path.exists(engine_json) or not os.path.exists(client_json):
63 63 if cp.poll() is not None:
64 64 print cp.poll()
65 65 raise RuntimeError("The test controller failed to start.")
66 66 elif time.time()-tic > 10:
67 67 raise RuntimeError("Timeout waiting for the test controller to start.")
68 68 time.sleep(0.1)
69 69 add_engines(1)
70 70
71 71 def add_engines(n=1, profile='iptest'):
72 72 rc = Client(profile=profile)
73 73 base = len(rc)
74 74 eps = []
75 75 for i in range(n):
76 76 ep = TestProcessLauncher()
77 77 ep.cmd_and_args = ipengine_cmd_argv + ['--profile=%s'%profile, '--log-level=50']
78 78 ep.start()
79 79 launchers.append(ep)
80 80 eps.append(ep)
81 81 tic = time.time()
82 82 while len(rc) < base+n:
83 83 if any([ ep.poll() is not None for ep in eps ]):
84 84 raise RuntimeError("A test engine failed to start.")
85 85 elif time.time()-tic > 10:
86 86 raise RuntimeError("Timeout waiting for engines to connect.")
87 87 time.sleep(.1)
88 88 rc.spin()
89 89 rc.close()
90 90 return eps
91 91
92 92 def teardown():
93 93 time.sleep(1)
94 94 while launchers:
95 95 p = launchers.pop()
96 96 if p.poll() is None:
97 97 try:
98 98 p.stop()
99 99 except Exception, e:
100 100 print e
101 101 pass
102 102 if p.poll() is None:
103 103 time.sleep(.25)
104 104 if p.poll() is None:
105 105 try:
106 106 print 'cleaning up test process...'
107 107 p.signal(SIGKILL)
108 108 except:
109 109 print "couldn't shutdown process: ", p
110 110 blackhole.close()
111 111
General Comments 0
You need to be logged in to leave comments. Login now