##// END OF EJS Templates
format again
format again

File last commit:

r29024:8fb4b969
r29024:8fb4b969
Show More
_process_win32.py
211 lines | 6.5 KiB | text/x-python | PythonLexer
"""Windows-specific implementation of process utilities.
This file is only meant to be imported by process.py, not by end-users.
"""
import ctypes
import os
import subprocess
import sys
import time
from ctypes import POINTER, c_int
from ctypes.wintypes import HLOCAL, LPCWSTR
from subprocess import STDOUT
from threading import Thread
from types import TracebackType
from typing import List, Optional
from . import py3compat
from ._process_common import arg_split as py_arg_split
from ._process_common import process_handler, read_no_interrupt
from .encoding import DEFAULT_ENCODING
class AvoidUNCPath:
"""A context manager to protect command execution from UNC paths.
In the Win32 API, commands can't be invoked with the cwd being a UNC path.
This context manager temporarily changes directory to the 'C:' drive on
entering, and restores the original working directory on exit.
The context manager returns the starting working directory *if* it made a
change and None otherwise, so that users can apply the necessary adjustment
to their system calls in the event of a change.
Examples
--------
::
cmd = 'dir'
with AvoidUNCPath() as path:
if path is not None:
cmd = '"pushd %s &&"%s' % (path, cmd)
os.system(cmd)
"""
def __enter__(self) -> Optional[str]:
self.path = os.getcwd()
self.is_unc_path = self.path.startswith(r"\\")
if self.is_unc_path:
# change to c drive (as cmd.exe cannot handle UNC addresses)
os.chdir("C:")
return self.path
else:
# We return None to signal that there was no change in the working
# directory
return None
def __exit__(
self,
exc_type: Optional[type[BaseException]],
exc_value: Optional[BaseException],
traceback: TracebackType,
) -> None:
if self.is_unc_path:
os.chdir(self.path)
def _system_body(p: subprocess.Popen) -> int:
"""Callback for _system."""
enc = DEFAULT_ENCODING
# Dec 2024: in both of these functions, I'm not sure why we .splitlines()
# the bytes and then decode each line individually instead of just decoding
# the whole thing at once.
def stdout_read() -> None:
try:
assert p.stdout is not None
for byte_line in read_no_interrupt(p.stdout).splitlines():
line = byte_line.decode(enc, "replace")
print(line, file=sys.stdout)
except Exception as e:
print(f"Error reading stdout: {e}", file=sys.stderr)
def stderr_read() -> None:
try:
assert p.stderr is not None
for byte_line in read_no_interrupt(p.stderr).splitlines():
line = byte_line.decode(enc, "replace")
print(line, file=sys.stderr)
except Exception as e:
print(f"Error reading stderr: {e}", file=sys.stderr)
stdout_thread = Thread(target=stdout_read)
stderr_thread = Thread(target=stderr_read)
stdout_thread.start()
stderr_thread.start()
# Wait to finish for returncode. Unfortunately, Python has a bug where
# wait() isn't interruptible (https://bugs.python.org/issue28168) so poll in
# a loop instead of just doing `return p.wait()`
while True:
result = p.poll()
if result is None:
time.sleep(0.01)
else:
break
# Join the threads to ensure they complete before returning
stdout_thread.join()
stderr_thread.join()
return result
def system(cmd: str) -> Optional[int]:
"""Win32 version of os.system() that works with network shares.
Note that this implementation returns None, as meant for use in IPython.
Parameters
----------
cmd : str or list
A command to be executed in the system shell.
Returns
-------
int : child process' exit code.
"""
# The controller provides interactivity with both
# stdin and stdout
# import _process_win32_controller
# _process_win32_controller.system(cmd)
with AvoidUNCPath() as path:
if path is not None:
cmd = '"pushd %s &&"%s' % (path, cmd)
return process_handler(cmd, _system_body)
def getoutput(cmd: str) -> str:
"""Return standard output of executing cmd in a shell.
Accepts the same arguments as os.system().
Parameters
----------
cmd : str or list
A command to be executed in the system shell.
Returns
-------
stdout : str
"""
with AvoidUNCPath() as path:
if path is not None:
cmd = '"pushd %s &&"%s' % (path, cmd)
out = process_handler(cmd, lambda p: p.communicate()[0], STDOUT)
if out is None:
out = b""
return py3compat.decode(out)
try:
windll = ctypes.windll # type: ignore [attr-defined]
CommandLineToArgvW = windll.shell32.CommandLineToArgvW
CommandLineToArgvW.arg_types = [LPCWSTR, POINTER(c_int)]
CommandLineToArgvW.restype = POINTER(LPCWSTR)
LocalFree = windll.kernel32.LocalFree
LocalFree.res_type = HLOCAL
LocalFree.arg_types = [HLOCAL]
def arg_split(
commandline: str, posix: bool = False, strict: bool = True
) -> List[str]:
"""Split a command line's arguments in a shell-like manner.
This is a special version for windows that use a ctypes call to CommandLineToArgvW
to do the argv splitting. The posix parameter is ignored.
If strict=False, process_common.arg_split(...strict=False) is used instead.
"""
# CommandLineToArgvW returns path to executable if called with empty string.
if commandline.strip() == "":
return []
if not strict:
# not really a cl-arg, fallback on _process_common
return py_arg_split(commandline, posix=posix, strict=strict)
argvn = c_int()
result_pointer = CommandLineToArgvW(commandline.lstrip(), ctypes.byref(argvn))
result_array_type = LPCWSTR * argvn.value
result = [
arg
for arg in result_array_type.from_address(
ctypes.addressof(result_pointer.contents)
)
if arg is not None
]
# for side effects
_ = LocalFree(result_pointer)
return result
except AttributeError:
arg_split = py_arg_split
def check_pid(pid: int) -> bool:
# OpenProcess returns 0 if no such process (of ours) exists
# positive int otherwise
return bool(windll.kernel32.OpenProcess(1, 0, pid))