##// END OF EJS Templates
Remove unused imports in IPython.utils
Thomas Kluyver -
Show More
@@ -1,188 +1,187 b''
1 1 """Windows-specific implementation of process utilities.
2 2
3 3 This file is only meant to be imported by process.py, not by end-users.
4 4 """
5 5
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2010-2011 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16 from __future__ import print_function
17 17
18 18 # stdlib
19 19 import os
20 20 import sys
21 21 import ctypes
22 import msvcrt
23 22
24 23 from ctypes import c_int, POINTER
25 24 from ctypes.wintypes import LPCWSTR, HLOCAL
26 25 from subprocess import STDOUT
27 26
28 27 # our own imports
29 28 from ._process_common import read_no_interrupt, process_handler, arg_split as py_arg_split
30 29 from . import py3compat
31 30 from .encoding import DEFAULT_ENCODING
32 31
33 32 #-----------------------------------------------------------------------------
34 33 # Function definitions
35 34 #-----------------------------------------------------------------------------
36 35
37 36 class AvoidUNCPath(object):
38 37 """A context manager to protect command execution from UNC paths.
39 38
40 39 In the Win32 API, commands can't be invoked with the cwd being a UNC path.
41 40 This context manager temporarily changes directory to the 'C:' drive on
42 41 entering, and restores the original working directory on exit.
43 42
44 43 The context manager returns the starting working directory *if* it made a
45 44 change and None otherwise, so that users can apply the necessary adjustment
46 45 to their system calls in the event of a change.
47 46
48 47 Example
49 48 -------
50 49 ::
51 50 cmd = 'dir'
52 51 with AvoidUNCPath() as path:
53 52 if path is not None:
54 53 cmd = '"pushd %s &&"%s' % (path, cmd)
55 54 os.system(cmd)
56 55 """
57 56 def __enter__(self):
58 57 self.path = os.getcwdu()
59 58 self.is_unc_path = self.path.startswith(r"\\")
60 59 if self.is_unc_path:
61 60 # change to c drive (as cmd.exe cannot handle UNC addresses)
62 61 os.chdir("C:")
63 62 return self.path
64 63 else:
65 64 # We return None to signal that there was no change in the working
66 65 # directory
67 66 return None
68 67
69 68 def __exit__(self, exc_type, exc_value, traceback):
70 69 if self.is_unc_path:
71 70 os.chdir(self.path)
72 71
73 72
74 73 def _find_cmd(cmd):
75 74 """Find the full path to a .bat or .exe using the win32api module."""
76 75 try:
77 76 from win32api import SearchPath
78 77 except ImportError:
79 78 raise ImportError('you need to have pywin32 installed for this to work')
80 79 else:
81 80 PATH = os.environ['PATH']
82 81 extensions = ['.exe', '.com', '.bat', '.py']
83 82 path = None
84 83 for ext in extensions:
85 84 try:
86 85 path = SearchPath(PATH, cmd + ext)[0]
87 86 except:
88 87 pass
89 88 if path is None:
90 89 raise OSError("command %r not found" % cmd)
91 90 else:
92 91 return path
93 92
94 93
95 94 def _system_body(p):
96 95 """Callback for _system."""
97 96 enc = DEFAULT_ENCODING
98 97 for line in read_no_interrupt(p.stdout).splitlines():
99 98 line = line.decode(enc, 'replace')
100 99 print(line, file=sys.stdout)
101 100 for line in read_no_interrupt(p.stderr).splitlines():
102 101 line = line.decode(enc, 'replace')
103 102 print(line, file=sys.stderr)
104 103
105 104 # Wait to finish for returncode
106 105 return p.wait()
107 106
108 107
109 108 def system(cmd):
110 109 """Win32 version of os.system() that works with network shares.
111 110
112 111 Note that this implementation returns None, as meant for use in IPython.
113 112
114 113 Parameters
115 114 ----------
116 115 cmd : str
117 116 A command to be executed in the system shell.
118 117
119 118 Returns
120 119 -------
121 120 None : we explicitly do NOT return the subprocess status code, as this
122 121 utility is meant to be used extensively in IPython, where any return value
123 122 would trigger :func:`sys.displayhook` calls.
124 123 """
125 124 # The controller provides interactivity with both
126 125 # stdin and stdout
127 126 #import _process_win32_controller
128 127 #_process_win32_controller.system(cmd)
129 128
130 129 with AvoidUNCPath() as path:
131 130 if path is not None:
132 131 cmd = '"pushd %s &&"%s' % (path, cmd)
133 132 return process_handler(cmd, _system_body)
134 133
135 134 def getoutput(cmd):
136 135 """Return standard output of executing cmd in a shell.
137 136
138 137 Accepts the same arguments as os.system().
139 138
140 139 Parameters
141 140 ----------
142 141 cmd : str
143 142 A command to be executed in the system shell.
144 143
145 144 Returns
146 145 -------
147 146 stdout : str
148 147 """
149 148
150 149 with AvoidUNCPath() as path:
151 150 if path is not None:
152 151 cmd = '"pushd %s &&"%s' % (path, cmd)
153 152 out = process_handler(cmd, lambda p: p.communicate()[0], STDOUT)
154 153
155 154 if out is None:
156 155 out = b''
157 156 return py3compat.bytes_to_str(out)
158 157
159 158 try:
160 159 CommandLineToArgvW = ctypes.windll.shell32.CommandLineToArgvW
161 160 CommandLineToArgvW.arg_types = [LPCWSTR, POINTER(c_int)]
162 161 CommandLineToArgvW.restype = POINTER(LPCWSTR)
163 162 LocalFree = ctypes.windll.kernel32.LocalFree
164 163 LocalFree.res_type = HLOCAL
165 164 LocalFree.arg_types = [HLOCAL]
166 165
167 166 def arg_split(commandline, posix=False, strict=True):
168 167 """Split a command line's arguments in a shell-like manner.
169 168
170 169 This is a special version for windows that use a ctypes call to CommandLineToArgvW
171 170 to do the argv splitting. The posix paramter is ignored.
172 171
173 172 If strict=False, process_common.arg_split(...strict=False) is used instead.
174 173 """
175 174 #CommandLineToArgvW returns path to executable if called with empty string.
176 175 if commandline.strip() == "":
177 176 return []
178 177 if not strict:
179 178 # not really a cl-arg, fallback on _process_common
180 179 return py_arg_split(commandline, posix=posix, strict=strict)
181 180 argvn = c_int()
182 181 result_pointer = CommandLineToArgvW(py3compat.cast_unicode(commandline.lstrip()), ctypes.byref(argvn))
183 182 result_array_type = LPCWSTR * argvn.value
184 183 result = [arg for arg in result_array_type.from_address(ctypes.addressof(result_pointer.contents))]
185 184 retval = LocalFree(result_pointer)
186 185 return result
187 186 except AttributeError:
188 187 arg_split = py_arg_split
@@ -1,574 +1,574 b''
1 1 """Windows-specific implementation of process utilities with direct WinAPI.
2 2
3 3 This file is meant to be used by process.py
4 4 """
5 5
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2010-2011 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 from __future__ import print_function
14 14
15 15 # stdlib
16 import os, sys, time, threading
16 import os, sys, threading
17 17 import ctypes, msvcrt
18 18
19 19 # Win32 API types needed for the API calls
20 20 from ctypes import POINTER
21 21 from ctypes.wintypes import HANDLE, HLOCAL, LPVOID, WORD, DWORD, BOOL, \
22 22 ULONG, LPCWSTR
23 23 LPDWORD = POINTER(DWORD)
24 24 LPHANDLE = POINTER(HANDLE)
25 25 ULONG_PTR = POINTER(ULONG)
26 26 class SECURITY_ATTRIBUTES(ctypes.Structure):
27 27 _fields_ = [("nLength", DWORD),
28 28 ("lpSecurityDescriptor", LPVOID),
29 29 ("bInheritHandle", BOOL)]
30 30 LPSECURITY_ATTRIBUTES = POINTER(SECURITY_ATTRIBUTES)
31 31 class STARTUPINFO(ctypes.Structure):
32 32 _fields_ = [("cb", DWORD),
33 33 ("lpReserved", LPCWSTR),
34 34 ("lpDesktop", LPCWSTR),
35 35 ("lpTitle", LPCWSTR),
36 36 ("dwX", DWORD),
37 37 ("dwY", DWORD),
38 38 ("dwXSize", DWORD),
39 39 ("dwYSize", DWORD),
40 40 ("dwXCountChars", DWORD),
41 41 ("dwYCountChars", DWORD),
42 42 ("dwFillAttribute", DWORD),
43 43 ("dwFlags", DWORD),
44 44 ("wShowWindow", WORD),
45 45 ("cbReserved2", WORD),
46 46 ("lpReserved2", LPVOID),
47 47 ("hStdInput", HANDLE),
48 48 ("hStdOutput", HANDLE),
49 49 ("hStdError", HANDLE)]
50 50 LPSTARTUPINFO = POINTER(STARTUPINFO)
51 51 class PROCESS_INFORMATION(ctypes.Structure):
52 52 _fields_ = [("hProcess", HANDLE),
53 53 ("hThread", HANDLE),
54 54 ("dwProcessId", DWORD),
55 55 ("dwThreadId", DWORD)]
56 56 LPPROCESS_INFORMATION = POINTER(PROCESS_INFORMATION)
57 57
58 58 # Win32 API constants needed
59 59 ERROR_HANDLE_EOF = 38
60 60 ERROR_BROKEN_PIPE = 109
61 61 ERROR_NO_DATA = 232
62 62 HANDLE_FLAG_INHERIT = 0x0001
63 63 STARTF_USESTDHANDLES = 0x0100
64 64 CREATE_SUSPENDED = 0x0004
65 65 CREATE_NEW_CONSOLE = 0x0010
66 66 CREATE_NO_WINDOW = 0x08000000
67 67 STILL_ACTIVE = 259
68 68 WAIT_TIMEOUT = 0x0102
69 69 WAIT_FAILED = 0xFFFFFFFF
70 70 INFINITE = 0xFFFFFFFF
71 71 DUPLICATE_SAME_ACCESS = 0x00000002
72 72 ENABLE_ECHO_INPUT = 0x0004
73 73 ENABLE_LINE_INPUT = 0x0002
74 74 ENABLE_PROCESSED_INPUT = 0x0001
75 75
76 76 # Win32 API functions needed
77 77 GetLastError = ctypes.windll.kernel32.GetLastError
78 78 GetLastError.argtypes = []
79 79 GetLastError.restype = DWORD
80 80
81 81 CreateFile = ctypes.windll.kernel32.CreateFileW
82 82 CreateFile.argtypes = [LPCWSTR, DWORD, DWORD, LPVOID, DWORD, DWORD, HANDLE]
83 83 CreateFile.restype = HANDLE
84 84
85 85 CreatePipe = ctypes.windll.kernel32.CreatePipe
86 86 CreatePipe.argtypes = [POINTER(HANDLE), POINTER(HANDLE),
87 87 LPSECURITY_ATTRIBUTES, DWORD]
88 88 CreatePipe.restype = BOOL
89 89
90 90 CreateProcess = ctypes.windll.kernel32.CreateProcessW
91 91 CreateProcess.argtypes = [LPCWSTR, LPCWSTR, LPSECURITY_ATTRIBUTES,
92 92 LPSECURITY_ATTRIBUTES, BOOL, DWORD, LPVOID, LPCWSTR, LPSTARTUPINFO,
93 93 LPPROCESS_INFORMATION]
94 94 CreateProcess.restype = BOOL
95 95
96 96 GetExitCodeProcess = ctypes.windll.kernel32.GetExitCodeProcess
97 97 GetExitCodeProcess.argtypes = [HANDLE, LPDWORD]
98 98 GetExitCodeProcess.restype = BOOL
99 99
100 100 GetCurrentProcess = ctypes.windll.kernel32.GetCurrentProcess
101 101 GetCurrentProcess.argtypes = []
102 102 GetCurrentProcess.restype = HANDLE
103 103
104 104 ResumeThread = ctypes.windll.kernel32.ResumeThread
105 105 ResumeThread.argtypes = [HANDLE]
106 106 ResumeThread.restype = DWORD
107 107
108 108 ReadFile = ctypes.windll.kernel32.ReadFile
109 109 ReadFile.argtypes = [HANDLE, LPVOID, DWORD, LPDWORD, LPVOID]
110 110 ReadFile.restype = BOOL
111 111
112 112 WriteFile = ctypes.windll.kernel32.WriteFile
113 113 WriteFile.argtypes = [HANDLE, LPVOID, DWORD, LPDWORD, LPVOID]
114 114 WriteFile.restype = BOOL
115 115
116 116 GetConsoleMode = ctypes.windll.kernel32.GetConsoleMode
117 117 GetConsoleMode.argtypes = [HANDLE, LPDWORD]
118 118 GetConsoleMode.restype = BOOL
119 119
120 120 SetConsoleMode = ctypes.windll.kernel32.SetConsoleMode
121 121 SetConsoleMode.argtypes = [HANDLE, DWORD]
122 122 SetConsoleMode.restype = BOOL
123 123
124 124 FlushConsoleInputBuffer = ctypes.windll.kernel32.FlushConsoleInputBuffer
125 125 FlushConsoleInputBuffer.argtypes = [HANDLE]
126 126 FlushConsoleInputBuffer.restype = BOOL
127 127
128 128 WaitForSingleObject = ctypes.windll.kernel32.WaitForSingleObject
129 129 WaitForSingleObject.argtypes = [HANDLE, DWORD]
130 130 WaitForSingleObject.restype = DWORD
131 131
132 132 DuplicateHandle = ctypes.windll.kernel32.DuplicateHandle
133 133 DuplicateHandle.argtypes = [HANDLE, HANDLE, HANDLE, LPHANDLE,
134 134 DWORD, BOOL, DWORD]
135 135 DuplicateHandle.restype = BOOL
136 136
137 137 SetHandleInformation = ctypes.windll.kernel32.SetHandleInformation
138 138 SetHandleInformation.argtypes = [HANDLE, DWORD, DWORD]
139 139 SetHandleInformation.restype = BOOL
140 140
141 141 CloseHandle = ctypes.windll.kernel32.CloseHandle
142 142 CloseHandle.argtypes = [HANDLE]
143 143 CloseHandle.restype = BOOL
144 144
145 145 CommandLineToArgvW = ctypes.windll.shell32.CommandLineToArgvW
146 146 CommandLineToArgvW.argtypes = [LPCWSTR, POINTER(ctypes.c_int)]
147 147 CommandLineToArgvW.restype = POINTER(LPCWSTR)
148 148
149 149 LocalFree = ctypes.windll.kernel32.LocalFree
150 150 LocalFree.argtypes = [HLOCAL]
151 151 LocalFree.restype = HLOCAL
152 152
153 153 class AvoidUNCPath(object):
154 154 """A context manager to protect command execution from UNC paths.
155 155
156 156 In the Win32 API, commands can't be invoked with the cwd being a UNC path.
157 157 This context manager temporarily changes directory to the 'C:' drive on
158 158 entering, and restores the original working directory on exit.
159 159
160 160 The context manager returns the starting working directory *if* it made a
161 161 change and None otherwise, so that users can apply the necessary adjustment
162 162 to their system calls in the event of a change.
163 163
164 164 Example
165 165 -------
166 166 ::
167 167 cmd = 'dir'
168 168 with AvoidUNCPath() as path:
169 169 if path is not None:
170 170 cmd = '"pushd %s &&"%s' % (path, cmd)
171 171 os.system(cmd)
172 172 """
173 173 def __enter__(self):
174 174 self.path = os.getcwdu()
175 175 self.is_unc_path = self.path.startswith(r"\\")
176 176 if self.is_unc_path:
177 177 # change to c drive (as cmd.exe cannot handle UNC addresses)
178 178 os.chdir("C:")
179 179 return self.path
180 180 else:
181 181 # We return None to signal that there was no change in the working
182 182 # directory
183 183 return None
184 184
185 185 def __exit__(self, exc_type, exc_value, traceback):
186 186 if self.is_unc_path:
187 187 os.chdir(self.path)
188 188
189 189
190 190 class Win32ShellCommandController(object):
191 191 """Runs a shell command in a 'with' context.
192 192
193 193 This implementation is Win32-specific.
194 194
195 195 Example:
196 196 # Runs the command interactively with default console stdin/stdout
197 197 with ShellCommandController('python -i') as scc:
198 198 scc.run()
199 199
200 200 # Runs the command using the provided functions for stdin/stdout
201 201 def my_stdout_func(s):
202 202 # print or save the string 's'
203 203 write_to_stdout(s)
204 204 def my_stdin_func():
205 205 # If input is available, return it as a string.
206 206 if input_available():
207 207 return get_input()
208 208 # If no input available, return None after a short delay to
209 209 # keep from blocking.
210 210 else:
211 211 time.sleep(0.01)
212 212 return None
213 213
214 214 with ShellCommandController('python -i') as scc:
215 215 scc.run(my_stdout_func, my_stdin_func)
216 216 """
217 217
218 218 def __init__(self, cmd, mergeout = True):
219 219 """Initializes the shell command controller.
220 220
221 221 The cmd is the program to execute, and mergeout is
222 222 whether to blend stdout and stderr into one output
223 223 in stdout. Merging them together in this fashion more
224 224 reliably keeps stdout and stderr in the correct order
225 225 especially for interactive shell usage.
226 226 """
227 227 self.cmd = cmd
228 228 self.mergeout = mergeout
229 229
230 230 def __enter__(self):
231 231 cmd = self.cmd
232 232 mergeout = self.mergeout
233 233
234 234 self.hstdout, self.hstdin, self.hstderr = None, None, None
235 235 self.piProcInfo = None
236 236 try:
237 237 p_hstdout, c_hstdout, p_hstderr, \
238 238 c_hstderr, p_hstdin, c_hstdin = [None]*6
239 239
240 240 # SECURITY_ATTRIBUTES with inherit handle set to True
241 241 saAttr = SECURITY_ATTRIBUTES()
242 242 saAttr.nLength = ctypes.sizeof(saAttr)
243 243 saAttr.bInheritHandle = True
244 244 saAttr.lpSecurityDescriptor = None
245 245
246 246 def create_pipe(uninherit):
247 247 """Creates a Windows pipe, which consists of two handles.
248 248
249 249 The 'uninherit' parameter controls which handle is not
250 250 inherited by the child process.
251 251 """
252 252 handles = HANDLE(), HANDLE()
253 253 if not CreatePipe(ctypes.byref(handles[0]),
254 254 ctypes.byref(handles[1]), ctypes.byref(saAttr), 0):
255 255 raise ctypes.WinError()
256 256 if not SetHandleInformation(handles[uninherit],
257 257 HANDLE_FLAG_INHERIT, 0):
258 258 raise ctypes.WinError()
259 259 return handles[0].value, handles[1].value
260 260
261 261 p_hstdout, c_hstdout = create_pipe(uninherit=0)
262 262 # 'mergeout' signals that stdout and stderr should be merged.
263 263 # We do that by using one pipe for both of them.
264 264 if mergeout:
265 265 c_hstderr = HANDLE()
266 266 if not DuplicateHandle(GetCurrentProcess(), c_hstdout,
267 267 GetCurrentProcess(), ctypes.byref(c_hstderr),
268 268 0, True, DUPLICATE_SAME_ACCESS):
269 269 raise ctypes.WinError()
270 270 else:
271 271 p_hstderr, c_hstderr = create_pipe(uninherit=0)
272 272 c_hstdin, p_hstdin = create_pipe(uninherit=1)
273 273
274 274 # Create the process object
275 275 piProcInfo = PROCESS_INFORMATION()
276 276 siStartInfo = STARTUPINFO()
277 277 siStartInfo.cb = ctypes.sizeof(siStartInfo)
278 278 siStartInfo.hStdInput = c_hstdin
279 279 siStartInfo.hStdOutput = c_hstdout
280 280 siStartInfo.hStdError = c_hstderr
281 281 siStartInfo.dwFlags = STARTF_USESTDHANDLES
282 282 dwCreationFlags = CREATE_SUSPENDED | CREATE_NO_WINDOW # | CREATE_NEW_CONSOLE
283 283
284 284 if not CreateProcess(None,
285 285 u"cmd.exe /c " + cmd,
286 286 None, None, True, dwCreationFlags,
287 287 None, None, ctypes.byref(siStartInfo),
288 288 ctypes.byref(piProcInfo)):
289 289 raise ctypes.WinError()
290 290
291 291 # Close this process's versions of the child handles
292 292 CloseHandle(c_hstdin)
293 293 c_hstdin = None
294 294 CloseHandle(c_hstdout)
295 295 c_hstdout = None
296 296 if c_hstderr != None:
297 297 CloseHandle(c_hstderr)
298 298 c_hstderr = None
299 299
300 300 # Transfer ownership of the parent handles to the object
301 301 self.hstdin = p_hstdin
302 302 p_hstdin = None
303 303 self.hstdout = p_hstdout
304 304 p_hstdout = None
305 305 if not mergeout:
306 306 self.hstderr = p_hstderr
307 307 p_hstderr = None
308 308 self.piProcInfo = piProcInfo
309 309
310 310 finally:
311 311 if p_hstdin:
312 312 CloseHandle(p_hstdin)
313 313 if c_hstdin:
314 314 CloseHandle(c_hstdin)
315 315 if p_hstdout:
316 316 CloseHandle(p_hstdout)
317 317 if c_hstdout:
318 318 CloseHandle(c_hstdout)
319 319 if p_hstderr:
320 320 CloseHandle(p_hstderr)
321 321 if c_hstderr:
322 322 CloseHandle(c_hstderr)
323 323
324 324 return self
325 325
326 326 def _stdin_thread(self, handle, hprocess, func, stdout_func):
327 327 exitCode = DWORD()
328 328 bytesWritten = DWORD(0)
329 329 while True:
330 330 #print("stdin thread loop start")
331 331 # Get the input string (may be bytes or unicode)
332 332 data = func()
333 333
334 334 # None signals to poll whether the process has exited
335 335 if data is None:
336 336 #print("checking for process completion")
337 337 if not GetExitCodeProcess(hprocess, ctypes.byref(exitCode)):
338 338 raise ctypes.WinError()
339 339 if exitCode.value != STILL_ACTIVE:
340 340 return
341 341 # TESTING: Does zero-sized writefile help?
342 342 if not WriteFile(handle, "", 0,
343 343 ctypes.byref(bytesWritten), None):
344 344 raise ctypes.WinError()
345 345 continue
346 346 #print("\nGot str %s\n" % repr(data), file=sys.stderr)
347 347
348 348 # Encode the string to the console encoding
349 349 if isinstance(data, unicode): #FIXME: Python3
350 350 data = data.encode('utf_8')
351 351
352 352 # What we have now must be a string of bytes
353 353 if not isinstance(data, str): #FIXME: Python3
354 354 raise RuntimeError("internal stdin function string error")
355 355
356 356 # An empty string signals EOF
357 357 if len(data) == 0:
358 358 return
359 359
360 360 # In a windows console, sometimes the input is echoed,
361 361 # but sometimes not. How do we determine when to do this?
362 362 stdout_func(data)
363 363 # WriteFile may not accept all the data at once.
364 364 # Loop until everything is processed
365 365 while len(data) != 0:
366 366 #print("Calling writefile")
367 367 if not WriteFile(handle, data, len(data),
368 368 ctypes.byref(bytesWritten), None):
369 369 # This occurs at exit
370 370 if GetLastError() == ERROR_NO_DATA:
371 371 return
372 372 raise ctypes.WinError()
373 373 #print("Called writefile")
374 374 data = data[bytesWritten.value:]
375 375
376 376 def _stdout_thread(self, handle, func):
377 377 # Allocate the output buffer
378 378 data = ctypes.create_string_buffer(4096)
379 379 while True:
380 380 bytesRead = DWORD(0)
381 381 if not ReadFile(handle, data, 4096,
382 382 ctypes.byref(bytesRead), None):
383 383 le = GetLastError()
384 384 if le == ERROR_BROKEN_PIPE:
385 385 return
386 386 else:
387 387 raise ctypes.WinError()
388 388 # FIXME: Python3
389 389 s = data.value[0:bytesRead.value]
390 390 #print("\nv: %s" % repr(s), file=sys.stderr)
391 391 func(s.decode('utf_8', 'replace'))
392 392
393 393 def run(self, stdout_func = None, stdin_func = None, stderr_func = None):
394 394 """Runs the process, using the provided functions for I/O.
395 395
396 396 The function stdin_func should return strings whenever a
397 397 character or characters become available.
398 398 The functions stdout_func and stderr_func are called whenever
399 399 something is printed to stdout or stderr, respectively.
400 400 These functions are called from different threads (but not
401 401 concurrently, because of the GIL).
402 402 """
403 403 if stdout_func == None and stdin_func == None and stderr_func == None:
404 404 return self._run_stdio()
405 405
406 406 if stderr_func != None and self.mergeout:
407 407 raise RuntimeError("Shell command was initiated with "
408 408 "merged stdin/stdout, but a separate stderr_func "
409 409 "was provided to the run() method")
410 410
411 411 # Create a thread for each input/output handle
412 412 stdin_thread = None
413 413 threads = []
414 414 if stdin_func:
415 415 stdin_thread = threading.Thread(target=self._stdin_thread,
416 416 args=(self.hstdin, self.piProcInfo.hProcess,
417 417 stdin_func, stdout_func))
418 418 threads.append(threading.Thread(target=self._stdout_thread,
419 419 args=(self.hstdout, stdout_func)))
420 420 if not self.mergeout:
421 421 if stderr_func == None:
422 422 stderr_func = stdout_func
423 423 threads.append(threading.Thread(target=self._stdout_thread,
424 424 args=(self.hstderr, stderr_func)))
425 425 # Start the I/O threads and the process
426 426 if ResumeThread(self.piProcInfo.hThread) == 0xFFFFFFFF:
427 427 raise ctypes.WinError()
428 428 if stdin_thread is not None:
429 429 stdin_thread.start()
430 430 for thread in threads:
431 431 thread.start()
432 432 # Wait for the process to complete
433 433 if WaitForSingleObject(self.piProcInfo.hProcess, INFINITE) == \
434 434 WAIT_FAILED:
435 435 raise ctypes.WinError()
436 436 # Wait for the I/O threads to complete
437 437 for thread in threads:
438 438 thread.join()
439 439
440 440 # Wait for the stdin thread to complete
441 441 if stdin_thread is not None:
442 442 stdin_thread.join()
443 443
444 444 def _stdin_raw_nonblock(self):
445 445 """Use the raw Win32 handle of sys.stdin to do non-blocking reads"""
446 446 # WARNING: This is experimental, and produces inconsistent results.
447 447 # It's possible for the handle not to be appropriate for use
448 448 # with WaitForSingleObject, among other things.
449 449 handle = msvcrt.get_osfhandle(sys.stdin.fileno())
450 450 result = WaitForSingleObject(handle, 100)
451 451 if result == WAIT_FAILED:
452 452 raise ctypes.WinError()
453 453 elif result == WAIT_TIMEOUT:
454 454 print(".", end='')
455 455 return None
456 456 else:
457 457 data = ctypes.create_string_buffer(256)
458 458 bytesRead = DWORD(0)
459 459 print('?', end='')
460 460
461 461 if not ReadFile(handle, data, 256,
462 462 ctypes.byref(bytesRead), None):
463 463 raise ctypes.WinError()
464 464 # This ensures the non-blocking works with an actual console
465 465 # Not checking the error, so the processing will still work with
466 466 # other handle types
467 467 FlushConsoleInputBuffer(handle)
468 468
469 469 data = data.value
470 470 data = data.replace('\r\n', '\n')
471 471 data = data.replace('\r', '\n')
472 472 print(repr(data) + " ", end='')
473 473 return data
474 474
475 475 def _stdin_raw_block(self):
476 476 """Use a blocking stdin read"""
477 477 # The big problem with the blocking read is that it doesn't
478 478 # exit when it's supposed to in all contexts. An extra
479 479 # key-press may be required to trigger the exit.
480 480 try:
481 481 data = sys.stdin.read(1)
482 482 data = data.replace('\r', '\n')
483 483 return data
484 484 except WindowsError as we:
485 485 if we.winerror == ERROR_NO_DATA:
486 486 # This error occurs when the pipe is closed
487 487 return None
488 488 else:
489 489 # Otherwise let the error propagate
490 490 raise we
491 491
492 492 def _stdout_raw(self, s):
493 493 """Writes the string to stdout"""
494 494 print(s, end='', file=sys.stdout)
495 495 sys.stdout.flush()
496 496
497 497 def _stderr_raw(self, s):
498 498 """Writes the string to stdout"""
499 499 print(s, end='', file=sys.stderr)
500 500 sys.stderr.flush()
501 501
502 502 def _run_stdio(self):
503 503 """Runs the process using the system standard I/O.
504 504
505 505 IMPORTANT: stdin needs to be asynchronous, so the Python
506 506 sys.stdin object is not used. Instead,
507 507 msvcrt.kbhit/getwch are used asynchronously.
508 508 """
509 509 # Disable Line and Echo mode
510 510 #lpMode = DWORD()
511 511 #handle = msvcrt.get_osfhandle(sys.stdin.fileno())
512 512 #if GetConsoleMode(handle, ctypes.byref(lpMode)):
513 513 # set_console_mode = True
514 514 # if not SetConsoleMode(handle, lpMode.value &
515 515 # ~(ENABLE_ECHO_INPUT | ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT)):
516 516 # raise ctypes.WinError()
517 517
518 518 if self.mergeout:
519 519 return self.run(stdout_func = self._stdout_raw,
520 520 stdin_func = self._stdin_raw_block)
521 521 else:
522 522 return self.run(stdout_func = self._stdout_raw,
523 523 stdin_func = self._stdin_raw_block,
524 524 stderr_func = self._stderr_raw)
525 525
526 526 # Restore the previous console mode
527 527 #if set_console_mode:
528 528 # if not SetConsoleMode(handle, lpMode.value):
529 529 # raise ctypes.WinError()
530 530
531 531 def __exit__(self, exc_type, exc_value, traceback):
532 532 if self.hstdin:
533 533 CloseHandle(self.hstdin)
534 534 self.hstdin = None
535 535 if self.hstdout:
536 536 CloseHandle(self.hstdout)
537 537 self.hstdout = None
538 538 if self.hstderr:
539 539 CloseHandle(self.hstderr)
540 540 self.hstderr = None
541 541 if self.piProcInfo != None:
542 542 CloseHandle(self.piProcInfo.hProcess)
543 543 CloseHandle(self.piProcInfo.hThread)
544 544 self.piProcInfo = None
545 545
546 546
547 547 def system(cmd):
548 548 """Win32 version of os.system() that works with network shares.
549 549
550 550 Note that this implementation returns None, as meant for use in IPython.
551 551
552 552 Parameters
553 553 ----------
554 554 cmd : str
555 555 A command to be executed in the system shell.
556 556
557 557 Returns
558 558 -------
559 559 None : we explicitly do NOT return the subprocess status code, as this
560 560 utility is meant to be used extensively in IPython, where any return value
561 561 would trigger :func:`sys.displayhook` calls.
562 562 """
563 563 with AvoidUNCPath() as path:
564 564 if path is not None:
565 565 cmd = '"pushd %s &&"%s' % (path, cmd)
566 566 with Win32ShellCommandController(cmd) as scc:
567 567 scc.run()
568 568
569 569
570 570 if __name__ == "__main__":
571 571 print("Test starting!")
572 572 #system("cmd")
573 573 system("python -i")
574 574 print("Test finished!")
@@ -1,354 +1,353 b''
1 1 # encoding: utf-8
2 2
3 3 """Pickle related utilities. Perhaps this should be called 'can'."""
4 4
5 5 __docformat__ = "restructuredtext en"
6 6
7 7 #-------------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-------------------------------------------------------------------------------
13 13
14 14 #-------------------------------------------------------------------------------
15 15 # Imports
16 16 #-------------------------------------------------------------------------------
17 17
18 18 import copy
19 19 import logging
20 20 import sys
21 21 from types import FunctionType
22 22
23 23 try:
24 24 import cPickle as pickle
25 25 except ImportError:
26 26 import pickle
27 27
28 28 try:
29 29 import numpy
30 30 except:
31 31 numpy = None
32 32
33 import codeutil
34 33 import py3compat
35 34 from importstring import import_item
36 35
37 36 from IPython.config import Application
38 37
39 38 if py3compat.PY3:
40 39 buffer = memoryview
41 40 class_type = type
42 41 else:
43 42 from types import ClassType
44 43 class_type = (type, ClassType)
45 44
46 45 #-------------------------------------------------------------------------------
47 46 # Classes
48 47 #-------------------------------------------------------------------------------
49 48
50 49
51 50 class CannedObject(object):
52 51 def __init__(self, obj, keys=[], hook=None):
53 52 """can an object for safe pickling
54 53
55 54 Parameters
56 55 ==========
57 56
58 57 obj:
59 58 The object to be canned
60 59 keys: list (optional)
61 60 list of attribute names that will be explicitly canned / uncanned
62 61 hook: callable (optional)
63 62 An optional extra callable,
64 63 which can do additional processing of the uncanned object.
65 64
66 65 large data may be offloaded into the buffers list,
67 66 used for zero-copy transfers.
68 67 """
69 68 self.keys = keys
70 69 self.obj = copy.copy(obj)
71 70 self.hook = can(hook)
72 71 for key in keys:
73 72 setattr(self.obj, key, can(getattr(obj, key)))
74 73
75 74 self.buffers = []
76 75
77 76 def get_object(self, g=None):
78 77 if g is None:
79 78 g = {}
80 79 obj = self.obj
81 80 for key in self.keys:
82 81 setattr(obj, key, uncan(getattr(obj, key), g))
83 82
84 83 if self.hook:
85 84 self.hook = uncan(self.hook, g)
86 85 self.hook(obj, g)
87 86 return self.obj
88 87
89 88
90 89 class Reference(CannedObject):
91 90 """object for wrapping a remote reference by name."""
92 91 def __init__(self, name):
93 92 if not isinstance(name, basestring):
94 93 raise TypeError("illegal name: %r"%name)
95 94 self.name = name
96 95 self.buffers = []
97 96
98 97 def __repr__(self):
99 98 return "<Reference: %r>"%self.name
100 99
101 100 def get_object(self, g=None):
102 101 if g is None:
103 102 g = {}
104 103
105 104 return eval(self.name, g)
106 105
107 106
108 107 class CannedFunction(CannedObject):
109 108
110 109 def __init__(self, f):
111 110 self._check_type(f)
112 111 self.code = f.func_code
113 112 if f.func_defaults:
114 113 self.defaults = [ can(fd) for fd in f.func_defaults ]
115 114 else:
116 115 self.defaults = None
117 116 self.module = f.__module__ or '__main__'
118 117 self.__name__ = f.__name__
119 118 self.buffers = []
120 119
121 120 def _check_type(self, obj):
122 121 assert isinstance(obj, FunctionType), "Not a function type"
123 122
124 123 def get_object(self, g=None):
125 124 # try to load function back into its module:
126 125 if not self.module.startswith('__'):
127 126 __import__(self.module)
128 127 g = sys.modules[self.module].__dict__
129 128
130 129 if g is None:
131 130 g = {}
132 131 if self.defaults:
133 132 defaults = tuple(uncan(cfd, g) for cfd in self.defaults)
134 133 else:
135 134 defaults = None
136 135 newFunc = FunctionType(self.code, g, self.__name__, defaults)
137 136 return newFunc
138 137
139 138 class CannedClass(CannedObject):
140 139
141 140 def __init__(self, cls):
142 141 self._check_type(cls)
143 142 self.name = cls.__name__
144 143 self.old_style = not isinstance(cls, type)
145 144 self._canned_dict = {}
146 145 for k,v in cls.__dict__.items():
147 146 if k not in ('__weakref__', '__dict__'):
148 147 self._canned_dict[k] = can(v)
149 148 if self.old_style:
150 149 mro = []
151 150 else:
152 151 mro = cls.mro()
153 152
154 153 self.parents = [ can(c) for c in mro[1:] ]
155 154 self.buffers = []
156 155
157 156 def _check_type(self, obj):
158 157 assert isinstance(obj, class_type), "Not a class type"
159 158
160 159 def get_object(self, g=None):
161 160 parents = tuple(uncan(p, g) for p in self.parents)
162 161 return type(self.name, parents, uncan_dict(self._canned_dict, g=g))
163 162
164 163 class CannedArray(CannedObject):
165 164 def __init__(self, obj):
166 165 self.shape = obj.shape
167 166 self.dtype = obj.dtype.descr if obj.dtype.fields else obj.dtype.str
168 167 if sum(obj.shape) == 0:
169 168 # just pickle it
170 169 self.buffers = [pickle.dumps(obj, -1)]
171 170 else:
172 171 # ensure contiguous
173 172 obj = numpy.ascontiguousarray(obj, dtype=None)
174 173 self.buffers = [buffer(obj)]
175 174
176 175 def get_object(self, g=None):
177 176 data = self.buffers[0]
178 177 if sum(self.shape) == 0:
179 178 # no shape, we just pickled it
180 179 return pickle.loads(data)
181 180 else:
182 181 return numpy.frombuffer(data, dtype=self.dtype).reshape(self.shape)
183 182
184 183
185 184 class CannedBytes(CannedObject):
186 185 wrap = bytes
187 186 def __init__(self, obj):
188 187 self.buffers = [obj]
189 188
190 189 def get_object(self, g=None):
191 190 data = self.buffers[0]
192 191 return self.wrap(data)
193 192
194 193 def CannedBuffer(CannedBytes):
195 194 wrap = buffer
196 195
197 196 #-------------------------------------------------------------------------------
198 197 # Functions
199 198 #-------------------------------------------------------------------------------
200 199
201 200 def _logger():
202 201 """get the logger for the current Application
203 202
204 203 the root logger will be used if no Application is running
205 204 """
206 205 if Application.initialized():
207 206 logger = Application.instance().log
208 207 else:
209 208 logger = logging.getLogger()
210 209 if not logger.handlers:
211 210 logging.basicConfig()
212 211
213 212 return logger
214 213
215 214 def _import_mapping(mapping, original=None):
216 215 """import any string-keys in a type mapping
217 216
218 217 """
219 218 log = _logger()
220 219 log.debug("Importing canning map")
221 220 for key,value in mapping.items():
222 221 if isinstance(key, basestring):
223 222 try:
224 223 cls = import_item(key)
225 224 except Exception:
226 225 if original and key not in original:
227 226 # only message on user-added classes
228 227 log.error("cannning class not importable: %r", key, exc_info=True)
229 228 mapping.pop(key)
230 229 else:
231 230 mapping[cls] = mapping.pop(key)
232 231
233 232 def istype(obj, check):
234 233 """like isinstance(obj, check), but strict
235 234
236 235 This won't catch subclasses.
237 236 """
238 237 if isinstance(check, tuple):
239 238 for cls in check:
240 239 if type(obj) is cls:
241 240 return True
242 241 return False
243 242 else:
244 243 return type(obj) is check
245 244
246 245 def can(obj):
247 246 """prepare an object for pickling"""
248 247
249 248 import_needed = False
250 249
251 250 for cls,canner in can_map.iteritems():
252 251 if isinstance(cls, basestring):
253 252 import_needed = True
254 253 break
255 254 elif istype(obj, cls):
256 255 return canner(obj)
257 256
258 257 if import_needed:
259 258 # perform can_map imports, then try again
260 259 # this will usually only happen once
261 260 _import_mapping(can_map, _original_can_map)
262 261 return can(obj)
263 262
264 263 return obj
265 264
266 265 def can_class(obj):
267 266 if isinstance(obj, class_type) and obj.__module__ == '__main__':
268 267 return CannedClass(obj)
269 268 else:
270 269 return obj
271 270
272 271 def can_dict(obj):
273 272 """can the *values* of a dict"""
274 273 if istype(obj, dict):
275 274 newobj = {}
276 275 for k, v in obj.iteritems():
277 276 newobj[k] = can(v)
278 277 return newobj
279 278 else:
280 279 return obj
281 280
282 281 sequence_types = (list, tuple, set)
283 282
284 283 def can_sequence(obj):
285 284 """can the elements of a sequence"""
286 285 if istype(obj, sequence_types):
287 286 t = type(obj)
288 287 return t([can(i) for i in obj])
289 288 else:
290 289 return obj
291 290
292 291 def uncan(obj, g=None):
293 292 """invert canning"""
294 293
295 294 import_needed = False
296 295 for cls,uncanner in uncan_map.iteritems():
297 296 if isinstance(cls, basestring):
298 297 import_needed = True
299 298 break
300 299 elif isinstance(obj, cls):
301 300 return uncanner(obj, g)
302 301
303 302 if import_needed:
304 303 # perform uncan_map imports, then try again
305 304 # this will usually only happen once
306 305 _import_mapping(uncan_map, _original_uncan_map)
307 306 return uncan(obj, g)
308 307
309 308 return obj
310 309
311 310 def uncan_dict(obj, g=None):
312 311 if istype(obj, dict):
313 312 newobj = {}
314 313 for k, v in obj.iteritems():
315 314 newobj[k] = uncan(v,g)
316 315 return newobj
317 316 else:
318 317 return obj
319 318
320 319 def uncan_sequence(obj, g=None):
321 320 if istype(obj, sequence_types):
322 321 t = type(obj)
323 322 return t([uncan(i,g) for i in obj])
324 323 else:
325 324 return obj
326 325
327 326 def _uncan_dependent_hook(dep, g=None):
328 327 dep.check_dependency()
329 328
330 329 def can_dependent(obj):
331 330 return CannedObject(obj, keys=('f', 'df'), hook=_uncan_dependent_hook)
332 331
333 332 #-------------------------------------------------------------------------------
334 333 # API dictionaries
335 334 #-------------------------------------------------------------------------------
336 335
337 336 # These dicts can be extended for custom serialization of new objects
338 337
339 338 can_map = {
340 339 'IPython.parallel.dependent' : can_dependent,
341 340 'numpy.ndarray' : CannedArray,
342 341 FunctionType : CannedFunction,
343 342 bytes : CannedBytes,
344 343 buffer : CannedBuffer,
345 344 class_type : can_class,
346 345 }
347 346
348 347 uncan_map = {
349 348 CannedObject : lambda obj, g: obj.get_object(g),
350 349 }
351 350
352 351 # for use in _import_mapping:
353 352 _original_can_map = can_map.copy()
354 353 _original_uncan_map = uncan_map.copy()
@@ -1,135 +1,125 b''
1 1 # encoding: utf-8
2 """Tests for IPython.utils.path.py"""
2 """Tests for IPython.utils.module_paths.py"""
3 3
4 4 #-----------------------------------------------------------------------------
5 5 # Copyright (C) 2008-2011 The IPython Development Team
6 6 #
7 7 # Distributed under the terms of the BSD License. The full license is in
8 8 # the file COPYING, distributed as part of this software.
9 9 #-----------------------------------------------------------------------------
10 10
11 11 #-----------------------------------------------------------------------------
12 12 # Imports
13 13 #-----------------------------------------------------------------------------
14 14
15 15 from __future__ import with_statement
16 16
17 17 import os
18 18 import shutil
19 19 import sys
20 20 import tempfile
21 import StringIO
22 21
23 22 from os.path import join, abspath, split
24 23
25 import nose.tools as nt
26
27 from nose import with_setup
28
29 import IPython
30 from IPython.testing import decorators as dec
31 from IPython.testing.decorators import skip_if_not_win32, skip_win32
32 24 from IPython.testing.tools import make_tempfile
33 from IPython.utils import path, io
34 from IPython.utils import py3compat
35 25
36 26 import IPython.utils.module_paths as mp
37 27
38 28 env = os.environ
39 29 TEST_FILE_PATH = split(abspath(__file__))[0]
40 30 TMP_TEST_DIR = tempfile.mkdtemp()
41 31 #
42 32 # Setup/teardown functions/decorators
43 33 #
44 34
45 35 old_syspath = sys.path
46 36
47 37 def make_empty_file(fname):
48 38 f = open(fname, 'w')
49 39 f.close()
50 40
51 41
52 42 def setup():
53 43 """Setup testenvironment for the module:
54 44
55 45 """
56 46 # Do not mask exceptions here. In particular, catching WindowsError is a
57 47 # problem because that exception is only defined on Windows...
58 48 os.makedirs(join(TMP_TEST_DIR, "xmod"))
59 49 os.makedirs(join(TMP_TEST_DIR, "nomod"))
60 50 make_empty_file(join(TMP_TEST_DIR, "xmod/__init__.py"))
61 51 make_empty_file(join(TMP_TEST_DIR, "xmod/sub.py"))
62 52 make_empty_file(join(TMP_TEST_DIR, "pack.py"))
63 53 make_empty_file(join(TMP_TEST_DIR, "packpyc.pyc"))
64 54 sys.path = [TMP_TEST_DIR]
65 55
66 56 def teardown():
67 57 """Teardown testenvironment for the module:
68 58
69 59 - Remove tempdir
70 60 - restore sys.path
71 61 """
72 62 # Note: we remove the parent test dir, which is the root of all test
73 63 # subdirs we may have created. Use shutil instead of os.removedirs, so
74 64 # that non-empty directories are all recursively removed.
75 65 shutil.rmtree(TMP_TEST_DIR)
76 66 sys.path = old_syspath
77 67
78 68
79 69 def test_get_init_1():
80 70 """See if get_init can find __init__.py in this testdir"""
81 71 with make_tempfile(join(TMP_TEST_DIR, "__init__.py")):
82 72 assert mp.get_init(TMP_TEST_DIR)
83 73
84 74 def test_get_init_2():
85 75 """See if get_init can find __init__.pyw in this testdir"""
86 76 with make_tempfile(join(TMP_TEST_DIR, "__init__.pyw")):
87 77 assert mp.get_init(TMP_TEST_DIR)
88 78
89 79 def test_get_init_3():
90 80 """get_init can't find __init__.pyc in this testdir"""
91 81 with make_tempfile(join(TMP_TEST_DIR, "__init__.pyc")):
92 82 assert mp.get_init(TMP_TEST_DIR) is None
93 83
94 def test_get_init_3():
84 def test_get_init_4():
95 85 """get_init can't find __init__ in empty testdir"""
96 86 assert mp.get_init(TMP_TEST_DIR) is None
97 87
98 88
99 89 def test_find_mod_1():
100 90 modpath = join(TMP_TEST_DIR, "xmod", "__init__.py")
101 91 assert mp.find_mod("xmod") == modpath
102 92
103 93 def test_find_mod_2():
104 94 modpath = join(TMP_TEST_DIR, "xmod", "__init__.py")
105 95 assert mp.find_mod("xmod") == modpath
106 96
107 97 def test_find_mod_3():
108 98 modpath = join(TMP_TEST_DIR, "xmod", "sub.py")
109 99 assert mp.find_mod("xmod.sub") == modpath
110 100
111 101 def test_find_mod_4():
112 102 modpath = join(TMP_TEST_DIR, "pack.py")
113 103 assert mp.find_mod("pack") == modpath
114 104
115 105 def test_find_mod_5():
116 106 assert mp.find_mod("packpyc") is None
117 107
118 108 def test_find_module_1():
119 109 modpath = join(TMP_TEST_DIR, "xmod")
120 110 assert mp.find_module("xmod") == modpath
121 111
122 112 def test_find_module_2():
123 113 """Testing sys.path that is empty"""
124 114 assert mp.find_module("xmod", []) is None
125 115
126 116 def test_find_module_3():
127 117 """Testing sys.path that is empty"""
128 118 assert mp.find_module(None, None) is None
129 119
130 120 def test_find_module_4():
131 121 """Testing sys.path that is empty"""
132 122 assert mp.find_module(None) is None
133 123
134 124 def test_find_module_5():
135 125 assert mp.find_module("xmod.nopack") is None
@@ -1,560 +1,559 b''
1 1 # encoding: utf-8
2 2 """Tests for IPython.utils.path.py"""
3 3
4 4 #-----------------------------------------------------------------------------
5 5 # Copyright (C) 2008-2011 The IPython Development Team
6 6 #
7 7 # Distributed under the terms of the BSD License. The full license is in
8 8 # the file COPYING, distributed as part of this software.
9 9 #-----------------------------------------------------------------------------
10 10
11 11 #-----------------------------------------------------------------------------
12 12 # Imports
13 13 #-----------------------------------------------------------------------------
14 14
15 15 from __future__ import with_statement
16 16
17 17 import os
18 18 import shutil
19 19 import sys
20 20 import tempfile
21 from io import StringIO
22 21 from contextlib import contextmanager
23 22
24 23 from os.path import join, abspath, split
25 24
26 25 import nose.tools as nt
27 26
28 27 from nose import with_setup
29 28
30 29 import IPython
31 30 from IPython.testing import decorators as dec
32 31 from IPython.testing.decorators import skip_if_not_win32, skip_win32
33 32 from IPython.testing.tools import make_tempfile, AssertPrints
34 from IPython.utils import path, io
33 from IPython.utils import path
35 34 from IPython.utils import py3compat
36 35 from IPython.utils.tempdir import TemporaryDirectory
37 36
38 37 # Platform-dependent imports
39 38 try:
40 39 import _winreg as wreg
41 40 except ImportError:
42 41 #Fake _winreg module on none windows platforms
43 42 import types
44 43 wr_name = "winreg" if py3compat.PY3 else "_winreg"
45 44 sys.modules[wr_name] = types.ModuleType(wr_name)
46 45 import _winreg as wreg
47 46 #Add entries that needs to be stubbed by the testing code
48 47 (wreg.OpenKey, wreg.QueryValueEx,) = (None, None)
49 48
50 49 try:
51 50 reload
52 51 except NameError: # Python 3
53 52 from imp import reload
54 53
55 54 #-----------------------------------------------------------------------------
56 55 # Globals
57 56 #-----------------------------------------------------------------------------
58 57 env = os.environ
59 58 TEST_FILE_PATH = split(abspath(__file__))[0]
60 59 TMP_TEST_DIR = tempfile.mkdtemp()
61 60 HOME_TEST_DIR = join(TMP_TEST_DIR, "home_test_dir")
62 61 XDG_TEST_DIR = join(HOME_TEST_DIR, "xdg_test_dir")
63 62 XDG_CACHE_DIR = join(HOME_TEST_DIR, "xdg_cache_dir")
64 63 IP_TEST_DIR = join(HOME_TEST_DIR,'.ipython')
65 64 #
66 65 # Setup/teardown functions/decorators
67 66 #
68 67
69 68 def setup():
70 69 """Setup testenvironment for the module:
71 70
72 71 - Adds dummy home dir tree
73 72 """
74 73 # Do not mask exceptions here. In particular, catching WindowsError is a
75 74 # problem because that exception is only defined on Windows...
76 75 os.makedirs(IP_TEST_DIR)
77 76 os.makedirs(os.path.join(XDG_TEST_DIR, 'ipython'))
78 77 os.makedirs(os.path.join(XDG_CACHE_DIR, 'ipython'))
79 78
80 79
81 80 def teardown():
82 81 """Teardown testenvironment for the module:
83 82
84 83 - Remove dummy home dir tree
85 84 """
86 85 # Note: we remove the parent test dir, which is the root of all test
87 86 # subdirs we may have created. Use shutil instead of os.removedirs, so
88 87 # that non-empty directories are all recursively removed.
89 88 shutil.rmtree(TMP_TEST_DIR)
90 89
91 90
92 91 def setup_environment():
93 92 """Setup testenvironment for some functions that are tested
94 93 in this module. In particular this functions stores attributes
95 94 and other things that we need to stub in some test functions.
96 95 This needs to be done on a function level and not module level because
97 96 each testfunction needs a pristine environment.
98 97 """
99 98 global oldstuff, platformstuff
100 99 oldstuff = (env.copy(), os.name, sys.platform, path.get_home_dir, IPython.__file__, os.getcwd())
101 100
102 101 if os.name == 'nt':
103 102 platformstuff = (wreg.OpenKey, wreg.QueryValueEx,)
104 103
105 104
106 105 def teardown_environment():
107 106 """Restore things that were remebered by the setup_environment function
108 107 """
109 108 (oldenv, os.name, sys.platform, path.get_home_dir, IPython.__file__, old_wd) = oldstuff
110 109 os.chdir(old_wd)
111 110 reload(path)
112 111
113 112 for key in env.keys():
114 113 if key not in oldenv:
115 114 del env[key]
116 115 env.update(oldenv)
117 116 if hasattr(sys, 'frozen'):
118 117 del sys.frozen
119 118 if os.name == 'nt':
120 119 (wreg.OpenKey, wreg.QueryValueEx,) = platformstuff
121 120
122 121 # Build decorator that uses the setup_environment/setup_environment
123 122 with_environment = with_setup(setup_environment, teardown_environment)
124 123
125 124 @skip_if_not_win32
126 125 @with_environment
127 126 def test_get_home_dir_1():
128 127 """Testcase for py2exe logic, un-compressed lib
129 128 """
130 129 sys.frozen = True
131 130
132 131 #fake filename for IPython.__init__
133 132 IPython.__file__ = abspath(join(HOME_TEST_DIR, "Lib/IPython/__init__.py"))
134 133
135 134 home_dir = path.get_home_dir()
136 135 nt.assert_equal(home_dir, abspath(HOME_TEST_DIR))
137 136
138 137
139 138 @skip_if_not_win32
140 139 @with_environment
141 140 def test_get_home_dir_2():
142 141 """Testcase for py2exe logic, compressed lib
143 142 """
144 143 sys.frozen = True
145 144 #fake filename for IPython.__init__
146 145 IPython.__file__ = abspath(join(HOME_TEST_DIR, "Library.zip/IPython/__init__.py")).lower()
147 146
148 147 home_dir = path.get_home_dir(True)
149 148 nt.assert_equal(home_dir, abspath(HOME_TEST_DIR).lower())
150 149
151 150
152 151 @with_environment
153 152 def test_get_home_dir_3():
154 153 """get_home_dir() uses $HOME if set"""
155 154 env["HOME"] = HOME_TEST_DIR
156 155 home_dir = path.get_home_dir(True)
157 156 # get_home_dir expands symlinks
158 157 nt.assert_equal(home_dir, os.path.realpath(env["HOME"]))
159 158
160 159
161 160 @with_environment
162 161 def test_get_home_dir_4():
163 162 """get_home_dir() still works if $HOME is not set"""
164 163
165 164 if 'HOME' in env: del env['HOME']
166 165 # this should still succeed, but we don't care what the answer is
167 166 home = path.get_home_dir(False)
168 167
169 168 @with_environment
170 169 def test_get_home_dir_5():
171 170 """raise HomeDirError if $HOME is specified, but not a writable dir"""
172 171 env['HOME'] = abspath(HOME_TEST_DIR+'garbage')
173 172 # set os.name = posix, to prevent My Documents fallback on Windows
174 173 os.name = 'posix'
175 174 nt.assert_raises(path.HomeDirError, path.get_home_dir, True)
176 175
177 176
178 177 # Should we stub wreg fully so we can run the test on all platforms?
179 178 @skip_if_not_win32
180 179 @with_environment
181 180 def test_get_home_dir_8():
182 181 """Using registry hack for 'My Documents', os=='nt'
183 182
184 183 HOMESHARE, HOMEDRIVE, HOMEPATH, USERPROFILE and others are missing.
185 184 """
186 185 os.name = 'nt'
187 186 # Remove from stub environment all keys that may be set
188 187 for key in ['HOME', 'HOMESHARE', 'HOMEDRIVE', 'HOMEPATH', 'USERPROFILE']:
189 188 env.pop(key, None)
190 189
191 190 #Stub windows registry functions
192 191 def OpenKey(x, y):
193 192 class key:
194 193 def Close(self):
195 194 pass
196 195 return key()
197 196 def QueryValueEx(x, y):
198 197 return [abspath(HOME_TEST_DIR)]
199 198
200 199 wreg.OpenKey = OpenKey
201 200 wreg.QueryValueEx = QueryValueEx
202 201
203 202 home_dir = path.get_home_dir()
204 203 nt.assert_equal(home_dir, abspath(HOME_TEST_DIR))
205 204
206 205
207 206 @with_environment
208 207 def test_get_ipython_dir_1():
209 208 """test_get_ipython_dir_1, Testcase to see if we can call get_ipython_dir without Exceptions."""
210 209 env_ipdir = os.path.join("someplace", ".ipython")
211 210 path._writable_dir = lambda path: True
212 211 env['IPYTHONDIR'] = env_ipdir
213 212 ipdir = path.get_ipython_dir()
214 213 nt.assert_equal(ipdir, env_ipdir)
215 214
216 215
217 216 @with_environment
218 217 def test_get_ipython_dir_2():
219 218 """test_get_ipython_dir_2, Testcase to see if we can call get_ipython_dir without Exceptions."""
220 219 path.get_home_dir = lambda : "someplace"
221 220 path.get_xdg_dir = lambda : None
222 221 path._writable_dir = lambda path: True
223 222 os.name = "posix"
224 223 env.pop('IPYTHON_DIR', None)
225 224 env.pop('IPYTHONDIR', None)
226 225 env.pop('XDG_CONFIG_HOME', None)
227 226 ipdir = path.get_ipython_dir()
228 227 nt.assert_equal(ipdir, os.path.join("someplace", ".ipython"))
229 228
230 229 @with_environment
231 230 def test_get_ipython_dir_3():
232 231 """test_get_ipython_dir_3, use XDG if defined, and .ipython doesn't exist."""
233 232 path.get_home_dir = lambda : "someplace"
234 233 path._writable_dir = lambda path: True
235 234 os.name = "posix"
236 235 env.pop('IPYTHON_DIR', None)
237 236 env.pop('IPYTHONDIR', None)
238 237 env['XDG_CONFIG_HOME'] = XDG_TEST_DIR
239 238 ipdir = path.get_ipython_dir()
240 239 if sys.platform == "darwin":
241 240 expected = os.path.join("someplace", ".ipython")
242 241 else:
243 242 expected = os.path.join(XDG_TEST_DIR, "ipython")
244 243 nt.assert_equal(ipdir, expected)
245 244
246 245 @with_environment
247 246 def test_get_ipython_dir_4():
248 247 """test_get_ipython_dir_4, use XDG if both exist."""
249 248 path.get_home_dir = lambda : HOME_TEST_DIR
250 249 os.name = "posix"
251 250 env.pop('IPYTHON_DIR', None)
252 251 env.pop('IPYTHONDIR', None)
253 252 env['XDG_CONFIG_HOME'] = XDG_TEST_DIR
254 253 ipdir = path.get_ipython_dir()
255 254 if sys.platform == "darwin":
256 255 expected = os.path.join(HOME_TEST_DIR, ".ipython")
257 256 else:
258 257 expected = os.path.join(XDG_TEST_DIR, "ipython")
259 258 nt.assert_equal(ipdir, expected)
260 259
261 260 @with_environment
262 261 def test_get_ipython_dir_5():
263 262 """test_get_ipython_dir_5, use .ipython if exists and XDG defined, but doesn't exist."""
264 263 path.get_home_dir = lambda : HOME_TEST_DIR
265 264 os.name = "posix"
266 265 env.pop('IPYTHON_DIR', None)
267 266 env.pop('IPYTHONDIR', None)
268 267 env['XDG_CONFIG_HOME'] = XDG_TEST_DIR
269 268 os.rmdir(os.path.join(XDG_TEST_DIR, 'ipython'))
270 269 ipdir = path.get_ipython_dir()
271 270 nt.assert_equal(ipdir, IP_TEST_DIR)
272 271
273 272 @with_environment
274 273 def test_get_ipython_dir_6():
275 274 """test_get_ipython_dir_6, use XDG if defined and neither exist."""
276 275 xdg = os.path.join(HOME_TEST_DIR, 'somexdg')
277 276 os.mkdir(xdg)
278 277 shutil.rmtree(os.path.join(HOME_TEST_DIR, '.ipython'))
279 278 path.get_home_dir = lambda : HOME_TEST_DIR
280 279 path.get_xdg_dir = lambda : xdg
281 280 os.name = "posix"
282 281 env.pop('IPYTHON_DIR', None)
283 282 env.pop('IPYTHONDIR', None)
284 283 env.pop('XDG_CONFIG_HOME', None)
285 284 xdg_ipdir = os.path.join(xdg, "ipython")
286 285 ipdir = path.get_ipython_dir()
287 286 nt.assert_equal(ipdir, xdg_ipdir)
288 287
289 288 @with_environment
290 289 def test_get_ipython_dir_7():
291 290 """test_get_ipython_dir_7, test home directory expansion on IPYTHONDIR"""
292 291 path._writable_dir = lambda path: True
293 292 home_dir = os.path.normpath(os.path.expanduser('~'))
294 293 env['IPYTHONDIR'] = os.path.join('~', 'somewhere')
295 294 ipdir = path.get_ipython_dir()
296 295 nt.assert_equal(ipdir, os.path.join(home_dir, 'somewhere'))
297 296
298 297 @skip_win32
299 298 @with_environment
300 299 def test_get_ipython_dir_8():
301 300 """test_get_ipython_dir_8, test / home directory"""
302 301 old = path._writable_dir, path.get_xdg_dir
303 302 try:
304 303 path._writable_dir = lambda path: bool(path)
305 304 path.get_xdg_dir = lambda: None
306 305 env.pop('IPYTHON_DIR', None)
307 306 env.pop('IPYTHONDIR', None)
308 307 env['HOME'] = '/'
309 308 nt.assert_equal(path.get_ipython_dir(), '/.ipython')
310 309 finally:
311 310 path._writable_dir, path.get_xdg_dir = old
312 311
313 312 @with_environment
314 313 def test_get_xdg_dir_0():
315 314 """test_get_xdg_dir_0, check xdg_dir"""
316 315 reload(path)
317 316 path._writable_dir = lambda path: True
318 317 path.get_home_dir = lambda : 'somewhere'
319 318 os.name = "posix"
320 319 sys.platform = "linux2"
321 320 env.pop('IPYTHON_DIR', None)
322 321 env.pop('IPYTHONDIR', None)
323 322 env.pop('XDG_CONFIG_HOME', None)
324 323
325 324 nt.assert_equal(path.get_xdg_dir(), os.path.join('somewhere', '.config'))
326 325
327 326
328 327 @with_environment
329 328 def test_get_xdg_dir_1():
330 329 """test_get_xdg_dir_1, check nonexistant xdg_dir"""
331 330 reload(path)
332 331 path.get_home_dir = lambda : HOME_TEST_DIR
333 332 os.name = "posix"
334 333 sys.platform = "linux2"
335 334 env.pop('IPYTHON_DIR', None)
336 335 env.pop('IPYTHONDIR', None)
337 336 env.pop('XDG_CONFIG_HOME', None)
338 337 nt.assert_equal(path.get_xdg_dir(), None)
339 338
340 339 @with_environment
341 340 def test_get_xdg_dir_2():
342 341 """test_get_xdg_dir_2, check xdg_dir default to ~/.config"""
343 342 reload(path)
344 343 path.get_home_dir = lambda : HOME_TEST_DIR
345 344 os.name = "posix"
346 345 sys.platform = "linux2"
347 346 env.pop('IPYTHON_DIR', None)
348 347 env.pop('IPYTHONDIR', None)
349 348 env.pop('XDG_CONFIG_HOME', None)
350 349 cfgdir=os.path.join(path.get_home_dir(), '.config')
351 350 if not os.path.exists(cfgdir):
352 351 os.makedirs(cfgdir)
353 352
354 353 nt.assert_equal(path.get_xdg_dir(), cfgdir)
355 354
356 355 @with_environment
357 356 def test_get_xdg_dir_3():
358 357 """test_get_xdg_dir_3, check xdg_dir not used on OS X"""
359 358 reload(path)
360 359 path.get_home_dir = lambda : HOME_TEST_DIR
361 360 os.name = "posix"
362 361 sys.platform = "darwin"
363 362 env.pop('IPYTHON_DIR', None)
364 363 env.pop('IPYTHONDIR', None)
365 364 env.pop('XDG_CONFIG_HOME', None)
366 365 cfgdir=os.path.join(path.get_home_dir(), '.config')
367 366 if not os.path.exists(cfgdir):
368 367 os.makedirs(cfgdir)
369 368
370 369 nt.assert_equal(path.get_xdg_dir(), None)
371 370
372 371 def test_filefind():
373 372 """Various tests for filefind"""
374 373 f = tempfile.NamedTemporaryFile()
375 374 # print 'fname:',f.name
376 375 alt_dirs = path.get_ipython_dir()
377 376 t = path.filefind(f.name, alt_dirs)
378 377 # print 'found:',t
379 378
380 379 @with_environment
381 380 def test_get_ipython_cache_dir():
382 381 os.environ["HOME"] = HOME_TEST_DIR
383 382 if os.name == 'posix' and sys.platform != 'darwin':
384 383 # test default
385 384 os.makedirs(os.path.join(HOME_TEST_DIR, ".cache"))
386 385 os.environ.pop("XDG_CACHE_HOME", None)
387 386 ipdir = path.get_ipython_cache_dir()
388 387 nt.assert_equal(os.path.join(HOME_TEST_DIR, ".cache", "ipython"),
389 388 ipdir)
390 389 nt.assert_true(os.path.isdir(ipdir))
391 390
392 391 # test env override
393 392 os.environ["XDG_CACHE_HOME"] = XDG_CACHE_DIR
394 393 ipdir = path.get_ipython_cache_dir()
395 394 nt.assert_true(os.path.isdir(ipdir))
396 395 nt.assert_equal(ipdir, os.path.join(XDG_CACHE_DIR, "ipython"))
397 396 else:
398 397 nt.assert_equal(path.get_ipython_cache_dir(),
399 398 path.get_ipython_dir())
400 399
401 400 def test_get_ipython_package_dir():
402 401 ipdir = path.get_ipython_package_dir()
403 402 nt.assert_true(os.path.isdir(ipdir))
404 403
405 404
406 405 def test_get_ipython_module_path():
407 406 ipapp_path = path.get_ipython_module_path('IPython.terminal.ipapp')
408 407 nt.assert_true(os.path.isfile(ipapp_path))
409 408
410 409
411 410 @dec.skip_if_not_win32
412 411 def test_get_long_path_name_win32():
413 412 p = path.get_long_path_name('c:\\docume~1')
414 413 nt.assert_equal(p,u'c:\\Documents and Settings')
415 414
416 415
417 416 @dec.skip_win32
418 417 def test_get_long_path_name():
419 418 p = path.get_long_path_name('/usr/local')
420 419 nt.assert_equal(p,'/usr/local')
421 420
422 421 @dec.skip_win32 # can't create not-user-writable dir on win
423 422 @with_environment
424 423 def test_not_writable_ipdir():
425 424 tmpdir = tempfile.mkdtemp()
426 425 os.name = "posix"
427 426 env.pop('IPYTHON_DIR', None)
428 427 env.pop('IPYTHONDIR', None)
429 428 env.pop('XDG_CONFIG_HOME', None)
430 429 env['HOME'] = tmpdir
431 430 ipdir = os.path.join(tmpdir, '.ipython')
432 431 os.mkdir(ipdir)
433 432 os.chmod(ipdir, 600)
434 433 with AssertPrints('is not a writable location', channel='stderr'):
435 434 ipdir = path.get_ipython_dir()
436 435 env.pop('IPYTHON_DIR', None)
437 436
438 437 def test_unquote_filename():
439 438 for win32 in (True, False):
440 439 nt.assert_equal(path.unquote_filename('foo.py', win32=win32), 'foo.py')
441 440 nt.assert_equal(path.unquote_filename('foo bar.py', win32=win32), 'foo bar.py')
442 441 nt.assert_equal(path.unquote_filename('"foo.py"', win32=True), 'foo.py')
443 442 nt.assert_equal(path.unquote_filename('"foo bar.py"', win32=True), 'foo bar.py')
444 443 nt.assert_equal(path.unquote_filename("'foo.py'", win32=True), 'foo.py')
445 444 nt.assert_equal(path.unquote_filename("'foo bar.py'", win32=True), 'foo bar.py')
446 445 nt.assert_equal(path.unquote_filename('"foo.py"', win32=False), '"foo.py"')
447 446 nt.assert_equal(path.unquote_filename('"foo bar.py"', win32=False), '"foo bar.py"')
448 447 nt.assert_equal(path.unquote_filename("'foo.py'", win32=False), "'foo.py'")
449 448 nt.assert_equal(path.unquote_filename("'foo bar.py'", win32=False), "'foo bar.py'")
450 449
451 450 @with_environment
452 451 def test_get_py_filename():
453 452 os.chdir(TMP_TEST_DIR)
454 453 for win32 in (True, False):
455 454 with make_tempfile('foo.py'):
456 455 nt.assert_equal(path.get_py_filename('foo.py', force_win32=win32), 'foo.py')
457 456 nt.assert_equal(path.get_py_filename('foo', force_win32=win32), 'foo.py')
458 457 with make_tempfile('foo'):
459 458 nt.assert_equal(path.get_py_filename('foo', force_win32=win32), 'foo')
460 459 nt.assert_raises(IOError, path.get_py_filename, 'foo.py', force_win32=win32)
461 460 nt.assert_raises(IOError, path.get_py_filename, 'foo', force_win32=win32)
462 461 nt.assert_raises(IOError, path.get_py_filename, 'foo.py', force_win32=win32)
463 462 true_fn = 'foo with spaces.py'
464 463 with make_tempfile(true_fn):
465 464 nt.assert_equal(path.get_py_filename('foo with spaces', force_win32=win32), true_fn)
466 465 nt.assert_equal(path.get_py_filename('foo with spaces.py', force_win32=win32), true_fn)
467 466 if win32:
468 467 nt.assert_equal(path.get_py_filename('"foo with spaces.py"', force_win32=True), true_fn)
469 468 nt.assert_equal(path.get_py_filename("'foo with spaces.py'", force_win32=True), true_fn)
470 469 else:
471 470 nt.assert_raises(IOError, path.get_py_filename, '"foo with spaces.py"', force_win32=False)
472 471 nt.assert_raises(IOError, path.get_py_filename, "'foo with spaces.py'", force_win32=False)
473 472
474 473 def test_unicode_in_filename():
475 474 """When a file doesn't exist, the exception raised should be safe to call
476 475 str() on - i.e. in Python 2 it must only have ASCII characters.
477 476
478 477 https://github.com/ipython/ipython/issues/875
479 478 """
480 479 try:
481 480 # these calls should not throw unicode encode exceptions
482 481 path.get_py_filename(u'fooéè.py', force_win32=False)
483 482 except IOError as ex:
484 483 str(ex)
485 484
486 485
487 486 class TestShellGlob(object):
488 487
489 488 @classmethod
490 489 def setUpClass(cls):
491 490 cls.filenames_start_with_a = map('a{0}'.format, range(3))
492 491 cls.filenames_end_with_b = map('{0}b'.format, range(3))
493 492 cls.filenames = cls.filenames_start_with_a + cls.filenames_end_with_b
494 493 cls.tempdir = TemporaryDirectory()
495 494 td = cls.tempdir.name
496 495
497 496 with cls.in_tempdir():
498 497 # Create empty files
499 498 for fname in cls.filenames:
500 499 open(os.path.join(td, fname), 'w').close()
501 500
502 501 @classmethod
503 502 def tearDownClass(cls):
504 503 cls.tempdir.cleanup()
505 504
506 505 @classmethod
507 506 @contextmanager
508 507 def in_tempdir(cls):
509 508 save = os.getcwdu()
510 509 try:
511 510 os.chdir(cls.tempdir.name)
512 511 yield
513 512 finally:
514 513 os.chdir(save)
515 514
516 515 def check_match(self, patterns, matches):
517 516 with self.in_tempdir():
518 517 # glob returns unordered list. that's why sorted is required.
519 518 nt.assert_equals(sorted(path.shellglob(patterns)),
520 519 sorted(matches))
521 520
522 521 def common_cases(self):
523 522 return [
524 523 (['*'], self.filenames),
525 524 (['a*'], self.filenames_start_with_a),
526 525 (['*c'], ['*c']),
527 526 (['*', 'a*', '*b', '*c'], self.filenames
528 527 + self.filenames_start_with_a
529 528 + self.filenames_end_with_b
530 529 + ['*c']),
531 530 (['a[012]'], self.filenames_start_with_a),
532 531 ]
533 532
534 533 @skip_win32
535 534 def test_match_posix(self):
536 535 for (patterns, matches) in self.common_cases() + [
537 536 ([r'\*'], ['*']),
538 537 ([r'a\*', 'a*'], ['a*'] + self.filenames_start_with_a),
539 538 ([r'a\[012]'], ['a[012]']),
540 539 ]:
541 540 yield (self.check_match, patterns, matches)
542 541
543 542 @skip_if_not_win32
544 543 def test_match_windows(self):
545 544 for (patterns, matches) in self.common_cases() + [
546 545 # In windows, backslash is interpreted as path
547 546 # separator. Therefore, you can't escape glob
548 547 # using it.
549 548 ([r'a\*', 'a*'], [r'a\*'] + self.filenames_start_with_a),
550 549 ([r'a\[012]'], [r'a\[012]']),
551 550 ]:
552 551 yield (self.check_match, patterns, matches)
553 552
554 553
555 554 def test_unescape_glob():
556 555 nt.assert_equals(path.unescape_glob(r'\*\[\!\]\?'), '*[!]?')
557 556 nt.assert_equals(path.unescape_glob(r'\\*'), r'\*')
558 557 nt.assert_equals(path.unescape_glob(r'\\\*'), r'\*')
559 558 nt.assert_equals(path.unescape_glob(r'\\a'), r'\a')
560 559 nt.assert_equals(path.unescape_glob(r'\a'), r'\a')
@@ -1,173 +1,170 b''
1 1 # encoding: utf-8
2 2 """Tests for IPython.utils.text"""
3 3
4 4 #-----------------------------------------------------------------------------
5 5 # Copyright (C) 2011 The IPython Development Team
6 6 #
7 7 # Distributed under the terms of the BSD License. The full license is in
8 8 # the file COPYING, distributed as part of this software.
9 9 #-----------------------------------------------------------------------------
10 10
11 11 #-----------------------------------------------------------------------------
12 12 # Imports
13 13 #-----------------------------------------------------------------------------
14 14
15 15 import os
16 16 import math
17 17 import random
18 18
19 19 import nose.tools as nt
20 20
21 from nose import with_setup
22
23 from IPython.testing import decorators as dec
24 21 from IPython.utils import text
25 22
26 23 #-----------------------------------------------------------------------------
27 24 # Globals
28 25 #-----------------------------------------------------------------------------
29 26
30 27 def test_columnize():
31 28 """Basic columnize tests."""
32 29 size = 5
33 30 items = [l*size for l in 'abc']
34 31 out = text.columnize(items, displaywidth=80)
35 32 nt.assert_equal(out, 'aaaaa bbbbb ccccc\n')
36 33 out = text.columnize(items, displaywidth=12)
37 34 nt.assert_equal(out, 'aaaaa ccccc\nbbbbb\n')
38 35 out = text.columnize(items, displaywidth=10)
39 36 nt.assert_equal(out, 'aaaaa\nbbbbb\nccccc\n')
40 37
41 38 def test_columnize_random():
42 39 """Test with random input to hopfully catch edge case """
43 40 for nitems in [random.randint(2,70) for i in range(2,20)]:
44 41 displaywidth = random.randint(20,200)
45 42 rand_len = [random.randint(2,displaywidth) for i in range(nitems)]
46 43 items = ['x'*l for l in rand_len]
47 44 out = text.columnize(items, displaywidth=displaywidth)
48 45 longer_line = max([len(x) for x in out.split('\n')])
49 46 longer_element = max(rand_len)
50 47 if longer_line > displaywidth:
51 48 print "Columnize displayed something lager than displaywidth : %s " % longer_line
52 49 print "longer element : %s " % longer_element
53 50 print "displaywidth : %s " % displaywidth
54 51 print "number of element : %s " % nitems
55 52 print "size of each element :\n %s" % rand_len
56 53 assert False
57 54
58 55 def test_columnize_medium():
59 56 """Test with inputs than shouldn't be wider tahn 80 """
60 57 size = 40
61 58 items = [l*size for l in 'abc']
62 59 out = text.columnize(items, displaywidth=80)
63 60 nt.assert_equal(out, '\n'.join(items+['']))
64 61
65 62 def test_columnize_long():
66 63 """Test columnize with inputs longer than the display window"""
67 64 size = 11
68 65 items = [l*size for l in 'abc']
69 66 out = text.columnize(items, displaywidth=size-1)
70 67 nt.assert_equal(out, '\n'.join(items+['']))
71 68
72 69 def eval_formatter_check(f):
73 70 ns = dict(n=12, pi=math.pi, stuff='hello there', os=os, u=u"cafΓ©", b="cafΓ©")
74 71 s = f.format("{n} {n//4} {stuff.split()[0]}", **ns)
75 72 nt.assert_equal(s, "12 3 hello")
76 73 s = f.format(' '.join(['{n//%i}'%i for i in range(1,8)]), **ns)
77 74 nt.assert_equal(s, "12 6 4 3 2 2 1")
78 75 s = f.format('{[n//i for i in range(1,8)]}', **ns)
79 76 nt.assert_equal(s, "[12, 6, 4, 3, 2, 2, 1]")
80 77 s = f.format("{stuff!s}", **ns)
81 78 nt.assert_equal(s, ns['stuff'])
82 79 s = f.format("{stuff!r}", **ns)
83 80 nt.assert_equal(s, repr(ns['stuff']))
84 81
85 82 # Check with unicode:
86 83 s = f.format("{u}", **ns)
87 84 nt.assert_equal(s, ns['u'])
88 85 # This decodes in a platform dependent manner, but it shouldn't error out
89 86 s = f.format("{b}", **ns)
90 87
91 88 nt.assert_raises(NameError, f.format, '{dne}', **ns)
92 89
93 90 def eval_formatter_slicing_check(f):
94 91 ns = dict(n=12, pi=math.pi, stuff='hello there', os=os)
95 92 s = f.format(" {stuff.split()[:]} ", **ns)
96 93 nt.assert_equal(s, " ['hello', 'there'] ")
97 94 s = f.format(" {stuff.split()[::-1]} ", **ns)
98 95 nt.assert_equal(s, " ['there', 'hello'] ")
99 96 s = f.format("{stuff[::2]}", **ns)
100 97 nt.assert_equal(s, ns['stuff'][::2])
101 98
102 99 nt.assert_raises(SyntaxError, f.format, "{n:x}", **ns)
103 100
104 101 def eval_formatter_no_slicing_check(f):
105 102 ns = dict(n=12, pi=math.pi, stuff='hello there', os=os)
106 103
107 104 s = f.format('{n:x} {pi**2:+f}', **ns)
108 105 nt.assert_equal(s, "c +9.869604")
109 106
110 107 s = f.format('{stuff[slice(1,4)]}', **ns)
111 108 nt.assert_equal(s, 'ell')
112 109
113 110 nt.assert_raises(SyntaxError, f.format, "{a[:]}")
114 111
115 112 def test_eval_formatter():
116 113 f = text.EvalFormatter()
117 114 eval_formatter_check(f)
118 115 eval_formatter_no_slicing_check(f)
119 116
120 117 def test_full_eval_formatter():
121 118 f = text.FullEvalFormatter()
122 119 eval_formatter_check(f)
123 120 eval_formatter_slicing_check(f)
124 121
125 122 def test_dollar_formatter():
126 123 f = text.DollarFormatter()
127 124 eval_formatter_check(f)
128 125 eval_formatter_slicing_check(f)
129 126
130 127 ns = dict(n=12, pi=math.pi, stuff='hello there', os=os)
131 128 s = f.format("$n", **ns)
132 129 nt.assert_equal(s, "12")
133 130 s = f.format("$n.real", **ns)
134 131 nt.assert_equal(s, "12")
135 132 s = f.format("$n/{stuff[:5]}", **ns)
136 133 nt.assert_equal(s, "12/hello")
137 134 s = f.format("$n $$HOME", **ns)
138 135 nt.assert_equal(s, "12 $HOME")
139 136 s = f.format("${foo}", foo="HOME")
140 137 nt.assert_equal(s, "$HOME")
141 138
142 139
143 140 def test_long_substr():
144 141 data = ['hi']
145 142 nt.assert_equal(text.long_substr(data), 'hi')
146 143
147 144
148 145 def test_long_substr2():
149 146 data = ['abc', 'abd', 'abf', 'ab']
150 147 nt.assert_equal(text.long_substr(data), 'ab')
151 148
152 149 def test_long_substr_empty():
153 150 data = []
154 151 nt.assert_equal(text.long_substr(data), '')
155 152
156 153 def test_strip_email():
157 154 src = """\
158 155 >> >>> def f(x):
159 156 >> ... return x+1
160 157 >> ...
161 158 >> >>> zz = f(2.5)"""
162 159 cln = """\
163 160 >>> def f(x):
164 161 ... return x+1
165 162 ...
166 163 >>> zz = f(2.5)"""
167 164 nt.assert_equal(text.strip_email_quotes(src), cln)
168 165
169 166
170 167 def test_strip_email2():
171 168 src = '> > > list()'
172 169 cln = 'list()'
173 170 nt.assert_equal(text.strip_email_quotes(src), cln)
@@ -1,148 +1,147 b''
1 1 """Some tests for the wildcard utilities."""
2 2
3 3 #-----------------------------------------------------------------------------
4 4 # Library imports
5 5 #-----------------------------------------------------------------------------
6 6 # Stdlib
7 import sys
8 7 import unittest
9 8
10 9 # Our own
11 10 from IPython.utils import wildcard
12 11
13 12 #-----------------------------------------------------------------------------
14 13 # Globals for test
15 14 #-----------------------------------------------------------------------------
16 15
17 16 class obj_t(object):
18 17 pass
19 18
20 19 root = obj_t()
21 20 l = ["arna","abel","ABEL","active","bob","bark","abbot"]
22 21 q = ["kate","loop","arne","vito","lucifer","koppel"]
23 22 for x in l:
24 23 o = obj_t()
25 24 setattr(root,x,o)
26 25 for y in q:
27 26 p = obj_t()
28 27 setattr(o,y,p)
29 28 root._apan = obj_t()
30 29 root._apan.a = 10
31 30 root._apan._a = 20
32 31 root._apan.__a = 20
33 32 root.__anka = obj_t()
34 33 root.__anka.a = 10
35 34 root.__anka._a = 20
36 35 root.__anka.__a = 20
37 36
38 37 root._APAN = obj_t()
39 38 root._APAN.a = 10
40 39 root._APAN._a = 20
41 40 root._APAN.__a = 20
42 41 root.__ANKA = obj_t()
43 42 root.__ANKA.a = 10
44 43 root.__ANKA._a = 20
45 44 root.__ANKA.__a = 20
46 45
47 46 #-----------------------------------------------------------------------------
48 47 # Test cases
49 48 #-----------------------------------------------------------------------------
50 49
51 50 class Tests (unittest.TestCase):
52 51 def test_case(self):
53 52 ns=root.__dict__
54 53 tests=[
55 54 ("a*", ["abbot","abel","active","arna",]),
56 55 ("?b*.?o*",["abbot.koppel","abbot.loop","abel.koppel","abel.loop",]),
57 56 ("_a*", []),
58 57 ("_*anka", ["__anka",]),
59 58 ("_*a*", ["__anka",]),
60 59 ]
61 60 for pat,res in tests:
62 61 res.sort()
63 62 a=wildcard.list_namespace(ns,"all",pat,ignore_case=False,
64 63 show_all=False).keys()
65 64 a.sort()
66 65 self.assertEqual(a,res)
67 66
68 67 def test_case_showall(self):
69 68 ns=root.__dict__
70 69 tests=[
71 70 ("a*", ["abbot","abel","active","arna",]),
72 71 ("?b*.?o*",["abbot.koppel","abbot.loop","abel.koppel","abel.loop",]),
73 72 ("_a*", ["_apan"]),
74 73 ("_*anka", ["__anka",]),
75 74 ("_*a*", ["__anka","_apan",]),
76 75 ]
77 76 for pat,res in tests:
78 77 res.sort()
79 78 a=wildcard.list_namespace(ns,"all",pat,ignore_case=False,
80 79 show_all=True).keys()
81 80 a.sort()
82 81 self.assertEqual(a,res)
83 82
84 83
85 84 def test_nocase(self):
86 85 ns=root.__dict__
87 86 tests=[
88 87 ("a*", ["abbot","abel","ABEL","active","arna",]),
89 88 ("?b*.?o*",["abbot.koppel","abbot.loop","abel.koppel","abel.loop",
90 89 "ABEL.koppel","ABEL.loop",]),
91 90 ("_a*", []),
92 91 ("_*anka", ["__anka","__ANKA",]),
93 92 ("_*a*", ["__anka","__ANKA",]),
94 93 ]
95 94 for pat,res in tests:
96 95 res.sort()
97 96 a=wildcard.list_namespace(ns,"all",pat,ignore_case=True,
98 97 show_all=False).keys()
99 98 a.sort()
100 99 self.assertEqual(a,res)
101 100
102 101 def test_nocase_showall(self):
103 102 ns=root.__dict__
104 103 tests=[
105 104 ("a*", ["abbot","abel","ABEL","active","arna",]),
106 105 ("?b*.?o*",["abbot.koppel","abbot.loop","abel.koppel","abel.loop",
107 106 "ABEL.koppel","ABEL.loop",]),
108 107 ("_a*", ["_apan","_APAN"]),
109 108 ("_*anka", ["__anka","__ANKA",]),
110 109 ("_*a*", ["__anka","__ANKA","_apan","_APAN"]),
111 110 ]
112 111 for pat,res in tests:
113 112 res.sort()
114 113 a=wildcard.list_namespace(ns,"all",pat,ignore_case=True,
115 114 show_all=True).keys()
116 115 a.sort()
117 116 self.assertEqual(a,res)
118 117
119 118 def test_dict_attributes(self):
120 119 """Dictionaries should be indexed by attributes, not by keys. This was
121 120 causing Github issue 129."""
122 121 ns = {"az":{"king":55}, "pq":{1:0}}
123 122 tests = [
124 123 ("a*", ["az"]),
125 124 ("az.k*", ["az.keys"]),
126 125 ("pq.k*", ["pq.keys"])
127 126 ]
128 127 for pat, res in tests:
129 128 res.sort()
130 129 a = wildcard.list_namespace(ns, "all", pat, ignore_case=False,
131 130 show_all=True).keys()
132 131 a.sort()
133 132 self.assertEqual(a, res)
134 133
135 134 def test_dict_dir(self):
136 135 class A(object):
137 136 def __init__(self):
138 137 self.a = 1
139 138 self.b = 2
140 139 def __getattribute__(self, name):
141 140 if name=="a":
142 141 raise AttributeError
143 142 return object.__getattribute__(self, name)
144 143
145 144 a = A()
146 145 adict = wildcard.dict_dir(a)
147 146 assert "a" not in adict # change to assertNotIn method in >= 2.7
148 147 self.assertEqual(adict["b"], 2)
@@ -1,717 +1,713 b''
1 1 # encoding: utf-8
2 2 """
3 3 Utilities for working with strings and text.
4 4
5 5 Inheritance diagram:
6 6
7 7 .. inheritance-diagram:: IPython.utils.text
8 8 :parts: 3
9 9 """
10 10
11 11 #-----------------------------------------------------------------------------
12 12 # Copyright (C) 2008-2011 The IPython Development Team
13 13 #
14 14 # Distributed under the terms of the BSD License. The full license is in
15 15 # the file COPYING, distributed as part of this software.
16 16 #-----------------------------------------------------------------------------
17 17
18 18 #-----------------------------------------------------------------------------
19 19 # Imports
20 20 #-----------------------------------------------------------------------------
21 21
22 import __main__
23
24 22 import os
25 23 import re
26 import sys
27 24 import textwrap
28 25 from string import Formatter
29 26
30 27 from IPython.external.path import path
31 28 from IPython.testing.skipdoctest import skip_doctest_py3, skip_doctest
32 29 from IPython.utils import py3compat
33 from IPython.utils.data import flatten
34 30
35 31 #-----------------------------------------------------------------------------
36 32 # Code
37 33 #-----------------------------------------------------------------------------
38 34
39 35 class LSString(str):
40 36 """String derivative with a special access attributes.
41 37
42 38 These are normal strings, but with the special attributes:
43 39
44 40 .l (or .list) : value as list (split on newlines).
45 41 .n (or .nlstr): original value (the string itself).
46 42 .s (or .spstr): value as whitespace-separated string.
47 43 .p (or .paths): list of path objects
48 44
49 45 Any values which require transformations are computed only once and
50 46 cached.
51 47
52 48 Such strings are very useful to efficiently interact with the shell, which
53 49 typically only understands whitespace-separated options for commands."""
54 50
55 51 def get_list(self):
56 52 try:
57 53 return self.__list
58 54 except AttributeError:
59 55 self.__list = self.split('\n')
60 56 return self.__list
61 57
62 58 l = list = property(get_list)
63 59
64 60 def get_spstr(self):
65 61 try:
66 62 return self.__spstr
67 63 except AttributeError:
68 64 self.__spstr = self.replace('\n',' ')
69 65 return self.__spstr
70 66
71 67 s = spstr = property(get_spstr)
72 68
73 69 def get_nlstr(self):
74 70 return self
75 71
76 72 n = nlstr = property(get_nlstr)
77 73
78 74 def get_paths(self):
79 75 try:
80 76 return self.__paths
81 77 except AttributeError:
82 78 self.__paths = [path(p) for p in self.split('\n') if os.path.exists(p)]
83 79 return self.__paths
84 80
85 81 p = paths = property(get_paths)
86 82
87 83 # FIXME: We need to reimplement type specific displayhook and then add this
88 84 # back as a custom printer. This should also be moved outside utils into the
89 85 # core.
90 86
91 87 # def print_lsstring(arg):
92 88 # """ Prettier (non-repr-like) and more informative printer for LSString """
93 89 # print "LSString (.p, .n, .l, .s available). Value:"
94 90 # print arg
95 91 #
96 92 #
97 93 # print_lsstring = result_display.when_type(LSString)(print_lsstring)
98 94
99 95
100 96 class SList(list):
101 97 """List derivative with a special access attributes.
102 98
103 99 These are normal lists, but with the special attributes:
104 100
105 101 .l (or .list) : value as list (the list itself).
106 102 .n (or .nlstr): value as a string, joined on newlines.
107 103 .s (or .spstr): value as a string, joined on spaces.
108 104 .p (or .paths): list of path objects
109 105
110 106 Any values which require transformations are computed only once and
111 107 cached."""
112 108
113 109 def get_list(self):
114 110 return self
115 111
116 112 l = list = property(get_list)
117 113
118 114 def get_spstr(self):
119 115 try:
120 116 return self.__spstr
121 117 except AttributeError:
122 118 self.__spstr = ' '.join(self)
123 119 return self.__spstr
124 120
125 121 s = spstr = property(get_spstr)
126 122
127 123 def get_nlstr(self):
128 124 try:
129 125 return self.__nlstr
130 126 except AttributeError:
131 127 self.__nlstr = '\n'.join(self)
132 128 return self.__nlstr
133 129
134 130 n = nlstr = property(get_nlstr)
135 131
136 132 def get_paths(self):
137 133 try:
138 134 return self.__paths
139 135 except AttributeError:
140 136 self.__paths = [path(p) for p in self if os.path.exists(p)]
141 137 return self.__paths
142 138
143 139 p = paths = property(get_paths)
144 140
145 141 def grep(self, pattern, prune = False, field = None):
146 142 """ Return all strings matching 'pattern' (a regex or callable)
147 143
148 144 This is case-insensitive. If prune is true, return all items
149 145 NOT matching the pattern.
150 146
151 147 If field is specified, the match must occur in the specified
152 148 whitespace-separated field.
153 149
154 150 Examples::
155 151
156 152 a.grep( lambda x: x.startswith('C') )
157 153 a.grep('Cha.*log', prune=1)
158 154 a.grep('chm', field=-1)
159 155 """
160 156
161 157 def match_target(s):
162 158 if field is None:
163 159 return s
164 160 parts = s.split()
165 161 try:
166 162 tgt = parts[field]
167 163 return tgt
168 164 except IndexError:
169 165 return ""
170 166
171 167 if isinstance(pattern, basestring):
172 168 pred = lambda x : re.search(pattern, x, re.IGNORECASE)
173 169 else:
174 170 pred = pattern
175 171 if not prune:
176 172 return SList([el for el in self if pred(match_target(el))])
177 173 else:
178 174 return SList([el for el in self if not pred(match_target(el))])
179 175
180 176 def fields(self, *fields):
181 177 """ Collect whitespace-separated fields from string list
182 178
183 179 Allows quick awk-like usage of string lists.
184 180
185 181 Example data (in var a, created by 'a = !ls -l')::
186 182 -rwxrwxrwx 1 ville None 18 Dec 14 2006 ChangeLog
187 183 drwxrwxrwx+ 6 ville None 0 Oct 24 18:05 IPython
188 184
189 185 a.fields(0) is ['-rwxrwxrwx', 'drwxrwxrwx+']
190 186 a.fields(1,0) is ['1 -rwxrwxrwx', '6 drwxrwxrwx+']
191 187 (note the joining by space).
192 188 a.fields(-1) is ['ChangeLog', 'IPython']
193 189
194 190 IndexErrors are ignored.
195 191
196 192 Without args, fields() just split()'s the strings.
197 193 """
198 194 if len(fields) == 0:
199 195 return [el.split() for el in self]
200 196
201 197 res = SList()
202 198 for el in [f.split() for f in self]:
203 199 lineparts = []
204 200
205 201 for fd in fields:
206 202 try:
207 203 lineparts.append(el[fd])
208 204 except IndexError:
209 205 pass
210 206 if lineparts:
211 207 res.append(" ".join(lineparts))
212 208
213 209 return res
214 210
215 211 def sort(self,field= None, nums = False):
216 212 """ sort by specified fields (see fields())
217 213
218 214 Example::
219 215 a.sort(1, nums = True)
220 216
221 217 Sorts a by second field, in numerical order (so that 21 > 3)
222 218
223 219 """
224 220
225 221 #decorate, sort, undecorate
226 222 if field is not None:
227 223 dsu = [[SList([line]).fields(field), line] for line in self]
228 224 else:
229 225 dsu = [[line, line] for line in self]
230 226 if nums:
231 227 for i in range(len(dsu)):
232 228 numstr = "".join([ch for ch in dsu[i][0] if ch.isdigit()])
233 229 try:
234 230 n = int(numstr)
235 231 except ValueError:
236 232 n = 0;
237 233 dsu[i][0] = n
238 234
239 235
240 236 dsu.sort()
241 237 return SList([t[1] for t in dsu])
242 238
243 239
244 240 # FIXME: We need to reimplement type specific displayhook and then add this
245 241 # back as a custom printer. This should also be moved outside utils into the
246 242 # core.
247 243
248 244 # def print_slist(arg):
249 245 # """ Prettier (non-repr-like) and more informative printer for SList """
250 246 # print "SList (.p, .n, .l, .s, .grep(), .fields(), sort() available):"
251 247 # if hasattr(arg, 'hideonce') and arg.hideonce:
252 248 # arg.hideonce = False
253 249 # return
254 250 #
255 251 # nlprint(arg) # This was a nested list printer, now removed.
256 252 #
257 253 # print_slist = result_display.when_type(SList)(print_slist)
258 254
259 255
260 256 def indent(instr,nspaces=4, ntabs=0, flatten=False):
261 257 """Indent a string a given number of spaces or tabstops.
262 258
263 259 indent(str,nspaces=4,ntabs=0) -> indent str by ntabs+nspaces.
264 260
265 261 Parameters
266 262 ----------
267 263
268 264 instr : basestring
269 265 The string to be indented.
270 266 nspaces : int (default: 4)
271 267 The number of spaces to be indented.
272 268 ntabs : int (default: 0)
273 269 The number of tabs to be indented.
274 270 flatten : bool (default: False)
275 271 Whether to scrub existing indentation. If True, all lines will be
276 272 aligned to the same indentation. If False, existing indentation will
277 273 be strictly increased.
278 274
279 275 Returns
280 276 -------
281 277
282 278 str|unicode : string indented by ntabs and nspaces.
283 279
284 280 """
285 281 if instr is None:
286 282 return
287 283 ind = '\t'*ntabs+' '*nspaces
288 284 if flatten:
289 285 pat = re.compile(r'^\s*', re.MULTILINE)
290 286 else:
291 287 pat = re.compile(r'^', re.MULTILINE)
292 288 outstr = re.sub(pat, ind, instr)
293 289 if outstr.endswith(os.linesep+ind):
294 290 return outstr[:-len(ind)]
295 291 else:
296 292 return outstr
297 293
298 294
299 295 def list_strings(arg):
300 296 """Always return a list of strings, given a string or list of strings
301 297 as input.
302 298
303 299 :Examples:
304 300
305 301 In [7]: list_strings('A single string')
306 302 Out[7]: ['A single string']
307 303
308 304 In [8]: list_strings(['A single string in a list'])
309 305 Out[8]: ['A single string in a list']
310 306
311 307 In [9]: list_strings(['A','list','of','strings'])
312 308 Out[9]: ['A', 'list', 'of', 'strings']
313 309 """
314 310
315 311 if isinstance(arg,basestring): return [arg]
316 312 else: return arg
317 313
318 314
319 315 def marquee(txt='',width=78,mark='*'):
320 316 """Return the input string centered in a 'marquee'.
321 317
322 318 :Examples:
323 319
324 320 In [16]: marquee('A test',40)
325 321 Out[16]: '**************** A test ****************'
326 322
327 323 In [17]: marquee('A test',40,'-')
328 324 Out[17]: '---------------- A test ----------------'
329 325
330 326 In [18]: marquee('A test',40,' ')
331 327 Out[18]: ' A test '
332 328
333 329 """
334 330 if not txt:
335 331 return (mark*width)[:width]
336 332 nmark = (width-len(txt)-2)//len(mark)//2
337 333 if nmark < 0: nmark =0
338 334 marks = mark*nmark
339 335 return '%s %s %s' % (marks,txt,marks)
340 336
341 337
342 338 ini_spaces_re = re.compile(r'^(\s+)')
343 339
344 340 def num_ini_spaces(strng):
345 341 """Return the number of initial spaces in a string"""
346 342
347 343 ini_spaces = ini_spaces_re.match(strng)
348 344 if ini_spaces:
349 345 return ini_spaces.end()
350 346 else:
351 347 return 0
352 348
353 349
354 350 def format_screen(strng):
355 351 """Format a string for screen printing.
356 352
357 353 This removes some latex-type format codes."""
358 354 # Paragraph continue
359 355 par_re = re.compile(r'\\$',re.MULTILINE)
360 356 strng = par_re.sub('',strng)
361 357 return strng
362 358
363 359
364 360 def dedent(text):
365 361 """Equivalent of textwrap.dedent that ignores unindented first line.
366 362
367 363 This means it will still dedent strings like:
368 364 '''foo
369 365 is a bar
370 366 '''
371 367
372 368 For use in wrap_paragraphs.
373 369 """
374 370
375 371 if text.startswith('\n'):
376 372 # text starts with blank line, don't ignore the first line
377 373 return textwrap.dedent(text)
378 374
379 375 # split first line
380 376 splits = text.split('\n',1)
381 377 if len(splits) == 1:
382 378 # only one line
383 379 return textwrap.dedent(text)
384 380
385 381 first, rest = splits
386 382 # dedent everything but the first line
387 383 rest = textwrap.dedent(rest)
388 384 return '\n'.join([first, rest])
389 385
390 386
391 387 def wrap_paragraphs(text, ncols=80):
392 388 """Wrap multiple paragraphs to fit a specified width.
393 389
394 390 This is equivalent to textwrap.wrap, but with support for multiple
395 391 paragraphs, as separated by empty lines.
396 392
397 393 Returns
398 394 -------
399 395
400 396 list of complete paragraphs, wrapped to fill `ncols` columns.
401 397 """
402 398 paragraph_re = re.compile(r'\n(\s*\n)+', re.MULTILINE)
403 399 text = dedent(text).strip()
404 400 paragraphs = paragraph_re.split(text)[::2] # every other entry is space
405 401 out_ps = []
406 402 indent_re = re.compile(r'\n\s+', re.MULTILINE)
407 403 for p in paragraphs:
408 404 # presume indentation that survives dedent is meaningful formatting,
409 405 # so don't fill unless text is flush.
410 406 if indent_re.search(p) is None:
411 407 # wrap paragraph
412 408 p = textwrap.fill(p, ncols)
413 409 out_ps.append(p)
414 410 return out_ps
415 411
416 412
417 413 def long_substr(data):
418 414 """Return the longest common substring in a list of strings.
419 415
420 416 Credit: http://stackoverflow.com/questions/2892931/longest-common-substring-from-more-than-two-strings-python
421 417 """
422 418 substr = ''
423 419 if len(data) > 1 and len(data[0]) > 0:
424 420 for i in range(len(data[0])):
425 421 for j in range(len(data[0])-i+1):
426 422 if j > len(substr) and all(data[0][i:i+j] in x for x in data):
427 423 substr = data[0][i:i+j]
428 424 elif len(data) == 1:
429 425 substr = data[0]
430 426 return substr
431 427
432 428
433 429 def strip_email_quotes(text):
434 430 """Strip leading email quotation characters ('>').
435 431
436 432 Removes any combination of leading '>' interspersed with whitespace that
437 433 appears *identically* in all lines of the input text.
438 434
439 435 Parameters
440 436 ----------
441 437 text : str
442 438
443 439 Examples
444 440 --------
445 441
446 442 Simple uses::
447 443
448 444 In [2]: strip_email_quotes('> > text')
449 445 Out[2]: 'text'
450 446
451 447 In [3]: strip_email_quotes('> > text\\n> > more')
452 448 Out[3]: 'text\\nmore'
453 449
454 450 Note how only the common prefix that appears in all lines is stripped::
455 451
456 452 In [4]: strip_email_quotes('> > text\\n> > more\\n> more...')
457 453 Out[4]: '> text\\n> more\\nmore...'
458 454
459 455 So if any line has no quote marks ('>') , then none are stripped from any
460 456 of them ::
461 457
462 458 In [5]: strip_email_quotes('> > text\\n> > more\\nlast different')
463 459 Out[5]: '> > text\\n> > more\\nlast different'
464 460 """
465 461 lines = text.splitlines()
466 462 matches = set()
467 463 for line in lines:
468 464 prefix = re.match(r'^(\s*>[ >]*)', line)
469 465 if prefix:
470 466 matches.add(prefix.group(1))
471 467 else:
472 468 break
473 469 else:
474 470 prefix = long_substr(list(matches))
475 471 if prefix:
476 472 strip = len(prefix)
477 473 text = '\n'.join([ ln[strip:] for ln in lines])
478 474 return text
479 475
480 476
481 477 class EvalFormatter(Formatter):
482 478 """A String Formatter that allows evaluation of simple expressions.
483 479
484 480 Note that this version interprets a : as specifying a format string (as per
485 481 standard string formatting), so if slicing is required, you must explicitly
486 482 create a slice.
487 483
488 484 This is to be used in templating cases, such as the parallel batch
489 485 script templates, where simple arithmetic on arguments is useful.
490 486
491 487 Examples
492 488 --------
493 489
494 490 In [1]: f = EvalFormatter()
495 491 In [2]: f.format('{n//4}', n=8)
496 492 Out [2]: '2'
497 493
498 494 In [3]: f.format("{greeting[slice(2,4)]}", greeting="Hello")
499 495 Out [3]: 'll'
500 496 """
501 497 def get_field(self, name, args, kwargs):
502 498 v = eval(name, kwargs)
503 499 return v, name
504 500
505 501
506 502 @skip_doctest_py3
507 503 class FullEvalFormatter(Formatter):
508 504 """A String Formatter that allows evaluation of simple expressions.
509 505
510 506 Any time a format key is not found in the kwargs,
511 507 it will be tried as an expression in the kwargs namespace.
512 508
513 509 Note that this version allows slicing using [1:2], so you cannot specify
514 510 a format string. Use :class:`EvalFormatter` to permit format strings.
515 511
516 512 Examples
517 513 --------
518 514
519 515 In [1]: f = FullEvalFormatter()
520 516 In [2]: f.format('{n//4}', n=8)
521 517 Out[2]: u'2'
522 518
523 519 In [3]: f.format('{list(range(5))[2:4]}')
524 520 Out[3]: u'[2, 3]'
525 521
526 522 In [4]: f.format('{3*2}')
527 523 Out[4]: u'6'
528 524 """
529 525 # copied from Formatter._vformat with minor changes to allow eval
530 526 # and replace the format_spec code with slicing
531 527 def _vformat(self, format_string, args, kwargs, used_args, recursion_depth):
532 528 if recursion_depth < 0:
533 529 raise ValueError('Max string recursion exceeded')
534 530 result = []
535 531 for literal_text, field_name, format_spec, conversion in \
536 532 self.parse(format_string):
537 533
538 534 # output the literal text
539 535 if literal_text:
540 536 result.append(literal_text)
541 537
542 538 # if there's a field, output it
543 539 if field_name is not None:
544 540 # this is some markup, find the object and do
545 541 # the formatting
546 542
547 543 if format_spec:
548 544 # override format spec, to allow slicing:
549 545 field_name = ':'.join([field_name, format_spec])
550 546
551 547 # eval the contents of the field for the object
552 548 # to be formatted
553 549 obj = eval(field_name, kwargs)
554 550
555 551 # do any conversion on the resulting object
556 552 obj = self.convert_field(obj, conversion)
557 553
558 554 # format the object and append to the result
559 555 result.append(self.format_field(obj, ''))
560 556
561 557 return u''.join(py3compat.cast_unicode(s) for s in result)
562 558
563 559
564 560 @skip_doctest_py3
565 561 class DollarFormatter(FullEvalFormatter):
566 562 """Formatter allowing Itpl style $foo replacement, for names and attribute
567 563 access only. Standard {foo} replacement also works, and allows full
568 564 evaluation of its arguments.
569 565
570 566 Examples
571 567 --------
572 568 In [1]: f = DollarFormatter()
573 569 In [2]: f.format('{n//4}', n=8)
574 570 Out[2]: u'2'
575 571
576 572 In [3]: f.format('23 * 76 is $result', result=23*76)
577 573 Out[3]: u'23 * 76 is 1748'
578 574
579 575 In [4]: f.format('$a or {b}', a=1, b=2)
580 576 Out[4]: u'1 or 2'
581 577 """
582 578 _dollar_pattern = re.compile("(.*?)\$(\$?[\w\.]+)")
583 579 def parse(self, fmt_string):
584 580 for literal_txt, field_name, format_spec, conversion \
585 581 in Formatter.parse(self, fmt_string):
586 582
587 583 # Find $foo patterns in the literal text.
588 584 continue_from = 0
589 585 txt = ""
590 586 for m in self._dollar_pattern.finditer(literal_txt):
591 587 new_txt, new_field = m.group(1,2)
592 588 # $$foo --> $foo
593 589 if new_field.startswith("$"):
594 590 txt += new_txt + new_field
595 591 else:
596 592 yield (txt + new_txt, new_field, "", None)
597 593 txt = ""
598 594 continue_from = m.end()
599 595
600 596 # Re-yield the {foo} style pattern
601 597 yield (txt + literal_txt[continue_from:], field_name, format_spec, conversion)
602 598
603 599 #-----------------------------------------------------------------------------
604 600 # Utils to columnize a list of string
605 601 #-----------------------------------------------------------------------------
606 602
607 603 def _chunks(l, n):
608 604 """Yield successive n-sized chunks from l."""
609 605 for i in xrange(0, len(l), n):
610 606 yield l[i:i+n]
611 607
612 608
613 609 def _find_optimal(rlist , separator_size=2 , displaywidth=80):
614 610 """Calculate optimal info to columnize a list of string"""
615 611 for nrow in range(1, len(rlist)+1) :
616 612 chk = map(max,_chunks(rlist, nrow))
617 613 sumlength = sum(chk)
618 614 ncols = len(chk)
619 615 if sumlength+separator_size*(ncols-1) <= displaywidth :
620 616 break;
621 617 return {'columns_numbers' : ncols,
622 618 'optimal_separator_width':(displaywidth - sumlength)/(ncols-1) if (ncols -1) else 0,
623 619 'rows_numbers' : nrow,
624 620 'columns_width' : chk
625 621 }
626 622
627 623
628 624 def _get_or_default(mylist, i, default=None):
629 625 """return list item number, or default if don't exist"""
630 626 if i >= len(mylist):
631 627 return default
632 628 else :
633 629 return mylist[i]
634 630
635 631
636 632 @skip_doctest
637 633 def compute_item_matrix(items, empty=None, *args, **kwargs) :
638 634 """Returns a nested list, and info to columnize items
639 635
640 636 Parameters
641 637 ----------
642 638
643 639 items :
644 640 list of strings to columize
645 641 empty : (default None)
646 642 default value to fill list if needed
647 643 separator_size : int (default=2)
648 644 How much caracters will be used as a separation between each columns.
649 645 displaywidth : int (default=80)
650 646 The width of the area onto wich the columns should enter
651 647
652 648 Returns
653 649 -------
654 650
655 651 Returns a tuple of (strings_matrix, dict_info)
656 652
657 653 strings_matrix :
658 654
659 655 nested list of string, the outer most list contains as many list as
660 656 rows, the innermost lists have each as many element as colums. If the
661 657 total number of elements in `items` does not equal the product of
662 658 rows*columns, the last element of some lists are filled with `None`.
663 659
664 660 dict_info :
665 661 some info to make columnize easier:
666 662
667 663 columns_numbers : number of columns
668 664 rows_numbers : number of rows
669 665 columns_width : list of with of each columns
670 666 optimal_separator_width : best separator width between columns
671 667
672 668 Examples
673 669 --------
674 670
675 671 In [1]: l = ['aaa','b','cc','d','eeeee','f','g','h','i','j','k','l']
676 672 ...: compute_item_matrix(l,displaywidth=12)
677 673 Out[1]:
678 674 ([['aaa', 'f', 'k'],
679 675 ['b', 'g', 'l'],
680 676 ['cc', 'h', None],
681 677 ['d', 'i', None],
682 678 ['eeeee', 'j', None]],
683 679 {'columns_numbers': 3,
684 680 'columns_width': [5, 1, 1],
685 681 'optimal_separator_width': 2,
686 682 'rows_numbers': 5})
687 683
688 684 """
689 685 info = _find_optimal(map(len, items), *args, **kwargs)
690 686 nrow, ncol = info['rows_numbers'], info['columns_numbers']
691 687 return ([[ _get_or_default(items, c*nrow+i, default=empty) for c in range(ncol) ] for i in range(nrow) ], info)
692 688
693 689
694 690 def columnize(items, separator=' ', displaywidth=80):
695 691 """ Transform a list of strings into a single string with columns.
696 692
697 693 Parameters
698 694 ----------
699 695 items : sequence of strings
700 696 The strings to process.
701 697
702 698 separator : str, optional [default is two spaces]
703 699 The string that separates columns.
704 700
705 701 displaywidth : int, optional [default is 80]
706 702 Width of the display in number of characters.
707 703
708 704 Returns
709 705 -------
710 706 The formatted string.
711 707 """
712 708 if not items :
713 709 return '\n'
714 710 matrix, info = compute_item_matrix(items, separator_size=len(separator), displaywidth=displaywidth)
715 711 fmatrix = [filter(None, x) for x in matrix]
716 712 sjoin = lambda x : separator.join([ y.ljust(w, ' ') for y, w in zip(x, info['columns_width'])])
717 713 return '\n'.join(map(sjoin, fmatrix))+'\n'
@@ -1,47 +1,46 b''
1 1 """utilities for checking zmq versions"""
2 2 #-----------------------------------------------------------------------------
3 3 # Copyright (C) 2013 The IPython Development Team
4 4 #
5 5 # Distributed under the terms of the BSD License. The full license is in
6 6 # the file COPYING.txt, distributed as part of this software.
7 7 #-----------------------------------------------------------------------------
8 8
9 9 #-----------------------------------------------------------------------------
10 10 # Verify zmq version dependency >= 2.1.11
11 11 #-----------------------------------------------------------------------------
12 12
13 import warnings
14 13 from IPython.utils.version import check_version
15 14
16 15
17 16 def patch_pyzmq():
18 17 """backport a few patches from newer pyzmq
19 18
20 19 These can be removed as we bump our minimum pyzmq version
21 20 """
22 21
23 22 import zmq
24 23
25 24 # fallback on stdlib json if jsonlib is selected, because jsonlib breaks things.
26 25 # jsonlib support is removed from pyzmq >= 2.2.0
27 26
28 27 from zmq.utils import jsonapi
29 28 if jsonapi.jsonmod.__name__ == 'jsonlib':
30 29 import json
31 30 jsonapi.jsonmod = json
32 31
33 32
34 33 def check_for_zmq(minimum_version, required_by='Someone'):
35 34 try:
36 35 import zmq
37 36 except ImportError:
38 37 raise ImportError("%s requires pyzmq >= %s"%(required_by, minimum_version))
39 38
40 39 patch_pyzmq()
41 40
42 41 pyzmq_version = zmq.__version__
43 42
44 43 if not check_version(pyzmq_version, minimum_version):
45 44 raise ImportError("%s requires pyzmq >= %s, but you have %s"%(
46 45 required_by, minimum_version, pyzmq_version))
47 46
General Comments 0
You need to be logged in to leave comments. Login now