# try_server.py - Interact with Try server # # Copyright 2019 Gregory Szorc # # This software may be used and distributed according to the terms of the # GNU General Public License version 2 or any later version. # no-check-code because Python 3 native. import base64 import json import os import subprocess import tempfile from .aws import AWSConnection LAMBDA_FUNCTION = "ci-try-server-upload" def trigger_try(c: AWSConnection, rev="."): """Trigger a new Try run.""" lambda_client = c.session.client("lambda") cset, bundle = generate_bundle(rev=rev) payload = { "bundle": base64.b64encode(bundle).decode("utf-8"), "node": cset["node"], "branch": cset["branch"], "user": cset["user"], "message": cset["desc"], } print("resolved revision:") print("node: %s" % cset["node"]) print("branch: %s" % cset["branch"]) print("user: %s" % cset["user"]) print("desc: %s" % cset["desc"].splitlines()[0]) print() print("sending to Try...") res = lambda_client.invoke( FunctionName=LAMBDA_FUNCTION, InvocationType="RequestResponse", Payload=json.dumps(payload).encode("utf-8"), ) body = json.load(res["Payload"]) for message in body: print("remote: %s" % message) def generate_bundle(rev="."): """Generate a bundle suitable for use by the Try service. Returns a tuple of revision metadata and raw Mercurial bundle data. """ # `hg bundle` doesn't support streaming to stdout. So we use a temporary # file. path = None try: fd, path = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg") os.close(fd) args = [ "hg", "bundle", "--type", "gzip-v2", "--base", "public()", "--rev", rev, path, ] print("generating bundle...") subprocess.run(args, check=True) with open(path, "rb") as fh: bundle_data = fh.read() finally: if path: os.unlink(path) args = [ "hg", "log", "-r", rev, # We have to upload as JSON, so it won't matter if we emit binary # since we need to normalize to UTF-8. "-T", "json", ] res = subprocess.run(args, check=True, capture_output=True) return json.loads(res.stdout)[0], bundle_data