# bundlerepo.py - repository class for viewing uncompressed bundles
#
# Copyright 2006, 2007 Benoit Boissinot <bboissin@gmail.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.

"""Repository class for viewing uncompressed bundles.

This provides a read-only repository interface to bundles as if they
were part of the actual repository.
"""

from __future__ import absolute_import

import os
import shutil
import tempfile

from .i18n import _
from .node import nullid

from . import (
    bundle2,
    changegroup,
    changelog,
    cmdutil,
    discovery,
    error,
    exchange,
    filelog,
    localrepo,
    manifest,
    mdiff,
    node as nodemod,
    pathutil,
    phases,
    pycompat,
    revlog,
    util,
    vfs as vfsmod,
)

class bundlerevlog(revlog.revlog):
    def __init__(self, opener, indexfile, cgunpacker, linkmapper):
        # How it works:
        # To retrieve a revision, we need to know the offset of the revision in
        # the bundle (an unbundle object). We store this offset in the index
        # (start). The base of the delta is stored in the base field.
        #
        # To differentiate a rev in the bundle from a rev in the revlog, we
        # check revision against repotiprev.
        opener = vfsmod.readonlyvfs(opener)
        revlog.revlog.__init__(self, opener, indexfile)
        self.bundle = cgunpacker
        n = len(self)
        self.repotiprev = n - 1
        self.bundlerevs = set() # used by 'bundle()' revset expression
        for deltadata in cgunpacker.deltaiter():
            node, p1, p2, cs, deltabase, delta, flags = deltadata

            size = len(delta)
            start = cgunpacker.tell() - size

            link = linkmapper(cs)
            if node in self.nodemap:
                # this can happen if two branches make the same change
                self.bundlerevs.add(self.nodemap[node])
                continue

            for p in (p1, p2):
                if p not in self.nodemap:
                    raise error.LookupError(p, self.indexfile,
                                            _("unknown parent"))

            if deltabase not in self.nodemap:
                raise LookupError(deltabase, self.indexfile,
                                  _('unknown delta base'))

            baserev = self.rev(deltabase)
            # start, size, full unc. size, base (unused), link, p1, p2, node
            e = (revlog.offset_type(start, flags), size, -1, baserev, link,
                 self.rev(p1), self.rev(p2), node)
            self.index.insert(-1, e)
            self.nodemap[node] = n
            self.bundlerevs.add(n)
            n += 1

    def _chunk(self, rev, df=None):
        # Warning: in case of bundle, the diff is against what we stored as
        # delta base, not against rev - 1
        # XXX: could use some caching
        if rev <= self.repotiprev:
            return revlog.revlog._chunk(self, rev)
        self.bundle.seek(self.start(rev))
        return self.bundle.read(self.length(rev))

    def revdiff(self, rev1, rev2):
        """return or calculate a delta between two revisions"""
        if rev1 > self.repotiprev and rev2 > self.repotiprev:
            # hot path for bundle
            revb = self.index[rev2][3]
            if revb == rev1:
                return self._chunk(rev2)
        elif rev1 <= self.repotiprev and rev2 <= self.repotiprev:
            return revlog.revlog.revdiff(self, rev1, rev2)

        return mdiff.textdiff(self.revision(rev1, raw=True),
                              self.revision(rev2, raw=True))

    def revision(self, nodeorrev, _df=None, raw=False):
        """return an uncompressed revision of a given node or revision
        number.
        """
        if isinstance(nodeorrev, int):
            rev = nodeorrev
            node = self.node(rev)
        else:
            node = nodeorrev
            rev = self.rev(node)

        if node == nullid:
            return ""

        rawtext = None
        chain = []
        iterrev = rev
        # reconstruct the revision if it is from a changegroup
        while iterrev > self.repotiprev:
            if self._cache and self._cache[1] == iterrev:
                rawtext = self._cache[2]
                break
            chain.append(iterrev)
            iterrev = self.index[iterrev][3]
        if rawtext is None:
            rawtext = self.baserevision(iterrev)

        while chain:
            delta = self._chunk(chain.pop())
            rawtext = mdiff.patches(rawtext, [delta])

        text, validatehash = self._processflags(rawtext, self.flags(rev),
                                                'read', raw=raw)
        if validatehash:
            self.checkhash(text, node, rev=rev)
        self._cache = (node, rev, rawtext)
        return text

    def baserevision(self, nodeorrev):
        # Revlog subclasses may override 'revision' method to modify format of
        # content retrieved from revlog. To use bundlerevlog with such class one
        # needs to override 'baserevision' and make more specific call here.
        return revlog.revlog.revision(self, nodeorrev, raw=True)

    def addrevision(self, *args, **kwargs):
        raise NotImplementedError

    def addgroup(self, *args, **kwargs):
        raise NotImplementedError

    def strip(self, *args, **kwargs):
        raise NotImplementedError

    def checksize(self):
        raise NotImplementedError

class bundlechangelog(bundlerevlog, changelog.changelog):
    def __init__(self, opener, cgunpacker):
        changelog.changelog.__init__(self, opener)
        linkmapper = lambda x: x
        bundlerevlog.__init__(self, opener, self.indexfile, cgunpacker,
                              linkmapper)

    def baserevision(self, nodeorrev):
        # Although changelog doesn't override 'revision' method, some extensions
        # may replace this class with another that does. Same story with
        # manifest and filelog classes.

        # This bypasses filtering on changelog.node() and rev() because we need
        # revision text of the bundle base even if it is hidden.
        oldfilter = self.filteredrevs
        try:
            self.filteredrevs = ()
            return changelog.changelog.revision(self, nodeorrev, raw=True)
        finally:
            self.filteredrevs = oldfilter

class bundlemanifest(bundlerevlog, manifest.manifestrevlog):
    def __init__(self, opener, cgunpacker, linkmapper, dirlogstarts=None,
                 dir=''):
        manifest.manifestrevlog.__init__(self, opener, dir=dir)
        bundlerevlog.__init__(self, opener, self.indexfile, cgunpacker,
                              linkmapper)
        if dirlogstarts is None:
            dirlogstarts = {}
            if self.bundle.version == "03":
                dirlogstarts = _getfilestarts(self.bundle)
        self._dirlogstarts = dirlogstarts
        self._linkmapper = linkmapper

    def baserevision(self, nodeorrev):
        node = nodeorrev
        if isinstance(node, int):
            node = self.node(node)

        if node in self.fulltextcache:
            result = '%s' % self.fulltextcache[node]
        else:
            result = manifest.manifestrevlog.revision(self, nodeorrev, raw=True)
        return result

    def dirlog(self, d):
        if d in self._dirlogstarts:
            self.bundle.seek(self._dirlogstarts[d])
            return bundlemanifest(
                self.opener, self.bundle, self._linkmapper,
                self._dirlogstarts, dir=d)
        return super(bundlemanifest, self).dirlog(d)

class bundlefilelog(filelog.filelog):
    def __init__(self, opener, path, cgunpacker, linkmapper):
        filelog.filelog.__init__(self, opener, path)
        self._revlog = bundlerevlog(opener, self.indexfile,
                                    cgunpacker, linkmapper)

    def baserevision(self, nodeorrev):
        return filelog.filelog.revision(self, nodeorrev, raw=True)

class bundlepeer(localrepo.localpeer):
    def canpush(self):
        return False

class bundlephasecache(phases.phasecache):
    def __init__(self, *args, **kwargs):
        super(bundlephasecache, self).__init__(*args, **kwargs)
        if util.safehasattr(self, 'opener'):
            self.opener = vfsmod.readonlyvfs(self.opener)

    def write(self):
        raise NotImplementedError

    def _write(self, fp):
        raise NotImplementedError

    def _updateroots(self, phase, newroots, tr):
        self.phaseroots[phase] = newroots
        self.invalidate()
        self.dirty = True

def _getfilestarts(cgunpacker):
    filespos = {}
    for chunkdata in iter(cgunpacker.filelogheader, {}):
        fname = chunkdata['filename']
        filespos[fname] = cgunpacker.tell()
        for chunk in iter(lambda: cgunpacker.deltachunk(None), {}):
            pass
    return filespos

class bundlerepository(localrepo.localrepository):
    """A repository instance that is a union of a local repo and a bundle.

    Instances represent a read-only repository composed of a local repository
    with the contents of a bundle file applied. The repository instance is
    conceptually similar to the state of a repository after an
    ``hg unbundle`` operation. However, the contents of the bundle are never
    applied to the actual base repository.
    """
    def __init__(self, ui, repopath, bundlepath):
        self._tempparent = None
        try:
            localrepo.localrepository.__init__(self, ui, repopath)
        except error.RepoError:
            self._tempparent = tempfile.mkdtemp()
            localrepo.instance(ui, self._tempparent, 1)
            localrepo.localrepository.__init__(self, ui, self._tempparent)
        self.ui.setconfig('phases', 'publish', False, 'bundlerepo')

        if repopath:
            self._url = 'bundle:' + util.expandpath(repopath) + '+' + bundlepath
        else:
            self._url = 'bundle:' + bundlepath

        self.tempfile = None
        f = util.posixfile(bundlepath, "rb")
        bundle = exchange.readbundle(ui, f, bundlepath)

        if isinstance(bundle, bundle2.unbundle20):
            self._bundlefile = bundle
            self._cgunpacker = None

            cgpart = None
            for part in bundle.iterparts(seekable=True):
                if part.type == 'changegroup':
                    if cgpart:
                        raise NotImplementedError("can't process "
                                                  "multiple changegroups")
                    cgpart = part

                self._handlebundle2part(bundle, part)

            if not cgpart:
                raise error.Abort(_("No changegroups found"))

            # This is required to placate a later consumer, which expects
            # the payload offset to be at the beginning of the changegroup.
            # We need to do this after the iterparts() generator advances
            # because iterparts() will seek to end of payload after the
            # generator returns control to iterparts().
            cgpart.seek(0, os.SEEK_SET)

        elif isinstance(bundle, changegroup.cg1unpacker):
            if bundle.compressed():
                f = self._writetempbundle(bundle.read, '.hg10un',
                                          header='HG10UN')
                bundle = exchange.readbundle(ui, f, bundlepath, self.vfs)

            self._bundlefile = bundle
            self._cgunpacker = bundle
        else:
            raise error.Abort(_('bundle type %s cannot be read') %
                              type(bundle))

        # dict with the mapping 'filename' -> position in the changegroup.
        self._cgfilespos = {}

        self.firstnewrev = self.changelog.repotiprev + 1
        phases.retractboundary(self, None, phases.draft,
                               [ctx.node() for ctx in self[self.firstnewrev:]])

    def _handlebundle2part(self, bundle, part):
        if part.type != 'changegroup':
            return

        cgstream = part
        version = part.params.get('version', '01')
        legalcgvers = changegroup.supportedincomingversions(self)
        if version not in legalcgvers:
            msg = _('Unsupported changegroup version: %s')
            raise error.Abort(msg % version)
        if bundle.compressed():
            cgstream = self._writetempbundle(part.read, '.cg%sun' % version)

        self._cgunpacker = changegroup.getunbundler(version, cgstream, 'UN')

    def _writetempbundle(self, readfn, suffix, header=''):
        """Write a temporary file to disk
        """
        fdtemp, temp = self.vfs.mkstemp(prefix="hg-bundle-",
                                        suffix=suffix)
        self.tempfile = temp

        with os.fdopen(fdtemp, r'wb') as fptemp:
            fptemp.write(header)
            while True:
                chunk = readfn(2**18)
                if not chunk:
                    break
                fptemp.write(chunk)

        return self.vfs.open(self.tempfile, mode="rb")

    @localrepo.unfilteredpropertycache
    def _phasecache(self):
        return bundlephasecache(self, self._phasedefaults)

    @localrepo.unfilteredpropertycache
    def changelog(self):
        # consume the header if it exists
        self._cgunpacker.changelogheader()
        c = bundlechangelog(self.svfs, self._cgunpacker)
        self.manstart = self._cgunpacker.tell()
        return c

    def _constructmanifest(self):
        self._cgunpacker.seek(self.manstart)
        # consume the header if it exists
        self._cgunpacker.manifestheader()
        linkmapper = self.unfiltered().changelog.rev
        m = bundlemanifest(self.svfs, self._cgunpacker, linkmapper)
        self.filestart = self._cgunpacker.tell()
        return m

    def _consumemanifest(self):
        """Consumes the manifest portion of the bundle, setting filestart so the
        file portion can be read."""
        self._cgunpacker.seek(self.manstart)
        self._cgunpacker.manifestheader()
        for delta in self._cgunpacker.deltaiter():
            pass
        self.filestart = self._cgunpacker.tell()

    @localrepo.unfilteredpropertycache
    def manstart(self):
        self.changelog
        return self.manstart

    @localrepo.unfilteredpropertycache
    def filestart(self):
        self.manifestlog

        # If filestart was not set by self.manifestlog, that means the
        # manifestlog implementation did not consume the manifests from the
        # changegroup (ex: it might be consuming trees from a separate bundle2
        # part instead). So we need to manually consume it.
        if r'filestart' not in self.__dict__:
            self._consumemanifest()

        return self.filestart

    def url(self):
        return self._url

    def file(self, f):
        if not self._cgfilespos:
            self._cgunpacker.seek(self.filestart)
            self._cgfilespos = _getfilestarts(self._cgunpacker)

        if f in self._cgfilespos:
            self._cgunpacker.seek(self._cgfilespos[f])
            linkmapper = self.unfiltered().changelog.rev
            return bundlefilelog(self.svfs, f, self._cgunpacker, linkmapper)
        else:
            return super(bundlerepository, self).file(f)

    def close(self):
        """Close assigned bundle file immediately."""
        self._bundlefile.close()
        if self.tempfile is not None:
            self.vfs.unlink(self.tempfile)
        if self._tempparent:
            shutil.rmtree(self._tempparent, True)

    def cancopy(self):
        return False

    def peer(self):
        return bundlepeer(self)

    def getcwd(self):
        return pycompat.getcwd() # always outside the repo

    # Check if parents exist in localrepo before setting
    def setparents(self, p1, p2=nullid):
        p1rev = self.changelog.rev(p1)
        p2rev = self.changelog.rev(p2)
        msg = _("setting parent to node %s that only exists in the bundle\n")
        if self.changelog.repotiprev < p1rev:
            self.ui.warn(msg % nodemod.hex(p1))
        if self.changelog.repotiprev < p2rev:
            self.ui.warn(msg % nodemod.hex(p2))
        return super(bundlerepository, self).setparents(p1, p2)

def instance(ui, path, create, intents=None):
    if create:
        raise error.Abort(_('cannot create new bundle repository'))
    # internal config: bundle.mainreporoot
    parentpath = ui.config("bundle", "mainreporoot")
    if not parentpath:
        # try to find the correct path to the working directory repo
        parentpath = cmdutil.findrepo(pycompat.getcwd())
        if parentpath is None:
            parentpath = ''
    if parentpath:
        # Try to make the full path relative so we get a nice, short URL.
        # In particular, we don't want temp dir names in test outputs.
        cwd = pycompat.getcwd()
        if parentpath == cwd:
            parentpath = ''
        else:
            cwd = pathutil.normasprefix(cwd)
            if parentpath.startswith(cwd):
                parentpath = parentpath[len(cwd):]
    u = util.url(path)
    path = u.localpath()
    if u.scheme == 'bundle':
        s = path.split("+", 1)
        if len(s) == 1:
            repopath, bundlename = parentpath, s[0]
        else:
            repopath, bundlename = s
    else:
        repopath, bundlename = parentpath, path
    return bundlerepository(ui, repopath, bundlename)

class bundletransactionmanager(object):
    def transaction(self):
        return None

    def close(self):
        raise NotImplementedError

    def release(self):
        raise NotImplementedError

def getremotechanges(ui, repo, peer, onlyheads=None, bundlename=None,
                     force=False):
    '''obtains a bundle of changes incoming from peer

    "onlyheads" restricts the returned changes to those reachable from the
      specified heads.
    "bundlename", if given, stores the bundle to this file path permanently;
      otherwise it's stored to a temp file and gets deleted again when you call
      the returned "cleanupfn".
    "force" indicates whether to proceed on unrelated repos.

    Returns a tuple (local, csets, cleanupfn):

    "local" is a local repo from which to obtain the actual incoming
      changesets; it is a bundlerepo for the obtained bundle when the
      original "peer" is remote.
    "csets" lists the incoming changeset node ids.
    "cleanupfn" must be called without arguments when you're done processing
      the changes; it closes both the original "peer" and the one returned
      here.
    '''
    tmp = discovery.findcommonincoming(repo, peer, heads=onlyheads,
                                       force=force)
    common, incoming, rheads = tmp
    if not incoming:
        try:
            if bundlename:
                os.unlink(bundlename)
        except OSError:
            pass
        return repo, [], peer.close

    commonset = set(common)
    rheads = [x for x in rheads if x not in commonset]

    bundle = None
    bundlerepo = None
    localrepo = peer.local()
    if bundlename or not localrepo:
        # create a bundle (uncompressed if peer repo is not local)

        # developer config: devel.legacy.exchange
        legexc = ui.configlist('devel', 'legacy.exchange')
        forcebundle1 = 'bundle2' not in legexc and 'bundle1' in legexc
        canbundle2 = (not forcebundle1
                      and peer.capable('getbundle')
                      and peer.capable('bundle2'))
        if canbundle2:
            with peer.commandexecutor() as e:
                b2 = e.callcommand('getbundle', {
                    'source': 'incoming',
                    'common': common,
                    'heads': rheads,
                    'bundlecaps': exchange.caps20to10(repo, role='client'),
                    'cg': True,
                }).result()

                fname = bundle = changegroup.writechunks(ui,
                                                         b2._forwardchunks(),
                                                         bundlename)
        else:
            if peer.capable('getbundle'):
                with peer.commandexecutor() as e:
                    cg = e.callcommand('getbundle', {
                        'source': 'incoming',
                        'common': common,
                        'heads': rheads,
                    }).result()
            elif onlyheads is None and not peer.capable('changegroupsubset'):
                # compat with older servers when pulling all remote heads

                with peer.commandexecutor() as e:
                    cg = e.callcommand('changegroup', {
                        'nodes': incoming,
                        'source': 'incoming',
                    }).result()

                rheads = None
            else:
                with peer.commandexecutor() as e:
                    cg = e.callcommand('changegroupsubset', {
                        'bases': incoming,
                        'heads': rheads,
                        'source': 'incoming',
                    }).result()

            if localrepo:
                bundletype = "HG10BZ"
            else:
                bundletype = "HG10UN"
            fname = bundle = bundle2.writebundle(ui, cg, bundlename,
                                                     bundletype)
        # keep written bundle?
        if bundlename:
            bundle = None
        if not localrepo:
            # use the created uncompressed bundlerepo
            localrepo = bundlerepo = bundlerepository(repo.baseui, repo.root,
                                                      fname)
            # this repo contains local and peer now, so filter out local again
            common = repo.heads()
    if localrepo:
        # Part of common may be remotely filtered
        # So use an unfiltered version
        # The discovery process probably need cleanup to avoid that
        localrepo = localrepo.unfiltered()

    csets = localrepo.changelog.findmissing(common, rheads)

    if bundlerepo:
        reponodes = [ctx.node() for ctx in bundlerepo[bundlerepo.firstnewrev:]]

        with peer.commandexecutor() as e:
            remotephases = e.callcommand('listkeys', {
                'namespace': 'phases',
            }).result()

        pullop = exchange.pulloperation(bundlerepo, peer, heads=reponodes)
        pullop.trmanager = bundletransactionmanager()
        exchange._pullapplyphases(pullop, remotephases)

    def cleanup():
        if bundlerepo:
            bundlerepo.close()
        if bundle:
            os.unlink(bundle)
        peer.close()

    return (localrepo, csets, cleanup)