# scmutil.py - Mercurial core utility functions # # Copyright Matt Mackall # # This software may be used and distributed according to the terms of the # GNU General Public License version 2 or any later version. from i18n import _ import util, error import os, errno, stat def checkfilename(f): '''Check that the filename f is an acceptable filename for a tracked file''' if '\r' in f or '\n' in f: raise util.Abort(_("'\\n' and '\\r' disallowed in filenames: %r") % f) def checkportable(ui, f): '''Check if filename f is portable and warn or abort depending on config''' checkfilename(f) val = ui.config('ui', 'portablefilenames', 'warn') lval = val.lower() abort = os.name == 'nt' or lval == 'abort' bval = util.parsebool(val) if abort or lval == 'warn' or bval: msg = util.checkwinfilename(f) if msg: if abort: raise util.Abort("%s: %r" % (msg, f)) ui.warn(_("warning: %s: %r\n") % (msg, f)) elif bval is None and lval != 'ignore': raise error.ConfigError( _("ui.portablefilenames value is invalid ('%s')") % val) class path_auditor(object): '''ensure that a filesystem path contains no banned components. the following properties of a path are checked: - ends with a directory separator - under top-level .hg - starts at the root of a windows drive - contains ".." - traverses a symlink (e.g. a/symlink_here/b) - inside a nested repository (a callback can be used to approve some nested repositories, e.g., subrepositories) ''' def __init__(self, root, callback=None): self.audited = set() self.auditeddir = set() self.root = root self.callback = callback def __call__(self, path): '''Check the relative path. path may contain a pattern (e.g. foodir/**.txt)''' if path in self.audited: return # AIX ignores "/" at end of path, others raise EISDIR. if util.endswithsep(path): raise util.Abort(_("path ends in directory separator: %s") % path) normpath = os.path.normcase(path) parts = util.splitpath(normpath) if (os.path.splitdrive(path)[0] or parts[0].lower() in ('.hg', '.hg.', '') or os.pardir in parts): raise util.Abort(_("path contains illegal component: %s") % path) if '.hg' in path.lower(): lparts = [p.lower() for p in parts] for p in '.hg', '.hg.': if p in lparts[1:]: pos = lparts.index(p) base = os.path.join(*parts[:pos]) raise util.Abort(_('path %r is inside nested repo %r') % (path, base)) parts.pop() prefixes = [] while parts: prefix = os.sep.join(parts) if prefix in self.auditeddir: break curpath = os.path.join(self.root, prefix) try: st = os.lstat(curpath) except OSError, err: # EINVAL can be raised as invalid path syntax under win32. # They must be ignored for patterns can be checked too. if err.errno not in (errno.ENOENT, errno.ENOTDIR, errno.EINVAL): raise else: if stat.S_ISLNK(st.st_mode): raise util.Abort( _('path %r traverses symbolic link %r') % (path, prefix)) elif (stat.S_ISDIR(st.st_mode) and os.path.isdir(os.path.join(curpath, '.hg'))): if not self.callback or not self.callback(curpath): raise util.Abort(_('path %r is inside nested repo %r') % (path, prefix)) prefixes.append(prefix) parts.pop() self.audited.add(path) # only add prefixes to the cache after checking everything: we don't # want to add "foo/bar/baz" before checking if there's a "foo/.hg" self.auditeddir.update(prefixes) class opener(object): '''Open files relative to a base directory This class is used to hide the details of COW semantics and remote file access from higher level code. ''' def __init__(self, base, audit=True): self.base = base if audit: self.auditor = path_auditor(base) else: self.auditor = util.always self.createmode = None self._trustnlink = None @util.propertycache def _can_symlink(self): return util.checklink(self.base) def _fixfilemode(self, name): if self.createmode is None: return os.chmod(name, self.createmode & 0666) def __call__(self, path, mode="r", text=False, atomictemp=False): r = util.checkosfilename(path) if r: raise util.Abort("%s: %r" % (r, path)) self.auditor(path) f = os.path.join(self.base, path) if not text and "b" not in mode: mode += "b" # for that other OS nlink = -1 dirname, basename = os.path.split(f) # If basename is empty, then the path is malformed because it points # to a directory. Let the posixfile() call below raise IOError. if basename and mode not in ('r', 'rb'): if atomictemp: if not os.path.isdir(dirname): util.makedirs(dirname, self.createmode) return util.atomictempfile(f, mode, self.createmode) try: if 'w' in mode: util.unlink(f) nlink = 0 else: # nlinks() may behave differently for files on Windows # shares if the file is open. fd = util.posixfile(f) nlink = util.nlinks(f) if nlink < 1: nlink = 2 # force mktempcopy (issue1922) fd.close() except (OSError, IOError), e: if e.errno != errno.ENOENT: raise nlink = 0 if not os.path.isdir(dirname): util.makedirs(dirname, self.createmode) if nlink > 0: if self._trustnlink is None: self._trustnlink = nlink > 1 or util.checknlink(f) if nlink > 1 or not self._trustnlink: util.rename(util.mktempcopy(f), f) fp = util.posixfile(f, mode) if nlink == 0: self._fixfilemode(f) return fp def symlink(self, src, dst): self.auditor(dst) linkname = os.path.join(self.base, dst) try: os.unlink(linkname) except OSError: pass dirname = os.path.dirname(linkname) if not os.path.exists(dirname): util.makedirs(dirname, self.createmode) if self._can_symlink: try: os.symlink(src, linkname) except OSError, err: raise OSError(err.errno, _('could not symlink to %r: %s') % (src, err.strerror), linkname) else: f = self(dst, "w") f.write(src) f.close() self._fixfilemode(dst) def canonpath(root, cwd, myname, auditor=None): '''return the canonical path of myname, given cwd and root''' if util.endswithsep(root): rootsep = root else: rootsep = root + os.sep name = myname if not os.path.isabs(name): name = os.path.join(root, cwd, name) name = os.path.normpath(name) if auditor is None: auditor = path_auditor(root) if name != rootsep and name.startswith(rootsep): name = name[len(rootsep):] auditor(name) return util.pconvert(name) elif name == root: return '' else: # Determine whether `name' is in the hierarchy at or beneath `root', # by iterating name=dirname(name) until that causes no change (can't # check name == '/', because that doesn't work on windows). For each # `name', compare dev/inode numbers. If they match, the list `rel' # holds the reversed list of components making up the relative file # name we want. root_st = os.stat(root) rel = [] while True: try: name_st = os.stat(name) except OSError: break if util.samestat(name_st, root_st): if not rel: # name was actually the same as root (maybe a symlink) return '' rel.reverse() name = os.path.join(*rel) auditor(name) return util.pconvert(name) dirname, basename = os.path.split(name) rel.append(basename) if dirname == name: break name = dirname raise util.Abort('%s not under root' % myname) def walkrepos(path, followsym=False, seen_dirs=None, recurse=False): '''yield every hg repository under path, recursively.''' def errhandler(err): if err.filename == path: raise err if followsym and hasattr(os.path, 'samestat'): def _add_dir_if_not_there(dirlst, dirname): match = False samestat = os.path.samestat dirstat = os.stat(dirname) for lstdirstat in dirlst: if samestat(dirstat, lstdirstat): match = True break if not match: dirlst.append(dirstat) return not match else: followsym = False if (seen_dirs is None) and followsym: seen_dirs = [] _add_dir_if_not_there(seen_dirs, path) for root, dirs, files in os.walk(path, topdown=True, onerror=errhandler): dirs.sort() if '.hg' in dirs: yield root # found a repository qroot = os.path.join(root, '.hg', 'patches') if os.path.isdir(os.path.join(qroot, '.hg')): yield qroot # we have a patch queue repo here if recurse: # avoid recursing inside the .hg directory dirs.remove('.hg') else: dirs[:] = [] # don't descend further elif followsym: newdirs = [] for d in dirs: fname = os.path.join(root, d) if _add_dir_if_not_there(seen_dirs, fname): if os.path.islink(fname): for hgname in walkrepos(fname, True, seen_dirs): yield hgname else: newdirs.append(d) dirs[:] = newdirs