##// END OF EJS Templates
obsutil: sort metadata before comparing in geteffectflag()...
obsutil: sort metadata before comparing in geteffectflag() This is probably less important now that we dropped Python 2. We do still support Python 3.6 though, and the dictionaries aren't ordered there either (that was a big change that came with 3.7). Still, maybe it's a good idea to sort metadata explicitly.

File last commit:

r49801:642e31cb default
r52395:a5d8f261 stable
Show More
hgclient.py
162 lines | 3.9 KiB | text/x-python | PythonLexer
# A minimal client for Mercurial's command server
import io
import os
import re
import signal
import socket
import struct
import subprocess
import sys
import time
if sys.version_info[0] >= 3:
stdout = sys.stdout.buffer
stderr = sys.stderr.buffer
stringio = io.BytesIO
def bprint(*args):
# remove b'' as well for ease of test migration
pargs = [re.sub(br'''\bb(['"])''', br'\1', b'%s' % a) for a in args]
stdout.write(b' '.join(pargs) + b'\n')
else:
import cStringIO
stdout = sys.stdout
stderr = sys.stderr
stringio = cStringIO.StringIO
bprint = print
def connectpipe(path=None, extraargs=()):
cmdline = [b'hg', b'serve', b'--cmdserver', b'pipe']
if path:
cmdline += [b'-R', path]
cmdline.extend(extraargs)
def tonative(cmdline):
if os.name != 'nt':
return cmdline
return [arg.decode("utf-8") for arg in cmdline]
server = subprocess.Popen(
tonative(cmdline), stdin=subprocess.PIPE, stdout=subprocess.PIPE
)
return server
class unixconnection:
def __init__(self, sockpath):
self.sock = sock = socket.socket(socket.AF_UNIX)
sock.connect(sockpath)
self.stdin = sock.makefile('wb')
self.stdout = sock.makefile('rb')
def wait(self):
self.stdin.close()
self.stdout.close()
self.sock.close()
class unixserver:
def __init__(self, sockpath, logpath=None, repopath=None):
self.sockpath = sockpath
cmdline = [b'hg', b'serve', b'--cmdserver', b'unix', b'-a', sockpath]
if repopath:
cmdline += [b'-R', repopath]
if logpath:
stdout = open(logpath, 'a')
stderr = subprocess.STDOUT
else:
stdout = stderr = None
self.server = subprocess.Popen(cmdline, stdout=stdout, stderr=stderr)
# wait for listen()
while self.server.poll() is None:
if os.path.exists(sockpath):
break
time.sleep(0.1)
def connect(self):
return unixconnection(self.sockpath)
def shutdown(self):
os.kill(self.server.pid, signal.SIGTERM)
self.server.wait()
def writeblock(server, data):
server.stdin.write(struct.pack(b'>I', len(data)))
server.stdin.write(data)
server.stdin.flush()
def readchannel(server):
data = server.stdout.read(5)
if not data:
raise EOFError
channel, length = struct.unpack('>cI', data)
if channel in b'IL':
return channel, length
else:
return channel, server.stdout.read(length)
def sep(text):
return text.replace(b'\\', b'/')
def runcommand(
server, args, output=stdout, error=stderr, input=None, outfilter=lambda x: x
):
bprint(b'*** runcommand', b' '.join(args))
stdout.flush()
server.stdin.write(b'runcommand\n')
writeblock(server, b'\0'.join(args))
if not input:
input = stringio()
while True:
ch, data = readchannel(server)
if ch == b'o':
output.write(outfilter(data))
output.flush()
elif ch == b'e':
error.write(data)
error.flush()
elif ch == b'I':
writeblock(server, input.read(data))
elif ch == b'L':
writeblock(server, input.readline(data))
elif ch == b'm':
bprint(b"message: %r" % data)
elif ch == b'r':
(ret,) = struct.unpack('>i', data)
if ret != 0:
bprint(b' [%d]' % ret)
return ret
else:
bprint(b"unexpected channel %c: %r" % (ch, data))
if ch.isupper():
return
def check(func, connect=connectpipe):
stdout.flush()
server = connect()
try:
return func(server)
finally:
server.stdin.close()
server.wait()
def checkwith(connect=connectpipe, **kwargs):
def wrap(func):
return check(func, lambda: connect(**kwargs))
return wrap