##// END OF EJS Templates
gh-7044: set TMPDIR to workingdir in tests
Jeroen Demeyer -
Show More
@@ -1,734 +1,737 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """IPython Test Process Controller
2 """IPython Test Process Controller
3
3
4 This module runs one or more subprocesses which will actually run the IPython
4 This module runs one or more subprocesses which will actually run the IPython
5 test suite.
5 test suite.
6
6
7 """
7 """
8
8
9 # Copyright (c) IPython Development Team.
9 # Copyright (c) IPython Development Team.
10 # Distributed under the terms of the Modified BSD License.
10 # Distributed under the terms of the Modified BSD License.
11
11
12 from __future__ import print_function
12 from __future__ import print_function
13
13
14 import argparse
14 import argparse
15 import json
15 import json
16 import multiprocessing.pool
16 import multiprocessing.pool
17 import os
17 import os
18 import stat
18 import stat
19 import re
19 import re
20 import requests
20 import requests
21 import shutil
21 import shutil
22 import signal
22 import signal
23 import sys
23 import sys
24 import subprocess
24 import subprocess
25 import time
25 import time
26
26
27 from .iptest import (
27 from .iptest import (
28 have, test_group_names as py_test_group_names, test_sections, StreamCapturer,
28 have, test_group_names as py_test_group_names, test_sections, StreamCapturer,
29 test_for,
29 test_for,
30 )
30 )
31 from IPython.utils.path import compress_user
31 from IPython.utils.path import compress_user
32 from IPython.utils.py3compat import bytes_to_str
32 from IPython.utils.py3compat import bytes_to_str
33 from IPython.utils.sysinfo import get_sys_info
33 from IPython.utils.sysinfo import get_sys_info
34 from IPython.utils.tempdir import TemporaryDirectory
34 from IPython.utils.tempdir import TemporaryDirectory
35 from IPython.utils.text import strip_ansi
35 from IPython.utils.text import strip_ansi
36
36
37 try:
37 try:
38 # Python >= 3.3
38 # Python >= 3.3
39 from subprocess import TimeoutExpired
39 from subprocess import TimeoutExpired
40 def popen_wait(p, timeout):
40 def popen_wait(p, timeout):
41 return p.wait(timeout)
41 return p.wait(timeout)
42 except ImportError:
42 except ImportError:
43 class TimeoutExpired(Exception):
43 class TimeoutExpired(Exception):
44 pass
44 pass
45 def popen_wait(p, timeout):
45 def popen_wait(p, timeout):
46 """backport of Popen.wait from Python 3"""
46 """backport of Popen.wait from Python 3"""
47 for i in range(int(10 * timeout)):
47 for i in range(int(10 * timeout)):
48 if p.poll() is not None:
48 if p.poll() is not None:
49 return
49 return
50 time.sleep(0.1)
50 time.sleep(0.1)
51 if p.poll() is None:
51 if p.poll() is None:
52 raise TimeoutExpired
52 raise TimeoutExpired
53
53
54 NOTEBOOK_SHUTDOWN_TIMEOUT = 10
54 NOTEBOOK_SHUTDOWN_TIMEOUT = 10
55
55
56 class TestController(object):
56 class TestController(object):
57 """Run tests in a subprocess
57 """Run tests in a subprocess
58 """
58 """
59 #: str, IPython test suite to be executed.
59 #: str, IPython test suite to be executed.
60 section = None
60 section = None
61 #: list, command line arguments to be executed
61 #: list, command line arguments to be executed
62 cmd = None
62 cmd = None
63 #: dict, extra environment variables to set for the subprocess
63 #: dict, extra environment variables to set for the subprocess
64 env = None
64 env = None
65 #: list, TemporaryDirectory instances to clear up when the process finishes
65 #: list, TemporaryDirectory instances to clear up when the process finishes
66 dirs = None
66 dirs = None
67 #: subprocess.Popen instance
67 #: subprocess.Popen instance
68 process = None
68 process = None
69 #: str, process stdout+stderr
69 #: str, process stdout+stderr
70 stdout = None
70 stdout = None
71
71
72 def __init__(self):
72 def __init__(self):
73 self.cmd = []
73 self.cmd = []
74 self.env = {}
74 self.env = {}
75 self.dirs = []
75 self.dirs = []
76
76
77 def setup(self):
77 def setup(self):
78 """Create temporary directories etc.
78 """Create temporary directories etc.
79
79
80 This is only called when we know the test group will be run. Things
80 This is only called when we know the test group will be run. Things
81 created here may be cleaned up by self.cleanup().
81 created here may be cleaned up by self.cleanup().
82 """
82 """
83 pass
83 pass
84
84
85 def launch(self, buffer_output=False, capture_output=False):
85 def launch(self, buffer_output=False, capture_output=False):
86 # print('*** ENV:', self.env) # dbg
86 # print('*** ENV:', self.env) # dbg
87 # print('*** CMD:', self.cmd) # dbg
87 # print('*** CMD:', self.cmd) # dbg
88 env = os.environ.copy()
88 env = os.environ.copy()
89 env.update(self.env)
89 env.update(self.env)
90 if buffer_output:
90 if buffer_output:
91 capture_output = True
91 capture_output = True
92 self.stdout_capturer = c = StreamCapturer(echo=not buffer_output)
92 self.stdout_capturer = c = StreamCapturer(echo=not buffer_output)
93 c.start()
93 c.start()
94 stdout = c.writefd if capture_output else None
94 stdout = c.writefd if capture_output else None
95 stderr = subprocess.STDOUT if capture_output else None
95 stderr = subprocess.STDOUT if capture_output else None
96 self.process = subprocess.Popen(self.cmd, stdout=stdout,
96 self.process = subprocess.Popen(self.cmd, stdout=stdout,
97 stderr=stderr, env=env)
97 stderr=stderr, env=env)
98
98
99 def wait(self):
99 def wait(self):
100 self.process.wait()
100 self.process.wait()
101 self.stdout_capturer.halt()
101 self.stdout_capturer.halt()
102 self.stdout = self.stdout_capturer.get_buffer()
102 self.stdout = self.stdout_capturer.get_buffer()
103 return self.process.returncode
103 return self.process.returncode
104
104
105 def print_extra_info(self):
105 def print_extra_info(self):
106 """Print extra information about this test run.
106 """Print extra information about this test run.
107
107
108 If we're running in parallel and showing the concise view, this is only
108 If we're running in parallel and showing the concise view, this is only
109 called if the test group fails. Otherwise, it's called before the test
109 called if the test group fails. Otherwise, it's called before the test
110 group is started.
110 group is started.
111
111
112 The base implementation does nothing, but it can be overridden by
112 The base implementation does nothing, but it can be overridden by
113 subclasses.
113 subclasses.
114 """
114 """
115 return
115 return
116
116
117 def cleanup_process(self):
117 def cleanup_process(self):
118 """Cleanup on exit by killing any leftover processes."""
118 """Cleanup on exit by killing any leftover processes."""
119 subp = self.process
119 subp = self.process
120 if subp is None or (subp.poll() is not None):
120 if subp is None or (subp.poll() is not None):
121 return # Process doesn't exist, or is already dead.
121 return # Process doesn't exist, or is already dead.
122
122
123 try:
123 try:
124 print('Cleaning up stale PID: %d' % subp.pid)
124 print('Cleaning up stale PID: %d' % subp.pid)
125 subp.kill()
125 subp.kill()
126 except: # (OSError, WindowsError) ?
126 except: # (OSError, WindowsError) ?
127 # This is just a best effort, if we fail or the process was
127 # This is just a best effort, if we fail or the process was
128 # really gone, ignore it.
128 # really gone, ignore it.
129 pass
129 pass
130 else:
130 else:
131 for i in range(10):
131 for i in range(10):
132 if subp.poll() is None:
132 if subp.poll() is None:
133 time.sleep(0.1)
133 time.sleep(0.1)
134 else:
134 else:
135 break
135 break
136
136
137 if subp.poll() is None:
137 if subp.poll() is None:
138 # The process did not die...
138 # The process did not die...
139 print('... failed. Manual cleanup may be required.')
139 print('... failed. Manual cleanup may be required.')
140
140
141 def cleanup(self):
141 def cleanup(self):
142 "Kill process if it's still alive, and clean up temporary directories"
142 "Kill process if it's still alive, and clean up temporary directories"
143 self.cleanup_process()
143 self.cleanup_process()
144 for td in self.dirs:
144 for td in self.dirs:
145 td.cleanup()
145 td.cleanup()
146
146
147 __del__ = cleanup
147 __del__ = cleanup
148
148
149
149
150 class PyTestController(TestController):
150 class PyTestController(TestController):
151 """Run Python tests using IPython.testing.iptest"""
151 """Run Python tests using IPython.testing.iptest"""
152 #: str, Python command to execute in subprocess
152 #: str, Python command to execute in subprocess
153 pycmd = None
153 pycmd = None
154
154
155 def __init__(self, section, options):
155 def __init__(self, section, options):
156 """Create new test runner."""
156 """Create new test runner."""
157 TestController.__init__(self)
157 TestController.__init__(self)
158 self.section = section
158 self.section = section
159 # pycmd is put into cmd[2] in PyTestController.launch()
159 # pycmd is put into cmd[2] in PyTestController.launch()
160 self.cmd = [sys.executable, '-c', None, section]
160 self.cmd = [sys.executable, '-c', None, section]
161 self.pycmd = "from IPython.testing.iptest import run_iptest; run_iptest()"
161 self.pycmd = "from IPython.testing.iptest import run_iptest; run_iptest()"
162 self.options = options
162 self.options = options
163
163
164 def setup(self):
164 def setup(self):
165 ipydir = TemporaryDirectory()
165 ipydir = TemporaryDirectory()
166 self.dirs.append(ipydir)
166 self.dirs.append(ipydir)
167 self.env['IPYTHONDIR'] = ipydir.name
167 self.env['IPYTHONDIR'] = ipydir.name
168 self.workingdir = workingdir = TemporaryDirectory()
168 self.workingdir = workingdir = TemporaryDirectory()
169 self.dirs.append(workingdir)
169 self.dirs.append(workingdir)
170 self.env['IPTEST_WORKING_DIR'] = workingdir.name
170 self.env['IPTEST_WORKING_DIR'] = workingdir.name
171 # This means we won't get odd effects from our own matplotlib config
171 # This means we won't get odd effects from our own matplotlib config
172 self.env['MPLCONFIGDIR'] = workingdir.name
172 self.env['MPLCONFIGDIR'] = workingdir.name
173 # For security reasons (http://bugs.python.org/issue16202), use
174 # a temporary directory to which other users have no access.
175 self.env['TMPDIR'] = workingdir.name
173
176
174 # Add a non-accessible directory to PATH (see gh-7053)
177 # Add a non-accessible directory to PATH (see gh-7053)
175 noaccess = os.path.join(self.workingdir.name, "_no_access_")
178 noaccess = os.path.join(self.workingdir.name, "_no_access_")
176 self.noaccess = noaccess
179 self.noaccess = noaccess
177 os.mkdir(noaccess, 0)
180 os.mkdir(noaccess, 0)
178
181
179 PATH = os.environ.get('PATH', '')
182 PATH = os.environ.get('PATH', '')
180 if PATH:
183 if PATH:
181 PATH = noaccess + os.pathsep + PATH
184 PATH = noaccess + os.pathsep + PATH
182 else:
185 else:
183 PATH = noaccess
186 PATH = noaccess
184 self.env['PATH'] = PATH
187 self.env['PATH'] = PATH
185
188
186 # From options:
189 # From options:
187 if self.options.xunit:
190 if self.options.xunit:
188 self.add_xunit()
191 self.add_xunit()
189 if self.options.coverage:
192 if self.options.coverage:
190 self.add_coverage()
193 self.add_coverage()
191 self.env['IPTEST_SUBPROC_STREAMS'] = self.options.subproc_streams
194 self.env['IPTEST_SUBPROC_STREAMS'] = self.options.subproc_streams
192 self.cmd.extend(self.options.extra_args)
195 self.cmd.extend(self.options.extra_args)
193
196
194 def cleanup(self):
197 def cleanup(self):
195 """
198 """
196 Make the non-accessible directory created in setup() accessible
199 Make the non-accessible directory created in setup() accessible
197 again, otherwise deleting the workingdir will fail.
200 again, otherwise deleting the workingdir will fail.
198 """
201 """
199 os.chmod(self.noaccess, stat.S_IRWXU)
202 os.chmod(self.noaccess, stat.S_IRWXU)
200 TestController.cleanup(self)
203 TestController.cleanup(self)
201
204
202 @property
205 @property
203 def will_run(self):
206 def will_run(self):
204 try:
207 try:
205 return test_sections[self.section].will_run
208 return test_sections[self.section].will_run
206 except KeyError:
209 except KeyError:
207 return True
210 return True
208
211
209 def add_xunit(self):
212 def add_xunit(self):
210 xunit_file = os.path.abspath(self.section + '.xunit.xml')
213 xunit_file = os.path.abspath(self.section + '.xunit.xml')
211 self.cmd.extend(['--with-xunit', '--xunit-file', xunit_file])
214 self.cmd.extend(['--with-xunit', '--xunit-file', xunit_file])
212
215
213 def add_coverage(self):
216 def add_coverage(self):
214 try:
217 try:
215 sources = test_sections[self.section].includes
218 sources = test_sections[self.section].includes
216 except KeyError:
219 except KeyError:
217 sources = ['IPython']
220 sources = ['IPython']
218
221
219 coverage_rc = ("[run]\n"
222 coverage_rc = ("[run]\n"
220 "data_file = {data_file}\n"
223 "data_file = {data_file}\n"
221 "source =\n"
224 "source =\n"
222 " {source}\n"
225 " {source}\n"
223 ).format(data_file=os.path.abspath('.coverage.'+self.section),
226 ).format(data_file=os.path.abspath('.coverage.'+self.section),
224 source="\n ".join(sources))
227 source="\n ".join(sources))
225 config_file = os.path.join(self.workingdir.name, '.coveragerc')
228 config_file = os.path.join(self.workingdir.name, '.coveragerc')
226 with open(config_file, 'w') as f:
229 with open(config_file, 'w') as f:
227 f.write(coverage_rc)
230 f.write(coverage_rc)
228
231
229 self.env['COVERAGE_PROCESS_START'] = config_file
232 self.env['COVERAGE_PROCESS_START'] = config_file
230 self.pycmd = "import coverage; coverage.process_startup(); " + self.pycmd
233 self.pycmd = "import coverage; coverage.process_startup(); " + self.pycmd
231
234
232 def launch(self, buffer_output=False):
235 def launch(self, buffer_output=False):
233 self.cmd[2] = self.pycmd
236 self.cmd[2] = self.pycmd
234 super(PyTestController, self).launch(buffer_output=buffer_output)
237 super(PyTestController, self).launch(buffer_output=buffer_output)
235
238
236
239
237 js_prefix = 'js/'
240 js_prefix = 'js/'
238
241
239 def get_js_test_dir():
242 def get_js_test_dir():
240 import IPython.html.tests as t
243 import IPython.html.tests as t
241 return os.path.join(os.path.dirname(t.__file__), '')
244 return os.path.join(os.path.dirname(t.__file__), '')
242
245
243 def all_js_groups():
246 def all_js_groups():
244 import glob
247 import glob
245 test_dir = get_js_test_dir()
248 test_dir = get_js_test_dir()
246 all_subdirs = glob.glob(test_dir + '[!_]*/')
249 all_subdirs = glob.glob(test_dir + '[!_]*/')
247 return [js_prefix+os.path.relpath(x, test_dir) for x in all_subdirs]
250 return [js_prefix+os.path.relpath(x, test_dir) for x in all_subdirs]
248
251
249 class JSController(TestController):
252 class JSController(TestController):
250 """Run CasperJS tests """
253 """Run CasperJS tests """
251
254
252 requirements = ['zmq', 'tornado', 'jinja2', 'casperjs', 'sqlite3',
255 requirements = ['zmq', 'tornado', 'jinja2', 'casperjs', 'sqlite3',
253 'jsonschema']
256 'jsonschema']
254
257
255 def __init__(self, section, xunit=True, engine='phantomjs', url=None):
258 def __init__(self, section, xunit=True, engine='phantomjs', url=None):
256 """Create new test runner."""
259 """Create new test runner."""
257 TestController.__init__(self)
260 TestController.__init__(self)
258 self.engine = engine
261 self.engine = engine
259 self.section = section
262 self.section = section
260 self.xunit = xunit
263 self.xunit = xunit
261 self.url = url
264 self.url = url
262 self.slimer_failure = re.compile('^FAIL.*', flags=re.MULTILINE)
265 self.slimer_failure = re.compile('^FAIL.*', flags=re.MULTILINE)
263 js_test_dir = get_js_test_dir()
266 js_test_dir = get_js_test_dir()
264 includes = '--includes=' + os.path.join(js_test_dir,'util.js')
267 includes = '--includes=' + os.path.join(js_test_dir,'util.js')
265 test_cases = os.path.join(js_test_dir, self.section[len(js_prefix):])
268 test_cases = os.path.join(js_test_dir, self.section[len(js_prefix):])
266 self.cmd = ['casperjs', 'test', includes, test_cases, '--engine=%s' % self.engine]
269 self.cmd = ['casperjs', 'test', includes, test_cases, '--engine=%s' % self.engine]
267
270
268 def setup(self):
271 def setup(self):
269 self.ipydir = TemporaryDirectory()
272 self.ipydir = TemporaryDirectory()
270 self.nbdir = TemporaryDirectory()
273 self.nbdir = TemporaryDirectory()
271 self.dirs.append(self.ipydir)
274 self.dirs.append(self.ipydir)
272 self.dirs.append(self.nbdir)
275 self.dirs.append(self.nbdir)
273 os.makedirs(os.path.join(self.nbdir.name, os.path.join(u'sub βˆ‚ir1', u'sub βˆ‚ir 1a')))
276 os.makedirs(os.path.join(self.nbdir.name, os.path.join(u'sub βˆ‚ir1', u'sub βˆ‚ir 1a')))
274 os.makedirs(os.path.join(self.nbdir.name, os.path.join(u'sub βˆ‚ir2', u'sub βˆ‚ir 1b')))
277 os.makedirs(os.path.join(self.nbdir.name, os.path.join(u'sub βˆ‚ir2', u'sub βˆ‚ir 1b')))
275
278
276 if self.xunit:
279 if self.xunit:
277 self.add_xunit()
280 self.add_xunit()
278
281
279 # If a url was specified, use that for the testing.
282 # If a url was specified, use that for the testing.
280 if self.url:
283 if self.url:
281 try:
284 try:
282 alive = requests.get(self.url).status_code == 200
285 alive = requests.get(self.url).status_code == 200
283 except:
286 except:
284 alive = False
287 alive = False
285
288
286 if alive:
289 if alive:
287 self.cmd.append("--url=%s" % self.url)
290 self.cmd.append("--url=%s" % self.url)
288 else:
291 else:
289 raise Exception('Could not reach "%s".' % self.url)
292 raise Exception('Could not reach "%s".' % self.url)
290 else:
293 else:
291 # start the ipython notebook, so we get the port number
294 # start the ipython notebook, so we get the port number
292 self.server_port = 0
295 self.server_port = 0
293 self._init_server()
296 self._init_server()
294 if self.server_port:
297 if self.server_port:
295 self.cmd.append("--port=%i" % self.server_port)
298 self.cmd.append("--port=%i" % self.server_port)
296 else:
299 else:
297 # don't launch tests if the server didn't start
300 # don't launch tests if the server didn't start
298 self.cmd = [sys.executable, '-c', 'raise SystemExit(1)']
301 self.cmd = [sys.executable, '-c', 'raise SystemExit(1)']
299
302
300 def add_xunit(self):
303 def add_xunit(self):
301 xunit_file = os.path.abspath(self.section.replace('/','.') + '.xunit.xml')
304 xunit_file = os.path.abspath(self.section.replace('/','.') + '.xunit.xml')
302 self.cmd.append('--xunit=%s' % xunit_file)
305 self.cmd.append('--xunit=%s' % xunit_file)
303
306
304 def launch(self, buffer_output):
307 def launch(self, buffer_output):
305 # If the engine is SlimerJS, we need to buffer the output because
308 # If the engine is SlimerJS, we need to buffer the output because
306 # SlimerJS does not support exit codes, so CasperJS always returns 0.
309 # SlimerJS does not support exit codes, so CasperJS always returns 0.
307 if self.engine == 'slimerjs' and not buffer_output:
310 if self.engine == 'slimerjs' and not buffer_output:
308 return super(JSController, self).launch(capture_output=True)
311 return super(JSController, self).launch(capture_output=True)
309
312
310 else:
313 else:
311 return super(JSController, self).launch(buffer_output=buffer_output)
314 return super(JSController, self).launch(buffer_output=buffer_output)
312
315
313 def wait(self, *pargs, **kwargs):
316 def wait(self, *pargs, **kwargs):
314 """Wait for the JSController to finish"""
317 """Wait for the JSController to finish"""
315 ret = super(JSController, self).wait(*pargs, **kwargs)
318 ret = super(JSController, self).wait(*pargs, **kwargs)
316 # If this is a SlimerJS controller, check the captured stdout for
319 # If this is a SlimerJS controller, check the captured stdout for
317 # errors. Otherwise, just return the return code.
320 # errors. Otherwise, just return the return code.
318 if self.engine == 'slimerjs':
321 if self.engine == 'slimerjs':
319 stdout = bytes_to_str(self.stdout)
322 stdout = bytes_to_str(self.stdout)
320 if ret != 0:
323 if ret != 0:
321 # This could still happen e.g. if it's stopped by SIGINT
324 # This could still happen e.g. if it's stopped by SIGINT
322 return ret
325 return ret
323 return bool(self.slimer_failure.search(strip_ansi(stdout)))
326 return bool(self.slimer_failure.search(strip_ansi(stdout)))
324 else:
327 else:
325 return ret
328 return ret
326
329
327 def print_extra_info(self):
330 def print_extra_info(self):
328 print("Running tests with notebook directory %r" % self.nbdir.name)
331 print("Running tests with notebook directory %r" % self.nbdir.name)
329
332
330 @property
333 @property
331 def will_run(self):
334 def will_run(self):
332 should_run = all(have[a] for a in self.requirements + [self.engine])
335 should_run = all(have[a] for a in self.requirements + [self.engine])
333 return should_run
336 return should_run
334
337
335 def _init_server(self):
338 def _init_server(self):
336 "Start the notebook server in a separate process"
339 "Start the notebook server in a separate process"
337 self.server_command = command = [sys.executable,
340 self.server_command = command = [sys.executable,
338 '-m', 'IPython.html',
341 '-m', 'IPython.html',
339 '--no-browser',
342 '--no-browser',
340 '--ipython-dir', self.ipydir.name,
343 '--ipython-dir', self.ipydir.name,
341 '--notebook-dir', self.nbdir.name,
344 '--notebook-dir', self.nbdir.name,
342 ]
345 ]
343 # ipc doesn't work on Windows, and darwin has crazy-long temp paths,
346 # ipc doesn't work on Windows, and darwin has crazy-long temp paths,
344 # which run afoul of ipc's maximum path length.
347 # which run afoul of ipc's maximum path length.
345 if sys.platform.startswith('linux'):
348 if sys.platform.startswith('linux'):
346 command.append('--KernelManager.transport=ipc')
349 command.append('--KernelManager.transport=ipc')
347 self.stream_capturer = c = StreamCapturer()
350 self.stream_capturer = c = StreamCapturer()
348 c.start()
351 c.start()
349 env = os.environ.copy()
352 env = os.environ.copy()
350 if self.engine == 'phantomjs':
353 if self.engine == 'phantomjs':
351 env['IPYTHON_ALLOW_DRAFT_WEBSOCKETS_FOR_PHANTOMJS'] = '1'
354 env['IPYTHON_ALLOW_DRAFT_WEBSOCKETS_FOR_PHANTOMJS'] = '1'
352 self.server = subprocess.Popen(command,
355 self.server = subprocess.Popen(command,
353 stdout=c.writefd,
356 stdout=c.writefd,
354 stderr=subprocess.STDOUT,
357 stderr=subprocess.STDOUT,
355 cwd=self.nbdir.name,
358 cwd=self.nbdir.name,
356 env=env,
359 env=env,
357 )
360 )
358 self.server_info_file = os.path.join(self.ipydir.name,
361 self.server_info_file = os.path.join(self.ipydir.name,
359 'profile_default', 'security', 'nbserver-%i.json' % self.server.pid
362 'profile_default', 'security', 'nbserver-%i.json' % self.server.pid
360 )
363 )
361 self._wait_for_server()
364 self._wait_for_server()
362
365
363 def _wait_for_server(self):
366 def _wait_for_server(self):
364 """Wait 30 seconds for the notebook server to start"""
367 """Wait 30 seconds for the notebook server to start"""
365 for i in range(300):
368 for i in range(300):
366 if self.server.poll() is not None:
369 if self.server.poll() is not None:
367 return self._failed_to_start()
370 return self._failed_to_start()
368 if os.path.exists(self.server_info_file):
371 if os.path.exists(self.server_info_file):
369 try:
372 try:
370 self._load_server_info()
373 self._load_server_info()
371 except ValueError:
374 except ValueError:
372 # If the server is halfway through writing the file, we may
375 # If the server is halfway through writing the file, we may
373 # get invalid JSON; it should be ready next iteration.
376 # get invalid JSON; it should be ready next iteration.
374 pass
377 pass
375 else:
378 else:
376 return
379 return
377 time.sleep(0.1)
380 time.sleep(0.1)
378 print("Notebook server-info file never arrived: %s" % self.server_info_file,
381 print("Notebook server-info file never arrived: %s" % self.server_info_file,
379 file=sys.stderr
382 file=sys.stderr
380 )
383 )
381
384
382 def _failed_to_start(self):
385 def _failed_to_start(self):
383 """Notebook server exited prematurely"""
386 """Notebook server exited prematurely"""
384 captured = self.stream_capturer.get_buffer().decode('utf-8', 'replace')
387 captured = self.stream_capturer.get_buffer().decode('utf-8', 'replace')
385 print("Notebook failed to start: ", file=sys.stderr)
388 print("Notebook failed to start: ", file=sys.stderr)
386 print(self.server_command)
389 print(self.server_command)
387 print(captured, file=sys.stderr)
390 print(captured, file=sys.stderr)
388
391
389 def _load_server_info(self):
392 def _load_server_info(self):
390 """Notebook server started, load connection info from JSON"""
393 """Notebook server started, load connection info from JSON"""
391 with open(self.server_info_file) as f:
394 with open(self.server_info_file) as f:
392 info = json.load(f)
395 info = json.load(f)
393 self.server_port = info['port']
396 self.server_port = info['port']
394
397
395 def cleanup(self):
398 def cleanup(self):
396 try:
399 try:
397 self.server.terminate()
400 self.server.terminate()
398 except OSError:
401 except OSError:
399 # already dead
402 # already dead
400 pass
403 pass
401 # wait 10s for the server to shutdown
404 # wait 10s for the server to shutdown
402 try:
405 try:
403 popen_wait(self.server, NOTEBOOK_SHUTDOWN_TIMEOUT)
406 popen_wait(self.server, NOTEBOOK_SHUTDOWN_TIMEOUT)
404 except TimeoutExpired:
407 except TimeoutExpired:
405 # server didn't terminate, kill it
408 # server didn't terminate, kill it
406 try:
409 try:
407 print("Failed to terminate notebook server, killing it.",
410 print("Failed to terminate notebook server, killing it.",
408 file=sys.stderr
411 file=sys.stderr
409 )
412 )
410 self.server.kill()
413 self.server.kill()
411 except OSError:
414 except OSError:
412 # already dead
415 # already dead
413 pass
416 pass
414 # wait another 10s
417 # wait another 10s
415 try:
418 try:
416 popen_wait(self.server, NOTEBOOK_SHUTDOWN_TIMEOUT)
419 popen_wait(self.server, NOTEBOOK_SHUTDOWN_TIMEOUT)
417 except TimeoutExpired:
420 except TimeoutExpired:
418 print("Notebook server still running (%s)" % self.server_info_file,
421 print("Notebook server still running (%s)" % self.server_info_file,
419 file=sys.stderr
422 file=sys.stderr
420 )
423 )
421
424
422 self.stream_capturer.halt()
425 self.stream_capturer.halt()
423 TestController.cleanup(self)
426 TestController.cleanup(self)
424
427
425
428
426 def prepare_controllers(options):
429 def prepare_controllers(options):
427 """Returns two lists of TestController instances, those to run, and those
430 """Returns two lists of TestController instances, those to run, and those
428 not to run."""
431 not to run."""
429 testgroups = options.testgroups
432 testgroups = options.testgroups
430 if testgroups:
433 if testgroups:
431 if 'js' in testgroups:
434 if 'js' in testgroups:
432 js_testgroups = all_js_groups()
435 js_testgroups = all_js_groups()
433 else:
436 else:
434 js_testgroups = [g for g in testgroups if g.startswith(js_prefix)]
437 js_testgroups = [g for g in testgroups if g.startswith(js_prefix)]
435
438
436 py_testgroups = [g for g in testgroups if g not in ['js'] + js_testgroups]
439 py_testgroups = [g for g in testgroups if g not in ['js'] + js_testgroups]
437 else:
440 else:
438 py_testgroups = py_test_group_names
441 py_testgroups = py_test_group_names
439 if not options.all:
442 if not options.all:
440 js_testgroups = []
443 js_testgroups = []
441 test_sections['parallel'].enabled = False
444 test_sections['parallel'].enabled = False
442 else:
445 else:
443 js_testgroups = all_js_groups()
446 js_testgroups = all_js_groups()
444
447
445 engine = 'slimerjs' if options.slimerjs else 'phantomjs'
448 engine = 'slimerjs' if options.slimerjs else 'phantomjs'
446 c_js = [JSController(name, xunit=options.xunit, engine=engine, url=options.url) for name in js_testgroups]
449 c_js = [JSController(name, xunit=options.xunit, engine=engine, url=options.url) for name in js_testgroups]
447 c_py = [PyTestController(name, options) for name in py_testgroups]
450 c_py = [PyTestController(name, options) for name in py_testgroups]
448
451
449 controllers = c_py + c_js
452 controllers = c_py + c_js
450 to_run = [c for c in controllers if c.will_run]
453 to_run = [c for c in controllers if c.will_run]
451 not_run = [c for c in controllers if not c.will_run]
454 not_run = [c for c in controllers if not c.will_run]
452 return to_run, not_run
455 return to_run, not_run
453
456
454 def do_run(controller, buffer_output=True):
457 def do_run(controller, buffer_output=True):
455 """Setup and run a test controller.
458 """Setup and run a test controller.
456
459
457 If buffer_output is True, no output is displayed, to avoid it appearing
460 If buffer_output is True, no output is displayed, to avoid it appearing
458 interleaved. In this case, the caller is responsible for displaying test
461 interleaved. In this case, the caller is responsible for displaying test
459 output on failure.
462 output on failure.
460
463
461 Returns
464 Returns
462 -------
465 -------
463 controller : TestController
466 controller : TestController
464 The same controller as passed in, as a convenience for using map() type
467 The same controller as passed in, as a convenience for using map() type
465 APIs.
468 APIs.
466 exitcode : int
469 exitcode : int
467 The exit code of the test subprocess. Non-zero indicates failure.
470 The exit code of the test subprocess. Non-zero indicates failure.
468 """
471 """
469 try:
472 try:
470 try:
473 try:
471 controller.setup()
474 controller.setup()
472 if not buffer_output:
475 if not buffer_output:
473 controller.print_extra_info()
476 controller.print_extra_info()
474 controller.launch(buffer_output=buffer_output)
477 controller.launch(buffer_output=buffer_output)
475 except Exception:
478 except Exception:
476 import traceback
479 import traceback
477 traceback.print_exc()
480 traceback.print_exc()
478 return controller, 1 # signal failure
481 return controller, 1 # signal failure
479
482
480 exitcode = controller.wait()
483 exitcode = controller.wait()
481 return controller, exitcode
484 return controller, exitcode
482
485
483 except KeyboardInterrupt:
486 except KeyboardInterrupt:
484 return controller, -signal.SIGINT
487 return controller, -signal.SIGINT
485 finally:
488 finally:
486 controller.cleanup()
489 controller.cleanup()
487
490
488 def report():
491 def report():
489 """Return a string with a summary report of test-related variables."""
492 """Return a string with a summary report of test-related variables."""
490 inf = get_sys_info()
493 inf = get_sys_info()
491 out = []
494 out = []
492 def _add(name, value):
495 def _add(name, value):
493 out.append((name, value))
496 out.append((name, value))
494
497
495 _add('IPython version', inf['ipython_version'])
498 _add('IPython version', inf['ipython_version'])
496 _add('IPython commit', "{} ({})".format(inf['commit_hash'], inf['commit_source']))
499 _add('IPython commit', "{} ({})".format(inf['commit_hash'], inf['commit_source']))
497 _add('IPython package', compress_user(inf['ipython_path']))
500 _add('IPython package', compress_user(inf['ipython_path']))
498 _add('Python version', inf['sys_version'].replace('\n',''))
501 _add('Python version', inf['sys_version'].replace('\n',''))
499 _add('sys.executable', compress_user(inf['sys_executable']))
502 _add('sys.executable', compress_user(inf['sys_executable']))
500 _add('Platform', inf['platform'])
503 _add('Platform', inf['platform'])
501
504
502 width = max(len(n) for (n,v) in out)
505 width = max(len(n) for (n,v) in out)
503 out = ["{:<{width}}: {}\n".format(n, v, width=width) for (n,v) in out]
506 out = ["{:<{width}}: {}\n".format(n, v, width=width) for (n,v) in out]
504
507
505 avail = []
508 avail = []
506 not_avail = []
509 not_avail = []
507
510
508 for k, is_avail in have.items():
511 for k, is_avail in have.items():
509 if is_avail:
512 if is_avail:
510 avail.append(k)
513 avail.append(k)
511 else:
514 else:
512 not_avail.append(k)
515 not_avail.append(k)
513
516
514 if avail:
517 if avail:
515 out.append('\nTools and libraries available at test time:\n')
518 out.append('\nTools and libraries available at test time:\n')
516 avail.sort()
519 avail.sort()
517 out.append(' ' + ' '.join(avail)+'\n')
520 out.append(' ' + ' '.join(avail)+'\n')
518
521
519 if not_avail:
522 if not_avail:
520 out.append('\nTools and libraries NOT available at test time:\n')
523 out.append('\nTools and libraries NOT available at test time:\n')
521 not_avail.sort()
524 not_avail.sort()
522 out.append(' ' + ' '.join(not_avail)+'\n')
525 out.append(' ' + ' '.join(not_avail)+'\n')
523
526
524 return ''.join(out)
527 return ''.join(out)
525
528
526 def run_iptestall(options):
529 def run_iptestall(options):
527 """Run the entire IPython test suite by calling nose and trial.
530 """Run the entire IPython test suite by calling nose and trial.
528
531
529 This function constructs :class:`IPTester` instances for all IPython
532 This function constructs :class:`IPTester` instances for all IPython
530 modules and package and then runs each of them. This causes the modules
533 modules and package and then runs each of them. This causes the modules
531 and packages of IPython to be tested each in their own subprocess using
534 and packages of IPython to be tested each in their own subprocess using
532 nose.
535 nose.
533
536
534 Parameters
537 Parameters
535 ----------
538 ----------
536
539
537 All parameters are passed as attributes of the options object.
540 All parameters are passed as attributes of the options object.
538
541
539 testgroups : list of str
542 testgroups : list of str
540 Run only these sections of the test suite. If empty, run all the available
543 Run only these sections of the test suite. If empty, run all the available
541 sections.
544 sections.
542
545
543 fast : int or None
546 fast : int or None
544 Run the test suite in parallel, using n simultaneous processes. If None
547 Run the test suite in parallel, using n simultaneous processes. If None
545 is passed, one process is used per CPU core. Default 1 (i.e. sequential)
548 is passed, one process is used per CPU core. Default 1 (i.e. sequential)
546
549
547 inc_slow : bool
550 inc_slow : bool
548 Include slow tests, like IPython.parallel. By default, these tests aren't
551 Include slow tests, like IPython.parallel. By default, these tests aren't
549 run.
552 run.
550
553
551 slimerjs : bool
554 slimerjs : bool
552 Use slimerjs if it's installed instead of phantomjs for casperjs tests.
555 Use slimerjs if it's installed instead of phantomjs for casperjs tests.
553
556
554 url : unicode
557 url : unicode
555 Address:port to use when running the JS tests.
558 Address:port to use when running the JS tests.
556
559
557 xunit : bool
560 xunit : bool
558 Produce Xunit XML output. This is written to multiple foo.xunit.xml files.
561 Produce Xunit XML output. This is written to multiple foo.xunit.xml files.
559
562
560 coverage : bool or str
563 coverage : bool or str
561 Measure code coverage from tests. True will store the raw coverage data,
564 Measure code coverage from tests. True will store the raw coverage data,
562 or pass 'html' or 'xml' to get reports.
565 or pass 'html' or 'xml' to get reports.
563
566
564 extra_args : list
567 extra_args : list
565 Extra arguments to pass to the test subprocesses, e.g. '-v'
568 Extra arguments to pass to the test subprocesses, e.g. '-v'
566 """
569 """
567 to_run, not_run = prepare_controllers(options)
570 to_run, not_run = prepare_controllers(options)
568
571
569 def justify(ltext, rtext, width=70, fill='-'):
572 def justify(ltext, rtext, width=70, fill='-'):
570 ltext += ' '
573 ltext += ' '
571 rtext = (' ' + rtext).rjust(width - len(ltext), fill)
574 rtext = (' ' + rtext).rjust(width - len(ltext), fill)
572 return ltext + rtext
575 return ltext + rtext
573
576
574 # Run all test runners, tracking execution time
577 # Run all test runners, tracking execution time
575 failed = []
578 failed = []
576 t_start = time.time()
579 t_start = time.time()
577
580
578 print()
581 print()
579 if options.fast == 1:
582 if options.fast == 1:
580 # This actually means sequential, i.e. with 1 job
583 # This actually means sequential, i.e. with 1 job
581 for controller in to_run:
584 for controller in to_run:
582 print('Test group:', controller.section)
585 print('Test group:', controller.section)
583 sys.stdout.flush() # Show in correct order when output is piped
586 sys.stdout.flush() # Show in correct order when output is piped
584 controller, res = do_run(controller, buffer_output=False)
587 controller, res = do_run(controller, buffer_output=False)
585 if res:
588 if res:
586 failed.append(controller)
589 failed.append(controller)
587 if res == -signal.SIGINT:
590 if res == -signal.SIGINT:
588 print("Interrupted")
591 print("Interrupted")
589 break
592 break
590 print()
593 print()
591
594
592 else:
595 else:
593 # Run tests concurrently
596 # Run tests concurrently
594 try:
597 try:
595 pool = multiprocessing.pool.ThreadPool(options.fast)
598 pool = multiprocessing.pool.ThreadPool(options.fast)
596 for (controller, res) in pool.imap_unordered(do_run, to_run):
599 for (controller, res) in pool.imap_unordered(do_run, to_run):
597 res_string = 'OK' if res == 0 else 'FAILED'
600 res_string = 'OK' if res == 0 else 'FAILED'
598 print(justify('Test group: ' + controller.section, res_string))
601 print(justify('Test group: ' + controller.section, res_string))
599 if res:
602 if res:
600 controller.print_extra_info()
603 controller.print_extra_info()
601 print(bytes_to_str(controller.stdout))
604 print(bytes_to_str(controller.stdout))
602 failed.append(controller)
605 failed.append(controller)
603 if res == -signal.SIGINT:
606 if res == -signal.SIGINT:
604 print("Interrupted")
607 print("Interrupted")
605 break
608 break
606 except KeyboardInterrupt:
609 except KeyboardInterrupt:
607 return
610 return
608
611
609 for controller in not_run:
612 for controller in not_run:
610 print(justify('Test group: ' + controller.section, 'NOT RUN'))
613 print(justify('Test group: ' + controller.section, 'NOT RUN'))
611
614
612 t_end = time.time()
615 t_end = time.time()
613 t_tests = t_end - t_start
616 t_tests = t_end - t_start
614 nrunners = len(to_run)
617 nrunners = len(to_run)
615 nfail = len(failed)
618 nfail = len(failed)
616 # summarize results
619 # summarize results
617 print('_'*70)
620 print('_'*70)
618 print('Test suite completed for system with the following information:')
621 print('Test suite completed for system with the following information:')
619 print(report())
622 print(report())
620 took = "Took %.3fs." % t_tests
623 took = "Took %.3fs." % t_tests
621 print('Status: ', end='')
624 print('Status: ', end='')
622 if not failed:
625 if not failed:
623 print('OK (%d test groups).' % nrunners, took)
626 print('OK (%d test groups).' % nrunners, took)
624 else:
627 else:
625 # If anything went wrong, point out what command to rerun manually to
628 # If anything went wrong, point out what command to rerun manually to
626 # see the actual errors and individual summary
629 # see the actual errors and individual summary
627 failed_sections = [c.section for c in failed]
630 failed_sections = [c.section for c in failed]
628 print('ERROR - {} out of {} test groups failed ({}).'.format(nfail,
631 print('ERROR - {} out of {} test groups failed ({}).'.format(nfail,
629 nrunners, ', '.join(failed_sections)), took)
632 nrunners, ', '.join(failed_sections)), took)
630 print()
633 print()
631 print('You may wish to rerun these, with:')
634 print('You may wish to rerun these, with:')
632 print(' iptest', *failed_sections)
635 print(' iptest', *failed_sections)
633 print()
636 print()
634
637
635 if options.coverage:
638 if options.coverage:
636 from coverage import coverage
639 from coverage import coverage
637 cov = coverage(data_file='.coverage')
640 cov = coverage(data_file='.coverage')
638 cov.combine()
641 cov.combine()
639 cov.save()
642 cov.save()
640
643
641 # Coverage HTML report
644 # Coverage HTML report
642 if options.coverage == 'html':
645 if options.coverage == 'html':
643 html_dir = 'ipy_htmlcov'
646 html_dir = 'ipy_htmlcov'
644 shutil.rmtree(html_dir, ignore_errors=True)
647 shutil.rmtree(html_dir, ignore_errors=True)
645 print("Writing HTML coverage report to %s/ ... " % html_dir, end="")
648 print("Writing HTML coverage report to %s/ ... " % html_dir, end="")
646 sys.stdout.flush()
649 sys.stdout.flush()
647
650
648 # Custom HTML reporter to clean up module names.
651 # Custom HTML reporter to clean up module names.
649 from coverage.html import HtmlReporter
652 from coverage.html import HtmlReporter
650 class CustomHtmlReporter(HtmlReporter):
653 class CustomHtmlReporter(HtmlReporter):
651 def find_code_units(self, morfs):
654 def find_code_units(self, morfs):
652 super(CustomHtmlReporter, self).find_code_units(morfs)
655 super(CustomHtmlReporter, self).find_code_units(morfs)
653 for cu in self.code_units:
656 for cu in self.code_units:
654 nameparts = cu.name.split(os.sep)
657 nameparts = cu.name.split(os.sep)
655 if 'IPython' not in nameparts:
658 if 'IPython' not in nameparts:
656 continue
659 continue
657 ix = nameparts.index('IPython')
660 ix = nameparts.index('IPython')
658 cu.name = '.'.join(nameparts[ix:])
661 cu.name = '.'.join(nameparts[ix:])
659
662
660 # Reimplement the html_report method with our custom reporter
663 # Reimplement the html_report method with our custom reporter
661 cov._harvest_data()
664 cov._harvest_data()
662 cov.config.from_args(omit='*{0}tests{0}*'.format(os.sep), html_dir=html_dir,
665 cov.config.from_args(omit='*{0}tests{0}*'.format(os.sep), html_dir=html_dir,
663 html_title='IPython test coverage',
666 html_title='IPython test coverage',
664 )
667 )
665 reporter = CustomHtmlReporter(cov, cov.config)
668 reporter = CustomHtmlReporter(cov, cov.config)
666 reporter.report(None)
669 reporter.report(None)
667 print('done.')
670 print('done.')
668
671
669 # Coverage XML report
672 # Coverage XML report
670 elif options.coverage == 'xml':
673 elif options.coverage == 'xml':
671 cov.xml_report(outfile='ipy_coverage.xml')
674 cov.xml_report(outfile='ipy_coverage.xml')
672
675
673 if failed:
676 if failed:
674 # Ensure that our exit code indicates failure
677 # Ensure that our exit code indicates failure
675 sys.exit(1)
678 sys.exit(1)
676
679
677 argparser = argparse.ArgumentParser(description='Run IPython test suite')
680 argparser = argparse.ArgumentParser(description='Run IPython test suite')
678 argparser.add_argument('testgroups', nargs='*',
681 argparser.add_argument('testgroups', nargs='*',
679 help='Run specified groups of tests. If omitted, run '
682 help='Run specified groups of tests. If omitted, run '
680 'all tests.')
683 'all tests.')
681 argparser.add_argument('--all', action='store_true',
684 argparser.add_argument('--all', action='store_true',
682 help='Include slow tests not run by default.')
685 help='Include slow tests not run by default.')
683 argparser.add_argument('--slimerjs', action='store_true',
686 argparser.add_argument('--slimerjs', action='store_true',
684 help="Use slimerjs if it's installed instead of phantomjs for casperjs tests.")
687 help="Use slimerjs if it's installed instead of phantomjs for casperjs tests.")
685 argparser.add_argument('--url', help="URL to use for the JS tests.")
688 argparser.add_argument('--url', help="URL to use for the JS tests.")
686 argparser.add_argument('-j', '--fast', nargs='?', const=None, default=1, type=int,
689 argparser.add_argument('-j', '--fast', nargs='?', const=None, default=1, type=int,
687 help='Run test sections in parallel. This starts as many '
690 help='Run test sections in parallel. This starts as many '
688 'processes as you have cores, or you can specify a number.')
691 'processes as you have cores, or you can specify a number.')
689 argparser.add_argument('--xunit', action='store_true',
692 argparser.add_argument('--xunit', action='store_true',
690 help='Produce Xunit XML results')
693 help='Produce Xunit XML results')
691 argparser.add_argument('--coverage', nargs='?', const=True, default=False,
694 argparser.add_argument('--coverage', nargs='?', const=True, default=False,
692 help="Measure test coverage. Specify 'html' or "
695 help="Measure test coverage. Specify 'html' or "
693 "'xml' to get reports.")
696 "'xml' to get reports.")
694 argparser.add_argument('--subproc-streams', default='capture',
697 argparser.add_argument('--subproc-streams', default='capture',
695 help="What to do with stdout/stderr from subprocesses. "
698 help="What to do with stdout/stderr from subprocesses. "
696 "'capture' (default), 'show' and 'discard' are the options.")
699 "'capture' (default), 'show' and 'discard' are the options.")
697
700
698 def default_options():
701 def default_options():
699 """Get an argparse Namespace object with the default arguments, to pass to
702 """Get an argparse Namespace object with the default arguments, to pass to
700 :func:`run_iptestall`.
703 :func:`run_iptestall`.
701 """
704 """
702 options = argparser.parse_args([])
705 options = argparser.parse_args([])
703 options.extra_args = []
706 options.extra_args = []
704 return options
707 return options
705
708
706 def main():
709 def main():
707 # iptest doesn't work correctly if the working directory is the
710 # iptest doesn't work correctly if the working directory is the
708 # root of the IPython source tree. Tell the user to avoid
711 # root of the IPython source tree. Tell the user to avoid
709 # frustration.
712 # frustration.
710 if os.path.exists(os.path.join(os.getcwd(),
713 if os.path.exists(os.path.join(os.getcwd(),
711 'IPython', 'testing', '__main__.py')):
714 'IPython', 'testing', '__main__.py')):
712 print("Don't run iptest from the IPython source directory",
715 print("Don't run iptest from the IPython source directory",
713 file=sys.stderr)
716 file=sys.stderr)
714 sys.exit(1)
717 sys.exit(1)
715 # Arguments after -- should be passed through to nose. Argparse treats
718 # Arguments after -- should be passed through to nose. Argparse treats
716 # everything after -- as regular positional arguments, so we separate them
719 # everything after -- as regular positional arguments, so we separate them
717 # first.
720 # first.
718 try:
721 try:
719 ix = sys.argv.index('--')
722 ix = sys.argv.index('--')
720 except ValueError:
723 except ValueError:
721 to_parse = sys.argv[1:]
724 to_parse = sys.argv[1:]
722 extra_args = []
725 extra_args = []
723 else:
726 else:
724 to_parse = sys.argv[1:ix]
727 to_parse = sys.argv[1:ix]
725 extra_args = sys.argv[ix+1:]
728 extra_args = sys.argv[ix+1:]
726
729
727 options = argparser.parse_args(to_parse)
730 options = argparser.parse_args(to_parse)
728 options.extra_args = extra_args
731 options.extra_args = extra_args
729
732
730 run_iptestall(options)
733 run_iptestall(options)
731
734
732
735
733 if __name__ == '__main__':
736 if __name__ == '__main__':
734 main()
737 main()
@@ -1,679 +1,678 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 """Tests for IPython.utils.path.py"""
2 """Tests for IPython.utils.path.py"""
3
3
4 # Copyright (c) IPython Development Team.
4 # Copyright (c) IPython Development Team.
5 # Distributed under the terms of the Modified BSD License.
5 # Distributed under the terms of the Modified BSD License.
6
6
7 import errno
7 import errno
8 import os
8 import os
9 import shutil
9 import shutil
10 import sys
10 import sys
11 import tempfile
11 import tempfile
12 import warnings
12 import warnings
13 from contextlib import contextmanager
13 from contextlib import contextmanager
14
14
15 try: # Python 3.3+
15 try: # Python 3.3+
16 from unittest.mock import patch
16 from unittest.mock import patch
17 except ImportError:
17 except ImportError:
18 from mock import patch
18 from mock import patch
19
19
20 from os.path import join, abspath, split
20 from os.path import join, abspath, split
21
21
22 from nose import SkipTest
22 from nose import SkipTest
23 import nose.tools as nt
23 import nose.tools as nt
24
24
25 from nose import with_setup
25 from nose import with_setup
26
26
27 import IPython
27 import IPython
28 from IPython.testing import decorators as dec
28 from IPython.testing import decorators as dec
29 from IPython.testing.decorators import (skip_if_not_win32, skip_win32,
29 from IPython.testing.decorators import (skip_if_not_win32, skip_win32,
30 onlyif_unicode_paths,)
30 onlyif_unicode_paths,)
31 from IPython.testing.tools import make_tempfile, AssertPrints
31 from IPython.testing.tools import make_tempfile, AssertPrints
32 from IPython.utils import path
32 from IPython.utils import path
33 from IPython.utils import py3compat
33 from IPython.utils import py3compat
34 from IPython.utils.tempdir import TemporaryDirectory
34 from IPython.utils.tempdir import TemporaryDirectory
35
35
36 # Platform-dependent imports
36 # Platform-dependent imports
37 try:
37 try:
38 import winreg as wreg # Py 3
38 import winreg as wreg # Py 3
39 except ImportError:
39 except ImportError:
40 try:
40 try:
41 import _winreg as wreg # Py 2
41 import _winreg as wreg # Py 2
42 except ImportError:
42 except ImportError:
43 #Fake _winreg module on none windows platforms
43 #Fake _winreg module on none windows platforms
44 import types
44 import types
45 wr_name = "winreg" if py3compat.PY3 else "_winreg"
45 wr_name = "winreg" if py3compat.PY3 else "_winreg"
46 sys.modules[wr_name] = types.ModuleType(wr_name)
46 sys.modules[wr_name] = types.ModuleType(wr_name)
47 try:
47 try:
48 import winreg as wreg
48 import winreg as wreg
49 except ImportError:
49 except ImportError:
50 import _winreg as wreg
50 import _winreg as wreg
51 #Add entries that needs to be stubbed by the testing code
51 #Add entries that needs to be stubbed by the testing code
52 (wreg.OpenKey, wreg.QueryValueEx,) = (None, None)
52 (wreg.OpenKey, wreg.QueryValueEx,) = (None, None)
53
53
54 try:
54 try:
55 reload
55 reload
56 except NameError: # Python 3
56 except NameError: # Python 3
57 from imp import reload
57 from imp import reload
58
58
59 #-----------------------------------------------------------------------------
59 #-----------------------------------------------------------------------------
60 # Globals
60 # Globals
61 #-----------------------------------------------------------------------------
61 #-----------------------------------------------------------------------------
62 env = os.environ
62 env = os.environ
63 TEST_FILE_PATH = split(abspath(__file__))[0]
63 TEST_FILE_PATH = split(abspath(__file__))[0]
64 TMP_TEST_DIR = tempfile.mkdtemp()
64 TMP_TEST_DIR = tempfile.mkdtemp()
65 HOME_TEST_DIR = join(TMP_TEST_DIR, "home_test_dir")
65 HOME_TEST_DIR = join(TMP_TEST_DIR, "home_test_dir")
66 XDG_TEST_DIR = join(HOME_TEST_DIR, "xdg_test_dir")
66 XDG_TEST_DIR = join(HOME_TEST_DIR, "xdg_test_dir")
67 XDG_CACHE_DIR = join(HOME_TEST_DIR, "xdg_cache_dir")
67 XDG_CACHE_DIR = join(HOME_TEST_DIR, "xdg_cache_dir")
68 IP_TEST_DIR = join(HOME_TEST_DIR,'.ipython')
68 IP_TEST_DIR = join(HOME_TEST_DIR,'.ipython')
69 #
69 #
70 # Setup/teardown functions/decorators
70 # Setup/teardown functions/decorators
71 #
71 #
72
72
73 def setup():
73 def setup():
74 """Setup testenvironment for the module:
74 """Setup testenvironment for the module:
75
75
76 - Adds dummy home dir tree
76 - Adds dummy home dir tree
77 """
77 """
78 # Do not mask exceptions here. In particular, catching WindowsError is a
78 # Do not mask exceptions here. In particular, catching WindowsError is a
79 # problem because that exception is only defined on Windows...
79 # problem because that exception is only defined on Windows...
80 os.makedirs(IP_TEST_DIR)
80 os.makedirs(IP_TEST_DIR)
81 os.makedirs(os.path.join(XDG_TEST_DIR, 'ipython'))
81 os.makedirs(os.path.join(XDG_TEST_DIR, 'ipython'))
82 os.makedirs(os.path.join(XDG_CACHE_DIR, 'ipython'))
82 os.makedirs(os.path.join(XDG_CACHE_DIR, 'ipython'))
83
83
84
84
85 def teardown():
85 def teardown():
86 """Teardown testenvironment for the module:
86 """Teardown testenvironment for the module:
87
87
88 - Remove dummy home dir tree
88 - Remove dummy home dir tree
89 """
89 """
90 # Note: we remove the parent test dir, which is the root of all test
90 # Note: we remove the parent test dir, which is the root of all test
91 # subdirs we may have created. Use shutil instead of os.removedirs, so
91 # subdirs we may have created. Use shutil instead of os.removedirs, so
92 # that non-empty directories are all recursively removed.
92 # that non-empty directories are all recursively removed.
93 shutil.rmtree(TMP_TEST_DIR)
93 shutil.rmtree(TMP_TEST_DIR)
94
94
95
95
96 def setup_environment():
96 def setup_environment():
97 """Setup testenvironment for some functions that are tested
97 """Setup testenvironment for some functions that are tested
98 in this module. In particular this functions stores attributes
98 in this module. In particular this functions stores attributes
99 and other things that we need to stub in some test functions.
99 and other things that we need to stub in some test functions.
100 This needs to be done on a function level and not module level because
100 This needs to be done on a function level and not module level because
101 each testfunction needs a pristine environment.
101 each testfunction needs a pristine environment.
102 """
102 """
103 global oldstuff, platformstuff
103 global oldstuff, platformstuff
104 oldstuff = (env.copy(), os.name, sys.platform, path.get_home_dir, IPython.__file__, os.getcwd())
104 oldstuff = (env.copy(), os.name, sys.platform, path.get_home_dir, IPython.__file__, os.getcwd())
105
105
106 def teardown_environment():
106 def teardown_environment():
107 """Restore things that were remembered by the setup_environment function
107 """Restore things that were remembered by the setup_environment function
108 """
108 """
109 (oldenv, os.name, sys.platform, path.get_home_dir, IPython.__file__, old_wd) = oldstuff
109 (oldenv, os.name, sys.platform, path.get_home_dir, IPython.__file__, old_wd) = oldstuff
110 os.chdir(old_wd)
110 os.chdir(old_wd)
111 reload(path)
111 reload(path)
112
112
113 for key in list(env):
113 for key in list(env):
114 if key not in oldenv:
114 if key not in oldenv:
115 del env[key]
115 del env[key]
116 env.update(oldenv)
116 env.update(oldenv)
117 if hasattr(sys, 'frozen'):
117 if hasattr(sys, 'frozen'):
118 del sys.frozen
118 del sys.frozen
119
119
120 # Build decorator that uses the setup_environment/setup_environment
120 # Build decorator that uses the setup_environment/setup_environment
121 with_environment = with_setup(setup_environment, teardown_environment)
121 with_environment = with_setup(setup_environment, teardown_environment)
122
122
123 @contextmanager
123 @contextmanager
124 def patch_get_home_dir(dirpath):
124 def patch_get_home_dir(dirpath):
125 orig_get_home_dir = path.get_home_dir
125 orig_get_home_dir = path.get_home_dir
126 path.get_home_dir = lambda : dirpath
126 path.get_home_dir = lambda : dirpath
127 try:
127 try:
128 yield
128 yield
129 finally:
129 finally:
130 path.get_home_dir = orig_get_home_dir
130 path.get_home_dir = orig_get_home_dir
131
131
132 @skip_if_not_win32
132 @skip_if_not_win32
133 @with_environment
133 @with_environment
134 def test_get_home_dir_1():
134 def test_get_home_dir_1():
135 """Testcase for py2exe logic, un-compressed lib
135 """Testcase for py2exe logic, un-compressed lib
136 """
136 """
137 unfrozen = path.get_home_dir()
137 unfrozen = path.get_home_dir()
138 sys.frozen = True
138 sys.frozen = True
139
139
140 #fake filename for IPython.__init__
140 #fake filename for IPython.__init__
141 IPython.__file__ = abspath(join(HOME_TEST_DIR, "Lib/IPython/__init__.py"))
141 IPython.__file__ = abspath(join(HOME_TEST_DIR, "Lib/IPython/__init__.py"))
142
142
143 home_dir = path.get_home_dir()
143 home_dir = path.get_home_dir()
144 nt.assert_equal(home_dir, unfrozen)
144 nt.assert_equal(home_dir, unfrozen)
145
145
146
146
147 @skip_if_not_win32
147 @skip_if_not_win32
148 @with_environment
148 @with_environment
149 def test_get_home_dir_2():
149 def test_get_home_dir_2():
150 """Testcase for py2exe logic, compressed lib
150 """Testcase for py2exe logic, compressed lib
151 """
151 """
152 unfrozen = path.get_home_dir()
152 unfrozen = path.get_home_dir()
153 sys.frozen = True
153 sys.frozen = True
154 #fake filename for IPython.__init__
154 #fake filename for IPython.__init__
155 IPython.__file__ = abspath(join(HOME_TEST_DIR, "Library.zip/IPython/__init__.py")).lower()
155 IPython.__file__ = abspath(join(HOME_TEST_DIR, "Library.zip/IPython/__init__.py")).lower()
156
156
157 home_dir = path.get_home_dir(True)
157 home_dir = path.get_home_dir(True)
158 nt.assert_equal(home_dir, unfrozen)
158 nt.assert_equal(home_dir, unfrozen)
159
159
160
160
161 @with_environment
161 @with_environment
162 def test_get_home_dir_3():
162 def test_get_home_dir_3():
163 """get_home_dir() uses $HOME if set"""
163 """get_home_dir() uses $HOME if set"""
164 env["HOME"] = HOME_TEST_DIR
164 env["HOME"] = HOME_TEST_DIR
165 home_dir = path.get_home_dir(True)
165 home_dir = path.get_home_dir(True)
166 # get_home_dir expands symlinks
166 # get_home_dir expands symlinks
167 nt.assert_equal(home_dir, os.path.realpath(env["HOME"]))
167 nt.assert_equal(home_dir, os.path.realpath(env["HOME"]))
168
168
169
169
170 @with_environment
170 @with_environment
171 def test_get_home_dir_4():
171 def test_get_home_dir_4():
172 """get_home_dir() still works if $HOME is not set"""
172 """get_home_dir() still works if $HOME is not set"""
173
173
174 if 'HOME' in env: del env['HOME']
174 if 'HOME' in env: del env['HOME']
175 # this should still succeed, but we don't care what the answer is
175 # this should still succeed, but we don't care what the answer is
176 home = path.get_home_dir(False)
176 home = path.get_home_dir(False)
177
177
178 @with_environment
178 @with_environment
179 def test_get_home_dir_5():
179 def test_get_home_dir_5():
180 """raise HomeDirError if $HOME is specified, but not a writable dir"""
180 """raise HomeDirError if $HOME is specified, but not a writable dir"""
181 env['HOME'] = abspath(HOME_TEST_DIR+'garbage')
181 env['HOME'] = abspath(HOME_TEST_DIR+'garbage')
182 # set os.name = posix, to prevent My Documents fallback on Windows
182 # set os.name = posix, to prevent My Documents fallback on Windows
183 os.name = 'posix'
183 os.name = 'posix'
184 nt.assert_raises(path.HomeDirError, path.get_home_dir, True)
184 nt.assert_raises(path.HomeDirError, path.get_home_dir, True)
185
185
186 # Should we stub wreg fully so we can run the test on all platforms?
186 # Should we stub wreg fully so we can run the test on all platforms?
187 @skip_if_not_win32
187 @skip_if_not_win32
188 @with_environment
188 @with_environment
189 def test_get_home_dir_8():
189 def test_get_home_dir_8():
190 """Using registry hack for 'My Documents', os=='nt'
190 """Using registry hack for 'My Documents', os=='nt'
191
191
192 HOMESHARE, HOMEDRIVE, HOMEPATH, USERPROFILE and others are missing.
192 HOMESHARE, HOMEDRIVE, HOMEPATH, USERPROFILE and others are missing.
193 """
193 """
194 os.name = 'nt'
194 os.name = 'nt'
195 # Remove from stub environment all keys that may be set
195 # Remove from stub environment all keys that may be set
196 for key in ['HOME', 'HOMESHARE', 'HOMEDRIVE', 'HOMEPATH', 'USERPROFILE']:
196 for key in ['HOME', 'HOMESHARE', 'HOMEDRIVE', 'HOMEPATH', 'USERPROFILE']:
197 env.pop(key, None)
197 env.pop(key, None)
198
198
199 class key:
199 class key:
200 def Close(self):
200 def Close(self):
201 pass
201 pass
202
202
203 with patch.object(wreg, 'OpenKey', return_value=key()), \
203 with patch.object(wreg, 'OpenKey', return_value=key()), \
204 patch.object(wreg, 'QueryValueEx', return_value=[abspath(HOME_TEST_DIR)]):
204 patch.object(wreg, 'QueryValueEx', return_value=[abspath(HOME_TEST_DIR)]):
205 home_dir = path.get_home_dir()
205 home_dir = path.get_home_dir()
206 nt.assert_equal(home_dir, abspath(HOME_TEST_DIR))
206 nt.assert_equal(home_dir, abspath(HOME_TEST_DIR))
207
207
208
208
209 @with_environment
209 @with_environment
210 def test_get_ipython_dir_1():
210 def test_get_ipython_dir_1():
211 """test_get_ipython_dir_1, Testcase to see if we can call get_ipython_dir without Exceptions."""
211 """test_get_ipython_dir_1, Testcase to see if we can call get_ipython_dir without Exceptions."""
212 env_ipdir = os.path.join("someplace", ".ipython")
212 env_ipdir = os.path.join("someplace", ".ipython")
213 path._writable_dir = lambda path: True
213 path._writable_dir = lambda path: True
214 env['IPYTHONDIR'] = env_ipdir
214 env['IPYTHONDIR'] = env_ipdir
215 ipdir = path.get_ipython_dir()
215 ipdir = path.get_ipython_dir()
216 nt.assert_equal(ipdir, env_ipdir)
216 nt.assert_equal(ipdir, env_ipdir)
217
217
218
218
219 @with_environment
219 @with_environment
220 def test_get_ipython_dir_2():
220 def test_get_ipython_dir_2():
221 """test_get_ipython_dir_2, Testcase to see if we can call get_ipython_dir without Exceptions."""
221 """test_get_ipython_dir_2, Testcase to see if we can call get_ipython_dir without Exceptions."""
222 with patch_get_home_dir('someplace'):
222 with patch_get_home_dir('someplace'):
223 path.get_xdg_dir = lambda : None
223 path.get_xdg_dir = lambda : None
224 path._writable_dir = lambda path: True
224 path._writable_dir = lambda path: True
225 os.name = "posix"
225 os.name = "posix"
226 env.pop('IPYTHON_DIR', None)
226 env.pop('IPYTHON_DIR', None)
227 env.pop('IPYTHONDIR', None)
227 env.pop('IPYTHONDIR', None)
228 env.pop('XDG_CONFIG_HOME', None)
228 env.pop('XDG_CONFIG_HOME', None)
229 ipdir = path.get_ipython_dir()
229 ipdir = path.get_ipython_dir()
230 nt.assert_equal(ipdir, os.path.join("someplace", ".ipython"))
230 nt.assert_equal(ipdir, os.path.join("someplace", ".ipython"))
231
231
232 @with_environment
232 @with_environment
233 def test_get_ipython_dir_3():
233 def test_get_ipython_dir_3():
234 """test_get_ipython_dir_3, move XDG if defined, and .ipython doesn't exist."""
234 """test_get_ipython_dir_3, move XDG if defined, and .ipython doesn't exist."""
235 tmphome = TemporaryDirectory()
235 tmphome = TemporaryDirectory()
236 try:
236 try:
237 with patch_get_home_dir(tmphome.name):
237 with patch_get_home_dir(tmphome.name):
238 os.name = "posix"
238 os.name = "posix"
239 env.pop('IPYTHON_DIR', None)
239 env.pop('IPYTHON_DIR', None)
240 env.pop('IPYTHONDIR', None)
240 env.pop('IPYTHONDIR', None)
241 env['XDG_CONFIG_HOME'] = XDG_TEST_DIR
241 env['XDG_CONFIG_HOME'] = XDG_TEST_DIR
242
242
243 with warnings.catch_warnings(record=True) as w:
243 with warnings.catch_warnings(record=True) as w:
244 ipdir = path.get_ipython_dir()
244 ipdir = path.get_ipython_dir()
245
245
246 nt.assert_equal(ipdir, os.path.join(tmphome.name, ".ipython"))
246 nt.assert_equal(ipdir, os.path.join(tmphome.name, ".ipython"))
247 if sys.platform != 'darwin':
247 if sys.platform != 'darwin':
248 nt.assert_equal(len(w), 1)
248 nt.assert_equal(len(w), 1)
249 nt.assert_in('Moving', str(w[0]))
249 nt.assert_in('Moving', str(w[0]))
250 finally:
250 finally:
251 tmphome.cleanup()
251 tmphome.cleanup()
252
252
253 @with_environment
253 @with_environment
254 def test_get_ipython_dir_4():
254 def test_get_ipython_dir_4():
255 """test_get_ipython_dir_4, warn if XDG and home both exist."""
255 """test_get_ipython_dir_4, warn if XDG and home both exist."""
256 with patch_get_home_dir(HOME_TEST_DIR):
256 with patch_get_home_dir(HOME_TEST_DIR):
257 os.name = "posix"
257 os.name = "posix"
258 env.pop('IPYTHON_DIR', None)
258 env.pop('IPYTHON_DIR', None)
259 env.pop('IPYTHONDIR', None)
259 env.pop('IPYTHONDIR', None)
260 env['XDG_CONFIG_HOME'] = XDG_TEST_DIR
260 env['XDG_CONFIG_HOME'] = XDG_TEST_DIR
261 try:
261 try:
262 os.mkdir(os.path.join(XDG_TEST_DIR, 'ipython'))
262 os.mkdir(os.path.join(XDG_TEST_DIR, 'ipython'))
263 except OSError as e:
263 except OSError as e:
264 if e.errno != errno.EEXIST:
264 if e.errno != errno.EEXIST:
265 raise
265 raise
266
266
267 with warnings.catch_warnings(record=True) as w:
267 with warnings.catch_warnings(record=True) as w:
268 ipdir = path.get_ipython_dir()
268 ipdir = path.get_ipython_dir()
269
269
270 nt.assert_equal(ipdir, os.path.join(HOME_TEST_DIR, ".ipython"))
270 nt.assert_equal(ipdir, os.path.join(HOME_TEST_DIR, ".ipython"))
271 if sys.platform != 'darwin':
271 if sys.platform != 'darwin':
272 nt.assert_equal(len(w), 1)
272 nt.assert_equal(len(w), 1)
273 nt.assert_in('Ignoring', str(w[0]))
273 nt.assert_in('Ignoring', str(w[0]))
274
274
275 @with_environment
275 @with_environment
276 def test_get_ipython_dir_5():
276 def test_get_ipython_dir_5():
277 """test_get_ipython_dir_5, use .ipython if exists and XDG defined, but doesn't exist."""
277 """test_get_ipython_dir_5, use .ipython if exists and XDG defined, but doesn't exist."""
278 with patch_get_home_dir(HOME_TEST_DIR):
278 with patch_get_home_dir(HOME_TEST_DIR):
279 os.name = "posix"
279 os.name = "posix"
280 env.pop('IPYTHON_DIR', None)
280 env.pop('IPYTHON_DIR', None)
281 env.pop('IPYTHONDIR', None)
281 env.pop('IPYTHONDIR', None)
282 env['XDG_CONFIG_HOME'] = XDG_TEST_DIR
282 env['XDG_CONFIG_HOME'] = XDG_TEST_DIR
283 try:
283 try:
284 os.rmdir(os.path.join(XDG_TEST_DIR, 'ipython'))
284 os.rmdir(os.path.join(XDG_TEST_DIR, 'ipython'))
285 except OSError as e:
285 except OSError as e:
286 if e.errno != errno.ENOENT:
286 if e.errno != errno.ENOENT:
287 raise
287 raise
288 ipdir = path.get_ipython_dir()
288 ipdir = path.get_ipython_dir()
289 nt.assert_equal(ipdir, IP_TEST_DIR)
289 nt.assert_equal(ipdir, IP_TEST_DIR)
290
290
291 @with_environment
291 @with_environment
292 def test_get_ipython_dir_6():
292 def test_get_ipython_dir_6():
293 """test_get_ipython_dir_6, use home over XDG if defined and neither exist."""
293 """test_get_ipython_dir_6, use home over XDG if defined and neither exist."""
294 xdg = os.path.join(HOME_TEST_DIR, 'somexdg')
294 xdg = os.path.join(HOME_TEST_DIR, 'somexdg')
295 os.mkdir(xdg)
295 os.mkdir(xdg)
296 shutil.rmtree(os.path.join(HOME_TEST_DIR, '.ipython'))
296 shutil.rmtree(os.path.join(HOME_TEST_DIR, '.ipython'))
297 with patch_get_home_dir(HOME_TEST_DIR):
297 with patch_get_home_dir(HOME_TEST_DIR):
298 orig_get_xdg_dir = path.get_xdg_dir
298 orig_get_xdg_dir = path.get_xdg_dir
299 path.get_xdg_dir = lambda : xdg
299 path.get_xdg_dir = lambda : xdg
300 try:
300 try:
301 os.name = "posix"
301 os.name = "posix"
302 env.pop('IPYTHON_DIR', None)
302 env.pop('IPYTHON_DIR', None)
303 env.pop('IPYTHONDIR', None)
303 env.pop('IPYTHONDIR', None)
304 env.pop('XDG_CONFIG_HOME', None)
304 env.pop('XDG_CONFIG_HOME', None)
305 with warnings.catch_warnings(record=True) as w:
305 with warnings.catch_warnings(record=True) as w:
306 ipdir = path.get_ipython_dir()
306 ipdir = path.get_ipython_dir()
307
307
308 nt.assert_equal(ipdir, os.path.join(HOME_TEST_DIR, '.ipython'))
308 nt.assert_equal(ipdir, os.path.join(HOME_TEST_DIR, '.ipython'))
309 nt.assert_equal(len(w), 0)
309 nt.assert_equal(len(w), 0)
310 finally:
310 finally:
311 path.get_xdg_dir = orig_get_xdg_dir
311 path.get_xdg_dir = orig_get_xdg_dir
312
312
313 @with_environment
313 @with_environment
314 def test_get_ipython_dir_7():
314 def test_get_ipython_dir_7():
315 """test_get_ipython_dir_7, test home directory expansion on IPYTHONDIR"""
315 """test_get_ipython_dir_7, test home directory expansion on IPYTHONDIR"""
316 path._writable_dir = lambda path: True
316 path._writable_dir = lambda path: True
317 home_dir = os.path.normpath(os.path.expanduser('~'))
317 home_dir = os.path.normpath(os.path.expanduser('~'))
318 env['IPYTHONDIR'] = os.path.join('~', 'somewhere')
318 env['IPYTHONDIR'] = os.path.join('~', 'somewhere')
319 ipdir = path.get_ipython_dir()
319 ipdir = path.get_ipython_dir()
320 nt.assert_equal(ipdir, os.path.join(home_dir, 'somewhere'))
320 nt.assert_equal(ipdir, os.path.join(home_dir, 'somewhere'))
321
321
322 @skip_win32
322 @skip_win32
323 @with_environment
323 @with_environment
324 def test_get_ipython_dir_8():
324 def test_get_ipython_dir_8():
325 """test_get_ipython_dir_8, test / home directory"""
325 """test_get_ipython_dir_8, test / home directory"""
326 old = path._writable_dir, path.get_xdg_dir
326 old = path._writable_dir, path.get_xdg_dir
327 try:
327 try:
328 path._writable_dir = lambda path: bool(path)
328 path._writable_dir = lambda path: bool(path)
329 path.get_xdg_dir = lambda: None
329 path.get_xdg_dir = lambda: None
330 env.pop('IPYTHON_DIR', None)
330 env.pop('IPYTHON_DIR', None)
331 env.pop('IPYTHONDIR', None)
331 env.pop('IPYTHONDIR', None)
332 env['HOME'] = '/'
332 env['HOME'] = '/'
333 nt.assert_equal(path.get_ipython_dir(), '/.ipython')
333 nt.assert_equal(path.get_ipython_dir(), '/.ipython')
334 finally:
334 finally:
335 path._writable_dir, path.get_xdg_dir = old
335 path._writable_dir, path.get_xdg_dir = old
336
336
337 @with_environment
337 @with_environment
338 def test_get_xdg_dir_0():
338 def test_get_xdg_dir_0():
339 """test_get_xdg_dir_0, check xdg_dir"""
339 """test_get_xdg_dir_0, check xdg_dir"""
340 reload(path)
340 reload(path)
341 path._writable_dir = lambda path: True
341 path._writable_dir = lambda path: True
342 path.get_home_dir = lambda : 'somewhere'
342 path.get_home_dir = lambda : 'somewhere'
343 os.name = "posix"
343 os.name = "posix"
344 sys.platform = "linux2"
344 sys.platform = "linux2"
345 env.pop('IPYTHON_DIR', None)
345 env.pop('IPYTHON_DIR', None)
346 env.pop('IPYTHONDIR', None)
346 env.pop('IPYTHONDIR', None)
347 env.pop('XDG_CONFIG_HOME', None)
347 env.pop('XDG_CONFIG_HOME', None)
348
348
349 nt.assert_equal(path.get_xdg_dir(), os.path.join('somewhere', '.config'))
349 nt.assert_equal(path.get_xdg_dir(), os.path.join('somewhere', '.config'))
350
350
351
351
352 @with_environment
352 @with_environment
353 def test_get_xdg_dir_1():
353 def test_get_xdg_dir_1():
354 """test_get_xdg_dir_1, check nonexistant xdg_dir"""
354 """test_get_xdg_dir_1, check nonexistant xdg_dir"""
355 reload(path)
355 reload(path)
356 path.get_home_dir = lambda : HOME_TEST_DIR
356 path.get_home_dir = lambda : HOME_TEST_DIR
357 os.name = "posix"
357 os.name = "posix"
358 sys.platform = "linux2"
358 sys.platform = "linux2"
359 env.pop('IPYTHON_DIR', None)
359 env.pop('IPYTHON_DIR', None)
360 env.pop('IPYTHONDIR', None)
360 env.pop('IPYTHONDIR', None)
361 env.pop('XDG_CONFIG_HOME', None)
361 env.pop('XDG_CONFIG_HOME', None)
362 nt.assert_equal(path.get_xdg_dir(), None)
362 nt.assert_equal(path.get_xdg_dir(), None)
363
363
364 @with_environment
364 @with_environment
365 def test_get_xdg_dir_2():
365 def test_get_xdg_dir_2():
366 """test_get_xdg_dir_2, check xdg_dir default to ~/.config"""
366 """test_get_xdg_dir_2, check xdg_dir default to ~/.config"""
367 reload(path)
367 reload(path)
368 path.get_home_dir = lambda : HOME_TEST_DIR
368 path.get_home_dir = lambda : HOME_TEST_DIR
369 os.name = "posix"
369 os.name = "posix"
370 sys.platform = "linux2"
370 sys.platform = "linux2"
371 env.pop('IPYTHON_DIR', None)
371 env.pop('IPYTHON_DIR', None)
372 env.pop('IPYTHONDIR', None)
372 env.pop('IPYTHONDIR', None)
373 env.pop('XDG_CONFIG_HOME', None)
373 env.pop('XDG_CONFIG_HOME', None)
374 cfgdir=os.path.join(path.get_home_dir(), '.config')
374 cfgdir=os.path.join(path.get_home_dir(), '.config')
375 if not os.path.exists(cfgdir):
375 if not os.path.exists(cfgdir):
376 os.makedirs(cfgdir)
376 os.makedirs(cfgdir)
377
377
378 nt.assert_equal(path.get_xdg_dir(), cfgdir)
378 nt.assert_equal(path.get_xdg_dir(), cfgdir)
379
379
380 @with_environment
380 @with_environment
381 def test_get_xdg_dir_3():
381 def test_get_xdg_dir_3():
382 """test_get_xdg_dir_3, check xdg_dir not used on OS X"""
382 """test_get_xdg_dir_3, check xdg_dir not used on OS X"""
383 reload(path)
383 reload(path)
384 path.get_home_dir = lambda : HOME_TEST_DIR
384 path.get_home_dir = lambda : HOME_TEST_DIR
385 os.name = "posix"
385 os.name = "posix"
386 sys.platform = "darwin"
386 sys.platform = "darwin"
387 env.pop('IPYTHON_DIR', None)
387 env.pop('IPYTHON_DIR', None)
388 env.pop('IPYTHONDIR', None)
388 env.pop('IPYTHONDIR', None)
389 env.pop('XDG_CONFIG_HOME', None)
389 env.pop('XDG_CONFIG_HOME', None)
390 cfgdir=os.path.join(path.get_home_dir(), '.config')
390 cfgdir=os.path.join(path.get_home_dir(), '.config')
391 if not os.path.exists(cfgdir):
391 if not os.path.exists(cfgdir):
392 os.makedirs(cfgdir)
392 os.makedirs(cfgdir)
393
393
394 nt.assert_equal(path.get_xdg_dir(), None)
394 nt.assert_equal(path.get_xdg_dir(), None)
395
395
396 def test_filefind():
396 def test_filefind():
397 """Various tests for filefind"""
397 """Various tests for filefind"""
398 f = tempfile.NamedTemporaryFile()
398 f = tempfile.NamedTemporaryFile()
399 # print 'fname:',f.name
399 # print 'fname:',f.name
400 alt_dirs = path.get_ipython_dir()
400 alt_dirs = path.get_ipython_dir()
401 t = path.filefind(f.name, alt_dirs)
401 t = path.filefind(f.name, alt_dirs)
402 # print 'found:',t
402 # print 'found:',t
403
403
404 @with_environment
404 @with_environment
405 def test_get_ipython_cache_dir():
405 def test_get_ipython_cache_dir():
406 os.environ["HOME"] = HOME_TEST_DIR
406 os.environ["HOME"] = HOME_TEST_DIR
407 if os.name == 'posix' and sys.platform != 'darwin':
407 if os.name == 'posix' and sys.platform != 'darwin':
408 # test default
408 # test default
409 os.makedirs(os.path.join(HOME_TEST_DIR, ".cache"))
409 os.makedirs(os.path.join(HOME_TEST_DIR, ".cache"))
410 os.environ.pop("XDG_CACHE_HOME", None)
410 os.environ.pop("XDG_CACHE_HOME", None)
411 ipdir = path.get_ipython_cache_dir()
411 ipdir = path.get_ipython_cache_dir()
412 nt.assert_equal(os.path.join(HOME_TEST_DIR, ".cache", "ipython"),
412 nt.assert_equal(os.path.join(HOME_TEST_DIR, ".cache", "ipython"),
413 ipdir)
413 ipdir)
414 nt.assert_true(os.path.isdir(ipdir))
414 nt.assert_true(os.path.isdir(ipdir))
415
415
416 # test env override
416 # test env override
417 os.environ["XDG_CACHE_HOME"] = XDG_CACHE_DIR
417 os.environ["XDG_CACHE_HOME"] = XDG_CACHE_DIR
418 ipdir = path.get_ipython_cache_dir()
418 ipdir = path.get_ipython_cache_dir()
419 nt.assert_true(os.path.isdir(ipdir))
419 nt.assert_true(os.path.isdir(ipdir))
420 nt.assert_equal(ipdir, os.path.join(XDG_CACHE_DIR, "ipython"))
420 nt.assert_equal(ipdir, os.path.join(XDG_CACHE_DIR, "ipython"))
421 else:
421 else:
422 nt.assert_equal(path.get_ipython_cache_dir(),
422 nt.assert_equal(path.get_ipython_cache_dir(),
423 path.get_ipython_dir())
423 path.get_ipython_dir())
424
424
425 def test_get_ipython_package_dir():
425 def test_get_ipython_package_dir():
426 ipdir = path.get_ipython_package_dir()
426 ipdir = path.get_ipython_package_dir()
427 nt.assert_true(os.path.isdir(ipdir))
427 nt.assert_true(os.path.isdir(ipdir))
428
428
429
429
430 def test_get_ipython_module_path():
430 def test_get_ipython_module_path():
431 ipapp_path = path.get_ipython_module_path('IPython.terminal.ipapp')
431 ipapp_path = path.get_ipython_module_path('IPython.terminal.ipapp')
432 nt.assert_true(os.path.isfile(ipapp_path))
432 nt.assert_true(os.path.isfile(ipapp_path))
433
433
434
434
435 @dec.skip_if_not_win32
435 @dec.skip_if_not_win32
436 def test_get_long_path_name_win32():
436 def test_get_long_path_name_win32():
437 with TemporaryDirectory() as tmpdir:
437 with TemporaryDirectory() as tmpdir:
438
438
439 # Make a long path. Expands the path of tmpdir prematurely as it may already have a long
439 # Make a long path. Expands the path of tmpdir prematurely as it may already have a long
440 # path component, so ensure we include the long form of it
440 # path component, so ensure we include the long form of it
441 long_path = os.path.join(path.get_long_path_name(tmpdir), u'this is my long path name')
441 long_path = os.path.join(path.get_long_path_name(tmpdir), u'this is my long path name')
442 os.makedirs(long_path)
442 os.makedirs(long_path)
443
443
444 # Test to see if the short path evaluates correctly.
444 # Test to see if the short path evaluates correctly.
445 short_path = os.path.join(tmpdir, u'THISIS~1')
445 short_path = os.path.join(tmpdir, u'THISIS~1')
446 evaluated_path = path.get_long_path_name(short_path)
446 evaluated_path = path.get_long_path_name(short_path)
447 nt.assert_equal(evaluated_path.lower(), long_path.lower())
447 nt.assert_equal(evaluated_path.lower(), long_path.lower())
448
448
449
449
450 @dec.skip_win32
450 @dec.skip_win32
451 def test_get_long_path_name():
451 def test_get_long_path_name():
452 p = path.get_long_path_name('/usr/local')
452 p = path.get_long_path_name('/usr/local')
453 nt.assert_equal(p,'/usr/local')
453 nt.assert_equal(p,'/usr/local')
454
454
455 @dec.skip_win32 # can't create not-user-writable dir on win
455 @dec.skip_win32 # can't create not-user-writable dir on win
456 @with_environment
456 @with_environment
457 def test_not_writable_ipdir():
457 def test_not_writable_ipdir():
458 tmpdir = tempfile.mkdtemp()
458 tmpdir = tempfile.mkdtemp()
459 os.name = "posix"
459 os.name = "posix"
460 env.pop('IPYTHON_DIR', None)
460 env.pop('IPYTHON_DIR', None)
461 env.pop('IPYTHONDIR', None)
461 env.pop('IPYTHONDIR', None)
462 env.pop('XDG_CONFIG_HOME', None)
462 env.pop('XDG_CONFIG_HOME', None)
463 env['HOME'] = tmpdir
463 env['HOME'] = tmpdir
464 ipdir = os.path.join(tmpdir, '.ipython')
464 ipdir = os.path.join(tmpdir, '.ipython')
465 os.mkdir(ipdir)
465 os.mkdir(ipdir, 0o555)
466 os.chmod(ipdir, 600)
467 try:
466 try:
468 os.listdir(ipdir)
467 open(os.path.join(ipdir, "_foo_"), 'w').close()
469 except OSError:
468 except IOError:
470 pass
469 pass
471 else:
470 else:
472 # I can still read an unreadable dir,
471 # I can still write to an unwritable dir,
473 # assume I'm root and skip the test
472 # assume I'm root and skip the test
474 raise SkipTest("I can't create directories that I can't list")
473 raise SkipTest("I can't create directories that I can't list")
475 with AssertPrints('is not a writable location', channel='stderr'):
474 with AssertPrints('is not a writable location', channel='stderr'):
476 ipdir = path.get_ipython_dir()
475 ipdir = path.get_ipython_dir()
477 env.pop('IPYTHON_DIR', None)
476 env.pop('IPYTHON_DIR', None)
478
477
479 def test_unquote_filename():
478 def test_unquote_filename():
480 for win32 in (True, False):
479 for win32 in (True, False):
481 nt.assert_equal(path.unquote_filename('foo.py', win32=win32), 'foo.py')
480 nt.assert_equal(path.unquote_filename('foo.py', win32=win32), 'foo.py')
482 nt.assert_equal(path.unquote_filename('foo bar.py', win32=win32), 'foo bar.py')
481 nt.assert_equal(path.unquote_filename('foo bar.py', win32=win32), 'foo bar.py')
483 nt.assert_equal(path.unquote_filename('"foo.py"', win32=True), 'foo.py')
482 nt.assert_equal(path.unquote_filename('"foo.py"', win32=True), 'foo.py')
484 nt.assert_equal(path.unquote_filename('"foo bar.py"', win32=True), 'foo bar.py')
483 nt.assert_equal(path.unquote_filename('"foo bar.py"', win32=True), 'foo bar.py')
485 nt.assert_equal(path.unquote_filename("'foo.py'", win32=True), 'foo.py')
484 nt.assert_equal(path.unquote_filename("'foo.py'", win32=True), 'foo.py')
486 nt.assert_equal(path.unquote_filename("'foo bar.py'", win32=True), 'foo bar.py')
485 nt.assert_equal(path.unquote_filename("'foo bar.py'", win32=True), 'foo bar.py')
487 nt.assert_equal(path.unquote_filename('"foo.py"', win32=False), '"foo.py"')
486 nt.assert_equal(path.unquote_filename('"foo.py"', win32=False), '"foo.py"')
488 nt.assert_equal(path.unquote_filename('"foo bar.py"', win32=False), '"foo bar.py"')
487 nt.assert_equal(path.unquote_filename('"foo bar.py"', win32=False), '"foo bar.py"')
489 nt.assert_equal(path.unquote_filename("'foo.py'", win32=False), "'foo.py'")
488 nt.assert_equal(path.unquote_filename("'foo.py'", win32=False), "'foo.py'")
490 nt.assert_equal(path.unquote_filename("'foo bar.py'", win32=False), "'foo bar.py'")
489 nt.assert_equal(path.unquote_filename("'foo bar.py'", win32=False), "'foo bar.py'")
491
490
492 @with_environment
491 @with_environment
493 def test_get_py_filename():
492 def test_get_py_filename():
494 os.chdir(TMP_TEST_DIR)
493 os.chdir(TMP_TEST_DIR)
495 for win32 in (True, False):
494 for win32 in (True, False):
496 with make_tempfile('foo.py'):
495 with make_tempfile('foo.py'):
497 nt.assert_equal(path.get_py_filename('foo.py', force_win32=win32), 'foo.py')
496 nt.assert_equal(path.get_py_filename('foo.py', force_win32=win32), 'foo.py')
498 nt.assert_equal(path.get_py_filename('foo', force_win32=win32), 'foo.py')
497 nt.assert_equal(path.get_py_filename('foo', force_win32=win32), 'foo.py')
499 with make_tempfile('foo'):
498 with make_tempfile('foo'):
500 nt.assert_equal(path.get_py_filename('foo', force_win32=win32), 'foo')
499 nt.assert_equal(path.get_py_filename('foo', force_win32=win32), 'foo')
501 nt.assert_raises(IOError, path.get_py_filename, 'foo.py', force_win32=win32)
500 nt.assert_raises(IOError, path.get_py_filename, 'foo.py', force_win32=win32)
502 nt.assert_raises(IOError, path.get_py_filename, 'foo', force_win32=win32)
501 nt.assert_raises(IOError, path.get_py_filename, 'foo', force_win32=win32)
503 nt.assert_raises(IOError, path.get_py_filename, 'foo.py', force_win32=win32)
502 nt.assert_raises(IOError, path.get_py_filename, 'foo.py', force_win32=win32)
504 true_fn = 'foo with spaces.py'
503 true_fn = 'foo with spaces.py'
505 with make_tempfile(true_fn):
504 with make_tempfile(true_fn):
506 nt.assert_equal(path.get_py_filename('foo with spaces', force_win32=win32), true_fn)
505 nt.assert_equal(path.get_py_filename('foo with spaces', force_win32=win32), true_fn)
507 nt.assert_equal(path.get_py_filename('foo with spaces.py', force_win32=win32), true_fn)
506 nt.assert_equal(path.get_py_filename('foo with spaces.py', force_win32=win32), true_fn)
508 if win32:
507 if win32:
509 nt.assert_equal(path.get_py_filename('"foo with spaces.py"', force_win32=True), true_fn)
508 nt.assert_equal(path.get_py_filename('"foo with spaces.py"', force_win32=True), true_fn)
510 nt.assert_equal(path.get_py_filename("'foo with spaces.py'", force_win32=True), true_fn)
509 nt.assert_equal(path.get_py_filename("'foo with spaces.py'", force_win32=True), true_fn)
511 else:
510 else:
512 nt.assert_raises(IOError, path.get_py_filename, '"foo with spaces.py"', force_win32=False)
511 nt.assert_raises(IOError, path.get_py_filename, '"foo with spaces.py"', force_win32=False)
513 nt.assert_raises(IOError, path.get_py_filename, "'foo with spaces.py'", force_win32=False)
512 nt.assert_raises(IOError, path.get_py_filename, "'foo with spaces.py'", force_win32=False)
514
513
515 @onlyif_unicode_paths
514 @onlyif_unicode_paths
516 def test_unicode_in_filename():
515 def test_unicode_in_filename():
517 """When a file doesn't exist, the exception raised should be safe to call
516 """When a file doesn't exist, the exception raised should be safe to call
518 str() on - i.e. in Python 2 it must only have ASCII characters.
517 str() on - i.e. in Python 2 it must only have ASCII characters.
519
518
520 https://github.com/ipython/ipython/issues/875
519 https://github.com/ipython/ipython/issues/875
521 """
520 """
522 try:
521 try:
523 # these calls should not throw unicode encode exceptions
522 # these calls should not throw unicode encode exceptions
524 path.get_py_filename(u'fooéè.py', force_win32=False)
523 path.get_py_filename(u'fooéè.py', force_win32=False)
525 except IOError as ex:
524 except IOError as ex:
526 str(ex)
525 str(ex)
527
526
528
527
529 class TestShellGlob(object):
528 class TestShellGlob(object):
530
529
531 @classmethod
530 @classmethod
532 def setUpClass(cls):
531 def setUpClass(cls):
533 cls.filenames_start_with_a = ['a0', 'a1', 'a2']
532 cls.filenames_start_with_a = ['a0', 'a1', 'a2']
534 cls.filenames_end_with_b = ['0b', '1b', '2b']
533 cls.filenames_end_with_b = ['0b', '1b', '2b']
535 cls.filenames = cls.filenames_start_with_a + cls.filenames_end_with_b
534 cls.filenames = cls.filenames_start_with_a + cls.filenames_end_with_b
536 cls.tempdir = TemporaryDirectory()
535 cls.tempdir = TemporaryDirectory()
537 td = cls.tempdir.name
536 td = cls.tempdir.name
538
537
539 with cls.in_tempdir():
538 with cls.in_tempdir():
540 # Create empty files
539 # Create empty files
541 for fname in cls.filenames:
540 for fname in cls.filenames:
542 open(os.path.join(td, fname), 'w').close()
541 open(os.path.join(td, fname), 'w').close()
543
542
544 @classmethod
543 @classmethod
545 def tearDownClass(cls):
544 def tearDownClass(cls):
546 cls.tempdir.cleanup()
545 cls.tempdir.cleanup()
547
546
548 @classmethod
547 @classmethod
549 @contextmanager
548 @contextmanager
550 def in_tempdir(cls):
549 def in_tempdir(cls):
551 save = py3compat.getcwd()
550 save = py3compat.getcwd()
552 try:
551 try:
553 os.chdir(cls.tempdir.name)
552 os.chdir(cls.tempdir.name)
554 yield
553 yield
555 finally:
554 finally:
556 os.chdir(save)
555 os.chdir(save)
557
556
558 def check_match(self, patterns, matches):
557 def check_match(self, patterns, matches):
559 with self.in_tempdir():
558 with self.in_tempdir():
560 # glob returns unordered list. that's why sorted is required.
559 # glob returns unordered list. that's why sorted is required.
561 nt.assert_equals(sorted(path.shellglob(patterns)),
560 nt.assert_equals(sorted(path.shellglob(patterns)),
562 sorted(matches))
561 sorted(matches))
563
562
564 def common_cases(self):
563 def common_cases(self):
565 return [
564 return [
566 (['*'], self.filenames),
565 (['*'], self.filenames),
567 (['a*'], self.filenames_start_with_a),
566 (['a*'], self.filenames_start_with_a),
568 (['*c'], ['*c']),
567 (['*c'], ['*c']),
569 (['*', 'a*', '*b', '*c'], self.filenames
568 (['*', 'a*', '*b', '*c'], self.filenames
570 + self.filenames_start_with_a
569 + self.filenames_start_with_a
571 + self.filenames_end_with_b
570 + self.filenames_end_with_b
572 + ['*c']),
571 + ['*c']),
573 (['a[012]'], self.filenames_start_with_a),
572 (['a[012]'], self.filenames_start_with_a),
574 ]
573 ]
575
574
576 @skip_win32
575 @skip_win32
577 def test_match_posix(self):
576 def test_match_posix(self):
578 for (patterns, matches) in self.common_cases() + [
577 for (patterns, matches) in self.common_cases() + [
579 ([r'\*'], ['*']),
578 ([r'\*'], ['*']),
580 ([r'a\*', 'a*'], ['a*'] + self.filenames_start_with_a),
579 ([r'a\*', 'a*'], ['a*'] + self.filenames_start_with_a),
581 ([r'a\[012]'], ['a[012]']),
580 ([r'a\[012]'], ['a[012]']),
582 ]:
581 ]:
583 yield (self.check_match, patterns, matches)
582 yield (self.check_match, patterns, matches)
584
583
585 @skip_if_not_win32
584 @skip_if_not_win32
586 def test_match_windows(self):
585 def test_match_windows(self):
587 for (patterns, matches) in self.common_cases() + [
586 for (patterns, matches) in self.common_cases() + [
588 # In windows, backslash is interpreted as path
587 # In windows, backslash is interpreted as path
589 # separator. Therefore, you can't escape glob
588 # separator. Therefore, you can't escape glob
590 # using it.
589 # using it.
591 ([r'a\*', 'a*'], [r'a\*'] + self.filenames_start_with_a),
590 ([r'a\*', 'a*'], [r'a\*'] + self.filenames_start_with_a),
592 ([r'a\[012]'], [r'a\[012]']),
591 ([r'a\[012]'], [r'a\[012]']),
593 ]:
592 ]:
594 yield (self.check_match, patterns, matches)
593 yield (self.check_match, patterns, matches)
595
594
596
595
597 def test_unescape_glob():
596 def test_unescape_glob():
598 nt.assert_equals(path.unescape_glob(r'\*\[\!\]\?'), '*[!]?')
597 nt.assert_equals(path.unescape_glob(r'\*\[\!\]\?'), '*[!]?')
599 nt.assert_equals(path.unescape_glob(r'\\*'), r'\*')
598 nt.assert_equals(path.unescape_glob(r'\\*'), r'\*')
600 nt.assert_equals(path.unescape_glob(r'\\\*'), r'\*')
599 nt.assert_equals(path.unescape_glob(r'\\\*'), r'\*')
601 nt.assert_equals(path.unescape_glob(r'\\a'), r'\a')
600 nt.assert_equals(path.unescape_glob(r'\\a'), r'\a')
602 nt.assert_equals(path.unescape_glob(r'\a'), r'\a')
601 nt.assert_equals(path.unescape_glob(r'\a'), r'\a')
603
602
604
603
605 def test_ensure_dir_exists():
604 def test_ensure_dir_exists():
606 with TemporaryDirectory() as td:
605 with TemporaryDirectory() as td:
607 d = os.path.join(td, u'βˆ‚ir')
606 d = os.path.join(td, u'βˆ‚ir')
608 path.ensure_dir_exists(d) # create it
607 path.ensure_dir_exists(d) # create it
609 assert os.path.isdir(d)
608 assert os.path.isdir(d)
610 path.ensure_dir_exists(d) # no-op
609 path.ensure_dir_exists(d) # no-op
611 f = os.path.join(td, u'Ζ’ile')
610 f = os.path.join(td, u'Ζ’ile')
612 open(f, 'w').close() # touch
611 open(f, 'w').close() # touch
613 with nt.assert_raises(IOError):
612 with nt.assert_raises(IOError):
614 path.ensure_dir_exists(f)
613 path.ensure_dir_exists(f)
615
614
616 class TestLinkOrCopy(object):
615 class TestLinkOrCopy(object):
617 def setUp(self):
616 def setUp(self):
618 self.tempdir = TemporaryDirectory()
617 self.tempdir = TemporaryDirectory()
619 self.src = self.dst("src")
618 self.src = self.dst("src")
620 with open(self.src, "w") as f:
619 with open(self.src, "w") as f:
621 f.write("Hello, world!")
620 f.write("Hello, world!")
622
621
623 def tearDown(self):
622 def tearDown(self):
624 self.tempdir.cleanup()
623 self.tempdir.cleanup()
625
624
626 def dst(self, *args):
625 def dst(self, *args):
627 return os.path.join(self.tempdir.name, *args)
626 return os.path.join(self.tempdir.name, *args)
628
627
629 def assert_inode_not_equal(self, a, b):
628 def assert_inode_not_equal(self, a, b):
630 nt.assert_not_equals(os.stat(a).st_ino, os.stat(b).st_ino,
629 nt.assert_not_equals(os.stat(a).st_ino, os.stat(b).st_ino,
631 "%r and %r do reference the same indoes" %(a, b))
630 "%r and %r do reference the same indoes" %(a, b))
632
631
633 def assert_inode_equal(self, a, b):
632 def assert_inode_equal(self, a, b):
634 nt.assert_equals(os.stat(a).st_ino, os.stat(b).st_ino,
633 nt.assert_equals(os.stat(a).st_ino, os.stat(b).st_ino,
635 "%r and %r do not reference the same indoes" %(a, b))
634 "%r and %r do not reference the same indoes" %(a, b))
636
635
637 def assert_content_equal(self, a, b):
636 def assert_content_equal(self, a, b):
638 with open(a) as a_f:
637 with open(a) as a_f:
639 with open(b) as b_f:
638 with open(b) as b_f:
640 nt.assert_equals(a_f.read(), b_f.read())
639 nt.assert_equals(a_f.read(), b_f.read())
641
640
642 @skip_win32
641 @skip_win32
643 def test_link_successful(self):
642 def test_link_successful(self):
644 dst = self.dst("target")
643 dst = self.dst("target")
645 path.link_or_copy(self.src, dst)
644 path.link_or_copy(self.src, dst)
646 self.assert_inode_equal(self.src, dst)
645 self.assert_inode_equal(self.src, dst)
647
646
648 @skip_win32
647 @skip_win32
649 def test_link_into_dir(self):
648 def test_link_into_dir(self):
650 dst = self.dst("some_dir")
649 dst = self.dst("some_dir")
651 os.mkdir(dst)
650 os.mkdir(dst)
652 path.link_or_copy(self.src, dst)
651 path.link_or_copy(self.src, dst)
653 expected_dst = self.dst("some_dir", os.path.basename(self.src))
652 expected_dst = self.dst("some_dir", os.path.basename(self.src))
654 self.assert_inode_equal(self.src, expected_dst)
653 self.assert_inode_equal(self.src, expected_dst)
655
654
656 @skip_win32
655 @skip_win32
657 def test_target_exists(self):
656 def test_target_exists(self):
658 dst = self.dst("target")
657 dst = self.dst("target")
659 open(dst, "w").close()
658 open(dst, "w").close()
660 path.link_or_copy(self.src, dst)
659 path.link_or_copy(self.src, dst)
661 self.assert_inode_equal(self.src, dst)
660 self.assert_inode_equal(self.src, dst)
662
661
663 @skip_win32
662 @skip_win32
664 def test_no_link(self):
663 def test_no_link(self):
665 real_link = os.link
664 real_link = os.link
666 try:
665 try:
667 del os.link
666 del os.link
668 dst = self.dst("target")
667 dst = self.dst("target")
669 path.link_or_copy(self.src, dst)
668 path.link_or_copy(self.src, dst)
670 self.assert_content_equal(self.src, dst)
669 self.assert_content_equal(self.src, dst)
671 self.assert_inode_not_equal(self.src, dst)
670 self.assert_inode_not_equal(self.src, dst)
672 finally:
671 finally:
673 os.link = real_link
672 os.link = real_link
674
673
675 @skip_if_not_win32
674 @skip_if_not_win32
676 def test_windows(self):
675 def test_windows(self):
677 dst = self.dst("target")
676 dst = self.dst("target")
678 path.link_or_copy(self.src, dst)
677 path.link_or_copy(self.src, dst)
679 self.assert_content_equal(self.src, dst)
678 self.assert_content_equal(self.src, dst)
General Comments 0
You need to be logged in to leave comments. Login now