##// END OF EJS Templates
typing: add type hints to the rest of the posix module...
Matt Harbison -
r50711:ae93ada0 default
parent child Browse files
Show More
@@ -1,775 +1,777 b''
1 1 # posix.py - Posix utility function implementations for Mercurial
2 2 #
3 3 # Copyright 2005-2009 Olivia Mackall <olivia@selenic.com> and others
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8
9 9 import errno
10 10 import fcntl
11 11 import getpass
12 12 import grp
13 13 import os
14 14 import pwd
15 15 import re
16 16 import select
17 17 import stat
18 18 import sys
19 19 import tempfile
20 20 import unicodedata
21 21
22 22 from typing import (
23 23 Any,
24 24 Iterable,
25 25 Iterator,
26 26 List,
27 Match,
27 28 NoReturn,
28 29 Optional,
29 30 Sequence,
31 Tuple,
30 32 Union,
31 33 )
32 34
33 35 from .i18n import _
34 36 from .pycompat import (
35 37 getattr,
36 38 open,
37 39 )
38 40 from . import (
39 41 encoding,
40 42 error,
41 43 policy,
42 44 pycompat,
43 45 )
44 46
45 47 osutil = policy.importmod('osutil')
46 48
47 49 normpath = os.path.normpath
48 50 samestat = os.path.samestat
49 51 abspath = os.path.abspath # re-exports
50 52
51 53 try:
52 54 oslink = os.link
53 55 except AttributeError:
54 56 # Some platforms build Python without os.link on systems that are
55 57 # vaguely unix-like but don't have hardlink support. For those
56 58 # poor souls, just say we tried and that it failed so we fall back
57 59 # to copies.
58 60 def oslink(src: bytes, dst: bytes) -> NoReturn:
59 61 raise OSError(
60 62 errno.EINVAL, b'hardlinks not supported: %s to %s' % (src, dst)
61 63 )
62 64
63 65
64 66 readlink = os.readlink
65 67 unlink = os.unlink
66 68 rename = os.rename
67 69 removedirs = os.removedirs
68 expandglobs = False
70 expandglobs: bool = False
69 71
70 umask = os.umask(0)
72 umask: int = os.umask(0)
71 73 os.umask(umask)
72 74
73 75 posixfile = open
74 76
75 77
76 def split(p):
78 def split(p: bytes) -> Tuple[bytes, bytes]:
77 79 """Same as posixpath.split, but faster
78 80
79 81 >>> import posixpath
80 82 >>> for f in [b'/absolute/path/to/file',
81 83 ... b'relative/path/to/file',
82 84 ... b'file_alone',
83 85 ... b'path/to/directory/',
84 86 ... b'/multiple/path//separators',
85 87 ... b'/file_at_root',
86 88 ... b'///multiple_leading_separators_at_root',
87 89 ... b'']:
88 90 ... assert split(f) == posixpath.split(f), f
89 91 """
90 92 ht = p.rsplit(b'/', 1)
91 93 if len(ht) == 1:
92 94 return b'', p
93 95 nh = ht[0].rstrip(b'/')
94 96 if nh:
95 97 return nh, ht[1]
96 98 return ht[0] + b'/', ht[1]
97 99
98 100
99 101 def openhardlinks() -> bool:
100 102 '''return true if it is safe to hold open file handles to hardlinks'''
101 103 return True
102 104
103 105
104 106 def nlinks(name: bytes) -> int:
105 107 '''return number of hardlinks for the given file'''
106 108 return os.lstat(name).st_nlink
107 109
108 110
109 111 def parsepatchoutput(output_line: bytes) -> bytes:
110 112 """parses the output produced by patch and returns the filename"""
111 113 pf = output_line[14:]
112 114 if pycompat.sysplatform == b'OpenVMS':
113 115 if pf[0] == b'`':
114 116 pf = pf[1:-1] # Remove the quotes
115 117 else:
116 118 if pf.startswith(b"'") and pf.endswith(b"'") and b" " in pf:
117 119 pf = pf[1:-1] # Remove the quotes
118 120 return pf
119 121
120 122
121 123 def sshargs(
122 124 sshcmd: bytes, host: bytes, user: Optional[bytes], port: Optional[bytes]
123 125 ) -> bytes:
124 126 '''Build argument list for ssh'''
125 127 args = user and (b"%s@%s" % (user, host)) or host
126 128 if b'-' in args[:1]:
127 129 raise error.Abort(
128 130 _(b'illegal ssh hostname or username starting with -: %s') % args
129 131 )
130 132 args = shellquote(args)
131 133 if port:
132 134 args = b'-p %s %s' % (shellquote(port), args)
133 135 return args
134 136
135 137
136 138 def isexec(f: bytes) -> bool:
137 139 """check whether a file is executable"""
138 140 return os.lstat(f).st_mode & 0o100 != 0
139 141
140 142
141 143 def setflags(f: bytes, l: bool, x: bool) -> None:
142 144 st = os.lstat(f)
143 145 s = st.st_mode
144 146 if l:
145 147 if not stat.S_ISLNK(s):
146 148 # switch file to link
147 149 with open(f, b'rb') as fp:
148 150 data = fp.read()
149 151 unlink(f)
150 152 try:
151 153 os.symlink(data, f)
152 154 except OSError:
153 155 # failed to make a link, rewrite file
154 156 with open(f, b"wb") as fp:
155 157 fp.write(data)
156 158
157 159 # no chmod needed at this point
158 160 return
159 161 if stat.S_ISLNK(s):
160 162 # switch link to file
161 163 data = os.readlink(f)
162 164 unlink(f)
163 165 with open(f, b"wb") as fp:
164 166 fp.write(data)
165 167 s = 0o666 & ~umask # avoid restatting for chmod
166 168
167 169 sx = s & 0o100
168 170 if st.st_nlink > 1 and bool(x) != bool(sx):
169 171 # the file is a hardlink, break it
170 172 with open(f, b"rb") as fp:
171 173 data = fp.read()
172 174 unlink(f)
173 175 with open(f, b"wb") as fp:
174 176 fp.write(data)
175 177
176 178 if x and not sx:
177 179 # Turn on +x for every +r bit when making a file executable
178 180 # and obey umask.
179 181 os.chmod(f, s | (s & 0o444) >> 2 & ~umask)
180 182 elif not x and sx:
181 183 # Turn off all +x bits
182 184 os.chmod(f, s & 0o666)
183 185
184 186
185 187 def copymode(
186 188 src: bytes,
187 189 dst: bytes,
188 190 mode: Optional[bytes] = None,
189 191 enforcewritable: bool = False,
190 192 ) -> None:
191 193 """Copy the file mode from the file at path src to dst.
192 194 If src doesn't exist, we're using mode instead. If mode is None, we're
193 195 using umask."""
194 196 try:
195 197 st_mode = os.lstat(src).st_mode & 0o777
196 198 except FileNotFoundError:
197 199 st_mode = mode
198 200 if st_mode is None:
199 201 st_mode = ~umask
200 202 st_mode &= 0o666
201 203
202 204 new_mode = st_mode
203 205
204 206 if enforcewritable:
205 207 new_mode |= stat.S_IWUSR
206 208
207 209 os.chmod(dst, new_mode)
208 210
209 211
210 212 def checkexec(path: bytes) -> bool:
211 213 """
212 214 Check whether the given path is on a filesystem with UNIX-like exec flags
213 215
214 216 Requires a directory (like /foo/.hg)
215 217 """
216 218
217 219 # VFAT on some Linux versions can flip mode but it doesn't persist
218 220 # a FS remount. Frequently we can detect it if files are created
219 221 # with exec bit on.
220 222
221 223 try:
222 224 EXECFLAGS = stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
223 225 basedir = os.path.join(path, b'.hg')
224 226 cachedir = os.path.join(basedir, b'wcache')
225 227 storedir = os.path.join(basedir, b'store')
226 228 if not os.path.exists(cachedir):
227 229 try:
228 230 # we want to create the 'cache' directory, not the '.hg' one.
229 231 # Automatically creating '.hg' directory could silently spawn
230 232 # invalid Mercurial repositories. That seems like a bad idea.
231 233 os.mkdir(cachedir)
232 234 if os.path.exists(storedir):
233 235 copymode(storedir, cachedir)
234 236 else:
235 237 copymode(basedir, cachedir)
236 238 except (IOError, OSError):
237 239 # we other fallback logic triggers
238 240 pass
239 241 if os.path.isdir(cachedir):
240 242 checkisexec = os.path.join(cachedir, b'checkisexec')
241 243 checknoexec = os.path.join(cachedir, b'checknoexec')
242 244
243 245 try:
244 246 m = os.stat(checkisexec).st_mode
245 247 except FileNotFoundError:
246 248 # checkisexec does not exist - fall through ...
247 249 pass
248 250 else:
249 251 # checkisexec exists, check if it actually is exec
250 252 if m & EXECFLAGS != 0:
251 253 # ensure checkisexec exists, check it isn't exec
252 254 try:
253 255 m = os.stat(checknoexec).st_mode
254 256 except FileNotFoundError:
255 257 open(checknoexec, b'w').close() # might fail
256 258 m = os.stat(checknoexec).st_mode
257 259 if m & EXECFLAGS == 0:
258 260 # check-exec is exec and check-no-exec is not exec
259 261 return True
260 262 # checknoexec exists but is exec - delete it
261 263 unlink(checknoexec)
262 264 # checkisexec exists but is not exec - delete it
263 265 unlink(checkisexec)
264 266
265 267 # check using one file, leave it as checkisexec
266 268 checkdir = cachedir
267 269 else:
268 270 # check directly in path and don't leave checkisexec behind
269 271 checkdir = path
270 272 checkisexec = None
271 273 fh, fn = pycompat.mkstemp(dir=checkdir, prefix=b'hg-checkexec-')
272 274 try:
273 275 os.close(fh)
274 276 m = os.stat(fn).st_mode
275 277 if m & EXECFLAGS == 0:
276 278 os.chmod(fn, m & 0o777 | EXECFLAGS)
277 279 if os.stat(fn).st_mode & EXECFLAGS != 0:
278 280 if checkisexec is not None:
279 281 os.rename(fn, checkisexec)
280 282 fn = None
281 283 return True
282 284 finally:
283 285 if fn is not None:
284 286 unlink(fn)
285 287 except (IOError, OSError):
286 288 # we don't care, the user probably won't be able to commit anyway
287 289 return False
288 290
289 291
290 292 def checklink(path: bytes) -> bool:
291 293 """check whether the given path is on a symlink-capable filesystem"""
292 294 # mktemp is not racy because symlink creation will fail if the
293 295 # file already exists
294 296 while True:
295 297 cachedir = os.path.join(path, b'.hg', b'wcache')
296 298 checklink = os.path.join(cachedir, b'checklink')
297 299 # try fast path, read only
298 300 if os.path.islink(checklink):
299 301 return True
300 302 if os.path.isdir(cachedir):
301 303 checkdir = cachedir
302 304 else:
303 305 checkdir = path
304 306 cachedir = None
305 307 name = tempfile.mktemp(
306 308 dir=pycompat.fsdecode(checkdir), prefix=r'checklink-'
307 309 )
308 310 name = pycompat.fsencode(name)
309 311 try:
310 312 fd = None
311 313 if cachedir is None:
312 314 fd = pycompat.namedtempfile(
313 315 dir=checkdir, prefix=b'hg-checklink-'
314 316 )
315 317 target = os.path.basename(fd.name)
316 318 else:
317 319 # create a fixed file to link to; doesn't matter if it
318 320 # already exists.
319 321 target = b'checklink-target'
320 322 try:
321 323 fullpath = os.path.join(cachedir, target)
322 324 open(fullpath, b'w').close()
323 325 except PermissionError:
324 326 # If we can't write to cachedir, just pretend
325 327 # that the fs is readonly and by association
326 328 # that the fs won't support symlinks. This
327 329 # seems like the least dangerous way to avoid
328 330 # data loss.
329 331 return False
330 332 try:
331 333 os.symlink(target, name)
332 334 if cachedir is None:
333 335 unlink(name)
334 336 else:
335 337 try:
336 338 os.rename(name, checklink)
337 339 except OSError:
338 340 unlink(name)
339 341 return True
340 342 except FileExistsError:
341 343 # link creation might race, try again
342 344 continue
343 345 finally:
344 346 if fd is not None:
345 347 fd.close()
346 348 except AttributeError:
347 349 return False
348 350 except OSError as inst:
349 351 # sshfs might report failure while successfully creating the link
350 352 if inst.errno == errno.EIO and os.path.exists(name):
351 353 unlink(name)
352 354 return False
353 355
354 356
355 def checkosfilename(path):
357 def checkosfilename(path: bytes) -> Optional[bytes]:
356 358 """Check that the base-relative path is a valid filename on this platform.
357 359 Returns None if the path is ok, or a UI string describing the problem."""
358 360 return None # on posix platforms, every path is ok
359 361
360 362
361 def getfsmountpoint(dirpath):
363 def getfsmountpoint(dirpath: bytes) -> Optional[bytes]:
362 364 """Get the filesystem mount point from a directory (best-effort)
363 365
364 366 Returns None if we are unsure. Raises OSError on ENOENT, EPERM, etc.
365 367 """
366 368 return getattr(osutil, 'getfsmountpoint', lambda x: None)(dirpath)
367 369
368 370
369 371 def getfstype(dirpath: bytes) -> Optional[bytes]:
370 372 """Get the filesystem type name from a directory (best-effort)
371 373
372 374 Returns None if we are unsure. Raises OSError on ENOENT, EPERM, etc.
373 375 """
374 376 return getattr(osutil, 'getfstype', lambda x: None)(dirpath)
375 377
376 378
377 379 def get_password() -> bytes:
378 380 return encoding.strtolocal(getpass.getpass(''))
379 381
380 382
381 383 def setbinary(fd) -> None:
382 384 pass
383 385
384 386
385 387 def pconvert(path: bytes) -> bytes:
386 388 return path
387 389
388 390
389 391 def localpath(path: bytes) -> bytes:
390 392 return path
391 393
392 394
393 395 def samefile(fpath1: bytes, fpath2: bytes) -> bool:
394 396 """Returns whether path1 and path2 refer to the same file. This is only
395 397 guaranteed to work for files, not directories."""
396 398 return os.path.samefile(fpath1, fpath2)
397 399
398 400
399 401 def samedevice(fpath1: bytes, fpath2: bytes) -> bool:
400 402 """Returns whether fpath1 and fpath2 are on the same device. This is only
401 403 guaranteed to work for files, not directories."""
402 404 st1 = os.lstat(fpath1)
403 405 st2 = os.lstat(fpath2)
404 406 return st1.st_dev == st2.st_dev
405 407
406 408
407 409 # os.path.normcase is a no-op, which doesn't help us on non-native filesystems
408 410 def normcase(path: bytes) -> bytes:
409 411 return path.lower()
410 412
411 413
412 414 # what normcase does to ASCII strings
413 normcasespec = encoding.normcasespecs.lower
415 normcasespec: int = encoding.normcasespecs.lower
414 416 # fallback normcase function for non-ASCII strings
415 417 normcasefallback = normcase
416 418
417 419 if pycompat.isdarwin:
418 420
419 421 def normcase(path: bytes) -> bytes:
420 422 """
421 423 Normalize a filename for OS X-compatible comparison:
422 424 - escape-encode invalid characters
423 425 - decompose to NFD
424 426 - lowercase
425 427 - omit ignored characters [200c-200f, 202a-202e, 206a-206f,feff]
426 428
427 429 >>> normcase(b'UPPER')
428 430 'upper'
429 431 >>> normcase(b'Caf\\xc3\\xa9')
430 432 'cafe\\xcc\\x81'
431 433 >>> normcase(b'\\xc3\\x89')
432 434 'e\\xcc\\x81'
433 435 >>> normcase(b'\\xb8\\xca\\xc3\\xca\\xbe\\xc8.JPG') # issue3918
434 436 '%b8%ca%c3\\xca\\xbe%c8.jpg'
435 437 """
436 438
437 439 try:
438 440 return encoding.asciilower(path) # exception for non-ASCII
439 441 except UnicodeDecodeError:
440 442 return normcasefallback(path)
441 443
442 444 normcasespec = encoding.normcasespecs.lower
443 445
444 446 def normcasefallback(path: bytes) -> bytes:
445 447 try:
446 448 u = path.decode('utf-8')
447 449 except UnicodeDecodeError:
448 450 # OS X percent-encodes any bytes that aren't valid utf-8
449 451 s = b''
450 452 pos = 0
451 453 l = len(path)
452 454 while pos < l:
453 455 try:
454 456 c = encoding.getutf8char(path, pos)
455 457 pos += len(c)
456 458 except ValueError:
457 459 c = b'%%%02X' % ord(path[pos : pos + 1])
458 460 pos += 1
459 461 s += c
460 462
461 463 u = s.decode('utf-8')
462 464
463 465 # Decompose then lowercase (HFS+ technote specifies lower)
464 466 enc = unicodedata.normalize('NFD', u).lower().encode('utf-8')
465 467 # drop HFS+ ignored characters
466 468 return encoding.hfsignoreclean(enc)
467 469
468 470
469 471 if pycompat.sysplatform == b'cygwin':
470 472 # workaround for cygwin, in which mount point part of path is
471 473 # treated as case sensitive, even though underlying NTFS is case
472 474 # insensitive.
473 475
474 476 # default mount points
475 477 cygwinmountpoints = sorted(
476 478 [
477 479 b"/usr/bin",
478 480 b"/usr/lib",
479 481 b"/cygdrive",
480 482 ],
481 483 reverse=True,
482 484 )
483 485
484 486 # use upper-ing as normcase as same as NTFS workaround
485 487 def normcase(path: bytes) -> bytes:
486 488 pathlen = len(path)
487 489 if (pathlen == 0) or (path[0] != pycompat.ossep):
488 490 # treat as relative
489 491 return encoding.upper(path)
490 492
491 493 # to preserve case of mountpoint part
492 494 for mp in cygwinmountpoints:
493 495 if not path.startswith(mp):
494 496 continue
495 497
496 498 mplen = len(mp)
497 499 if mplen == pathlen: # mount point itself
498 500 return mp
499 501 if path[mplen] == pycompat.ossep:
500 502 return mp + encoding.upper(path[mplen:])
501 503
502 504 return encoding.upper(path)
503 505
504 506 normcasespec = encoding.normcasespecs.other
505 507 normcasefallback = normcase
506 508
507 509 # Cygwin translates native ACLs to POSIX permissions,
508 510 # but these translations are not supported by native
509 511 # tools, so the exec bit tends to be set erroneously.
510 512 # Therefore, disable executable bit access on Cygwin.
511 513 def checkexec(path: bytes) -> bool:
512 514 return False
513 515
514 516 # Similarly, Cygwin's symlink emulation is likely to create
515 517 # problems when Mercurial is used from both Cygwin and native
516 518 # Windows, with other native tools, or on shared volumes
517 519 def checklink(path: bytes) -> bool:
518 520 return False
519 521
520 522
521 _needsshellquote = None
523 _needsshellquote: Optional[Match[bytes]] = None
522 524
523 525
524 526 def shellquote(s: bytes) -> bytes:
525 527 if pycompat.sysplatform == b'OpenVMS':
526 528 return b'"%s"' % s
527 529 global _needsshellquote
528 530 if _needsshellquote is None:
529 531 _needsshellquote = re.compile(br'[^a-zA-Z0-9._/+-]').search
530 532 if s and not _needsshellquote(s):
531 533 # "s" shouldn't have to be quoted
532 534 return s
533 535 else:
534 536 return b"'%s'" % s.replace(b"'", b"'\\''")
535 537
536 538
537 539 def shellsplit(s: bytes) -> List[bytes]:
538 540 """Parse a command string in POSIX shell way (best-effort)"""
539 541 return pycompat.shlexsplit(s, posix=True)
540 542
541 543
542 544 def testpid(pid: int) -> bool:
543 545 '''return False if pid dead, True if running or not sure'''
544 546 if pycompat.sysplatform == b'OpenVMS':
545 547 return True
546 548 try:
547 549 os.kill(pid, 0)
548 550 return True
549 551 except OSError as inst:
550 552 return inst.errno != errno.ESRCH
551 553
552 554
553 555 def isowner(st: os.stat_result) -> bool:
554 556 """Return True if the stat object st is from the current user."""
555 557 return st.st_uid == os.getuid()
556 558
557 559
558 560 def findexe(command: bytes) -> Optional[bytes]:
559 561 """Find executable for command searching like which does.
560 562 If command is a basename then PATH is searched for command.
561 563 PATH isn't searched if command is an absolute or relative path.
562 564 If command isn't found None is returned."""
563 565 if pycompat.sysplatform == b'OpenVMS':
564 566 return command
565 567
566 568 def findexisting(executable: bytes) -> Optional[bytes]:
567 569 b'Will return executable if existing file'
568 570 if os.path.isfile(executable) and os.access(executable, os.X_OK):
569 571 return executable
570 572 return None
571 573
572 574 if pycompat.ossep in command:
573 575 return findexisting(command)
574 576
575 577 if pycompat.sysplatform == b'plan9':
576 578 return findexisting(os.path.join(b'/bin', command))
577 579
578 580 for path in encoding.environ.get(b'PATH', b'').split(pycompat.ospathsep):
579 581 executable = findexisting(os.path.join(path, command))
580 582 if executable is not None:
581 583 return executable
582 584 return None
583 585
584 586
585 587 def setsignalhandler() -> None:
586 588 pass
587 589
588 590
589 591 _wantedkinds = {stat.S_IFREG, stat.S_IFLNK}
590 592
591 593
592 594 def statfiles(files: Sequence[bytes]) -> Iterator[Optional[os.stat_result]]:
593 595 """Stat each file in files. Yield each stat, or None if a file does not
594 596 exist or has a type we don't care about."""
595 597 lstat = os.lstat
596 598 getkind = stat.S_IFMT
597 599 for nf in files:
598 600 try:
599 601 st = lstat(nf)
600 602 if getkind(st.st_mode) not in _wantedkinds:
601 603 st = None
602 604 except (FileNotFoundError, NotADirectoryError):
603 605 st = None
604 606 yield st
605 607
606 608
607 609 def getuser() -> bytes:
608 610 '''return name of current user'''
609 611 return pycompat.fsencode(getpass.getuser())
610 612
611 613
612 614 def username(uid: Optional[int] = None) -> Optional[bytes]:
613 615 """Return the name of the user with the given uid.
614 616
615 617 If uid is None, return the name of the current user."""
616 618
617 619 if uid is None:
618 620 uid = os.getuid()
619 621 try:
620 622 return pycompat.fsencode(pwd.getpwuid(uid)[0])
621 623 except KeyError:
622 624 return b'%d' % uid
623 625
624 626
625 627 def groupname(gid: Optional[int] = None) -> Optional[bytes]:
626 628 """Return the name of the group with the given gid.
627 629
628 630 If gid is None, return the name of the current group."""
629 631
630 632 if gid is None:
631 633 gid = os.getgid()
632 634 try:
633 635 return pycompat.fsencode(grp.getgrgid(gid)[0])
634 636 except KeyError:
635 637 return pycompat.bytestr(gid)
636 638
637 639
638 640 def groupmembers(name: bytes) -> List[bytes]:
639 641 """Return the list of members of the group with the given
640 642 name, KeyError if the group does not exist.
641 643 """
642 644 name = pycompat.fsdecode(name)
643 645 return pycompat.rapply(pycompat.fsencode, list(grp.getgrnam(name).gr_mem))
644 646
645 647
646 648 def spawndetached(args: List[bytes]) -> int:
647 649 return os.spawnvp(os.P_NOWAIT | getattr(os, 'P_DETACH', 0), args[0], args)
648 650
649 651
650 def gethgcmd():
652 def gethgcmd(): # TODO: convert to bytes, like on Windows?
651 653 return sys.argv[:1]
652 654
653 655
654 656 def makedir(path: bytes, notindexed: bool) -> None:
655 657 os.mkdir(path)
656 658
657 659
658 660 def lookupreg(
659 661 key: bytes,
660 662 name: Optional[bytes] = None,
661 663 scope: Optional[Union[int, Iterable[int]]] = None,
662 664 ) -> Optional[bytes]:
663 665 return None
664 666
665 667
666 668 def hidewindow() -> None:
667 669 """Hide current shell window.
668 670
669 671 Used to hide the window opened when starting asynchronous
670 672 child process under Windows, unneeded on other systems.
671 673 """
672 674 pass
673 675
674 676
675 677 class cachestat:
676 678 def __init__(self, path: bytes) -> None:
677 679 self.stat = os.stat(path)
678 680
679 681 def cacheable(self) -> bool:
680 682 return bool(self.stat.st_ino)
681 683
682 684 __hash__ = object.__hash__
683 685
684 686 def __eq__(self, other: Any) -> bool:
685 687 try:
686 688 # Only dev, ino, size, mtime and atime are likely to change. Out
687 689 # of these, we shouldn't compare atime but should compare the
688 690 # rest. However, one of the other fields changing indicates
689 691 # something fishy going on, so return False if anything but atime
690 692 # changes.
691 693 return (
692 694 self.stat.st_mode == other.stat.st_mode
693 695 and self.stat.st_ino == other.stat.st_ino
694 696 and self.stat.st_dev == other.stat.st_dev
695 697 and self.stat.st_nlink == other.stat.st_nlink
696 698 and self.stat.st_uid == other.stat.st_uid
697 699 and self.stat.st_gid == other.stat.st_gid
698 700 and self.stat.st_size == other.stat.st_size
699 701 and self.stat[stat.ST_MTIME] == other.stat[stat.ST_MTIME]
700 702 and self.stat[stat.ST_CTIME] == other.stat[stat.ST_CTIME]
701 703 )
702 704 except AttributeError:
703 705 return False
704 706
705 707 def __ne__(self, other: Any) -> bool:
706 708 return not self == other
707 709
708 710
709 711 def statislink(st: Optional[os.stat_result]) -> bool:
710 712 '''check whether a stat result is a symlink'''
711 713 return stat.S_ISLNK(st.st_mode) if st else False
712 714
713 715
714 716 def statisexec(st: Optional[os.stat_result]) -> bool:
715 717 '''check whether a stat result is an executable file'''
716 718 return (st.st_mode & 0o100 != 0) if st else False
717 719
718 720
719 721 def poll(fds):
720 722 """block until something happens on any file descriptor
721 723
722 724 This is a generic helper that will check for any activity
723 725 (read, write. exception) and return the list of touched files.
724 726
725 727 In unsupported cases, it will raise a NotImplementedError"""
726 728 try:
727 729 res = select.select(fds, fds, fds)
728 730 except ValueError: # out of range file descriptor
729 731 raise NotImplementedError()
730 732 return sorted(list(set(sum(res, []))))
731 733
732 734
733 735 def readpipe(pipe) -> bytes:
734 736 """Read all available data from a pipe."""
735 737 # We can't fstat() a pipe because Linux will always report 0.
736 738 # So, we set the pipe to non-blocking mode and read everything
737 739 # that's available.
738 740 flags = fcntl.fcntl(pipe, fcntl.F_GETFL)
739 741 flags |= os.O_NONBLOCK
740 742 oldflags = fcntl.fcntl(pipe, fcntl.F_SETFL, flags)
741 743
742 744 try:
743 745 chunks = []
744 746 while True:
745 747 try:
746 748 s = pipe.read()
747 749 if not s:
748 750 break
749 751 chunks.append(s)
750 752 except IOError:
751 753 break
752 754
753 755 return b''.join(chunks)
754 756 finally:
755 757 fcntl.fcntl(pipe, fcntl.F_SETFL, oldflags)
756 758
757 759
758 760 def bindunixsocket(sock, path: bytes) -> None:
759 761 """Bind the UNIX domain socket to the specified path"""
760 762 # use relative path instead of full path at bind() if possible, since
761 763 # AF_UNIX path has very small length limit (107 chars) on common
762 764 # platforms (see sys/un.h)
763 765 dirname, basename = os.path.split(path)
764 766 bakwdfd = None
765 767
766 768 try:
767 769 if dirname:
768 770 bakwdfd = os.open(b'.', os.O_DIRECTORY)
769 771 os.chdir(dirname)
770 772 sock.bind(basename)
771 773 if bakwdfd:
772 774 os.fchdir(bakwdfd)
773 775 finally:
774 776 if bakwdfd:
775 777 os.close(bakwdfd)
General Comments 0
You need to be logged in to leave comments. Login now