##// END OF EJS Templates
setdiscovery: use revset for resolving DAG heads in a subset...
Gregory Szorc -
r39212:fec01c69 default
parent child Browse files
Show More
@@ -1,298 +1,293
1 1 # setdiscovery.py - improved discovery of common nodeset for mercurial
2 2 #
3 3 # Copyright 2010 Benoit Boissinot <bboissin@gmail.com>
4 4 # and Peter Arrenbrecht <peter@arrenbrecht.ch>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8 """
9 9 Algorithm works in the following way. You have two repository: local and
10 10 remote. They both contains a DAG of changelists.
11 11
12 12 The goal of the discovery protocol is to find one set of node *common*,
13 13 the set of nodes shared by local and remote.
14 14
15 15 One of the issue with the original protocol was latency, it could
16 16 potentially require lots of roundtrips to discover that the local repo was a
17 17 subset of remote (which is a very common case, you usually have few changes
18 18 compared to upstream, while upstream probably had lots of development).
19 19
20 20 The new protocol only requires one interface for the remote repo: `known()`,
21 21 which given a set of changelists tells you if they are present in the DAG.
22 22
23 23 The algorithm then works as follow:
24 24
25 25 - We will be using three sets, `common`, `missing`, `unknown`. Originally
26 26 all nodes are in `unknown`.
27 27 - Take a sample from `unknown`, call `remote.known(sample)`
28 28 - For each node that remote knows, move it and all its ancestors to `common`
29 29 - For each node that remote doesn't know, move it and all its descendants
30 30 to `missing`
31 31 - Iterate until `unknown` is empty
32 32
33 33 There are a couple optimizations, first is instead of starting with a random
34 34 sample of missing, start by sending all heads, in the case where the local
35 35 repo is a subset, you computed the answer in one round trip.
36 36
37 37 Then you can do something similar to the bisecting strategy used when
38 38 finding faulty changesets. Instead of random samples, you can try picking
39 39 nodes that will maximize the number of nodes that will be
40 40 classified with it (since all ancestors or descendants will be marked as well).
41 41 """
42 42
43 43 from __future__ import absolute_import
44 44
45 45 import collections
46 46 import random
47 47
48 48 from .i18n import _
49 49 from .node import (
50 50 nullid,
51 51 nullrev,
52 52 )
53 53 from . import (
54 dagutil,
55 54 error,
56 55 util,
57 56 )
58 57
59 58 def _updatesample(revs, heads, sample, parentfn, quicksamplesize=0):
60 59 """update an existing sample to match the expected size
61 60
62 61 The sample is updated with revs exponentially distant from each head of the
63 62 <revs> set. (H~1, H~2, H~4, H~8, etc).
64 63
65 64 If a target size is specified, the sampling will stop once this size is
66 65 reached. Otherwise sampling will happen until roots of the <revs> set are
67 66 reached.
68 67
69 68 :revs: set of revs we want to discover (if None, assume the whole dag)
70 69 :heads: set of DAG head revs
71 70 :sample: a sample to update
72 71 :parentfn: a callable to resolve parents for a revision
73 72 :quicksamplesize: optional target size of the sample"""
74 73 dist = {}
75 74 visit = collections.deque(heads)
76 75 seen = set()
77 76 factor = 1
78 77 while visit:
79 78 curr = visit.popleft()
80 79 if curr in seen:
81 80 continue
82 81 d = dist.setdefault(curr, 1)
83 82 if d > factor:
84 83 factor *= 2
85 84 if d == factor:
86 85 sample.add(curr)
87 86 if quicksamplesize and (len(sample) >= quicksamplesize):
88 87 return
89 88 seen.add(curr)
90 89
91 90 for p in parentfn(curr):
92 91 if p != nullrev and (not revs or p in revs):
93 92 dist.setdefault(p, d + 1)
94 93 visit.append(p)
95 94
96 95 def _takequicksample(repo, headrevs, revs, size):
97 96 """takes a quick sample of size <size>
98 97
99 98 It is meant for initial sampling and focuses on querying heads and close
100 99 ancestors of heads.
101 100
102 101 :dag: a dag object
103 102 :headrevs: set of head revisions in local DAG to consider
104 103 :revs: set of revs to discover
105 104 :size: the maximum size of the sample"""
106 105 sample = set(repo.revs('heads(%ld)', revs))
107 106
108 107 if len(sample) >= size:
109 108 return _limitsample(sample, size)
110 109
111 110 _updatesample(None, headrevs, sample, repo.changelog.parentrevs,
112 111 quicksamplesize=size)
113 112 return sample
114 113
115 114 def _takefullsample(repo, headrevs, revs, size):
116 115 sample = set(repo.revs('heads(%ld)', revs))
117 116
118 117 # update from heads
119 118 revsheads = set(repo.revs('heads(%ld)', revs))
120 119 _updatesample(revs, revsheads, sample, repo.changelog.parentrevs)
121 120 # update from roots
122 121 revsroots = set(repo.revs('roots(%ld)', revs))
123 122
124 123 # TODO this is quadratic
125 124 parentfn = lambda rev: repo.changelog.children(repo.changelog.node(rev))
126 125
127 126 _updatesample(revs, revsroots, sample, parentfn)
128 127 assert sample
129 128 sample = _limitsample(sample, size)
130 129 if len(sample) < size:
131 130 more = size - len(sample)
132 131 sample.update(random.sample(list(revs - sample), more))
133 132 return sample
134 133
135 134 def _limitsample(sample, desiredlen):
136 135 """return a random subset of sample of at most desiredlen item"""
137 136 if len(sample) > desiredlen:
138 137 sample = set(random.sample(sample, desiredlen))
139 138 return sample
140 139
141 140 def findcommonheads(ui, local, remote,
142 141 initialsamplesize=100,
143 142 fullsamplesize=200,
144 143 abortwhenunrelated=True,
145 144 ancestorsof=None):
146 145 '''Return a tuple (common, anyincoming, remoteheads) used to identify
147 146 missing nodes from or in remote.
148 147 '''
149 148 start = util.timer()
150 149
151 150 roundtrips = 0
152 151 cl = local.changelog
153 152 clnode = cl.node
154 153 clrev = cl.rev
155 154
156 155 if ancestorsof is not None:
157 156 ownheads = [clrev(n) for n in ancestorsof]
158 157 else:
159 158 ownheads = [rev for rev in cl.headrevs() if rev != nullrev]
160 159
161 dag = dagutil.revlogdag(cl)
162
163 160 # early exit if we know all the specified remote heads already
164 161 ui.debug("query 1; heads\n")
165 162 roundtrips += 1
166 163 sample = _limitsample(ownheads, initialsamplesize)
167 164 # indices between sample and externalized version must match
168 165 sample = list(sample)
169 166
170 167 with remote.commandexecutor() as e:
171 168 fheads = e.callcommand('heads', {})
172 169 fknown = e.callcommand('known', {
173 170 'nodes': [clnode(r) for r in sample],
174 171 })
175 172
176 173 srvheadhashes, yesno = fheads.result(), fknown.result()
177 174
178 175 if cl.tip() == nullid:
179 176 if srvheadhashes != [nullid]:
180 177 return [nullid], True, srvheadhashes
181 178 return [nullid], False, []
182 179
183 180 # start actual discovery (we note this before the next "if" for
184 181 # compatibility reasons)
185 182 ui.status(_("searching for changes\n"))
186 183
187 184 srvheads = []
188 185 for node in srvheadhashes:
189 186 if node == nullid:
190 187 continue
191 188
192 189 try:
193 190 srvheads.append(clrev(node))
194 191 # Catches unknown and filtered nodes.
195 192 except error.LookupError:
196 193 continue
197 194
198 195 if len(srvheads) == len(srvheadhashes):
199 196 ui.debug("all remote heads known locally\n")
200 197 return srvheadhashes, False, srvheadhashes
201 198
202 199 if len(sample) == len(ownheads) and all(yesno):
203 200 ui.note(_("all local heads known remotely\n"))
204 201 ownheadhashes = [clnode(r) for r in ownheads]
205 202 return ownheadhashes, True, srvheadhashes
206 203
207 204 # full blown discovery
208 205
209 206 # own nodes I know we both know
210 207 # treat remote heads (and maybe own heads) as a first implicit sample
211 208 # response
212 209 common = cl.incrementalmissingrevs(srvheads)
213 210 commoninsample = set(n for i, n in enumerate(sample) if yesno[i])
214 211 common.addbases(commoninsample)
215 212 # own nodes where I don't know if remote knows them
216 213 undecided = set(common.missingancestors(ownheads))
217 214 # own nodes I know remote lacks
218 215 missing = set()
219 216
220 217 full = False
221 218 progress = ui.makeprogress(_('searching'), unit=_('queries'))
222 219 while undecided:
223 220
224 221 if sample:
225 222 missinginsample = [n for i, n in enumerate(sample) if not yesno[i]]
226 223
227 224 if missing:
228 225 missing.update(local.revs('descendants(%ld) - descendants(%ld)',
229 226 missinginsample, missing))
230 227 else:
231 228 missing.update(local.revs('descendants(%ld)', missinginsample))
232 229
233 230 undecided.difference_update(missing)
234 231
235 232 if not undecided:
236 233 break
237 234
238 235 if full or common.hasbases():
239 236 if full:
240 237 ui.note(_("sampling from both directions\n"))
241 238 else:
242 239 ui.debug("taking initial sample\n")
243 240 samplefunc = _takefullsample
244 241 targetsize = fullsamplesize
245 242 else:
246 243 # use even cheaper initial sample
247 244 ui.debug("taking quick initial sample\n")
248 245 samplefunc = _takequicksample
249 246 targetsize = initialsamplesize
250 247 if len(undecided) < targetsize:
251 248 sample = list(undecided)
252 249 else:
253 250 sample = samplefunc(local, ownheads, undecided, targetsize)
254 251
255 252 roundtrips += 1
256 253 progress.update(roundtrips)
257 254 ui.debug("query %i; still undecided: %i, sample size is: %i\n"
258 255 % (roundtrips, len(undecided), len(sample)))
259 256 # indices between sample and externalized version must match
260 257 sample = list(sample)
261 258
262 259 with remote.commandexecutor() as e:
263 260 yesno = e.callcommand('known', {
264 261 'nodes': [clnode(r) for r in sample],
265 262 }).result()
266 263
267 264 full = True
268 265
269 266 if sample:
270 267 commoninsample = set(n for i, n in enumerate(sample) if yesno[i])
271 268 common.addbases(commoninsample)
272 269 common.removeancestorsfrom(undecided)
273 270
274 271 # heads(common) == heads(common.bases) since common represents common.bases
275 272 # and all its ancestors
276 result = dag.headsetofconnecteds(common.bases)
277 # common.bases can include nullrev, but our contract requires us to not
278 # return any heads in that case, so discard that
279 result.discard(nullrev)
273 # The presence of nullrev will confuse heads(). So filter it out.
274 result = set(local.revs('heads(%ld)', common.bases - {nullrev}))
280 275 elapsed = util.timer() - start
281 276 progress.complete()
282 277 ui.debug("%d total queries in %.4fs\n" % (roundtrips, elapsed))
283 278 msg = ('found %d common and %d unknown server heads,'
284 279 ' %d roundtrips in %.4fs\n')
285 280 missing = set(result) - set(srvheads)
286 281 ui.log('discovery', msg, len(result), len(missing), roundtrips,
287 282 elapsed)
288 283
289 284 if not result and srvheadhashes != [nullid]:
290 285 if abortwhenunrelated:
291 286 raise error.Abort(_("repository is unrelated"))
292 287 else:
293 288 ui.warn(_("warning: repository is unrelated\n"))
294 289 return ({nullid}, True, srvheadhashes,)
295 290
296 291 anyincoming = (srvheadhashes != [nullid])
297 292 result = {clnode(r) for r in result}
298 293 return result, anyincoming, srvheadhashes
General Comments 0
You need to be logged in to leave comments. Login now