from __future__ import annotations import contextlib import os from typing import ( Any, Dict, Iterable, Iterator, List, Optional, Tuple, ) from mercurial.node import sha1nodeconstants from mercurial import ( dirstatemap, error, extensions, match as matchmod, pycompat, scmutil, util, ) from mercurial.dirstateutils import ( timestamp, ) from mercurial.interfaces import ( dirstate as intdirstate, ) from . import gitutil DirstateItem = dirstatemap.DirstateItem propertycache = util.propertycache pygit2 = gitutil.get_pygit2() def readpatternfile(orig, filepath, warn, sourceinfo=False): if not (b'info/exclude' in filepath or filepath.endswith(b'.gitignore')): return orig(filepath, warn, sourceinfo=False) result = [] warnings = [] with open(filepath, 'rb') as fp: for l in fp: l = l.strip() if not l or l.startswith(b'#'): continue if l.startswith(b'!'): warnings.append(b'unsupported ignore pattern %s' % l) continue if l.startswith(b'/'): result.append(b'rootglob:' + l[1:]) else: result.append(b'relglob:' + l) return result, warnings extensions.wrapfunction(matchmod, 'readpatternfile', readpatternfile) _STATUS_MAP = {} if pygit2: _STATUS_MAP = { pygit2.GIT_STATUS_CONFLICTED: b'm', pygit2.GIT_STATUS_CURRENT: b'n', pygit2.GIT_STATUS_IGNORED: b'?', pygit2.GIT_STATUS_INDEX_DELETED: b'r', pygit2.GIT_STATUS_INDEX_MODIFIED: b'n', pygit2.GIT_STATUS_INDEX_NEW: b'a', pygit2.GIT_STATUS_INDEX_RENAMED: b'a', pygit2.GIT_STATUS_INDEX_TYPECHANGE: b'n', pygit2.GIT_STATUS_WT_DELETED: b'r', pygit2.GIT_STATUS_WT_MODIFIED: b'n', pygit2.GIT_STATUS_WT_NEW: b'?', pygit2.GIT_STATUS_WT_RENAMED: b'a', pygit2.GIT_STATUS_WT_TYPECHANGE: b'n', pygit2.GIT_STATUS_WT_UNREADABLE: b'?', pygit2.GIT_STATUS_INDEX_MODIFIED | pygit2.GIT_STATUS_WT_MODIFIED: b'm', } class gitdirstate(intdirstate.idirstate): def __init__(self, ui, vfs, gitrepo, use_dirstate_v2): self._ui = ui self._root = os.path.dirname(vfs.base) self._opener = vfs self.git = gitrepo self._plchangecallbacks = {} # TODO: context.poststatusfixup is bad and uses this attribute self._dirty = False self._mapcls = dirstatemap.dirstatemap self._use_dirstate_v2 = use_dirstate_v2 @propertycache def _map(self): """Return the dirstate contents (see documentation for dirstatemap).""" self._map = self._mapcls( self._ui, self._opener, self._root, sha1nodeconstants, self._use_dirstate_v2, ) return self._map def p1(self) -> bytes: try: return self.git.head.peel().id.raw except pygit2.GitError: # Typically happens when peeling HEAD fails, as in an # empty repository. return sha1nodeconstants.nullid def p2(self) -> bytes: # TODO: MERGE_HEAD? something like that, right? return sha1nodeconstants.nullid def setparents(self, p1: bytes, p2: Optional[bytes] = None): if p2 is None: p2 = sha1nodeconstants.nullid assert p2 == sha1nodeconstants.nullid, b'TODO merging support' self.git.head.set_target(gitutil.togitnode(p1)) @util.propertycache def identity(self): return util.filestat.frompath( os.path.join(self._root, b'.git', b'index') ) def branch(self) -> bytes: return b'default' def parents(self) -> List[bytes]: # TODO how on earth do we find p2 if a merge is in flight? return [self.p1(), sha1nodeconstants.nullid] def __iter__(self) -> Iterator[bytes]: return (pycompat.fsencode(f.path) for f in self.git.index) def items(self) -> Iterator[Tuple[bytes, intdirstate.DirstateItemT]]: for ie in self.git.index: yield ie.path, None # value should be a DirstateItem # py2,3 compat forward iteritems = items def __getitem__(self, filename): try: gs = self.git.status_file(filename) except KeyError: return b'?' return _STATUS_MAP[gs] def __contains__(self, filename: Any) -> bool: try: gs = self.git.status_file(filename) return _STATUS_MAP[gs] != b'?' except KeyError: return False def status( self, match: matchmod.basematcher, subrepos: bool, ignored: bool, clean: bool, unknown: bool, ) -> intdirstate.StatusReturnT: listclean = clean # TODO handling of clean files - can we get that from git.status()? modified, added, removed, deleted, unknown, ignored, clean = ( [], [], [], [], [], [], [], ) try: mtime_boundary = timestamp.get_fs_now(self._opener) except OSError: # In largefiles or readonly context mtime_boundary = None gstatus = self.git.status() for path, status in gstatus.items(): path = pycompat.fsencode(path) if not match(path): continue if status == pygit2.GIT_STATUS_IGNORED: if path.endswith(b'/'): continue ignored.append(path) elif status in ( pygit2.GIT_STATUS_WT_MODIFIED, pygit2.GIT_STATUS_INDEX_MODIFIED, pygit2.GIT_STATUS_WT_MODIFIED | pygit2.GIT_STATUS_INDEX_MODIFIED, ): modified.append(path) elif status == pygit2.GIT_STATUS_INDEX_NEW: added.append(path) elif status == pygit2.GIT_STATUS_WT_NEW: unknown.append(path) elif status == pygit2.GIT_STATUS_WT_DELETED: deleted.append(path) elif status == pygit2.GIT_STATUS_INDEX_DELETED: removed.append(path) else: raise error.Abort( b'unhandled case: status for %r is %r' % (path, status) ) if listclean: observed = set( modified + added + removed + deleted + unknown + ignored ) index = self.git.index index.read() for entry in index: path = pycompat.fsencode(entry.path) if not match(path): continue if path in observed: continue # already in some other set if path[-1] == b'/': continue # directory clean.append(path) # TODO are we really always sure of status here? return ( False, scmutil.status( modified, added, removed, deleted, unknown, ignored, clean ), mtime_boundary, ) def flagfunc( self, buildfallback: intdirstate.FlagFuncFallbackT ) -> intdirstate.FlagFuncReturnT: # TODO we can do better return buildfallback() def getcwd(self) -> bytes: # TODO is this a good way to do this? return os.path.dirname( os.path.dirname(pycompat.fsencode(self.git.path)) ) def get_entry(self, path: bytes) -> intdirstate.DirstateItemT: """return a DirstateItem for the associated path""" entry = self._map.get(path) if entry is None: return DirstateItem() return entry def normalize( self, path: bytes, isknown: bool = False, ignoremissing: bool = False ) -> bytes: normed = util.normcase(path) assert normed == path, b"TODO handling of case folding: %s != %s" % ( normed, path, ) return path @property def _checklink(self) -> bool: return util.checklink(os.path.dirname(pycompat.fsencode(self.git.path))) def copies(self) -> Dict[bytes, bytes]: # TODO support copies? return {} # # TODO what the heck is this _filecache = set() @property def is_changing_parents(self) -> bool: # TODO: we need to implement the context manager bits and # correctly stage/revert index edits. return False @property def is_changing_any(self) -> bool: # TODO: we need to implement the context manager bits and # correctly stage/revert index edits. return False def write(self, tr: Optional[intdirstate.TransactionT]) -> None: # TODO: call parent change callbacks if tr: def writeinner(category): self.git.index.write() tr.addpending(b'gitdirstate', writeinner) else: self.git.index.write() def pathto(self, f: bytes, cwd: Optional[bytes] = None) -> bytes: if cwd is None: cwd = self.getcwd() # TODO core dirstate does something about slashes here assert isinstance(f, bytes) r = util.pathto(self._root, cwd, f) return r def matches(self, match: matchmod.basematcher) -> Iterable[bytes]: for x in self.git.index: p = pycompat.fsencode(x.path) if match(p): yield p # TODO: return list instead of yielding? def set_clean(self, f, parentfiledata): """Mark a file normal and clean.""" # TODO: for now we just let libgit2 re-stat the file. We can # clearly do better. def set_possibly_dirty(self, f): """Mark a file normal, but possibly dirty.""" # TODO: for now we just let libgit2 re-stat the file. We can # clearly do better. def walk( self, match: matchmod.basematcher, subrepos: Any, unknown: bool, ignored: bool, full: bool = True, ) -> intdirstate.WalkReturnT: # TODO: we need to use .status() and not iterate the index, # because the index doesn't force a re-walk and so `hg add` of # a new file without an intervening call to status will # silently do nothing. r = {} cwd = self.getcwd() for path, status in self.git.status().items(): if path.startswith('.hg/'): continue path = pycompat.fsencode(path) if not match(path): continue # TODO construct the stat info from the status object? try: s = os.stat(os.path.join(cwd, path)) except FileNotFoundError: continue r[path] = s return r def set_tracked(self, f, reset_copy=False): # TODO: support copies and reset_copy=True uf = pycompat.fsdecode(f) if uf in self.git.index: return False index = self.git.index index.read() index.add(uf) index.write() return True def add(self, f): index = self.git.index index.read() index.add(pycompat.fsdecode(f)) index.write() def drop(self, f): index = self.git.index index.read() fs = pycompat.fsdecode(f) if fs in index: index.remove(fs) index.write() def set_untracked(self, f): index = self.git.index index.read() fs = pycompat.fsdecode(f) if fs in index: index.remove(fs) index.write() return True return False def remove(self, f): index = self.git.index index.read() index.remove(pycompat.fsdecode(f)) index.write() def copied(self, file: bytes) -> Optional[bytes]: # TODO: track copies? return None def prefetch_parents(self): # TODO pass def update_file(self, *args, **kwargs): # TODO pass @contextlib.contextmanager def changing_parents(self, repo): # TODO: track this maybe? yield def addparentchangecallback( self, category: bytes, callback: intdirstate.AddParentChangeCallbackT ) -> None: # TODO: should this be added to the dirstate interface? self._plchangecallbacks[category] = callback def setbranch( self, branch: bytes, transaction: Optional[intdirstate.TransactionT] ) -> None: raise error.Abort( b'git repos do not support branches. try using bookmarks' )