posix.py
218 lines
| 5.9 KiB
| text/x-python
|
PythonLexer
/ mercurial / posix.py
Martin Geisler
|
r8226 | # posix.py - Posix utility function implementations for Mercurial | ||
# | ||||
# Copyright 2005-2009 Matt Mackall <mpm@selenic.com> and others | ||||
# | ||||
# This software may be used and distributed according to the terms of the | ||||
# GNU General Public License version 2, incorporated herein by reference. | ||||
Matt Mackall
|
r7890 | |||
from i18n import _ | ||||
Simon Heimberg
|
r8312 | import osutil | ||
import os, sys, errno, stat, getpass, pwd, grp | ||||
Matt Mackall
|
r7890 | |||
posixfile = file | ||||
nulldev = '/dev/null' | ||||
normpath = os.path.normpath | ||||
samestat = os.path.samestat | ||||
umask = os.umask(0) | ||||
os.umask(umask) | ||||
def openhardlinks(): | ||||
'''return true if it is safe to hold open file handles to hardlinks''' | ||||
return True | ||||
def rcfiles(path): | ||||
rcs = [os.path.join(path, 'hgrc')] | ||||
rcdir = os.path.join(path, 'hgrc.d') | ||||
try: | ||||
rcs.extend([os.path.join(rcdir, f) | ||||
for f, kind in osutil.listdir(rcdir) | ||||
if f.endswith(".rc")]) | ||||
except OSError: | ||||
pass | ||||
return rcs | ||||
def system_rcpath(): | ||||
path = [] | ||||
# old mod_python does not set sys.argv | ||||
if len(getattr(sys, 'argv', [])) > 0: | ||||
path.extend(rcfiles(os.path.dirname(sys.argv[0]) + | ||||
'/../etc/mercurial')) | ||||
path.extend(rcfiles('/etc/mercurial')) | ||||
return path | ||||
def user_rcpath(): | ||||
return [os.path.expanduser('~/.hgrc')] | ||||
def parse_patch_output(output_line): | ||||
"""parses the output produced by patch and returns the file name""" | ||||
pf = output_line[14:] | ||||
if os.sys.platform == 'OpenVMS': | ||||
if pf[0] == '`': | ||||
pf = pf[1:-1] # Remove the quotes | ||||
else: | ||||
Peter Arrenbrecht
|
r8219 | if pf.startswith("'") and pf.endswith("'") and " " in pf: | ||
Matt Mackall
|
r7890 | pf = pf[1:-1] # Remove the quotes | ||
return pf | ||||
def sshargs(sshcmd, host, user, port): | ||||
'''Build argument list for ssh''' | ||||
args = user and ("%s@%s" % (user, host)) or host | ||||
return port and ("%s -p %s" % (args, port)) or args | ||||
def is_exec(f): | ||||
"""check whether a file is executable""" | ||||
return (os.lstat(f).st_mode & 0100 != 0) | ||||
def set_flags(f, l, x): | ||||
s = os.lstat(f).st_mode | ||||
if l: | ||||
if not stat.S_ISLNK(s): | ||||
# switch file to link | ||||
data = file(f).read() | ||||
os.unlink(f) | ||||
try: | ||||
os.symlink(data, f) | ||||
except: | ||||
# failed to make a link, rewrite file | ||||
file(f, "w").write(data) | ||||
# no chmod needed at this point | ||||
return | ||||
if stat.S_ISLNK(s): | ||||
# switch link to file | ||||
data = os.readlink(f) | ||||
os.unlink(f) | ||||
file(f, "w").write(data) | ||||
s = 0666 & ~umask # avoid restatting for chmod | ||||
sx = s & 0100 | ||||
if x and not sx: | ||||
# Turn on +x for every +r bit when making a file executable | ||||
# and obey umask. | ||||
os.chmod(f, s | (s & 0444) >> 2 & ~umask) | ||||
elif not x and sx: | ||||
# Turn off all +x bits | ||||
os.chmod(f, s & 0666) | ||||
def set_binary(fd): | ||||
pass | ||||
def pconvert(path): | ||||
return path | ||||
def localpath(path): | ||||
return path | ||||
def shellquote(s): | ||||
if os.sys.platform == 'OpenVMS': | ||||
return '"%s"' % s | ||||
else: | ||||
return "'%s'" % s.replace("'", "'\\''") | ||||
def quotecommand(cmd): | ||||
return cmd | ||||
def popen(command, mode='r'): | ||||
return os.popen(command, mode) | ||||
def testpid(pid): | ||||
'''return False if pid dead, True if running or not sure''' | ||||
if os.sys.platform == 'OpenVMS': | ||||
return True | ||||
try: | ||||
os.kill(pid, 0) | ||||
return True | ||||
except OSError, inst: | ||||
return inst.errno != errno.ESRCH | ||||
def explain_exit(code): | ||||
"""return a 2-tuple (desc, code) describing a process's status""" | ||||
if os.WIFEXITED(code): | ||||
val = os.WEXITSTATUS(code) | ||||
return _("exited with status %d") % val, val | ||||
elif os.WIFSIGNALED(code): | ||||
val = os.WTERMSIG(code) | ||||
return _("killed by signal %d") % val, val | ||||
elif os.WIFSTOPPED(code): | ||||
val = os.WSTOPSIG(code) | ||||
return _("stopped by signal %d") % val, val | ||||
raise ValueError(_("invalid exit code")) | ||||
def isowner(fp, st=None): | ||||
"""Return True if the file object f belongs to the current user. | ||||
The return value of a util.fstat(f) may be passed as the st argument. | ||||
""" | ||||
if st is None: | ||||
st = fstat(fp) | ||||
return st.st_uid == os.getuid() | ||||
def find_exe(command): | ||||
'''Find executable for command searching like which does. | ||||
If command is a basename then PATH is searched for command. | ||||
PATH isn't searched if command is an absolute or relative path. | ||||
If command isn't found None is returned.''' | ||||
if sys.platform == 'OpenVMS': | ||||
return command | ||||
def findexisting(executable): | ||||
'Will return executable if existing file' | ||||
if os.path.exists(executable): | ||||
return executable | ||||
return None | ||||
if os.sep in command: | ||||
return findexisting(command) | ||||
for path in os.environ.get('PATH', '').split(os.pathsep): | ||||
executable = findexisting(os.path.join(path, command)) | ||||
if executable is not None: | ||||
return executable | ||||
return None | ||||
def set_signal_handler(): | ||||
pass | ||||
def statfiles(files): | ||||
'Stat each file in files and yield stat or None if file does not exist.' | ||||
lstat = os.lstat | ||||
for nf in files: | ||||
try: | ||||
st = lstat(nf) | ||||
except OSError, err: | ||||
if err.errno not in (errno.ENOENT, errno.ENOTDIR): | ||||
raise | ||||
st = None | ||||
yield st | ||||
def getuser(): | ||||
'''return name of current user''' | ||||
return getpass.getuser() | ||||
def expand_glob(pats): | ||||
'''On Windows, expand the implicit globs in a list of patterns''' | ||||
return list(pats) | ||||
def username(uid=None): | ||||
"""Return the name of the user with the given uid. | ||||
If uid is None, return the name of the current user.""" | ||||
if uid is None: | ||||
uid = os.getuid() | ||||
try: | ||||
return pwd.getpwuid(uid)[0] | ||||
except KeyError: | ||||
return str(uid) | ||||
def groupname(gid=None): | ||||
"""Return the name of the group with the given gid. | ||||
If gid is None, return the name of the current group.""" | ||||
if gid is None: | ||||
gid = os.getgid() | ||||
try: | ||||
return grp.getgrgid(gid)[0] | ||||
except KeyError: | ||||
return str(gid) | ||||