##// END OF EJS Templates
repair: migrate revlogs during upgrade...
repair: migrate revlogs during upgrade Our next step for in-place upgrade is to migrate store data. Revlogs are the biggest source of data within the store and a store is useless without them, so we implement their migration first. Our strategy for migrating revlogs is to walk the store and call `revlog.clone()` on each revlog. There are some minor complications. Because revlogs have different storage options (e.g. changelog has generaldelta and delta chains disabled), we need to obtain the correct class of revlog so inserted data is encoded properly for its type. Various attempts at implementing progress indicators that didn't lead to frustration from false "it's almost done" indicators were made. I initially used a single progress bar based on number of revlogs. However, this quickly churned through all filelogs, got to 99% then effectively froze at 99.99% when it got to the manifest. So I converted the progress bar to total revision count. This was a little bit better. But the manifest was still significantly slower than filelogs and it took forever to process the last few percent. I then tried both revision/chunk bytes and raw bytes as the denominator. This had the opposite effect: because so much data is in manifests, it would churn through filelogs without showing much progress. When it got to manifests, it would fill in 90+% of the progress bar. I finally gave up having a unified progress bar and instead implemented 3 progress bars: 1 for filelog revisions, 1 for manifest revisions, and 1 for changelog revisions. I added extra messages indicating the total number of revisions of each so users know there are more progress bars coming. I also added extra messages before and after each stage to give extra details about what is happening. Strictly speaking, this isn't necessary. But the numbers are impressive. For example, when converting a non-generaldelta mozilla-central repository, the messages you see are: migrating 2475593 total revisions (1833043 in filelogs, 321156 in manifests, 321394 in changelog) migrating 1.67 GB in store; 2508 GB tracked data migrating 267868 filelogs containing 1833043 revisions (1.09 GB in store; 57.3 GB tracked data) finished migrating 1833043 filelog revisions across 267868 filelogs; change in size: -415776 bytes migrating 1 manifests containing 321156 revisions (518 MB in store; 2451 GB tracked data) That "2508 GB" figure really blew me away. I had no clue that the raw tracked data in mozilla-central was that large. Granted, 2451 GB is in the manifest and "only" 57.3 GB is in filelogs. But still. It's worth noting that gratuitous loading of source revlogs in order to display numbers and progress bars does serve a purpose: it ensures we can open all source revlogs. We don't want to spend several minutes copying revlogs only to encounter a permissions error or similar later. As part of this commit, we also add swapping of the store directory to the upgrade function. After revlogs are converted, we move the old store into the backup directory then move the temporary repo's store into the old store's location. On well-behaved systems, this should be 2 atomic operations and the window of inconsistency show be very narrow. There are still a few improvements to be made to store copying and upgrading. But this commit gets the bulk of the work out of the way.

File last commit:

r30639:d524c885 default
r30779:38aa1ca9 default
Show More
worker.py
224 lines | 7.2 KiB | text/x-python | PythonLexer
# worker.py - master-slave parallelism support
#
# Copyright 2013 Facebook, Inc.
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
from __future__ import absolute_import
import errno
import os
import signal
import sys
from .i18n import _
from . import (
encoding,
error,
pycompat,
scmutil,
util,
)
def countcpus():
'''try to count the number of CPUs on the system'''
# posix
try:
n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
if n > 0:
return n
except (AttributeError, ValueError):
pass
# windows
try:
n = int(encoding.environ['NUMBER_OF_PROCESSORS'])
if n > 0:
return n
except (KeyError, ValueError):
pass
return 1
def _numworkers(ui):
s = ui.config('worker', 'numcpus')
if s:
try:
n = int(s)
if n >= 1:
return n
except ValueError:
raise error.Abort(_('number of cpus must be an integer'))
return min(max(countcpus(), 4), 32)
if pycompat.osname == 'posix':
_startupcost = 0.01
else:
_startupcost = 1e30
def worthwhile(ui, costperop, nops):
'''try to determine whether the benefit of multiple processes can
outweigh the cost of starting them'''
linear = costperop * nops
workers = _numworkers(ui)
benefit = linear - (_startupcost * workers + linear / workers)
return benefit >= 0.15
def worker(ui, costperarg, func, staticargs, args):
'''run a function, possibly in parallel in multiple worker
processes.
returns a progress iterator
costperarg - cost of a single task
func - function to run
staticargs - arguments to pass to every invocation of the function
args - arguments to split into chunks, to pass to individual
workers
'''
if worthwhile(ui, costperarg, len(args)):
return _platformworker(ui, func, staticargs, args)
return func(*staticargs + (args,))
def _posixworker(ui, func, staticargs, args):
rfd, wfd = os.pipe()
workers = _numworkers(ui)
oldhandler = signal.getsignal(signal.SIGINT)
signal.signal(signal.SIGINT, signal.SIG_IGN)
pids, problem = set(), [0]
def killworkers():
# unregister SIGCHLD handler as all children will be killed. This
# function shouldn't be interrupted by another SIGCHLD; otherwise pids
# could be updated while iterating, which would cause inconsistency.
signal.signal(signal.SIGCHLD, oldchldhandler)
# if one worker bails, there's no good reason to wait for the rest
for p in pids:
try:
os.kill(p, signal.SIGTERM)
except OSError as err:
if err.errno != errno.ESRCH:
raise
def waitforworkers(blocking=True):
for pid in pids.copy():
p = st = 0
while True:
try:
p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
break
except OSError as e:
if e.errno == errno.EINTR:
continue
elif e.errno == errno.ECHILD:
# child would already be reaped, but pids yet been
# updated (maybe interrupted just after waitpid)
pids.discard(pid)
break
else:
raise
if p:
pids.discard(p)
st = _exitstatus(st)
if st and not problem[0]:
problem[0] = st
def sigchldhandler(signum, frame):
waitforworkers(blocking=False)
if problem[0]:
killworkers()
oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
for pargs in partition(args, workers):
pid = os.fork()
if pid == 0:
signal.signal(signal.SIGINT, oldhandler)
signal.signal(signal.SIGCHLD, oldchldhandler)
def workerfunc():
os.close(rfd)
for i, item in func(*(staticargs + (pargs,))):
os.write(wfd, '%d %s\n' % (i, item))
# make sure we use os._exit in all code paths. otherwise the worker
# may do some clean-ups which could cause surprises like deadlock.
# see sshpeer.cleanup for example.
try:
scmutil.callcatch(ui, workerfunc)
except KeyboardInterrupt:
os._exit(255)
except: # never return, therefore no re-raises
try:
ui.traceback()
finally:
os._exit(255)
else:
os._exit(0)
pids.add(pid)
os.close(wfd)
fp = os.fdopen(rfd, 'rb', 0)
def cleanup():
signal.signal(signal.SIGINT, oldhandler)
waitforworkers()
signal.signal(signal.SIGCHLD, oldchldhandler)
status = problem[0]
if status:
if status < 0:
os.kill(os.getpid(), -status)
sys.exit(status)
try:
for line in util.iterfile(fp):
l = line.split(' ', 1)
yield int(l[0]), l[1][:-1]
except: # re-raises
killworkers()
cleanup()
raise
cleanup()
def _posixexitstatus(code):
'''convert a posix exit status into the same form returned by
os.spawnv
returns None if the process was stopped instead of exiting'''
if os.WIFEXITED(code):
return os.WEXITSTATUS(code)
elif os.WIFSIGNALED(code):
return -os.WTERMSIG(code)
if pycompat.osname != 'nt':
_platformworker = _posixworker
_exitstatus = _posixexitstatus
def partition(lst, nslices):
'''partition a list into N slices of roughly equal size
The current strategy takes every Nth element from the input. If
we ever write workers that need to preserve grouping in input
we should consider allowing callers to specify a partition strategy.
mpm is not a fan of this partitioning strategy when files are involved.
In his words:
Single-threaded Mercurial makes a point of creating and visiting
files in a fixed order (alphabetical). When creating files in order,
a typical filesystem is likely to allocate them on nearby regions on
disk. Thus, when revisiting in the same order, locality is maximized
and various forms of OS and disk-level caching and read-ahead get a
chance to work.
This effect can be quite significant on spinning disks. I discovered it
circa Mercurial v0.4 when revlogs were named by hashes of filenames.
Tarring a repo and copying it to another disk effectively randomized
the revlog ordering on disk by sorting the revlogs by hash and suddenly
performance of my kernel checkout benchmark dropped by ~10x because the
"working set" of sectors visited no longer fit in the drive's cache and
the workload switched from streaming to random I/O.
What we should really be doing is have workers read filenames from a
ordered queue. This preserves locality and also keeps any worker from
getting more than one file out of balance.
'''
for i in range(nslices):
yield lst[i::nslices]