##// END OF EJS Templates
revlog: guarantee that p1 != null if a non-null parent exists...
revlog: guarantee that p1 != null if a non-null parent exists This change does not affect the hashing (which already did this transformation), but can change the log output in the rare case where this behavior was observed in repositories. The change can simplify iteration code where regular changesets and merges are distinct branches. Differential Revision: https://phab.mercurial-scm.org/D10150

File last commit:

r43327:c5c502bd default
r47537:49fd21f3 default
Show More
try_server.py
99 lines | 2.4 KiB | text/x-python | PythonLexer
# try_server.py - Interact with Try server
#
# Copyright 2019 Gregory Szorc <gregory.szorc@gmail.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
# no-check-code because Python 3 native.
import base64
import json
import os
import subprocess
import tempfile
from .aws import AWSConnection
LAMBDA_FUNCTION = "ci-try-server-upload"
def trigger_try(c: AWSConnection, rev="."):
"""Trigger a new Try run."""
lambda_client = c.session.client("lambda")
cset, bundle = generate_bundle(rev=rev)
payload = {
"bundle": base64.b64encode(bundle).decode("utf-8"),
"node": cset["node"],
"branch": cset["branch"],
"user": cset["user"],
"message": cset["desc"],
}
print("resolved revision:")
print("node: %s" % cset["node"])
print("branch: %s" % cset["branch"])
print("user: %s" % cset["user"])
print("desc: %s" % cset["desc"].splitlines()[0])
print()
print("sending to Try...")
res = lambda_client.invoke(
FunctionName=LAMBDA_FUNCTION,
InvocationType="RequestResponse",
Payload=json.dumps(payload).encode("utf-8"),
)
body = json.load(res["Payload"])
for message in body:
print("remote: %s" % message)
def generate_bundle(rev="."):
"""Generate a bundle suitable for use by the Try service.
Returns a tuple of revision metadata and raw Mercurial bundle data.
"""
# `hg bundle` doesn't support streaming to stdout. So we use a temporary
# file.
path = None
try:
fd, path = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
os.close(fd)
args = [
"hg",
"bundle",
"--type",
"gzip-v2",
"--base",
"public()",
"--rev",
rev,
path,
]
print("generating bundle...")
subprocess.run(args, check=True)
with open(path, "rb") as fh:
bundle_data = fh.read()
finally:
if path:
os.unlink(path)
args = [
"hg",
"log",
"-r",
rev,
# We have to upload as JSON, so it won't matter if we emit binary
# since we need to normalize to UTF-8.
"-T",
"json",
]
res = subprocess.run(args, check=True, capture_output=True)
return json.loads(res.stdout)[0], bundle_data