##// END OF EJS Templates
Always free
M Bussonnier -
Show More
@@ -1,211 +1,213
1 1 """Windows-specific implementation of process utilities.
2 2
3 3 This file is only meant to be imported by process.py, not by end-users.
4 4 """
5 5
6 6 import ctypes
7 7 import os
8 8 import subprocess
9 9 import sys
10 10 import time
11 11 from ctypes import POINTER, c_int
12 12 from ctypes.wintypes import HLOCAL, LPCWSTR
13 13 from subprocess import STDOUT
14 14 from threading import Thread
15 15 from types import TracebackType
16 16 from typing import List, Optional
17 17
18 18 from . import py3compat
19 19 from ._process_common import arg_split as py_arg_split
20 20
21 21 from ._process_common import process_handler, read_no_interrupt
22 22 from .encoding import DEFAULT_ENCODING
23 23
24 24
25 25 class AvoidUNCPath:
26 26 """A context manager to protect command execution from UNC paths.
27 27
28 28 In the Win32 API, commands can't be invoked with the cwd being a UNC path.
29 29 This context manager temporarily changes directory to the 'C:' drive on
30 30 entering, and restores the original working directory on exit.
31 31
32 32 The context manager returns the starting working directory *if* it made a
33 33 change and None otherwise, so that users can apply the necessary adjustment
34 34 to their system calls in the event of a change.
35 35
36 36 Examples
37 37 --------
38 38 ::
39 39 cmd = 'dir'
40 40 with AvoidUNCPath() as path:
41 41 if path is not None:
42 42 cmd = '"pushd %s &&"%s' % (path, cmd)
43 43 os.system(cmd)
44 44 """
45 45
46 46 def __enter__(self) -> Optional[str]:
47 47 self.path = os.getcwd()
48 48 self.is_unc_path = self.path.startswith(r"\\")
49 49 if self.is_unc_path:
50 50 # change to c drive (as cmd.exe cannot handle UNC addresses)
51 51 os.chdir("C:")
52 52 return self.path
53 53 else:
54 54 # We return None to signal that there was no change in the working
55 55 # directory
56 56 return None
57 57
58 58 def __exit__(
59 59 self,
60 60 exc_type: Optional[type[BaseException]],
61 61 exc_value: Optional[BaseException],
62 62 traceback: TracebackType,
63 63 ) -> None:
64 64 if self.is_unc_path:
65 65 os.chdir(self.path)
66 66
67 67
68 68 def _system_body(p: subprocess.Popen) -> int:
69 69 """Callback for _system."""
70 70 enc = DEFAULT_ENCODING
71 71
72 72 # Dec 2024: in both of these functions, I'm not sure why we .splitlines()
73 73 # the bytes and then decode each line individually instead of just decoding
74 74 # the whole thing at once.
75 75 def stdout_read() -> None:
76 76 try:
77 77 assert p.stdout is not None
78 78 for byte_line in read_no_interrupt(p.stdout).splitlines():
79 79 line = byte_line.decode(enc, "replace")
80 80 print(line, file=sys.stdout)
81 81 except Exception as e:
82 82 print(f"Error reading stdout: {e}", file=sys.stderr)
83 83
84 84 def stderr_read() -> None:
85 85 try:
86 86 assert p.stderr is not None
87 87 for byte_line in read_no_interrupt(p.stderr).splitlines():
88 88 line = byte_line.decode(enc, "replace")
89 89 print(line, file=sys.stderr)
90 90 except Exception as e:
91 91 print(f"Error reading stderr: {e}", file=sys.stderr)
92 92
93 93 stdout_thread = Thread(target=stdout_read)
94 94 stderr_thread = Thread(target=stderr_read)
95 95
96 96 stdout_thread.start()
97 97 stderr_thread.start()
98 98
99 99 # Wait to finish for returncode. Unfortunately, Python has a bug where
100 100 # wait() isn't interruptible (https://bugs.python.org/issue28168) so poll in
101 101 # a loop instead of just doing `return p.wait()`
102 102 while True:
103 103 result = p.poll()
104 104 if result is None:
105 105 time.sleep(0.01)
106 106 else:
107 107 break
108 108
109 109 # Join the threads to ensure they complete before returning
110 110 stdout_thread.join()
111 111 stderr_thread.join()
112 112
113 113 return result
114 114
115 115
116 116 def system(cmd: str) -> Optional[int]:
117 117 """Win32 version of os.system() that works with network shares.
118 118
119 119 Note that this implementation returns None, as meant for use in IPython.
120 120
121 121 Parameters
122 122 ----------
123 123 cmd : str or list
124 124 A command to be executed in the system shell.
125 125
126 126 Returns
127 127 -------
128 128 int : child process' exit code.
129 129 """
130 130 # The controller provides interactivity with both
131 131 # stdin and stdout
132 132 # import _process_win32_controller
133 133 # _process_win32_controller.system(cmd)
134 134
135 135 with AvoidUNCPath() as path:
136 136 if path is not None:
137 137 cmd = '"pushd %s &&"%s' % (path, cmd)
138 138 return process_handler(cmd, _system_body)
139 139
140 140
141 141 def getoutput(cmd: str) -> str:
142 142 """Return standard output of executing cmd in a shell.
143 143
144 144 Accepts the same arguments as os.system().
145 145
146 146 Parameters
147 147 ----------
148 148 cmd : str or list
149 149 A command to be executed in the system shell.
150 150
151 151 Returns
152 152 -------
153 153 stdout : str
154 154 """
155 155
156 156 with AvoidUNCPath() as path:
157 157 if path is not None:
158 158 cmd = '"pushd %s &&"%s' % (path, cmd)
159 159 out = process_handler(cmd, lambda p: p.communicate()[0], STDOUT)
160 160
161 161 if out is None:
162 162 out = b""
163 163 return py3compat.decode(out)
164 164
165 165
166 166 try:
167 167 windll = ctypes.windll # type: ignore [attr-defined]
168 168 CommandLineToArgvW = windll.shell32.CommandLineToArgvW
169 169 CommandLineToArgvW.arg_types = [LPCWSTR, POINTER(c_int)]
170 170 CommandLineToArgvW.restype = POINTER(LPCWSTR)
171 171 LocalFree = windll.kernel32.LocalFree
172 172 LocalFree.res_type = HLOCAL
173 173 LocalFree.arg_types = [HLOCAL]
174 174
175 175 def arg_split(
176 176 commandline: str, posix: bool = False, strict: bool = True
177 177 ) -> List[str]:
178 178 """Split a command line's arguments in a shell-like manner.
179 179
180 180 This is a special version for windows that use a ctypes call to CommandLineToArgvW
181 181 to do the argv splitting. The posix parameter is ignored.
182 182
183 183 If strict=False, process_common.arg_split(...strict=False) is used instead.
184 184 """
185 185 # CommandLineToArgvW returns path to executable if called with empty string.
186 186 if commandline.strip() == "":
187 187 return []
188 188 if not strict:
189 189 # not really a cl-arg, fallback on _process_common
190 190 return py_arg_split(commandline, posix=posix, strict=strict)
191 191 argvn = c_int()
192 192 result_pointer = CommandLineToArgvW(commandline.lstrip(), ctypes.byref(argvn))
193 result_array_type = LPCWSTR * argvn.value
194 result = [
195 arg
196 for arg in result_array_type.from_address(
197 ctypes.addressof(result_pointer.contents)
198 )
199 if arg is not None
200 ]
201 # for side effects
202 _ = LocalFree(result_pointer)
193 try:
194 result_array_type = LPCWSTR * argvn.value
195 result = [
196 arg
197 for arg in result_array_type.from_address(
198 ctypes.addressof(result_pointer.contents)
199 )
200 if arg is not None
201 ]
202 finally:
203 # for side effects
204 _ = LocalFree(result_pointer)
203 205 return result
204 206 except AttributeError:
205 207 arg_split = py_arg_split
206 208
207 209
208 210 def check_pid(pid: int) -> bool:
209 211 # OpenProcess returns 0 if no such process (of ours) exists
210 212 # positive int otherwise
211 213 return bool(windll.kernel32.OpenProcess(1, 0, pid))
General Comments 0
You need to be logged in to leave comments. Login now