##// END OF EJS Templates
tests: use `--no-cache-dir` with `pip`...
tests: use `--no-cache-dir` with `pip` After 1a09563a615c, there's one more wheel that gets cached in the user's pip cache in the macOS CI runner. The wheel corresponds to the version being used for the tests, but it doesn't get cached until the 3rd or 4th test shard is run, so it's not an issue with installing to run the tests. This seems to eliminate that. This doesn't seem to be an issue on Windows or Linux in my setup. Windows not being affected is likely because we set `$USERPROFILE` to redirect the home directory to `$TESTTMP` when running tests, since 08fd76a553c9. (When checking with `"$PYTHON" -m pip cache dir`, it points to `$TESTTMP/pip/cache`.) We do also set `$HOME` to this same location when running posix tests, but I can't tell what's going on locally in Linux, because running `pip` directly in the *.t explodes, and `"$PYTHON" -m pip --version` prints `pip 9.0.1 from /usr/lib/python3/dist-packages`, so that's likely before caching was enabled[1]. Running `python3.8 -m pip --version` locally outside of the *.t (the same version used to invoke the test runner), prints `pip 24.2 from /home/mharbison/.local/lib/python3.8/site-packages/pip (python 3.8)`. In CI, both macOS and Linux print a modern version of `pip`, and list the cache as being under `$TESTTMP`, but then it doesn't end up there on macOS. No idea if it is a pip bug, or what. But let's be explict and disable caching. [1] https://github.com/pypa/pip/blob/fe0925b3c00bf8956a0d33408df692ac364217d4/docs/html/topics/caching.md?plain=1#L37

File last commit:

r52596:ca7bde5d default
r53221:519a997b stable
Show More
hgclient.py
161 lines | 3.9 KiB | text/x-python | PythonLexer
# A minimal client for Mercurial's command server
import io
import os
import re
import signal
import socket
import struct
import subprocess
import sys
import time
if sys.version_info[0] >= 3:
stdout = sys.stdout.buffer
stderr = sys.stderr.buffer
stringio = io.BytesIO
def bprint(*args):
# remove b'' as well for ease of test migration
pargs = [re.sub(br'''\bb(['"])''', br'\1', b'%s' % a) for a in args]
stdout.write(b' '.join(pargs) + b'\n')
else:
import cStringIO
stdout = sys.stdout
stderr = sys.stderr
stringio = cStringIO.StringIO
bprint = print
def connectpipe(path=None, extraargs=()):
cmdline = [b'hg', b'serve', b'--cmdserver', b'pipe']
if path:
cmdline += [b'-R', path]
cmdline.extend(extraargs)
def tonative(cmdline):
if os.name != 'nt':
return cmdline
return [arg.decode("utf-8") for arg in cmdline]
server = subprocess.Popen(
tonative(cmdline), stdin=subprocess.PIPE, stdout=subprocess.PIPE
)
return server
class unixconnection:
def __init__(self, sockpath):
self.sock = sock = socket.socket(socket.AF_UNIX)
sock.connect(sockpath)
self.stdin = sock.makefile('wb')
self.stdout = sock.makefile('rb')
def wait(self):
self.stdin.close()
self.stdout.close()
self.sock.close()
class unixserver:
def __init__(self, sockpath, logpath=None, repopath=None):
self.sockpath = sockpath
cmdline = [b'hg', b'serve', b'--cmdserver', b'unix', b'-a', sockpath]
if repopath:
cmdline += [b'-R', repopath]
if logpath:
stdout = open(logpath, 'a')
stderr = subprocess.STDOUT
else:
stdout = stderr = None
self.server = subprocess.Popen(cmdline, stdout=stdout, stderr=stderr)
# wait for listen()
while self.server.poll() is None:
if os.path.exists(sockpath):
break
time.sleep(0.1)
def connect(self):
return unixconnection(self.sockpath)
def shutdown(self):
os.kill(self.server.pid, signal.SIGTERM)
self.server.wait()
def writeblock(server, data):
server.stdin.write(struct.pack(b'>I', len(data)))
server.stdin.write(data)
server.stdin.flush()
def readchannel(server):
data = server.stdout.read(5)
if not data:
raise EOFError
channel, length = struct.unpack('>cI', data)
if channel in b'IL':
return channel, length
else:
return channel, server.stdout.read(length)
def sep(text):
return text.replace(b'\\', b'/')
def runcommand(
server, args, output=stdout, error=stderr, input=None, outfilter=lambda x: x
):
bprint(b'*** runcommand', b' '.join(args))
stdout.flush()
server.stdin.write(b'runcommand\n')
writeblock(server, b'\0'.join(args))
if not input:
input = stringio()
while True:
ch, data = readchannel(server)
if ch == b'o':
output.write(outfilter(data))
output.flush()
elif ch == b'e':
error.write(data)
error.flush()
elif ch == b'I':
writeblock(server, input.read(data))
elif ch == b'L':
writeblock(server, input.readline(data))
elif ch == b'm':
bprint(b"message: %r" % data)
elif ch == b'r':
(ret,) = struct.unpack('>i', data)
if ret != 0:
bprint(b' [%d]' % ret)
return ret
else:
bprint(b"unexpected channel %c: %r" % (ch, data))
if ch.isupper():
return
def check(func, connect=connectpipe):
stdout.flush()
server = connect()
try:
return func(server)
finally:
server.stdin.close()
server.wait()
def checkwith(connect=connectpipe, **kwargs):
def wrap(func):
return check(func, lambda: connect(**kwargs))
return wrap