##// END OF EJS Templates
Fix cleanup of test controller
Thomas Kluyver -
Show More
@@ -1,275 +1,278 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """IPython Test Process Controller
2 """IPython Test Process Controller
3
3
4 This module runs one or more subprocesses which will actually run the IPython
4 This module runs one or more subprocesses which will actually run the IPython
5 test suite.
5 test suite.
6
6
7 """
7 """
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Copyright (C) 2009-2011 The IPython Development Team
10 # Copyright (C) 2009-2011 The IPython Development Team
11 #
11 #
12 # Distributed under the terms of the BSD License. The full license is in
12 # Distributed under the terms of the BSD License. The full license is in
13 # the file COPYING, distributed as part of this software.
13 # the file COPYING, distributed as part of this software.
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15
15
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17 # Imports
17 # Imports
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 from __future__ import print_function
19 from __future__ import print_function
20
20
21 import argparse
21 import argparse
22 import multiprocessing.pool
22 import multiprocessing.pool
23 import os
23 import os
24 import signal
24 import signal
25 import sys
25 import sys
26 import subprocess
26 import subprocess
27 import time
27 import time
28
28
29 from .iptest import have, test_group_names, test_sections
29 from .iptest import have, test_group_names, test_sections
30 from IPython.utils import py3compat
30 from IPython.utils import py3compat
31 from IPython.utils.sysinfo import sys_info
31 from IPython.utils.sysinfo import sys_info
32 from IPython.utils.tempdir import TemporaryDirectory
32 from IPython.utils.tempdir import TemporaryDirectory
33
33
34
34
35 class IPTestController(object):
35 class IPTestController(object):
36 """Run iptest in a subprocess
36 """Run iptest in a subprocess
37 """
37 """
38 #: str, IPython test suite to be executed.
38 #: str, IPython test suite to be executed.
39 section = None
39 section = None
40 #: list, command line arguments to be executed
40 #: list, command line arguments to be executed
41 cmd = None
41 cmd = None
42 #: dict, extra environment variables to set for the subprocess
42 #: dict, extra environment variables to set for the subprocess
43 env = None
43 env = None
44 #: list, TemporaryDirectory instances to clear up when the process finishes
44 #: list, TemporaryDirectory instances to clear up when the process finishes
45 dirs = None
45 dirs = None
46 #: subprocess.Popen instance
46 #: subprocess.Popen instance
47 process = None
47 process = None
48 buffer_output = False
48 buffer_output = False
49
49
50 def __init__(self, section):
50 def __init__(self, section):
51 """Create new test runner."""
51 """Create new test runner."""
52 self.section = section
52 self.section = section
53 self.cmd = [sys.executable, '-m', 'IPython.testing.iptest', section]
53 self.cmd = [sys.executable, '-m', 'IPython.testing.iptest', section]
54 self.env = {}
54 self.env = {}
55 self.dirs = []
55 self.dirs = []
56 ipydir = TemporaryDirectory()
56 ipydir = TemporaryDirectory()
57 self.dirs.append(ipydir)
57 self.dirs.append(ipydir)
58 self.env['IPYTHONDIR'] = ipydir.name
58 self.env['IPYTHONDIR'] = ipydir.name
59 workingdir = TemporaryDirectory()
59 workingdir = TemporaryDirectory()
60 self.dirs.append(workingdir)
60 self.dirs.append(workingdir)
61 self.env['IPTEST_WORKING_DIR'] = workingdir.name
61 self.env['IPTEST_WORKING_DIR'] = workingdir.name
62
62
63 def add_xunit(self):
63 def add_xunit(self):
64 xunit_file = os.path.abspath(self.section + '.xunit.xml')
64 xunit_file = os.path.abspath(self.section + '.xunit.xml')
65 self.cmd.extend(['--with-xunit', '--xunit-file', xunit_file])
65 self.cmd.extend(['--with-xunit', '--xunit-file', xunit_file])
66
66
67 def add_coverage(self, xml=True):
67 def add_coverage(self, xml=True):
68 self.cmd.extend(['--with-coverage', '--cover-package', self.section])
68 self.cmd.extend(['--with-coverage', '--cover-package', self.section])
69 if xml:
69 if xml:
70 coverage_xml = os.path.abspath(self.section + ".coverage.xml")
70 coverage_xml = os.path.abspath(self.section + ".coverage.xml")
71 self.cmd.extend(['--cover-xml', '--cover-xml-file', coverage_xml])
71 self.cmd.extend(['--cover-xml', '--cover-xml-file', coverage_xml])
72
72
73
73
74 def launch(self):
74 def launch(self):
75 # print('*** ENV:', self.env) # dbg
75 # print('*** ENV:', self.env) # dbg
76 # print('*** CMD:', self.cmd) # dbg
76 # print('*** CMD:', self.cmd) # dbg
77 env = os.environ.copy()
77 env = os.environ.copy()
78 env.update(self.env)
78 env.update(self.env)
79 output = subprocess.PIPE if self.buffer_output else None
79 output = subprocess.PIPE if self.buffer_output else None
80 self.process = subprocess.Popen(self.cmd, stdout=output,
80 self.process = subprocess.Popen(self.cmd, stdout=output,
81 stderr=output, env=env)
81 stderr=output, env=env)
82
82
83 def run(self):
83 def run(self):
84 """Run the stored commands"""
84 """Run the stored commands"""
85 try:
85 try:
86 retcode = self._run_cmd()
86 retcode = self._run_cmd()
87 except KeyboardInterrupt:
87 except KeyboardInterrupt:
88 return -signal.SIGINT
88 return -signal.SIGINT
89 except:
89 except:
90 import traceback
90 import traceback
91 traceback.print_exc()
91 traceback.print_exc()
92 return 1 # signal failure
92 return 1 # signal failure
93
93
94 if self.coverage_xml:
94 if self.coverage_xml:
95 subprocess.call(["coverage", "xml", "-o", self.coverage_xml])
95 subprocess.call(["coverage", "xml", "-o", self.coverage_xml])
96 return retcode
96 return retcode
97
97
98 def cleanup(self):
98 def cleanup_process(self):
99 """Cleanup on exit by killing any leftover processes."""
99 """Cleanup on exit by killing any leftover processes."""
100 subp = self.process
100 subp = self.process
101 if subp is None or (subp.poll() is not None):
101 if subp is None or (subp.poll() is not None):
102 return # Process doesn't exist, or is already dead.
102 return # Process doesn't exist, or is already dead.
103
103
104 try:
104 try:
105 print('Cleaning up stale PID: %d' % subp.pid)
105 print('Cleaning up stale PID: %d' % subp.pid)
106 subp.kill()
106 subp.kill()
107 except: # (OSError, WindowsError) ?
107 except: # (OSError, WindowsError) ?
108 # This is just a best effort, if we fail or the process was
108 # This is just a best effort, if we fail or the process was
109 # really gone, ignore it.
109 # really gone, ignore it.
110 pass
110 pass
111 else:
111 else:
112 for i in range(10):
112 for i in range(10):
113 if subp.poll() is None:
113 if subp.poll() is None:
114 time.sleep(0.1)
114 time.sleep(0.1)
115 else:
115 else:
116 break
116 break
117
117
118 if subp.poll() is None:
118 if subp.poll() is None:
119 # The process did not die...
119 # The process did not die...
120 print('... failed. Manual cleanup may be required.')
120 print('... failed. Manual cleanup may be required.')
121
121
122 def cleanup(self):
123 "Kill process if it's still alive, and clean up temporary directories"
124 self.cleanup_process()
122 for td in self.dirs:
125 for td in self.dirs:
123 td.cleanup()
126 td.cleanup()
124
127
125 __del__ = cleanup
128 __del__ = cleanup
126
129
127 def test_controllers_to_run(inc_slow=False):
130 def test_controllers_to_run(inc_slow=False):
128 """Returns an ordered list of IPTestController instances to be run."""
131 """Returns an ordered list of IPTestController instances to be run."""
129 res = []
132 res = []
130 if not inc_slow:
133 if not inc_slow:
131 test_sections['parallel'].enabled = False
134 test_sections['parallel'].enabled = False
132 for name in test_group_names:
135 for name in test_group_names:
133 if test_sections[name].will_run:
136 if test_sections[name].will_run:
134 res.append(IPTestController(name))
137 res.append(IPTestController(name))
135 return res
138 return res
136
139
137 def do_run(controller):
140 def do_run(controller):
138 try:
141 try:
139 try:
142 try:
140 controller.launch()
143 controller.launch()
141 except Exception:
144 except Exception:
142 import traceback
145 import traceback
143 traceback.print_exc()
146 traceback.print_exc()
144 return controller, 1 # signal failure
147 return controller, 1 # signal failure
145
148
146 exitcode = controller.process.wait()
149 exitcode = controller.process.wait()
147 controller.cleanup()
150 controller.cleanup()
148 return controller, exitcode
151 return controller, exitcode
149
152
150 except KeyboardInterrupt:
153 except KeyboardInterrupt:
151 controller.cleanup()
154 controller.cleanup()
152 return controller, -signal.SIGINT
155 return controller, -signal.SIGINT
153
156
154 def report():
157 def report():
155 """Return a string with a summary report of test-related variables."""
158 """Return a string with a summary report of test-related variables."""
156
159
157 out = [ sys_info(), '\n']
160 out = [ sys_info(), '\n']
158
161
159 avail = []
162 avail = []
160 not_avail = []
163 not_avail = []
161
164
162 for k, is_avail in have.items():
165 for k, is_avail in have.items():
163 if is_avail:
166 if is_avail:
164 avail.append(k)
167 avail.append(k)
165 else:
168 else:
166 not_avail.append(k)
169 not_avail.append(k)
167
170
168 if avail:
171 if avail:
169 out.append('\nTools and libraries available at test time:\n')
172 out.append('\nTools and libraries available at test time:\n')
170 avail.sort()
173 avail.sort()
171 out.append(' ' + ' '.join(avail)+'\n')
174 out.append(' ' + ' '.join(avail)+'\n')
172
175
173 if not_avail:
176 if not_avail:
174 out.append('\nTools and libraries NOT available at test time:\n')
177 out.append('\nTools and libraries NOT available at test time:\n')
175 not_avail.sort()
178 not_avail.sort()
176 out.append(' ' + ' '.join(not_avail)+'\n')
179 out.append(' ' + ' '.join(not_avail)+'\n')
177
180
178 return ''.join(out)
181 return ''.join(out)
179
182
180 def run_iptestall(inc_slow=False, jobs=1, xunit=False, coverage=False):
183 def run_iptestall(inc_slow=False, jobs=1, xunit=False, coverage=False):
181 """Run the entire IPython test suite by calling nose and trial.
184 """Run the entire IPython test suite by calling nose and trial.
182
185
183 This function constructs :class:`IPTester` instances for all IPython
186 This function constructs :class:`IPTester` instances for all IPython
184 modules and package and then runs each of them. This causes the modules
187 modules and package and then runs each of them. This causes the modules
185 and packages of IPython to be tested each in their own subprocess using
188 and packages of IPython to be tested each in their own subprocess using
186 nose.
189 nose.
187
190
188 Parameters
191 Parameters
189 ----------
192 ----------
190
193
191 inc_slow : bool, optional
194 inc_slow : bool, optional
192 Include slow tests, like IPython.parallel. By default, these tests aren't
195 Include slow tests, like IPython.parallel. By default, these tests aren't
193 run.
196 run.
194
197
195 fast : bool, option
198 fast : bool, option
196 Run the test suite in parallel, if True, using as many threads as there
199 Run the test suite in parallel, if True, using as many threads as there
197 are processors
200 are processors
198 """
201 """
199 pool = multiprocessing.pool.ThreadPool(jobs)
202 pool = multiprocessing.pool.ThreadPool(jobs)
200 if jobs != 1:
203 if jobs != 1:
201 IPTestController.buffer_output = True
204 IPTestController.buffer_output = True
202
205
203 controllers = test_controllers_to_run(inc_slow=inc_slow)
206 controllers = test_controllers_to_run(inc_slow=inc_slow)
204
207
205 # Run all test runners, tracking execution time
208 # Run all test runners, tracking execution time
206 failed = []
209 failed = []
207 t_start = time.time()
210 t_start = time.time()
208
211
209 print('*'*70)
212 print('*'*70)
210 for (controller, res) in pool.imap_unordered(do_run, controllers):
213 for (controller, res) in pool.imap_unordered(do_run, controllers):
211 tgroup = 'IPython test group: ' + controller.section
214 tgroup = 'IPython test group: ' + controller.section
212 res_string = 'OK' if res == 0 else 'FAILED'
215 res_string = 'OK' if res == 0 else 'FAILED'
213 res_string = res_string.rjust(70 - len(tgroup), '.')
216 res_string = res_string.rjust(70 - len(tgroup), '.')
214 print(tgroup + res_string)
217 print(tgroup + res_string)
215 if res:
218 if res:
216 failed.append(controller)
219 failed.append(controller)
217 if res == -signal.SIGINT:
220 if res == -signal.SIGINT:
218 print("Interrupted")
221 print("Interrupted")
219 break
222 break
220
223
221 t_end = time.time()
224 t_end = time.time()
222 t_tests = t_end - t_start
225 t_tests = t_end - t_start
223 nrunners = len(controllers)
226 nrunners = len(controllers)
224 nfail = len(failed)
227 nfail = len(failed)
225 # summarize results
228 # summarize results
226 print()
229 print()
227 print('*'*70)
230 print('*'*70)
228 print('Test suite completed for system with the following information:')
231 print('Test suite completed for system with the following information:')
229 print(report())
232 print(report())
230 print('Ran %s test groups in %.3fs' % (nrunners, t_tests))
233 print('Ran %s test groups in %.3fs' % (nrunners, t_tests))
231 print()
234 print()
232 print('Status:')
235 print('Status:')
233 if not failed:
236 if not failed:
234 print('OK')
237 print('OK')
235 else:
238 else:
236 # If anything went wrong, point out what command to rerun manually to
239 # If anything went wrong, point out what command to rerun manually to
237 # see the actual errors and individual summary
240 # see the actual errors and individual summary
238 print('ERROR - %s out of %s test groups failed.' % (nfail, nrunners))
241 print('ERROR - %s out of %s test groups failed.' % (nfail, nrunners))
239 for controller in failed:
242 for controller in failed:
240 print('-'*40)
243 print('-'*40)
241 print('Runner failed:', controller.section)
244 print('Runner failed:', controller.section)
242 print('You may wish to rerun this one individually, with:')
245 print('You may wish to rerun this one individually, with:')
243 failed_call_args = [py3compat.cast_unicode(x) for x in controller.cmd]
246 failed_call_args = [py3compat.cast_unicode(x) for x in controller.cmd]
244 print(u' '.join(failed_call_args))
247 print(u' '.join(failed_call_args))
245 print()
248 print()
246 # Ensure that our exit code indicates failure
249 # Ensure that our exit code indicates failure
247 sys.exit(1)
250 sys.exit(1)
248
251
249
252
250 def main():
253 def main():
251 if len(sys.argv) > 1 and (sys.argv[1] in test_sections):
254 if len(sys.argv) > 1 and (sys.argv[1] in test_sections):
252 from .iptest import run_iptest
255 from .iptest import run_iptest
253 # This is in-process
256 # This is in-process
254 run_iptest()
257 run_iptest()
255 return
258 return
256
259
257 parser = argparse.ArgumentParser(description='Run IPython test suite')
260 parser = argparse.ArgumentParser(description='Run IPython test suite')
258 parser.add_argument('--all', action='store_true',
261 parser.add_argument('--all', action='store_true',
259 help='Include slow tests not run by default.')
262 help='Include slow tests not run by default.')
260 parser.add_argument('-j', '--fast', nargs='?', const=None, default=1,
263 parser.add_argument('-j', '--fast', nargs='?', const=None, default=1,
261 help='Run test sections in parallel.')
264 help='Run test sections in parallel.')
262 parser.add_argument('--xunit', action='store_true',
265 parser.add_argument('--xunit', action='store_true',
263 help='Produce Xunit XML results')
266 help='Produce Xunit XML results')
264 parser.add_argument('--coverage', action='store_true',
267 parser.add_argument('--coverage', action='store_true',
265 help='Measure test coverage.')
268 help='Measure test coverage.')
266
269
267 options = parser.parse_args()
270 options = parser.parse_args()
268
271
269 # This starts subprocesses
272 # This starts subprocesses
270 run_iptestall(inc_slow=options.all, jobs=options.fast,
273 run_iptestall(inc_slow=options.all, jobs=options.fast,
271 xunit=options.xunit, coverage=options.coverage)
274 xunit=options.xunit, coverage=options.coverage)
272
275
273
276
274 if __name__ == '__main__':
277 if __name__ == '__main__':
275 main()
278 main()
@@ -1,148 +1,151 b''
1 """TemporaryDirectory class, copied from Python 3.2.
1 """TemporaryDirectory class, copied from Python 3.2.
2
2
3 This is copied from the stdlib and will be standard in Python 3.2 and onwards.
3 This is copied from the stdlib and will be standard in Python 3.2 and onwards.
4 """
4 """
5 from __future__ import print_function
5 from __future__ import print_function
6
6
7 import os as _os
7 import os as _os
8 import warnings as _warnings
9 import sys as _sys
8
10
9 # This code should only be used in Python versions < 3.2, since after that we
11 # This code should only be used in Python versions < 3.2, since after that we
10 # can rely on the stdlib itself.
12 # can rely on the stdlib itself.
11 try:
13 try:
12 from tempfile import TemporaryDirectory
14 from tempfile import TemporaryDirectory
13
15
14 except ImportError:
16 except ImportError:
15 from tempfile import mkdtemp, template
17 from tempfile import mkdtemp, template
16
18
17 class TemporaryDirectory(object):
19 class TemporaryDirectory(object):
18 """Create and return a temporary directory. This has the same
20 """Create and return a temporary directory. This has the same
19 behavior as mkdtemp but can be used as a context manager. For
21 behavior as mkdtemp but can be used as a context manager. For
20 example:
22 example:
21
23
22 with TemporaryDirectory() as tmpdir:
24 with TemporaryDirectory() as tmpdir:
23 ...
25 ...
24
26
25 Upon exiting the context, the directory and everthing contained
27 Upon exiting the context, the directory and everthing contained
26 in it are removed.
28 in it are removed.
27 """
29 """
28
30
29 def __init__(self, suffix="", prefix=template, dir=None):
31 def __init__(self, suffix="", prefix=template, dir=None):
30 self.name = mkdtemp(suffix, prefix, dir)
32 self.name = mkdtemp(suffix, prefix, dir)
31 self._closed = False
33 self._closed = False
32
34
33 def __enter__(self):
35 def __enter__(self):
34 return self.name
36 return self.name
35
37
36 def cleanup(self, _warn=False):
38 def cleanup(self, _warn=False):
37 if self.name and not self._closed:
39 if self.name and not self._closed:
38 try:
40 try:
39 self._rmtree(self.name)
41 self._rmtree(self.name)
40 except (TypeError, AttributeError) as ex:
42 except (TypeError, AttributeError) as ex:
41 # Issue #10188: Emit a warning on stderr
43 # Issue #10188: Emit a warning on stderr
42 # if the directory could not be cleaned
44 # if the directory could not be cleaned
43 # up due to missing globals
45 # up due to missing globals
44 if "None" not in str(ex):
46 if "None" not in str(ex):
45 raise
47 raise
46 print("ERROR: {!r} while cleaning up {!r}".format(ex, self,),
48 print("ERROR: {!r} while cleaning up {!r}".format(ex, self,),
47 file=_sys.stderr)
49 file=_sys.stderr)
48 return
50 return
49 self._closed = True
51 self._closed = True
50 if _warn:
52 if _warn:
51 self._warn("Implicitly cleaning up {!r}".format(self),
53 self._warn("Implicitly cleaning up {!r}".format(self),
52 ResourceWarning)
54 Warning)
53
55
54 def __exit__(self, exc, value, tb):
56 def __exit__(self, exc, value, tb):
55 self.cleanup()
57 self.cleanup()
56
58
57 def __del__(self):
59 def __del__(self):
58 # Issue a ResourceWarning if implicit cleanup needed
60 # Issue a ResourceWarning if implicit cleanup needed
59 self.cleanup(_warn=True)
61 self.cleanup(_warn=True)
60
62
61
63
62 # XXX (ncoghlan): The following code attempts to make
64 # XXX (ncoghlan): The following code attempts to make
63 # this class tolerant of the module nulling out process
65 # this class tolerant of the module nulling out process
64 # that happens during CPython interpreter shutdown
66 # that happens during CPython interpreter shutdown
65 # Alas, it doesn't actually manage it. See issue #10188
67 # Alas, it doesn't actually manage it. See issue #10188
66 _listdir = staticmethod(_os.listdir)
68 _listdir = staticmethod(_os.listdir)
67 _path_join = staticmethod(_os.path.join)
69 _path_join = staticmethod(_os.path.join)
68 _isdir = staticmethod(_os.path.isdir)
70 _isdir = staticmethod(_os.path.isdir)
69 _remove = staticmethod(_os.remove)
71 _remove = staticmethod(_os.remove)
70 _rmdir = staticmethod(_os.rmdir)
72 _rmdir = staticmethod(_os.rmdir)
71 _os_error = _os.error
73 _os_error = _os.error
74 _warn = _warnings.warn
72
75
73 def _rmtree(self, path):
76 def _rmtree(self, path):
74 # Essentially a stripped down version of shutil.rmtree. We can't
77 # Essentially a stripped down version of shutil.rmtree. We can't
75 # use globals because they may be None'ed out at shutdown.
78 # use globals because they may be None'ed out at shutdown.
76 for name in self._listdir(path):
79 for name in self._listdir(path):
77 fullname = self._path_join(path, name)
80 fullname = self._path_join(path, name)
78 try:
81 try:
79 isdir = self._isdir(fullname)
82 isdir = self._isdir(fullname)
80 except self._os_error:
83 except self._os_error:
81 isdir = False
84 isdir = False
82 if isdir:
85 if isdir:
83 self._rmtree(fullname)
86 self._rmtree(fullname)
84 else:
87 else:
85 try:
88 try:
86 self._remove(fullname)
89 self._remove(fullname)
87 except self._os_error:
90 except self._os_error:
88 pass
91 pass
89 try:
92 try:
90 self._rmdir(path)
93 self._rmdir(path)
91 except self._os_error:
94 except self._os_error:
92 pass
95 pass
93
96
94
97
95 class NamedFileInTemporaryDirectory(object):
98 class NamedFileInTemporaryDirectory(object):
96
99
97 def __init__(self, filename, mode='w+b', bufsize=-1, **kwds):
100 def __init__(self, filename, mode='w+b', bufsize=-1, **kwds):
98 """
101 """
99 Open a file named `filename` in a temporary directory.
102 Open a file named `filename` in a temporary directory.
100
103
101 This context manager is preferred over `NamedTemporaryFile` in
104 This context manager is preferred over `NamedTemporaryFile` in
102 stdlib `tempfile` when one needs to reopen the file.
105 stdlib `tempfile` when one needs to reopen the file.
103
106
104 Arguments `mode` and `bufsize` are passed to `open`.
107 Arguments `mode` and `bufsize` are passed to `open`.
105 Rest of the arguments are passed to `TemporaryDirectory`.
108 Rest of the arguments are passed to `TemporaryDirectory`.
106
109
107 """
110 """
108 self._tmpdir = TemporaryDirectory(**kwds)
111 self._tmpdir = TemporaryDirectory(**kwds)
109 path = _os.path.join(self._tmpdir.name, filename)
112 path = _os.path.join(self._tmpdir.name, filename)
110 self.file = open(path, mode, bufsize)
113 self.file = open(path, mode, bufsize)
111
114
112 def cleanup(self):
115 def cleanup(self):
113 self.file.close()
116 self.file.close()
114 self._tmpdir.cleanup()
117 self._tmpdir.cleanup()
115
118
116 __del__ = cleanup
119 __del__ = cleanup
117
120
118 def __enter__(self):
121 def __enter__(self):
119 return self.file
122 return self.file
120
123
121 def __exit__(self, type, value, traceback):
124 def __exit__(self, type, value, traceback):
122 self.cleanup()
125 self.cleanup()
123
126
124
127
125 class TemporaryWorkingDirectory(TemporaryDirectory):
128 class TemporaryWorkingDirectory(TemporaryDirectory):
126 """
129 """
127 Creates a temporary directory and sets the cwd to that directory.
130 Creates a temporary directory and sets the cwd to that directory.
128 Automatically reverts to previous cwd upon cleanup.
131 Automatically reverts to previous cwd upon cleanup.
129 Usage example:
132 Usage example:
130
133
131 with TemporaryWorakingDirectory() as tmpdir:
134 with TemporaryWorakingDirectory() as tmpdir:
132 ...
135 ...
133 """
136 """
134
137
135 def __init__(self, **kw):
138 def __init__(self, **kw):
136 super(TemporaryWorkingDirectory, self).__init__(**kw)
139 super(TemporaryWorkingDirectory, self).__init__(**kw)
137
140
138 #Change cwd to new temp dir. Remember old cwd.
141 #Change cwd to new temp dir. Remember old cwd.
139 self.old_wd = _os.getcwd()
142 self.old_wd = _os.getcwd()
140 _os.chdir(self.name)
143 _os.chdir(self.name)
141
144
142
145
143 def cleanup(self, _warn=False):
146 def cleanup(self, _warn=False):
144 #Revert to old cwd.
147 #Revert to old cwd.
145 _os.chdir(self.old_wd)
148 _os.chdir(self.old_wd)
146
149
147 #Cleanup
150 #Cleanup
148 super(TemporaryWorkingDirectory, self).cleanup(_warn=_warn)
151 super(TemporaryWorkingDirectory, self).cleanup(_warn=_warn)
General Comments 0
You need to be logged in to leave comments. Login now