##// END OF EJS Templates
typing: add trivial type hints to rest of the windows platform module...
Matt Harbison -
r50712:2b147671 default
parent child Browse files
Show More
@@ -1,746 +1,750 b''
1 1 # windows.py - Windows utility function implementations for Mercurial
2 2 #
3 3 # Copyright 2005-2009 Olivia Mackall <olivia@selenic.com> and others
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8
9 9 import errno
10 10 import getpass
11 11 import msvcrt # pytype: disable=import-error
12 12 import os
13 13 import re
14 14 import stat
15 15 import string
16 16 import sys
17 17 import winreg # pytype: disable=import-error
18 18
19 19 from typing import (
20 AnyStr,
20 21 BinaryIO,
21 22 Iterable,
22 23 Iterator,
23 24 List,
25 Mapping,
24 26 NoReturn,
25 27 Optional,
28 Pattern,
26 29 Sequence,
27 30 Union,
28 31 )
29 32
30 33 from .i18n import _
31 34 from .pycompat import getattr
32 35 from . import (
33 36 encoding,
34 37 error,
35 38 policy,
36 39 pycompat,
37 40 typelib,
38 41 win32,
39 42 )
40 43
41 44
42 45 osutil = policy.importmod('osutil')
43 46
44 47 getfsmountpoint = win32.getvolumename
45 48 getfstype = win32.getfstype
46 49 getuser = win32.getuser
47 50 hidewindow = win32.hidewindow
48 51 makedir = win32.makedir
49 52 nlinks = win32.nlinks
50 53 oslink = win32.oslink
51 54 samedevice = win32.samedevice
52 55 samefile = win32.samefile
53 56 setsignalhandler = win32.setsignalhandler
54 57 spawndetached = win32.spawndetached
55 58 split = os.path.split
56 59 testpid = win32.testpid
57 60 unlink = win32.unlink
58 61
59 umask = 0o022
62 umask: int = 0o022
60 63
61 64
62 65 class mixedfilemodewrapper:
63 66 """Wraps a file handle when it is opened in read/write mode.
64 67
65 68 fopen() and fdopen() on Windows have a specific-to-Windows requirement
66 69 that files opened with mode r+, w+, or a+ make a call to a file positioning
67 70 function when switching between reads and writes. Without this extra call,
68 71 Python will raise a not very intuitive "IOError: [Errno 0] Error."
69 72
70 73 This class wraps posixfile instances when the file is opened in read/write
71 74 mode and automatically adds checks or inserts appropriate file positioning
72 75 calls when necessary.
73 76 """
74 77
75 78 OPNONE = 0
76 79 OPREAD = 1
77 80 OPWRITE = 2
78 81
79 82 def __init__(self, fp):
80 83 object.__setattr__(self, '_fp', fp)
81 84 object.__setattr__(self, '_lastop', 0)
82 85
83 86 def __enter__(self):
84 87 self._fp.__enter__()
85 88 return self
86 89
87 90 def __exit__(self, exc_type, exc_val, exc_tb):
88 91 self._fp.__exit__(exc_type, exc_val, exc_tb)
89 92
90 93 def __getattr__(self, name):
91 94 return getattr(self._fp, name)
92 95
93 96 def __setattr__(self, name, value):
94 97 return self._fp.__setattr__(name, value)
95 98
96 99 def _noopseek(self):
97 100 self._fp.seek(0, os.SEEK_CUR)
98 101
99 102 def seek(self, *args, **kwargs):
100 103 object.__setattr__(self, '_lastop', self.OPNONE)
101 104 return self._fp.seek(*args, **kwargs)
102 105
103 106 def write(self, d):
104 107 if self._lastop == self.OPREAD:
105 108 self._noopseek()
106 109
107 110 object.__setattr__(self, '_lastop', self.OPWRITE)
108 111 return self._fp.write(d)
109 112
110 113 def writelines(self, *args, **kwargs):
111 114 if self._lastop == self.OPREAD:
112 115 self._noopeseek()
113 116
114 117 object.__setattr__(self, '_lastop', self.OPWRITE)
115 118 return self._fp.writelines(*args, **kwargs)
116 119
117 120 def read(self, *args, **kwargs):
118 121 if self._lastop == self.OPWRITE:
119 122 self._noopseek()
120 123
121 124 object.__setattr__(self, '_lastop', self.OPREAD)
122 125 return self._fp.read(*args, **kwargs)
123 126
124 127 def readline(self, *args, **kwargs):
125 128 if self._lastop == self.OPWRITE:
126 129 self._noopseek()
127 130
128 131 object.__setattr__(self, '_lastop', self.OPREAD)
129 132 return self._fp.readline(*args, **kwargs)
130 133
131 134 def readlines(self, *args, **kwargs):
132 135 if self._lastop == self.OPWRITE:
133 136 self._noopseek()
134 137
135 138 object.__setattr__(self, '_lastop', self.OPREAD)
136 139 return self._fp.readlines(*args, **kwargs)
137 140
138 141
139 142 class fdproxy:
140 143 """Wraps osutil.posixfile() to override the name attribute to reflect the
141 144 underlying file name.
142 145 """
143 146
144 147 def __init__(self, name, fp):
145 148 self.name = name
146 149 self._fp = fp
147 150
148 151 def __enter__(self):
149 152 self._fp.__enter__()
150 153 # Return this wrapper for the context manager so that the name is
151 154 # still available.
152 155 return self
153 156
154 157 def __exit__(self, exc_type, exc_value, traceback):
155 158 self._fp.__exit__(exc_type, exc_value, traceback)
156 159
157 160 def __iter__(self):
158 161 return iter(self._fp)
159 162
160 163 def __getattr__(self, name):
161 164 return getattr(self._fp, name)
162 165
163 166
164 167 def posixfile(name, mode=b'r', buffering=-1):
165 168 '''Open a file with even more POSIX-like semantics'''
166 169 try:
167 170 fp = osutil.posixfile(name, mode, buffering) # may raise WindowsError
168 171
169 172 # PyFile_FromFd() ignores the name, and seems to report fp.name as the
170 173 # underlying file descriptor.
171 174 fp = fdproxy(name, fp)
172 175
173 176 # The position when opening in append mode is implementation defined, so
174 177 # make it consistent with other platforms, which position at EOF.
175 178 if b'a' in mode:
176 179 fp.seek(0, os.SEEK_END)
177 180
178 181 if b'+' in mode:
179 182 return mixedfilemodewrapper(fp)
180 183
181 184 return fp
182 185 except WindowsError as err: # pytype: disable=name-error
183 186 # convert to a friendlier exception
184 187 raise IOError(
185 188 err.errno, '%s: %s' % (encoding.strfromlocal(name), err.strerror)
186 189 )
187 190
188 191
189 192 # may be wrapped by win32mbcs extension
190 193 listdir = osutil.listdir
191 194
192 195
193 196 def get_password() -> bytes:
194 197 """Prompt for password with echo off, using Windows getch().
195 198
196 199 This shouldn't be called directly- use ``ui.getpass()`` instead, which
197 200 checks if the session is interactive first.
198 201 """
199 202 pw = u""
200 203 while True:
201 204 c = msvcrt.getwch() # pytype: disable=module-attr
202 205 if c == u'\r' or c == u'\n':
203 206 break
204 207 if c == u'\003':
205 208 raise KeyboardInterrupt
206 209 if c == u'\b':
207 210 pw = pw[:-1]
208 211 else:
209 212 pw = pw + c
210 213 msvcrt.putwch(u'\r') # pytype: disable=module-attr
211 214 msvcrt.putwch(u'\n') # pytype: disable=module-attr
212 215 return encoding.unitolocal(pw)
213 216
214 217
215 218 class winstdout(typelib.BinaryIO_Proxy):
216 219 """Some files on Windows misbehave.
217 220
218 221 When writing to a broken pipe, EINVAL instead of EPIPE may be raised.
219 222
220 223 When writing too many bytes to a console at the same, a "Not enough space"
221 224 error may happen. Python 3 already works around that.
222 225 """
223 226
224 227 def __init__(self, fp: BinaryIO):
225 228 self.fp = fp
226 229
227 230 def __getattr__(self, key):
228 231 return getattr(self.fp, key)
229 232
230 233 def close(self):
231 234 try:
232 235 self.fp.close()
233 236 except IOError:
234 237 pass
235 238
236 239 def write(self, s):
237 240 try:
238 241 return self.fp.write(s)
239 242 except IOError as inst:
240 243 if inst.errno != 0 and not win32.lasterrorwaspipeerror(inst):
241 244 raise
242 245 self.close()
243 246 raise IOError(errno.EPIPE, 'Broken pipe')
244 247
245 248 def flush(self):
246 249 try:
247 250 return self.fp.flush()
248 251 except IOError as inst:
249 252 if not win32.lasterrorwaspipeerror(inst):
250 253 raise
251 254 raise IOError(errno.EPIPE, 'Broken pipe')
252 255
253 256
254 257 def openhardlinks() -> bool:
255 258 return True
256 259
257 260
258 261 def parsepatchoutput(output_line: bytes) -> bytes:
259 262 """parses the output produced by patch and returns the filename"""
260 263 pf = output_line[14:]
261 264 if pf[0] == b'`':
262 265 pf = pf[1:-1] # Remove the quotes
263 266 return pf
264 267
265 268
266 269 def sshargs(
267 270 sshcmd: bytes, host: bytes, user: Optional[bytes], port: Optional[bytes]
268 271 ) -> bytes:
269 272 '''Build argument list for ssh or Plink'''
270 273 pflag = b'plink' in sshcmd.lower() and b'-P' or b'-p'
271 274 args = user and (b"%s@%s" % (user, host)) or host
272 275 if args.startswith(b'-') or args.startswith(b'/'):
273 276 raise error.Abort(
274 277 _(b'illegal ssh hostname or username starting with - or /: %s')
275 278 % args
276 279 )
277 280 args = shellquote(args)
278 281 if port:
279 282 args = b'%s %s %s' % (pflag, shellquote(port), args)
280 283 return args
281 284
282 285
283 286 def setflags(f: bytes, l: bool, x: bool) -> None:
284 287 pass
285 288
286 289
287 290 def copymode(
288 291 src: bytes,
289 292 dst: bytes,
290 293 mode: Optional[bytes] = None,
291 294 enforcewritable: bool = False,
292 295 ) -> None:
293 296 pass
294 297
295 298
296 299 def checkexec(path: bytes) -> bool:
297 300 return False
298 301
299 302
300 303 def checklink(path: bytes) -> bool:
301 304 return False
302 305
303 306
304 307 def setbinary(fd) -> None:
305 308 # When run without console, pipes may expose invalid
306 309 # fileno(), usually set to -1.
307 310 fno = getattr(fd, 'fileno', None)
308 311 if fno is not None and fno() >= 0:
309 312 msvcrt.setmode(fno(), os.O_BINARY) # pytype: disable=module-attr
310 313
311 314
312 315 def pconvert(path: bytes) -> bytes:
313 316 return path.replace(pycompat.ossep, b'/')
314 317
315 318
316 319 def localpath(path: bytes) -> bytes:
317 320 return path.replace(b'/', b'\\')
318 321
319 322
320 def normpath(path):
323 def normpath(path: bytes) -> bytes:
321 324 return pconvert(os.path.normpath(path))
322 325
323 326
324 327 def normcase(path: bytes) -> bytes:
325 328 return encoding.upper(path) # NTFS compares via upper()
326 329
327 330
328 DRIVE_RE_B = re.compile(b'^[a-z]:')
329 DRIVE_RE_S = re.compile('^[a-z]:')
331 DRIVE_RE_B: Pattern[bytes] = re.compile(b'^[a-z]:')
332 DRIVE_RE_S: Pattern[str] = re.compile('^[a-z]:')
330 333
331 334
332 def abspath(path):
335 # TODO: why is this accepting str?
336 def abspath(path: AnyStr) -> AnyStr:
333 337 abs_path = os.path.abspath(path) # re-exports
334 338 # Python on Windows is inconsistent regarding the capitalization of drive
335 339 # letter and this cause issue with various path comparison along the way.
336 340 # So we normalize the drive later to upper case here.
337 341 #
338 342 # See https://bugs.python.org/issue40368 for and example of this hell.
339 343 if isinstance(abs_path, bytes):
340 344 if DRIVE_RE_B.match(abs_path):
341 345 abs_path = abs_path[0:1].upper() + abs_path[1:]
342 346 elif DRIVE_RE_S.match(abs_path):
343 347 abs_path = abs_path[0:1].upper() + abs_path[1:]
344 348 return abs_path
345 349
346 350
347 351 # see posix.py for definitions
348 normcasespec = encoding.normcasespecs.upper
352 normcasespec: int = encoding.normcasespecs.upper
349 353 normcasefallback = encoding.upperfallback
350 354
351 355
352 def samestat(s1, s2):
356 def samestat(s1: os.stat_result, s2: os.stat_result) -> bool:
353 357 return False
354 358
355 359
356 def shelltocmdexe(path, env):
360 def shelltocmdexe(path: bytes, env: Mapping[bytes, bytes]) -> bytes:
357 361 r"""Convert shell variables in the form $var and ${var} inside ``path``
358 362 to %var% form. Existing Windows style variables are left unchanged.
359 363
360 364 The variables are limited to the given environment. Unknown variables are
361 365 left unchanged.
362 366
363 367 >>> e = {b'var1': b'v1', b'var2': b'v2', b'var3': b'v3'}
364 368 >>> # Only valid values are expanded
365 369 >>> shelltocmdexe(b'cmd $var1 ${var2} %var3% $missing ${missing} %missing%',
366 370 ... e)
367 371 'cmd %var1% %var2% %var3% $missing ${missing} %missing%'
368 372 >>> # Single quote prevents expansion, as does \$ escaping
369 373 >>> shelltocmdexe(b"cmd '$var1 ${var2} %var3%' \$var1 \${var2} \\", e)
370 374 'cmd "$var1 ${var2} %var3%" $var1 ${var2} \\'
371 375 >>> # $$ is not special. %% is not special either, but can be the end and
372 376 >>> # start of consecutive variables
373 377 >>> shelltocmdexe(b"cmd $$ %% %var1%%var2%", e)
374 378 'cmd $$ %% %var1%%var2%'
375 379 >>> # No double substitution
376 380 >>> shelltocmdexe(b"$var1 %var1%", {b'var1': b'%var2%', b'var2': b'boom'})
377 381 '%var1% %var1%'
378 382 >>> # Tilde expansion
379 383 >>> shelltocmdexe(b"~/dir ~\dir2 ~tmpfile \~/", {})
380 384 '%USERPROFILE%/dir %USERPROFILE%\\dir2 ~tmpfile ~/'
381 385 """
382 386 if not any(c in path for c in b"$'~"):
383 387 return path
384 388
385 389 varchars = pycompat.sysbytes(string.ascii_letters + string.digits) + b'_-'
386 390
387 391 res = b''
388 392 index = 0
389 393 pathlen = len(path)
390 394 while index < pathlen:
391 395 c = path[index : index + 1]
392 396 if c == b'\'': # no expansion within single quotes
393 397 path = path[index + 1 :]
394 398 pathlen = len(path)
395 399 try:
396 400 index = path.index(b'\'')
397 401 res += b'"' + path[:index] + b'"'
398 402 except ValueError:
399 403 res += c + path
400 404 index = pathlen - 1
401 405 elif c == b'%': # variable
402 406 path = path[index + 1 :]
403 407 pathlen = len(path)
404 408 try:
405 409 index = path.index(b'%')
406 410 except ValueError:
407 411 res += b'%' + path
408 412 index = pathlen - 1
409 413 else:
410 414 var = path[:index]
411 415 res += b'%' + var + b'%'
412 416 elif c == b'$': # variable
413 417 if path[index + 1 : index + 2] == b'{':
414 418 path = path[index + 2 :]
415 419 pathlen = len(path)
416 420 try:
417 421 index = path.index(b'}')
418 422 var = path[:index]
419 423
420 424 # See below for why empty variables are handled specially
421 425 if env.get(var, b'') != b'':
422 426 res += b'%' + var + b'%'
423 427 else:
424 428 res += b'${' + var + b'}'
425 429 except ValueError:
426 430 res += b'${' + path
427 431 index = pathlen - 1
428 432 else:
429 433 var = b''
430 434 index += 1
431 435 c = path[index : index + 1]
432 436 while c != b'' and c in varchars:
433 437 var += c
434 438 index += 1
435 439 c = path[index : index + 1]
436 440 # Some variables (like HG_OLDNODE) may be defined, but have an
437 441 # empty value. Those need to be skipped because when spawning
438 442 # cmd.exe to run the hook, it doesn't replace %VAR% for an empty
439 443 # VAR, and that really confuses things like revset expressions.
440 444 # OTOH, if it's left in Unix format and the hook runs sh.exe, it
441 445 # will substitute to an empty string, and everything is happy.
442 446 if env.get(var, b'') != b'':
443 447 res += b'%' + var + b'%'
444 448 else:
445 449 res += b'$' + var
446 450
447 451 if c != b'':
448 452 index -= 1
449 453 elif (
450 454 c == b'~'
451 455 and index + 1 < pathlen
452 456 and path[index + 1 : index + 2] in (b'\\', b'/')
453 457 ):
454 458 res += b"%USERPROFILE%"
455 459 elif (
456 460 c == b'\\'
457 461 and index + 1 < pathlen
458 462 and path[index + 1 : index + 2] in (b'$', b'~')
459 463 ):
460 464 # Skip '\', but only if it is escaping $ or ~
461 465 res += path[index + 1 : index + 2]
462 466 index += 1
463 467 else:
464 468 res += c
465 469
466 470 index += 1
467 471 return res
468 472
469 473
470 474 # A sequence of backslashes is special iff it precedes a double quote:
471 475 # - if there's an even number of backslashes, the double quote is not
472 476 # quoted (i.e. it ends the quoted region)
473 477 # - if there's an odd number of backslashes, the double quote is quoted
474 478 # - in both cases, every pair of backslashes is unquoted into a single
475 479 # backslash
476 480 # (See http://msdn2.microsoft.com/en-us/library/a1y7w461.aspx )
477 481 # So, to quote a string, we must surround it in double quotes, double
478 482 # the number of backslashes that precede double quotes and add another
479 483 # backslash before every double quote (being careful with the double
480 484 # quote we've appended to the end)
481 _quotere = None
485 _quotere: Optional[Pattern[bytes]] = None
482 486 _needsshellquote = None
483 487
484 488
485 489 def shellquote(s: bytes) -> bytes:
486 490 r"""
487 491 >>> shellquote(br'C:\Users\xyz')
488 492 '"C:\\Users\\xyz"'
489 493 >>> shellquote(br'C:\Users\xyz/mixed')
490 494 '"C:\\Users\\xyz/mixed"'
491 495 >>> # Would be safe not to quote too, since it is all double backslashes
492 496 >>> shellquote(br'C:\\Users\\xyz')
493 497 '"C:\\\\Users\\\\xyz"'
494 498 >>> # But this must be quoted
495 499 >>> shellquote(br'C:\\Users\\xyz/abc')
496 500 '"C:\\\\Users\\\\xyz/abc"'
497 501 """
498 502 global _quotere
499 503 if _quotere is None:
500 504 _quotere = re.compile(br'(\\*)("|\\$)')
501 505 global _needsshellquote
502 506 if _needsshellquote is None:
503 507 # ":" is also treated as "safe character", because it is used as a part
504 508 # of path name on Windows. "\" is also part of a path name, but isn't
505 509 # safe because shlex.split() (kind of) treats it as an escape char and
506 510 # drops it. It will leave the next character, even if it is another
507 511 # "\".
508 512 _needsshellquote = re.compile(br'[^a-zA-Z0-9._:/-]').search
509 513 if s and not _needsshellquote(s) and not _quotere.search(s):
510 514 # "s" shouldn't have to be quoted
511 515 return s
512 516 return b'"%s"' % _quotere.sub(br'\1\1\\\2', s)
513 517
514 518
515 def _unquote(s):
519 def _unquote(s: bytes) -> bytes:
516 520 if s.startswith(b'"') and s.endswith(b'"'):
517 521 return s[1:-1]
518 522 return s
519 523
520 524
521 525 def shellsplit(s: bytes) -> List[bytes]:
522 526 """Parse a command string in cmd.exe way (best-effort)"""
523 527 return pycompat.maplist(_unquote, pycompat.shlexsplit(s, posix=False))
524 528
525 529
526 530 # if you change this stub into a real check, please try to implement the
527 531 # username and groupname functions above, too.
528 532 def isowner(st: os.stat_result) -> bool:
529 533 return True
530 534
531 535
532 536 def findexe(command: bytes) -> Optional[bytes]:
533 537 """Find executable for command searching like cmd.exe does.
534 538 If command is a basename then PATH is searched for command.
535 539 PATH isn't searched if command is an absolute or relative path.
536 540 An extension from PATHEXT is found and added if not present.
537 541 If command isn't found None is returned."""
538 542 pathext = encoding.environ.get(b'PATHEXT', b'.COM;.EXE;.BAT;.CMD')
539 543 pathexts = [ext for ext in pathext.lower().split(pycompat.ospathsep)]
540 544 if os.path.splitext(command)[1].lower() in pathexts:
541 545 pathexts = [b'']
542 546
543 547 def findexisting(pathcommand: bytes) -> Optional[bytes]:
544 548 """Will append extension (if needed) and return existing file"""
545 549 for ext in pathexts:
546 550 executable = pathcommand + ext
547 551 if os.path.exists(executable):
548 552 return executable
549 553 return None
550 554
551 555 if pycompat.ossep in command:
552 556 return findexisting(command)
553 557
554 558 for path in encoding.environ.get(b'PATH', b'').split(pycompat.ospathsep):
555 559 executable = findexisting(os.path.join(path, command))
556 560 if executable is not None:
557 561 return executable
558 562 return findexisting(os.path.expanduser(os.path.expandvars(command)))
559 563
560 564
561 565 _wantedkinds = {stat.S_IFREG, stat.S_IFLNK}
562 566
563 567
564 568 def statfiles(files: Sequence[bytes]) -> Iterator[Optional[os.stat_result]]:
565 569 """Stat each file in files. Yield each stat, or None if a file
566 570 does not exist or has a type we don't care about.
567 571
568 572 Cluster and cache stat per directory to minimize number of OS stat calls."""
569 573 dircache = {} # dirname -> filename -> status | None if file does not exist
570 574 getkind = stat.S_IFMT
571 575 for nf in files:
572 576 nf = normcase(nf)
573 577 dir, base = os.path.split(nf)
574 578 if not dir:
575 579 dir = b'.'
576 580 cache = dircache.get(dir, None)
577 581 if cache is None:
578 582 try:
579 583 dmap = {
580 584 normcase(n): s
581 585 for n, k, s in listdir(dir, True)
582 586 if getkind(s.st_mode) in _wantedkinds
583 587 }
584 588 except (FileNotFoundError, NotADirectoryError):
585 589 dmap = {}
586 590 cache = dircache.setdefault(dir, dmap)
587 591 yield cache.get(base, None)
588 592
589 593
590 594 def username(uid: Optional[int] = None) -> Optional[bytes]:
591 595 """Return the name of the user with the given uid.
592 596
593 597 If uid is None, return the name of the current user."""
594 598 if not uid:
595 599 try:
596 600 return pycompat.fsencode(getpass.getuser())
597 601 except ModuleNotFoundError:
598 602 # getpass.getuser() checks for a few environment variables first,
599 603 # but if those aren't set, imports pwd and calls getpwuid(), none of
600 604 # which exists on Windows.
601 605 pass
602 606 return None
603 607
604 608
605 609 def groupname(gid: Optional[int] = None) -> Optional[bytes]:
606 610 """Return the name of the group with the given gid.
607 611
608 612 If gid is None, return the name of the current group."""
609 613 return None
610 614
611 615
612 def readlink(pathname):
616 def readlink(pathname: bytes) -> bytes:
613 617 path = pycompat.fsdecode(pathname)
614 618 try:
615 619 link = os.readlink(path)
616 620 except ValueError as e:
617 621 # On py2, os.readlink() raises an AttributeError since it is
618 622 # unsupported. On py3, reading a non-link raises a ValueError. Simply
619 623 # treat this as the error the locking code has been expecting up to now
620 624 # until an effort can be made to enable symlink support on Windows.
621 625 raise AttributeError(e)
622 626 return pycompat.fsencode(link)
623 627
624 628
625 def removedirs(name):
629 def removedirs(name: bytes) -> None:
626 630 """special version of os.removedirs that does not remove symlinked
627 631 directories or junction points if they actually contain files"""
628 632 if listdir(name):
629 633 return
630 634 os.rmdir(name)
631 635 head, tail = os.path.split(name)
632 636 if not tail:
633 637 head, tail = os.path.split(head)
634 638 while head and tail:
635 639 try:
636 640 if listdir(head):
637 641 return
638 642 os.rmdir(head)
639 643 except (ValueError, OSError):
640 644 break
641 645 head, tail = os.path.split(head)
642 646
643 647
644 def rename(src, dst):
648 def rename(src: bytes, dst: bytes) -> None:
645 649 '''atomically rename file src to dst, replacing dst if it exists'''
646 650 try:
647 651 os.rename(src, dst)
648 652 except FileExistsError:
649 653 unlink(dst)
650 654 os.rename(src, dst)
651 655
652 656
653 def gethgcmd():
657 def gethgcmd() -> List[bytes]:
654 658 return [encoding.strtolocal(arg) for arg in [sys.executable] + sys.argv[:1]]
655 659
656 660
657 661 def groupmembers(name: bytes) -> List[bytes]:
658 662 # Don't support groups on Windows for now
659 663 raise KeyError
660 664
661 665
662 666 def isexec(f: bytes) -> bool:
663 667 return False
664 668
665 669
666 670 class cachestat:
667 671 def __init__(self, path: bytes) -> None:
668 672 pass
669 673
670 674 def cacheable(self) -> bool:
671 675 return False
672 676
673 677
674 678 def lookupreg(
675 679 key: bytes,
676 680 valname: Optional[bytes] = None,
677 681 scope: Optional[Union[int, Iterable[int]]] = None,
678 682 ) -> Optional[bytes]:
679 683 """Look up a key/value name in the Windows registry.
680 684
681 685 valname: value name. If unspecified, the default value for the key
682 686 is used.
683 687 scope: optionally specify scope for registry lookup, this can be
684 688 a sequence of scopes to look up in order. Default (CURRENT_USER,
685 689 LOCAL_MACHINE).
686 690 """
687 691 if scope is None:
688 692 # pytype: disable=module-attr
689 693 scope = (winreg.HKEY_CURRENT_USER, winreg.HKEY_LOCAL_MACHINE)
690 694 # pytype: enable=module-attr
691 695 elif not isinstance(scope, (list, tuple)):
692 696 scope = (scope,)
693 697 for s in scope:
694 698 try:
695 699 # pytype: disable=module-attr
696 700 with winreg.OpenKey(s, encoding.strfromlocal(key)) as hkey:
697 701 # pytype: enable=module-attr
698 702 name = None
699 703 if valname is not None:
700 704 name = encoding.strfromlocal(valname)
701 705 # pytype: disable=module-attr
702 706 val = winreg.QueryValueEx(hkey, name)[0]
703 707 # pytype: enable=module-attr
704 708
705 709 # never let a Unicode string escape into the wild
706 710 return encoding.unitolocal(val)
707 711 except EnvironmentError:
708 712 pass
709 713
710 714
711 expandglobs = True
715 expandglobs: bool = True
712 716
713 717
714 718 def statislink(st: Optional[os.stat_result]) -> bool:
715 719 '''check whether a stat result is a symlink'''
716 720 return False
717 721
718 722
719 723 def statisexec(st: Optional[os.stat_result]) -> bool:
720 724 '''check whether a stat result is an executable file'''
721 725 return False
722 726
723 727
724 def poll(fds):
728 def poll(fds) -> List:
725 729 # see posix.py for description
726 730 raise NotImplementedError()
727 731
728 732
729 733 def readpipe(pipe) -> bytes:
730 734 """Read all available data from a pipe."""
731 735 chunks = []
732 736 while True:
733 737 size = win32.peekpipe(pipe)
734 738 if not size:
735 739 break
736 740
737 741 s = pipe.read(size)
738 742 if not s:
739 743 break
740 744 chunks.append(s)
741 745
742 746 return b''.join(chunks)
743 747
744 748
745 749 def bindunixsocket(sock, path: bytes) -> NoReturn:
746 750 raise NotImplementedError('unsupported platform')
General Comments 0
You need to be logged in to leave comments. Login now