##// END OF EJS Templates
util: avoid echoing the password to the console on Windows py3 (issue6446)...
util: avoid echoing the password to the console on Windows py3 (issue6446) The `getpass.getpass()` implementation on Windows first checks if `sys.stdin` and `sys.__stdin__` are the same object. It's not on py3 because the former is replaced in dispatch.py with something that doesn't normalize '\n' to '\r\n'. When they aren't the same object, it simply calls `sys.stdin.readline()` instead of the mscvrt functions that read the input characters before they are echoed. This simply copies the `getpass.win_getpass()` implementation without the stdin check, and byteifies around the edges. I'm not sure if there's a reasonable replacement for the check that we could implement. When echoing input into the hg command, the `ui.interactive()` check causes `ui.getpass()` to bail before getting here. If the proper config switches are used to bypass that and call this, the process stalls until '\n' is input into the console. So there could be a deadlock here when run by another command if the wrong config settings are applied. Differential Revision: https://phab.mercurial-scm.org/D10708

File last commit:

r43327:c5c502bd default
r47949:5b351317 stable
Show More
try_server.py
99 lines | 2.4 KiB | text/x-python | PythonLexer
# try_server.py - Interact with Try server
#
# Copyright 2019 Gregory Szorc <gregory.szorc@gmail.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
# no-check-code because Python 3 native.
import base64
import json
import os
import subprocess
import tempfile
from .aws import AWSConnection
LAMBDA_FUNCTION = "ci-try-server-upload"
def trigger_try(c: AWSConnection, rev="."):
"""Trigger a new Try run."""
lambda_client = c.session.client("lambda")
cset, bundle = generate_bundle(rev=rev)
payload = {
"bundle": base64.b64encode(bundle).decode("utf-8"),
"node": cset["node"],
"branch": cset["branch"],
"user": cset["user"],
"message": cset["desc"],
}
print("resolved revision:")
print("node: %s" % cset["node"])
print("branch: %s" % cset["branch"])
print("user: %s" % cset["user"])
print("desc: %s" % cset["desc"].splitlines()[0])
print()
print("sending to Try...")
res = lambda_client.invoke(
FunctionName=LAMBDA_FUNCTION,
InvocationType="RequestResponse",
Payload=json.dumps(payload).encode("utf-8"),
)
body = json.load(res["Payload"])
for message in body:
print("remote: %s" % message)
def generate_bundle(rev="."):
"""Generate a bundle suitable for use by the Try service.
Returns a tuple of revision metadata and raw Mercurial bundle data.
"""
# `hg bundle` doesn't support streaming to stdout. So we use a temporary
# file.
path = None
try:
fd, path = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
os.close(fd)
args = [
"hg",
"bundle",
"--type",
"gzip-v2",
"--base",
"public()",
"--rev",
rev,
path,
]
print("generating bundle...")
subprocess.run(args, check=True)
with open(path, "rb") as fh:
bundle_data = fh.read()
finally:
if path:
os.unlink(path)
args = [
"hg",
"log",
"-r",
rev,
# We have to upload as JSON, so it won't matter if we emit binary
# since we need to normalize to UTF-8.
"-T",
"json",
]
res = subprocess.run(args, check=True, capture_output=True)
return json.loads(res.stdout)[0], bundle_data