##// END OF EJS Templates
upgraderepo: add a config option for parallel computation...
marmoute -
r44258:acbb55b8 default
parent child Browse files
Show More
@@ -706,6 +706,9 b' coreconfigitem('
706 b'experimental', b'worker.wdir-get-thread-safe', default=False,
706 b'experimental', b'worker.wdir-get-thread-safe', default=False,
707 )
707 )
708 coreconfigitem(
708 coreconfigitem(
709 b'experimental', b'worker.repository-upgrade', default=False,
710 )
711 coreconfigitem(
709 b'experimental', b'xdiff', default=False,
712 b'experimental', b'xdiff', default=False,
710 )
713 )
711 coreconfigitem(
714 coreconfigitem(
@@ -8,6 +8,7 b''
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import collections
10 import collections
11 import multiprocessing
11 import os
12 import os
12
13
13 from .i18n import _
14 from .i18n import _
@@ -1007,6 +1008,102 b' def _getsidedata(srcrepo, rev):'
1007
1008
1008
1009
1009 def getsidedataadder(srcrepo, destrepo):
1010 def getsidedataadder(srcrepo, destrepo):
1011 use_w = srcrepo.ui.configbool(b'experimental', b'worker.repository-upgrade')
1012 if pycompat.iswindows or not use_w:
1013 return _get_simple_sidedata_adder(srcrepo, destrepo)
1014 else:
1015 return _get_worker_sidedata_adder(srcrepo, destrepo)
1016
1017
1018 def _sidedata_worker(srcrepo, revs_queue, sidedata_queue, tokens):
1019 """The function used by worker precomputing sidedata
1020
1021 It read an input queue containing revision numbers
1022 It write in an output queue containing (rev, <sidedata-map>)
1023
1024 The `None` input value is used as a stop signal.
1025
1026 The `tokens` semaphore is user to avoid having too many unprocessed
1027 entries. The workers needs to acquire one token before fetching a task.
1028 They will be released by the consumer of the produced data.
1029 """
1030 tokens.acquire()
1031 rev = revs_queue.get()
1032 while rev is not None:
1033 data = _getsidedata(srcrepo, rev)
1034 sidedata_queue.put((rev, data))
1035 tokens.acquire()
1036 rev = revs_queue.get()
1037 # processing of `None` is completed, release the token.
1038 tokens.release()
1039
1040
1041 BUFF_PER_WORKER = 50
1042
1043
1044 def _get_worker_sidedata_adder(srcrepo, destrepo):
1045 """The parallel version of the sidedata computation
1046
1047 This code spawn a pool of worker that precompute a buffer of sidedata
1048 before we actually need them"""
1049 # avoid circular import copies -> scmutil -> worker -> copies
1050 from . import worker
1051
1052 nbworkers = worker._numworkers(srcrepo.ui)
1053
1054 tokens = multiprocessing.BoundedSemaphore(nbworkers * BUFF_PER_WORKER)
1055 revsq = multiprocessing.Queue()
1056 sidedataq = multiprocessing.Queue()
1057
1058 assert srcrepo.filtername is None
1059 # queue all tasks beforehand, revision numbers are small and it make
1060 # synchronisation simpler
1061 #
1062 # Since the computation for each node can be quite expensive, the overhead
1063 # of using a single queue is not revelant. In practice, most computation
1064 # are fast but some are very expensive and dominate all the other smaller
1065 # cost.
1066 for r in srcrepo.changelog.revs():
1067 revsq.put(r)
1068 # queue the "no more tasks" markers
1069 for i in range(nbworkers):
1070 revsq.put(None)
1071
1072 allworkers = []
1073 for i in range(nbworkers):
1074 args = (srcrepo, revsq, sidedataq, tokens)
1075 w = multiprocessing.Process(target=_sidedata_worker, args=args)
1076 allworkers.append(w)
1077 w.start()
1078
1079 # dictionnary to store results for revision higher than we one we are
1080 # looking for. For example, if we need the sidedatamap for 42, and 43 is
1081 # received, when shelve 43 for later use.
1082 staging = {}
1083
1084 def sidedata_companion(revlog, rev):
1085 sidedata = {}
1086 if util.safehasattr(revlog, b'filteredrevs'): # this is a changelog
1087 # Is the data previously shelved ?
1088 sidedata = staging.pop(rev, None)
1089 if sidedata is None:
1090 # look at the queued result until we find the one we are lookig
1091 # for (shelve the other ones)
1092 r, sidedata = sidedataq.get()
1093 while r != rev:
1094 staging[r] = sidedata
1095 r, sidedata = sidedataq.get()
1096 tokens.release()
1097 return False, (), sidedata
1098
1099 return sidedata_companion
1100
1101
1102 def _get_simple_sidedata_adder(srcrepo, destrepo):
1103 """The simple version of the sidedata computation
1104
1105 It just compute it in the same thread on request"""
1106
1010 def sidedatacompanion(revlog, rev):
1107 def sidedatacompanion(revlog, rev):
1011 sidedata = {}
1108 sidedata = {}
1012 if util.safehasattr(revlog, 'filteredrevs'): # this is a changelog
1109 if util.safehasattr(revlog, 'filteredrevs'): # this is a changelog
General Comments 0
You need to be logged in to leave comments. Login now