# exchangev2.py - repository exchange for wire protocol version 2
#
# Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.

from __future__ import absolute_import

import collections
import weakref

from .i18n import _
from .node import (
    nullid,
    short,
)
from . import (
    bookmarks,
    error,
    mdiff,
    narrowspec,
    phases,
    pycompat,
    setdiscovery,
)
from .interfaces import repository


def pull(pullop):
    """Pull using wire protocol version 2."""
    repo = pullop.repo
    remote = pullop.remote

    usingrawchangelogandmanifest = _checkuserawstorefiledata(pullop)

    # If this is a clone and it was requested to perform a "stream clone",
    # we obtain the raw files data from the remote then fall back to an
    # incremental pull. This is somewhat hacky and is not nearly robust enough
    # for long-term usage.
    if usingrawchangelogandmanifest:
        with repo.transaction(b'clone'):
            _fetchrawstorefiles(repo, remote)
            repo.invalidate(clearfilecache=True)

    tr = pullop.trmanager.transaction()

    # We don't use the repo's narrow matcher here because the patterns passed
    # to exchange.pull() could be different.
    narrowmatcher = narrowspec.match(
        repo.root,
        # Empty maps to nevermatcher. So always
        # set includes if missing.
        pullop.includepats or {b'path:.'},
        pullop.excludepats,
    )

    if pullop.includepats or pullop.excludepats:
        pathfilter = {}
        if pullop.includepats:
            pathfilter[b'include'] = sorted(pullop.includepats)
        if pullop.excludepats:
            pathfilter[b'exclude'] = sorted(pullop.excludepats)
    else:
        pathfilter = None

    # Figure out what needs to be fetched.
    common, fetch, remoteheads = _pullchangesetdiscovery(
        repo, remote, pullop.heads, abortwhenunrelated=pullop.force
    )

    # And fetch the data.
    pullheads = pullop.heads or remoteheads
    csetres = _fetchchangesets(repo, tr, remote, common, fetch, pullheads)

    # New revisions are written to the changelog. But all other updates
    # are deferred. Do those now.

    # Ensure all new changesets are draft by default. If the repo is
    # publishing, the phase will be adjusted by the loop below.
    if csetres[b'added']:
        phases.registernew(repo, tr, phases.draft, csetres[b'added'])

    # And adjust the phase of all changesets accordingly.
    for phasenumber, phase in phases.phasenames.items():
        if phase == b'secret' or not csetres[b'nodesbyphase'][phase]:
            continue

        phases.advanceboundary(
            repo, tr, phasenumber, csetres[b'nodesbyphase'][phase],
        )

    # Write bookmark updates.
    bookmarks.updatefromremote(
        repo.ui,
        repo,
        csetres[b'bookmarks'],
        remote.url(),
        pullop.gettransaction,
        explicit=pullop.explicitbookmarks,
    )

    manres = _fetchmanifests(repo, tr, remote, csetres[b'manifestnodes'])

    # We don't properly support shallow changeset and manifest yet. So we apply
    # depth limiting locally.
    if pullop.depth:
        relevantcsetnodes = set()
        clnode = repo.changelog.node

        for rev in repo.revs(
            b'ancestors(%ln, %s)', pullheads, pullop.depth - 1
        ):
            relevantcsetnodes.add(clnode(rev))

        csetrelevantfilter = lambda n: n in relevantcsetnodes

    else:
        csetrelevantfilter = lambda n: True

    # If obtaining the raw store files, we need to scan the full repo to
    # derive all the changesets, manifests, and linkrevs.
    if usingrawchangelogandmanifest:
        csetsforfiles = []
        mnodesforfiles = []
        manifestlinkrevs = {}

        for rev in repo:
            ctx = repo[rev]
            node = ctx.node()

            if not csetrelevantfilter(node):
                continue

            mnode = ctx.manifestnode()

            csetsforfiles.append(node)
            mnodesforfiles.append(mnode)
            manifestlinkrevs[mnode] = rev

    else:
        csetsforfiles = [n for n in csetres[b'added'] if csetrelevantfilter(n)]
        mnodesforfiles = manres[b'added']
        manifestlinkrevs = manres[b'linkrevs']

    # Find all file nodes referenced by added manifests and fetch those
    # revisions.
    fnodes = _derivefilesfrommanifests(repo, narrowmatcher, mnodesforfiles)
    _fetchfilesfromcsets(
        repo,
        tr,
        remote,
        pathfilter,
        fnodes,
        csetsforfiles,
        manifestlinkrevs,
        shallow=bool(pullop.depth),
    )


def _checkuserawstorefiledata(pullop):
    """Check whether we should use rawstorefiledata command to retrieve data."""

    repo = pullop.repo
    remote = pullop.remote

    # Command to obtain raw store data isn't available.
    if b'rawstorefiledata' not in remote.apidescriptor[b'commands']:
        return False

    # Only honor if user requested stream clone operation.
    if not pullop.streamclonerequested:
        return False

    # Only works on empty repos.
    if len(repo):
        return False

    # TODO This is super hacky. There needs to be a storage API for this. We
    # also need to check for compatibility with the remote.
    if b'revlogv1' not in repo.requirements:
        return False

    return True


def _fetchrawstorefiles(repo, remote):
    with remote.commandexecutor() as e:
        objs = e.callcommand(
            b'rawstorefiledata', {b'files': [b'changelog', b'manifestlog'],}
        ).result()

        # First object is a summary of files data that follows.
        overall = next(objs)

        progress = repo.ui.makeprogress(
            _(b'clone'), total=overall[b'totalsize'], unit=_(b'bytes')
        )
        with progress:
            progress.update(0)

            # Next are pairs of file metadata, data.
            while True:
                try:
                    filemeta = next(objs)
                except StopIteration:
                    break

                for k in (b'location', b'path', b'size'):
                    if k not in filemeta:
                        raise error.Abort(
                            _(b'remote file data missing key: %s') % k
                        )

                if filemeta[b'location'] == b'store':
                    vfs = repo.svfs
                else:
                    raise error.Abort(
                        _(b'invalid location for raw file data: %s')
                        % filemeta[b'location']
                    )

                bytesremaining = filemeta[b'size']

                with vfs.open(filemeta[b'path'], b'wb') as fh:
                    while True:
                        try:
                            chunk = next(objs)
                        except StopIteration:
                            break

                        bytesremaining -= len(chunk)

                        if bytesremaining < 0:
                            raise error.Abort(
                                _(
                                    b'received invalid number of bytes for file '
                                    b'data; expected %d, got extra'
                                )
                                % filemeta[b'size']
                            )

                        progress.increment(step=len(chunk))
                        fh.write(chunk)

                        try:
                            if chunk.islast:
                                break
                        except AttributeError:
                            raise error.Abort(
                                _(
                                    b'did not receive indefinite length bytestring '
                                    b'for file data'
                                )
                            )

                if bytesremaining:
                    raise error.Abort(
                        _(
                            b'received invalid number of bytes for'
                            b'file data; expected %d got %d'
                        )
                        % (
                            filemeta[b'size'],
                            filemeta[b'size'] - bytesremaining,
                        )
                    )


def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True):
    """Determine which changesets need to be pulled."""

    if heads:
        knownnode = repo.changelog.hasnode
        if all(knownnode(head) for head in heads):
            return heads, False, heads

    # TODO wire protocol version 2 is capable of more efficient discovery
    # than setdiscovery. Consider implementing something better.
    common, fetch, remoteheads = setdiscovery.findcommonheads(
        repo.ui, repo, remote, abortwhenunrelated=abortwhenunrelated
    )

    common = set(common)
    remoteheads = set(remoteheads)

    # If a remote head is filtered locally, put it back in the common set.
    # See the comment in exchange._pulldiscoverychangegroup() for more.

    if fetch and remoteheads:
        has_node = repo.unfiltered().changelog.index.has_node

        common |= {head for head in remoteheads if has_node(head)}

        if set(remoteheads).issubset(common):
            fetch = []

    common.discard(nullid)

    return common, fetch, remoteheads


def _fetchchangesets(repo, tr, remote, common, fetch, remoteheads):
    # TODO consider adding a step here where we obtain the DAG shape first
    # (or ask the server to slice changesets into chunks for us) so that
    # we can perform multiple fetches in batches. This will facilitate
    # resuming interrupted clones, higher server-side cache hit rates due
    # to smaller segments, etc.
    with remote.commandexecutor() as e:
        objs = e.callcommand(
            b'changesetdata',
            {
                b'revisions': [
                    {
                        b'type': b'changesetdagrange',
                        b'roots': sorted(common),
                        b'heads': sorted(remoteheads),
                    }
                ],
                b'fields': {b'bookmarks', b'parents', b'phase', b'revision'},
            },
        ).result()

        # The context manager waits on all response data when exiting. So
        # we need to remain in the context manager in order to stream data.
        return _processchangesetdata(repo, tr, objs)


def _processchangesetdata(repo, tr, objs):
    repo.hook(b'prechangegroup', throw=True, **pycompat.strkwargs(tr.hookargs))

    urepo = repo.unfiltered()
    cl = urepo.changelog

    cl.delayupdate(tr)

    # The first emitted object is a header describing the data that
    # follows.
    meta = next(objs)

    progress = repo.ui.makeprogress(
        _(b'changesets'), unit=_(b'chunks'), total=meta.get(b'totalitems')
    )

    manifestnodes = {}

    def linkrev(node):
        repo.ui.debug(b'add changeset %s\n' % short(node))
        # Linkrev for changelog is always self.
        return len(cl)

    def onchangeset(cl, node):
        progress.increment()

        revision = cl.changelogrevision(node)

        # We need to preserve the mapping of changelog revision to node
        # so we can set the linkrev accordingly when manifests are added.
        manifestnodes[cl.rev(node)] = revision.manifest

    nodesbyphase = {phase: set() for phase in phases.phasenames.values()}
    remotebookmarks = {}

    # addgroup() expects a 7-tuple describing revisions. This normalizes
    # the wire data to that format.
    #
    # This loop also aggregates non-revision metadata, such as phase
    # data.
    def iterrevisions():
        for cset in objs:
            node = cset[b'node']

            if b'phase' in cset:
                nodesbyphase[cset[b'phase']].add(node)

            for mark in cset.get(b'bookmarks', []):
                remotebookmarks[mark] = node

            # TODO add mechanism for extensions to examine records so they
            # can siphon off custom data fields.

            extrafields = {}

            for field, size in cset.get(b'fieldsfollowing', []):
                extrafields[field] = next(objs)

            # Some entries might only be metadata only updates.
            if b'revision' not in extrafields:
                continue

            data = extrafields[b'revision']

            yield (
                node,
                cset[b'parents'][0],
                cset[b'parents'][1],
                # Linknode is always itself for changesets.
                cset[b'node'],
                # We always send full revisions. So delta base is not set.
                nullid,
                mdiff.trivialdiffheader(len(data)) + data,
                # Flags not yet supported.
                0,
            )

    added = cl.addgroup(
        iterrevisions(), linkrev, weakref.proxy(tr), addrevisioncb=onchangeset
    )

    progress.complete()

    return {
        b'added': added,
        b'nodesbyphase': nodesbyphase,
        b'bookmarks': remotebookmarks,
        b'manifestnodes': manifestnodes,
    }


def _fetchmanifests(repo, tr, remote, manifestnodes):
    rootmanifest = repo.manifestlog.getstorage(b'')

    # Some manifests can be shared between changesets. Filter out revisions
    # we already know about.
    fetchnodes = []
    linkrevs = {}
    seen = set()

    for clrev, node in sorted(pycompat.iteritems(manifestnodes)):
        if node in seen:
            continue

        try:
            rootmanifest.rev(node)
        except error.LookupError:
            fetchnodes.append(node)
            linkrevs[node] = clrev

        seen.add(node)

    # TODO handle tree manifests

    # addgroup() expects 7-tuple describing revisions. This normalizes
    # the wire data to that format.
    def iterrevisions(objs, progress):
        for manifest in objs:
            node = manifest[b'node']

            extrafields = {}

            for field, size in manifest.get(b'fieldsfollowing', []):
                extrafields[field] = next(objs)

            if b'delta' in extrafields:
                basenode = manifest[b'deltabasenode']
                delta = extrafields[b'delta']
            elif b'revision' in extrafields:
                basenode = nullid
                revision = extrafields[b'revision']
                delta = mdiff.trivialdiffheader(len(revision)) + revision
            else:
                continue

            yield (
                node,
                manifest[b'parents'][0],
                manifest[b'parents'][1],
                # The value passed in is passed to the lookup function passed
                # to addgroup(). We already have a map of manifest node to
                # changelog revision number. So we just pass in the
                # manifest node here and use linkrevs.__getitem__ as the
                # resolution function.
                node,
                basenode,
                delta,
                # Flags not yet supported.
                0,
            )

            progress.increment()

    progress = repo.ui.makeprogress(
        _(b'manifests'), unit=_(b'chunks'), total=len(fetchnodes)
    )

    commandmeta = remote.apidescriptor[b'commands'][b'manifestdata']
    batchsize = commandmeta.get(b'recommendedbatchsize', 10000)
    # TODO make size configurable on client?

    # We send commands 1 at a time to the remote. This is not the most
    # efficient because we incur a round trip at the end of each batch.
    # However, the existing frame-based reactor keeps consuming server
    # data in the background. And this results in response data buffering
    # in memory. This can consume gigabytes of memory.
    # TODO send multiple commands in a request once background buffering
    # issues are resolved.

    added = []

    for i in pycompat.xrange(0, len(fetchnodes), batchsize):
        batch = [node for node in fetchnodes[i : i + batchsize]]
        if not batch:
            continue

        with remote.commandexecutor() as e:
            objs = e.callcommand(
                b'manifestdata',
                {
                    b'tree': b'',
                    b'nodes': batch,
                    b'fields': {b'parents', b'revision'},
                    b'haveparents': True,
                },
            ).result()

            # Chomp off header object.
            next(objs)

            added.extend(
                rootmanifest.addgroup(
                    iterrevisions(objs, progress),
                    linkrevs.__getitem__,
                    weakref.proxy(tr),
                )
            )

    progress.complete()

    return {
        b'added': added,
        b'linkrevs': linkrevs,
    }


def _derivefilesfrommanifests(repo, matcher, manifestnodes):
    """Determine what file nodes are relevant given a set of manifest nodes.

    Returns a dict mapping file paths to dicts of file node to first manifest
    node.
    """
    ml = repo.manifestlog
    fnodes = collections.defaultdict(dict)

    progress = repo.ui.makeprogress(
        _(b'scanning manifests'), total=len(manifestnodes)
    )

    with progress:
        for manifestnode in manifestnodes:
            m = ml.get(b'', manifestnode)

            # TODO this will pull in unwanted nodes because it takes the storage
            # delta into consideration. What we really want is something that
            # takes the delta between the manifest's parents. And ideally we
            # would ignore file nodes that are known locally. For now, ignore
            # both these limitations. This will result in incremental fetches
            # requesting data we already have. So this is far from ideal.
            md = m.readfast()

            for path, fnode in md.items():
                if matcher(path):
                    fnodes[path].setdefault(fnode, manifestnode)

            progress.increment()

    return fnodes


def _fetchfiles(repo, tr, remote, fnodes, linkrevs):
    """Fetch file data from explicit file revisions."""

    def iterrevisions(objs, progress):
        for filerevision in objs:
            node = filerevision[b'node']

            extrafields = {}

            for field, size in filerevision.get(b'fieldsfollowing', []):
                extrafields[field] = next(objs)

            if b'delta' in extrafields:
                basenode = filerevision[b'deltabasenode']
                delta = extrafields[b'delta']
            elif b'revision' in extrafields:
                basenode = nullid
                revision = extrafields[b'revision']
                delta = mdiff.trivialdiffheader(len(revision)) + revision
            else:
                continue

            yield (
                node,
                filerevision[b'parents'][0],
                filerevision[b'parents'][1],
                node,
                basenode,
                delta,
                # Flags not yet supported.
                0,
            )

            progress.increment()

    progress = repo.ui.makeprogress(
        _(b'files'),
        unit=_(b'chunks'),
        total=sum(len(v) for v in pycompat.itervalues(fnodes)),
    )

    # TODO make batch size configurable
    batchsize = 10000
    fnodeslist = [x for x in sorted(fnodes.items())]

    for i in pycompat.xrange(0, len(fnodeslist), batchsize):
        batch = [x for x in fnodeslist[i : i + batchsize]]
        if not batch:
            continue

        with remote.commandexecutor() as e:
            fs = []
            locallinkrevs = {}

            for path, nodes in batch:
                fs.append(
                    (
                        path,
                        e.callcommand(
                            b'filedata',
                            {
                                b'path': path,
                                b'nodes': sorted(nodes),
                                b'fields': {b'parents', b'revision'},
                                b'haveparents': True,
                            },
                        ),
                    )
                )

                locallinkrevs[path] = {
                    node: linkrevs[manifestnode]
                    for node, manifestnode in pycompat.iteritems(nodes)
                }

            for path, f in fs:
                objs = f.result()

                # Chomp off header objects.
                next(objs)

                store = repo.file(path)
                store.addgroup(
                    iterrevisions(objs, progress),
                    locallinkrevs[path].__getitem__,
                    weakref.proxy(tr),
                )


def _fetchfilesfromcsets(
    repo, tr, remote, pathfilter, fnodes, csets, manlinkrevs, shallow=False
):
    """Fetch file data from explicit changeset revisions."""

    def iterrevisions(objs, remaining, progress):
        while remaining:
            filerevision = next(objs)

            node = filerevision[b'node']

            extrafields = {}

            for field, size in filerevision.get(b'fieldsfollowing', []):
                extrafields[field] = next(objs)

            if b'delta' in extrafields:
                basenode = filerevision[b'deltabasenode']
                delta = extrafields[b'delta']
            elif b'revision' in extrafields:
                basenode = nullid
                revision = extrafields[b'revision']
                delta = mdiff.trivialdiffheader(len(revision)) + revision
            else:
                continue

            if b'linknode' in filerevision:
                linknode = filerevision[b'linknode']
            else:
                linknode = node

            yield (
                node,
                filerevision[b'parents'][0],
                filerevision[b'parents'][1],
                linknode,
                basenode,
                delta,
                # Flags not yet supported.
                0,
            )

            progress.increment()
            remaining -= 1

    progress = repo.ui.makeprogress(
        _(b'files'),
        unit=_(b'chunks'),
        total=sum(len(v) for v in pycompat.itervalues(fnodes)),
    )

    commandmeta = remote.apidescriptor[b'commands'][b'filesdata']
    batchsize = commandmeta.get(b'recommendedbatchsize', 50000)

    shallowfiles = repository.REPO_FEATURE_SHALLOW_FILE_STORAGE in repo.features
    fields = {b'parents', b'revision'}
    clrev = repo.changelog.rev

    # There are no guarantees that we'll have ancestor revisions if
    # a) this repo has shallow file storage b) shallow data fetching is enabled.
    # Force remote to not delta against possibly unknown revisions when these
    # conditions hold.
    haveparents = not (shallowfiles or shallow)

    # Similarly, we may not have calculated linkrevs for all incoming file
    # revisions. Ask the remote to do work for us in this case.
    if not haveparents:
        fields.add(b'linknode')

    for i in pycompat.xrange(0, len(csets), batchsize):
        batch = [x for x in csets[i : i + batchsize]]
        if not batch:
            continue

        with remote.commandexecutor() as e:
            args = {
                b'revisions': [
                    {b'type': b'changesetexplicit', b'nodes': batch,}
                ],
                b'fields': fields,
                b'haveparents': haveparents,
            }

            if pathfilter:
                args[b'pathfilter'] = pathfilter

            objs = e.callcommand(b'filesdata', args).result()

            # First object is an overall header.
            overall = next(objs)

            # We have overall['totalpaths'] segments.
            for i in pycompat.xrange(overall[b'totalpaths']):
                header = next(objs)

                path = header[b'path']
                store = repo.file(path)

                linkrevs = {
                    fnode: manlinkrevs[mnode]
                    for fnode, mnode in pycompat.iteritems(fnodes[path])
                }

                def getlinkrev(node):
                    if node in linkrevs:
                        return linkrevs[node]
                    else:
                        return clrev(node)

                store.addgroup(
                    iterrevisions(objs, header[b'totalitems'], progress),
                    getlinkrev,
                    weakref.proxy(tr),
                    maybemissingparents=shallow,
                )