Show More
@@ -706,6 +706,9 b' coreconfigitem(' | |||
|
706 | 706 | b'experimental', b'worker.wdir-get-thread-safe', default=False, |
|
707 | 707 | ) |
|
708 | 708 | coreconfigitem( |
|
709 | b'experimental', b'worker.repository-upgrade', default=False, | |
|
710 | ) | |
|
711 | coreconfigitem( | |
|
709 | 712 | b'experimental', b'xdiff', default=False, |
|
710 | 713 | ) |
|
711 | 714 | coreconfigitem( |
@@ -8,6 +8,7 b'' | |||
|
8 | 8 | from __future__ import absolute_import |
|
9 | 9 | |
|
10 | 10 | import collections |
|
11 | import multiprocessing | |
|
11 | 12 | import os |
|
12 | 13 | |
|
13 | 14 | from .i18n import _ |
@@ -1007,6 +1008,102 b' def _getsidedata(srcrepo, rev):' | |||
|
1007 | 1008 | |
|
1008 | 1009 | |
|
1009 | 1010 | def getsidedataadder(srcrepo, destrepo): |
|
1011 | use_w = srcrepo.ui.configbool(b'experimental', b'worker.repository-upgrade') | |
|
1012 | if pycompat.iswindows or not use_w: | |
|
1013 | return _get_simple_sidedata_adder(srcrepo, destrepo) | |
|
1014 | else: | |
|
1015 | return _get_worker_sidedata_adder(srcrepo, destrepo) | |
|
1016 | ||
|
1017 | ||
|
1018 | def _sidedata_worker(srcrepo, revs_queue, sidedata_queue, tokens): | |
|
1019 | """The function used by worker precomputing sidedata | |
|
1020 | ||
|
1021 | It read an input queue containing revision numbers | |
|
1022 | It write in an output queue containing (rev, <sidedata-map>) | |
|
1023 | ||
|
1024 | The `None` input value is used as a stop signal. | |
|
1025 | ||
|
1026 | The `tokens` semaphore is user to avoid having too many unprocessed | |
|
1027 | entries. The workers needs to acquire one token before fetching a task. | |
|
1028 | They will be released by the consumer of the produced data. | |
|
1029 | """ | |
|
1030 | tokens.acquire() | |
|
1031 | rev = revs_queue.get() | |
|
1032 | while rev is not None: | |
|
1033 | data = _getsidedata(srcrepo, rev) | |
|
1034 | sidedata_queue.put((rev, data)) | |
|
1035 | tokens.acquire() | |
|
1036 | rev = revs_queue.get() | |
|
1037 | # processing of `None` is completed, release the token. | |
|
1038 | tokens.release() | |
|
1039 | ||
|
1040 | ||
|
1041 | BUFF_PER_WORKER = 50 | |
|
1042 | ||
|
1043 | ||
|
1044 | def _get_worker_sidedata_adder(srcrepo, destrepo): | |
|
1045 | """The parallel version of the sidedata computation | |
|
1046 | ||
|
1047 | This code spawn a pool of worker that precompute a buffer of sidedata | |
|
1048 | before we actually need them""" | |
|
1049 | # avoid circular import copies -> scmutil -> worker -> copies | |
|
1050 | from . import worker | |
|
1051 | ||
|
1052 | nbworkers = worker._numworkers(srcrepo.ui) | |
|
1053 | ||
|
1054 | tokens = multiprocessing.BoundedSemaphore(nbworkers * BUFF_PER_WORKER) | |
|
1055 | revsq = multiprocessing.Queue() | |
|
1056 | sidedataq = multiprocessing.Queue() | |
|
1057 | ||
|
1058 | assert srcrepo.filtername is None | |
|
1059 | # queue all tasks beforehand, revision numbers are small and it make | |
|
1060 | # synchronisation simpler | |
|
1061 | # | |
|
1062 | # Since the computation for each node can be quite expensive, the overhead | |
|
1063 | # of using a single queue is not revelant. In practice, most computation | |
|
1064 | # are fast but some are very expensive and dominate all the other smaller | |
|
1065 | # cost. | |
|
1066 | for r in srcrepo.changelog.revs(): | |
|
1067 | revsq.put(r) | |
|
1068 | # queue the "no more tasks" markers | |
|
1069 | for i in range(nbworkers): | |
|
1070 | revsq.put(None) | |
|
1071 | ||
|
1072 | allworkers = [] | |
|
1073 | for i in range(nbworkers): | |
|
1074 | args = (srcrepo, revsq, sidedataq, tokens) | |
|
1075 | w = multiprocessing.Process(target=_sidedata_worker, args=args) | |
|
1076 | allworkers.append(w) | |
|
1077 | w.start() | |
|
1078 | ||
|
1079 | # dictionnary to store results for revision higher than we one we are | |
|
1080 | # looking for. For example, if we need the sidedatamap for 42, and 43 is | |
|
1081 | # received, when shelve 43 for later use. | |
|
1082 | staging = {} | |
|
1083 | ||
|
1084 | def sidedata_companion(revlog, rev): | |
|
1085 | sidedata = {} | |
|
1086 | if util.safehasattr(revlog, b'filteredrevs'): # this is a changelog | |
|
1087 | # Is the data previously shelved ? | |
|
1088 | sidedata = staging.pop(rev, None) | |
|
1089 | if sidedata is None: | |
|
1090 | # look at the queued result until we find the one we are lookig | |
|
1091 | # for (shelve the other ones) | |
|
1092 | r, sidedata = sidedataq.get() | |
|
1093 | while r != rev: | |
|
1094 | staging[r] = sidedata | |
|
1095 | r, sidedata = sidedataq.get() | |
|
1096 | tokens.release() | |
|
1097 | return False, (), sidedata | |
|
1098 | ||
|
1099 | return sidedata_companion | |
|
1100 | ||
|
1101 | ||
|
1102 | def _get_simple_sidedata_adder(srcrepo, destrepo): | |
|
1103 | """The simple version of the sidedata computation | |
|
1104 | ||
|
1105 | It just compute it in the same thread on request""" | |
|
1106 | ||
|
1010 | 1107 | def sidedatacompanion(revlog, rev): |
|
1011 | 1108 | sidedata = {} |
|
1012 | 1109 | if util.safehasattr(revlog, 'filteredrevs'): # this is a changelog |
General Comments 0
You need to be logged in to leave comments.
Login now