|
|
# Copyright 2005, 2006 Benoit Boissinot <benoit.boissinot@ens-lyon.org>
|
|
|
#
|
|
|
# This software may be used and distributed according to the terms of the
|
|
|
# GNU General Public License version 2 or any later version.
|
|
|
|
|
|
'''commands to sign and verify changesets'''
|
|
|
|
|
|
import os, tempfile, binascii
|
|
|
from mercurial import util, commands, match, cmdutil
|
|
|
from mercurial import node as hgnode
|
|
|
from mercurial.i18n import _
|
|
|
|
|
|
cmdtable = {}
|
|
|
command = cmdutil.command(cmdtable)
|
|
|
# Note for extension authors: ONLY specify testedwith = 'internal' for
|
|
|
# extensions which SHIP WITH MERCURIAL. Non-mainline extensions should
|
|
|
# be specifying the version(s) of Mercurial they are tested with, or
|
|
|
# leave the attribute unspecified.
|
|
|
testedwith = 'internal'
|
|
|
|
|
|
class gpg(object):
|
|
|
def __init__(self, path, key=None):
|
|
|
self.path = path
|
|
|
self.key = (key and " --local-user \"%s\"" % key) or ""
|
|
|
|
|
|
def sign(self, data):
|
|
|
gpgcmd = "%s --sign --detach-sign%s" % (self.path, self.key)
|
|
|
return util.filter(data, gpgcmd)
|
|
|
|
|
|
def verify(self, data, sig):
|
|
|
""" returns of the good and bad signatures"""
|
|
|
sigfile = datafile = None
|
|
|
try:
|
|
|
# create temporary files
|
|
|
fd, sigfile = tempfile.mkstemp(prefix="hg-gpg-", suffix=".sig")
|
|
|
fp = os.fdopen(fd, 'wb')
|
|
|
fp.write(sig)
|
|
|
fp.close()
|
|
|
fd, datafile = tempfile.mkstemp(prefix="hg-gpg-", suffix=".txt")
|
|
|
fp = os.fdopen(fd, 'wb')
|
|
|
fp.write(data)
|
|
|
fp.close()
|
|
|
gpgcmd = ("%s --logger-fd 1 --status-fd 1 --verify "
|
|
|
"\"%s\" \"%s\"" % (self.path, sigfile, datafile))
|
|
|
ret = util.filter("", gpgcmd)
|
|
|
finally:
|
|
|
for f in (sigfile, datafile):
|
|
|
try:
|
|
|
if f:
|
|
|
os.unlink(f)
|
|
|
except OSError:
|
|
|
pass
|
|
|
keys = []
|
|
|
key, fingerprint = None, None
|
|
|
for l in ret.splitlines():
|
|
|
# see DETAILS in the gnupg documentation
|
|
|
# filter the logger output
|
|
|
if not l.startswith("[GNUPG:]"):
|
|
|
continue
|
|
|
l = l[9:]
|
|
|
if l.startswith("VALIDSIG"):
|
|
|
# fingerprint of the primary key
|
|
|
fingerprint = l.split()[10]
|
|
|
elif l.startswith("ERRSIG"):
|
|
|
key = l.split(" ", 3)[:2]
|
|
|
key.append("")
|
|
|
fingerprint = None
|
|
|
elif (l.startswith("GOODSIG") or
|
|
|
l.startswith("EXPSIG") or
|
|
|
l.startswith("EXPKEYSIG") or
|
|
|
l.startswith("BADSIG")):
|
|
|
if key is not None:
|
|
|
keys.append(key + [fingerprint])
|
|
|
key = l.split(" ", 2)
|
|
|
fingerprint = None
|
|
|
if key is not None:
|
|
|
keys.append(key + [fingerprint])
|
|
|
return keys
|
|
|
|
|
|
def newgpg(ui, **opts):
|
|
|
"""create a new gpg instance"""
|
|
|
gpgpath = ui.config("gpg", "cmd", "gpg")
|
|
|
gpgkey = opts.get('key')
|
|
|
if not gpgkey:
|
|
|
gpgkey = ui.config("gpg", "key", None)
|
|
|
return gpg(gpgpath, gpgkey)
|
|
|
|
|
|
def sigwalk(repo):
|
|
|
"""
|
|
|
walk over every sigs, yields a couple
|
|
|
((node, version, sig), (filename, linenumber))
|
|
|
"""
|
|
|
def parsefile(fileiter, context):
|
|
|
ln = 1
|
|
|
for l in fileiter:
|
|
|
if not l:
|
|
|
continue
|
|
|
yield (l.split(" ", 2), (context, ln))
|
|
|
ln += 1
|
|
|
|
|
|
# read the heads
|
|
|
fl = repo.file(".hgsigs")
|
|
|
for r in reversed(fl.heads()):
|
|
|
fn = ".hgsigs|%s" % hgnode.short(r)
|
|
|
for item in parsefile(fl.read(r).splitlines(), fn):
|
|
|
yield item
|
|
|
try:
|
|
|
# read local signatures
|
|
|
fn = "localsigs"
|
|
|
for item in parsefile(repo.vfs(fn), fn):
|
|
|
yield item
|
|
|
except IOError:
|
|
|
pass
|
|
|
|
|
|
def getkeys(ui, repo, mygpg, sigdata, context):
|
|
|
"""get the keys who signed a data"""
|
|
|
fn, ln = context
|
|
|
node, version, sig = sigdata
|
|
|
prefix = "%s:%d" % (fn, ln)
|
|
|
node = hgnode.bin(node)
|
|
|
|
|
|
data = node2txt(repo, node, version)
|
|
|
sig = binascii.a2b_base64(sig)
|
|
|
keys = mygpg.verify(data, sig)
|
|
|
|
|
|
validkeys = []
|
|
|
# warn for expired key and/or sigs
|
|
|
for key in keys:
|
|
|
if key[0] == "ERRSIG":
|
|
|
ui.write(_("%s Unknown key ID \"%s\"\n")
|
|
|
% (prefix, shortkey(ui, key[1][:15])))
|
|
|
continue
|
|
|
if key[0] == "BADSIG":
|
|
|
ui.write(_("%s Bad signature from \"%s\"\n") % (prefix, key[2]))
|
|
|
continue
|
|
|
if key[0] == "EXPSIG":
|
|
|
ui.write(_("%s Note: Signature has expired"
|
|
|
" (signed by: \"%s\")\n") % (prefix, key[2]))
|
|
|
elif key[0] == "EXPKEYSIG":
|
|
|
ui.write(_("%s Note: This key has expired"
|
|
|
" (signed by: \"%s\")\n") % (prefix, key[2]))
|
|
|
validkeys.append((key[1], key[2], key[3]))
|
|
|
return validkeys
|
|
|
|
|
|
@command("sigs", [], _('hg sigs'))
|
|
|
def sigs(ui, repo):
|
|
|
"""list signed changesets"""
|
|
|
mygpg = newgpg(ui)
|
|
|
revs = {}
|
|
|
|
|
|
for data, context in sigwalk(repo):
|
|
|
node, version, sig = data
|
|
|
fn, ln = context
|
|
|
try:
|
|
|
n = repo.lookup(node)
|
|
|
except KeyError:
|
|
|
ui.warn(_("%s:%d node does not exist\n") % (fn, ln))
|
|
|
continue
|
|
|
r = repo.changelog.rev(n)
|
|
|
keys = getkeys(ui, repo, mygpg, data, context)
|
|
|
if not keys:
|
|
|
continue
|
|
|
revs.setdefault(r, [])
|
|
|
revs[r].extend(keys)
|
|
|
for rev in sorted(revs, reverse=True):
|
|
|
for k in revs[rev]:
|
|
|
r = "%5d:%s" % (rev, hgnode.hex(repo.changelog.node(rev)))
|
|
|
ui.write("%-30s %s\n" % (keystr(ui, k), r))
|
|
|
|
|
|
@command("sigcheck", [], _('hg sigcheck REV'))
|
|
|
def check(ui, repo, rev):
|
|
|
"""verify all the signatures there may be for a particular revision"""
|
|
|
mygpg = newgpg(ui)
|
|
|
rev = repo.lookup(rev)
|
|
|
hexrev = hgnode.hex(rev)
|
|
|
keys = []
|
|
|
|
|
|
for data, context in sigwalk(repo):
|
|
|
node, version, sig = data
|
|
|
if node == hexrev:
|
|
|
k = getkeys(ui, repo, mygpg, data, context)
|
|
|
if k:
|
|
|
keys.extend(k)
|
|
|
|
|
|
if not keys:
|
|
|
ui.write(_("no valid signature for %s\n") % hgnode.short(rev))
|
|
|
return
|
|
|
|
|
|
# print summary
|
|
|
ui.write("%s is signed by:\n" % hgnode.short(rev))
|
|
|
for key in keys:
|
|
|
ui.write(" %s\n" % keystr(ui, key))
|
|
|
|
|
|
def keystr(ui, key):
|
|
|
"""associate a string to a key (username, comment)"""
|
|
|
keyid, user, fingerprint = key
|
|
|
comment = ui.config("gpg", fingerprint, None)
|
|
|
if comment:
|
|
|
return "%s (%s)" % (user, comment)
|
|
|
else:
|
|
|
return user
|
|
|
|
|
|
@command("sign",
|
|
|
[('l', 'local', None, _('make the signature local')),
|
|
|
('f', 'force', None, _('sign even if the sigfile is modified')),
|
|
|
('', 'no-commit', None, _('do not commit the sigfile after signing')),
|
|
|
('k', 'key', '',
|
|
|
_('the key id to sign with'), _('ID')),
|
|
|
('m', 'message', '',
|
|
|
_('use text as commit message'), _('TEXT')),
|
|
|
('e', 'edit', False, _('invoke editor on commit messages')),
|
|
|
] + commands.commitopts2,
|
|
|
_('hg sign [OPTION]... [REV]...'))
|
|
|
def sign(ui, repo, *revs, **opts):
|
|
|
"""add a signature for the current or given revision
|
|
|
|
|
|
If no revision is given, the parent of the working directory is used,
|
|
|
or tip if no revision is checked out.
|
|
|
|
|
|
See :hg:`help dates` for a list of formats valid for -d/--date.
|
|
|
"""
|
|
|
|
|
|
mygpg = newgpg(ui, **opts)
|
|
|
sigver = "0"
|
|
|
sigmessage = ""
|
|
|
|
|
|
date = opts.get('date')
|
|
|
if date:
|
|
|
opts['date'] = util.parsedate(date)
|
|
|
|
|
|
if revs:
|
|
|
nodes = [repo.lookup(n) for n in revs]
|
|
|
else:
|
|
|
nodes = [node for node in repo.dirstate.parents()
|
|
|
if node != hgnode.nullid]
|
|
|
if len(nodes) > 1:
|
|
|
raise util.Abort(_('uncommitted merge - please provide a '
|
|
|
'specific revision'))
|
|
|
if not nodes:
|
|
|
nodes = [repo.changelog.tip()]
|
|
|
|
|
|
for n in nodes:
|
|
|
hexnode = hgnode.hex(n)
|
|
|
ui.write(_("signing %d:%s\n") % (repo.changelog.rev(n),
|
|
|
hgnode.short(n)))
|
|
|
# build data
|
|
|
data = node2txt(repo, n, sigver)
|
|
|
sig = mygpg.sign(data)
|
|
|
if not sig:
|
|
|
raise util.Abort(_("error while signing"))
|
|
|
sig = binascii.b2a_base64(sig)
|
|
|
sig = sig.replace("\n", "")
|
|
|
sigmessage += "%s %s %s\n" % (hexnode, sigver, sig)
|
|
|
|
|
|
# write it
|
|
|
if opts['local']:
|
|
|
repo.vfs.append("localsigs", sigmessage)
|
|
|
return
|
|
|
|
|
|
if not opts["force"]:
|
|
|
msigs = match.exact(repo.root, '', ['.hgsigs'])
|
|
|
if any(repo.status(match=msigs, unknown=True, ignored=True)):
|
|
|
raise util.Abort(_("working copy of .hgsigs is changed "),
|
|
|
hint=_("please commit .hgsigs manually"))
|
|
|
|
|
|
sigsfile = repo.wfile(".hgsigs", "ab")
|
|
|
sigsfile.write(sigmessage)
|
|
|
sigsfile.close()
|
|
|
|
|
|
if '.hgsigs' not in repo.dirstate:
|
|
|
repo[None].add([".hgsigs"])
|
|
|
|
|
|
if opts["no_commit"]:
|
|
|
return
|
|
|
|
|
|
message = opts['message']
|
|
|
if not message:
|
|
|
# we don't translate commit messages
|
|
|
message = "\n".join(["Added signature for changeset %s"
|
|
|
% hgnode.short(n)
|
|
|
for n in nodes])
|
|
|
try:
|
|
|
editor = cmdutil.getcommiteditor(editform='gpg.sign', **opts)
|
|
|
repo.commit(message, opts['user'], opts['date'], match=msigs,
|
|
|
editor=editor)
|
|
|
except ValueError as inst:
|
|
|
raise util.Abort(str(inst))
|
|
|
|
|
|
def shortkey(ui, key):
|
|
|
if len(key) != 16:
|
|
|
ui.debug("key ID \"%s\" format error\n" % key)
|
|
|
return key
|
|
|
|
|
|
return key[-8:]
|
|
|
|
|
|
def node2txt(repo, node, ver):
|
|
|
"""map a manifest into some text"""
|
|
|
if ver == "0":
|
|
|
return "%s\n" % hgnode.hex(node)
|
|
|
else:
|
|
|
raise util.Abort(_("unknown signature version"))
|
|
|
|