##// END OF EJS Templates
windows-ci: run Windows CI automatically alongside the others...
windows-ci: run Windows CI automatically alongside the others This will enable us to make Windows Python 3 a first-class citizen for the next 6.0 cycle. We will probably get some flaky tests and we're missing others that are skipped, but we'll turn them on it future patches. Differential Revision: https://phab.mercurial-scm.org/D11256

File last commit:

r43327:c5c502bd default
r48618:6ec89734 stable
Show More
try_server.py
99 lines | 2.4 KiB | text/x-python | PythonLexer
# try_server.py - Interact with Try server
#
# Copyright 2019 Gregory Szorc <gregory.szorc@gmail.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
# no-check-code because Python 3 native.
import base64
import json
import os
import subprocess
import tempfile
from .aws import AWSConnection
LAMBDA_FUNCTION = "ci-try-server-upload"
def trigger_try(c: AWSConnection, rev="."):
"""Trigger a new Try run."""
lambda_client = c.session.client("lambda")
cset, bundle = generate_bundle(rev=rev)
payload = {
"bundle": base64.b64encode(bundle).decode("utf-8"),
"node": cset["node"],
"branch": cset["branch"],
"user": cset["user"],
"message": cset["desc"],
}
print("resolved revision:")
print("node: %s" % cset["node"])
print("branch: %s" % cset["branch"])
print("user: %s" % cset["user"])
print("desc: %s" % cset["desc"].splitlines()[0])
print()
print("sending to Try...")
res = lambda_client.invoke(
FunctionName=LAMBDA_FUNCTION,
InvocationType="RequestResponse",
Payload=json.dumps(payload).encode("utf-8"),
)
body = json.load(res["Payload"])
for message in body:
print("remote: %s" % message)
def generate_bundle(rev="."):
"""Generate a bundle suitable for use by the Try service.
Returns a tuple of revision metadata and raw Mercurial bundle data.
"""
# `hg bundle` doesn't support streaming to stdout. So we use a temporary
# file.
path = None
try:
fd, path = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
os.close(fd)
args = [
"hg",
"bundle",
"--type",
"gzip-v2",
"--base",
"public()",
"--rev",
rev,
path,
]
print("generating bundle...")
subprocess.run(args, check=True)
with open(path, "rb") as fh:
bundle_data = fh.read()
finally:
if path:
os.unlink(path)
args = [
"hg",
"log",
"-r",
rev,
# We have to upload as JSON, so it won't matter if we emit binary
# since we need to normalize to UTF-8.
"-T",
"json",
]
res = subprocess.run(args, check=True, capture_output=True)
return json.loads(res.stdout)[0], bundle_data