##// END OF EJS Templates
merge: mark file gets as not thread safe (issue5933)...
merge: mark file gets as not thread safe (issue5933) In default installs, this has the effect of disabling the thread-based worker on Windows when manifesting files in the working directory. My measurements have shown that with revlog-based repositories, Mercurial spends a lot of CPU time in revlog code resolving file data. This ends up incurring a lot of context switching across threads and slows down `hg update` operations when going from an empty working directory to the tip of the repo. On mozilla-unified (246,351 files) on an i7-6700K (4+4 CPUs): before: 487s wall after: 360s wall (equivalent to worker.enabled=false) cpus=2: 379s wall Even with only 2 threads, the thread pool is still slower. The introduction of the thread-based worker (02b36e860e0b) states that it resulted in a "~50%" speedup for `hg sparse --enable-profile` and `hg sparse --disable-profile`. This disagrees with my measurement above. I theorize a few reasons for this: 1) Removal of files from the working directory is I/O - not CPU - bound and should benefit from a thread pool (unless I/O is insanely fast and the GIL release is near instantaneous). So tests like `hg sparse --enable-profile` may exercise deletion throughput and aren't good benchmarks for worker tasks that are CPU heavy. 2) The patch was authored by someone at Facebook. The results were likely measured against a repository using remotefilelog. And I believe that revision retrieval during working directory updates with remotefilelog will often use a remote store, thus being I/O and not CPU bound. This probably resulted in an overstated performance gain. Since there appears to be a need to enable the thread-based worker with some stores, I've made the flagging of file gets as thread safe configurable. I've made it experimental because I don't want to formalize a boolean flag for this option and because this attribute is best captured against the store implementation. But we don't have a proper store API for this yet. I'd rather cross this bridge later. It is possible there are revlog-based repositories that do benefit from a thread-based worker. I didn't do very comprehensive testing. If there are, we may want to devise a more proper algorithm for whether to use the thread-based worker, including possibly config options to limit the number of threads to use. But until I see evidence that justifies complexity, simplicity wins. Differential Revision: https://phab.mercurial-scm.org/D3963

File last commit:

r28836:3f45488d default
r38755:be498426 default
Show More
hgclient.py
123 lines | 3.3 KiB | text/x-python | PythonLexer
# A minimal client for Mercurial's command server
from __future__ import absolute_import, print_function
import os
import signal
import socket
import struct
import subprocess
import sys
import time
try:
import cStringIO as io
stringio = io.StringIO
except ImportError:
import io
stringio = io.StringIO
def connectpipe(path=None):
cmdline = ['hg', 'serve', '--cmdserver', 'pipe']
if path:
cmdline += ['-R', path]
server = subprocess.Popen(cmdline, stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
return server
class unixconnection(object):
def __init__(self, sockpath):
self.sock = sock = socket.socket(socket.AF_UNIX)
sock.connect(sockpath)
self.stdin = sock.makefile('wb')
self.stdout = sock.makefile('rb')
def wait(self):
self.stdin.close()
self.stdout.close()
self.sock.close()
class unixserver(object):
def __init__(self, sockpath, logpath=None, repopath=None):
self.sockpath = sockpath
cmdline = ['hg', 'serve', '--cmdserver', 'unix', '-a', sockpath]
if repopath:
cmdline += ['-R', repopath]
if logpath:
stdout = open(logpath, 'a')
stderr = subprocess.STDOUT
else:
stdout = stderr = None
self.server = subprocess.Popen(cmdline, stdout=stdout, stderr=stderr)
# wait for listen()
while self.server.poll() is None:
if os.path.exists(sockpath):
break
time.sleep(0.1)
def connect(self):
return unixconnection(self.sockpath)
def shutdown(self):
os.kill(self.server.pid, signal.SIGTERM)
self.server.wait()
def writeblock(server, data):
server.stdin.write(struct.pack('>I', len(data)))
server.stdin.write(data)
server.stdin.flush()
def readchannel(server):
data = server.stdout.read(5)
if not data:
raise EOFError
channel, length = struct.unpack('>cI', data)
if channel in 'IL':
return channel, length
else:
return channel, server.stdout.read(length)
def sep(text):
return text.replace('\\', '/')
def runcommand(server, args, output=sys.stdout, error=sys.stderr, input=None,
outfilter=lambda x: x):
print('*** runcommand', ' '.join(args))
sys.stdout.flush()
server.stdin.write('runcommand\n')
writeblock(server, '\0'.join(args))
if not input:
input = stringio()
while True:
ch, data = readchannel(server)
if ch == 'o':
output.write(outfilter(data))
output.flush()
elif ch == 'e':
error.write(data)
error.flush()
elif ch == 'I':
writeblock(server, input.read(data))
elif ch == 'L':
writeblock(server, input.readline(data))
elif ch == 'r':
ret, = struct.unpack('>i', data)
if ret != 0:
print(' [%d]' % ret)
return ret
else:
print("unexpected channel %c: %r" % (ch, data))
if ch.isupper():
return
def check(func, connect=connectpipe):
sys.stdout.flush()
server = connect()
try:
return func(server)
finally:
server.stdin.close()
server.wait()