##// END OF EJS Templates
discovery: explicitly use `undecided` for the children mapping...
marmoute -
r42052:a05f0bbe default
parent child Browse files
Show More
@@ -1,370 +1,371 b''
1 1 # setdiscovery.py - improved discovery of common nodeset for mercurial
2 2 #
3 3 # Copyright 2010 Benoit Boissinot <bboissin@gmail.com>
4 4 # and Peter Arrenbrecht <peter@arrenbrecht.ch>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8 """
9 9 Algorithm works in the following way. You have two repository: local and
10 10 remote. They both contains a DAG of changelists.
11 11
12 12 The goal of the discovery protocol is to find one set of node *common*,
13 13 the set of nodes shared by local and remote.
14 14
15 15 One of the issue with the original protocol was latency, it could
16 16 potentially require lots of roundtrips to discover that the local repo was a
17 17 subset of remote (which is a very common case, you usually have few changes
18 18 compared to upstream, while upstream probably had lots of development).
19 19
20 20 The new protocol only requires one interface for the remote repo: `known()`,
21 21 which given a set of changelists tells you if they are present in the DAG.
22 22
23 23 The algorithm then works as follow:
24 24
25 25 - We will be using three sets, `common`, `missing`, `unknown`. Originally
26 26 all nodes are in `unknown`.
27 27 - Take a sample from `unknown`, call `remote.known(sample)`
28 28 - For each node that remote knows, move it and all its ancestors to `common`
29 29 - For each node that remote doesn't know, move it and all its descendants
30 30 to `missing`
31 31 - Iterate until `unknown` is empty
32 32
33 33 There are a couple optimizations, first is instead of starting with a random
34 34 sample of missing, start by sending all heads, in the case where the local
35 35 repo is a subset, you computed the answer in one round trip.
36 36
37 37 Then you can do something similar to the bisecting strategy used when
38 38 finding faulty changesets. Instead of random samples, you can try picking
39 39 nodes that will maximize the number of nodes that will be
40 40 classified with it (since all ancestors or descendants will be marked as well).
41 41 """
42 42
43 43 from __future__ import absolute_import
44 44
45 45 import collections
46 46 import random
47 47
48 48 from .i18n import _
49 49 from .node import (
50 50 nullid,
51 51 nullrev,
52 52 )
53 53 from . import (
54 54 error,
55 55 util,
56 56 )
57 57
58 58 def _updatesample(revs, heads, sample, parentfn, quicksamplesize=0):
59 59 """update an existing sample to match the expected size
60 60
61 61 The sample is updated with revs exponentially distant from each head of the
62 62 <revs> set. (H~1, H~2, H~4, H~8, etc).
63 63
64 64 If a target size is specified, the sampling will stop once this size is
65 65 reached. Otherwise sampling will happen until roots of the <revs> set are
66 66 reached.
67 67
68 68 :revs: set of revs we want to discover (if None, assume the whole dag)
69 69 :heads: set of DAG head revs
70 70 :sample: a sample to update
71 71 :parentfn: a callable to resolve parents for a revision
72 72 :quicksamplesize: optional target size of the sample"""
73 73 dist = {}
74 74 visit = collections.deque(heads)
75 75 seen = set()
76 76 factor = 1
77 77 while visit:
78 78 curr = visit.popleft()
79 79 if curr in seen:
80 80 continue
81 81 d = dist.setdefault(curr, 1)
82 82 if d > factor:
83 83 factor *= 2
84 84 if d == factor:
85 85 sample.add(curr)
86 86 if quicksamplesize and (len(sample) >= quicksamplesize):
87 87 return
88 88 seen.add(curr)
89 89
90 90 for p in parentfn(curr):
91 91 if p != nullrev and (not revs or p in revs):
92 92 dist.setdefault(p, d + 1)
93 93 visit.append(p)
94 94
95 95 def _limitsample(sample, desiredlen):
96 96 """return a random subset of sample of at most desiredlen item"""
97 97 if len(sample) > desiredlen:
98 98 sample = set(random.sample(sample, desiredlen))
99 99 return sample
100 100
101 101 class partialdiscovery(object):
102 102 """an object representing ongoing discovery
103 103
104 104 Feed with data from the remote repository, this object keep track of the
105 105 current set of changeset in various states:
106 106
107 107 - common: revs also known remotely
108 108 - undecided: revs we don't have information on yet
109 109 - missing: revs missing remotely
110 110 (all tracked revisions are known locally)
111 111 """
112 112
113 113 def __init__(self, repo, targetheads):
114 114 self._repo = repo
115 115 self._targetheads = targetheads
116 116 self._common = repo.changelog.incrementalmissingrevs()
117 117 self._undecided = None
118 118 self.missing = set()
119 119 self._childrenmap = None
120 120
121 121 def addcommons(self, commons):
122 122 """registrer nodes known as common"""
123 123 self._common.addbases(commons)
124 124 if self._undecided is not None:
125 125 self._common.removeancestorsfrom(self._undecided)
126 126
127 127 def addmissings(self, missings):
128 128 """registrer some nodes as missing"""
129 129 newmissing = self._repo.revs('%ld::%ld', missings, self.undecided)
130 130 if newmissing:
131 131 self.missing.update(newmissing)
132 132 self.undecided.difference_update(newmissing)
133 133
134 134 def addinfo(self, sample):
135 135 """consume an iterable of (rev, known) tuples"""
136 136 common = set()
137 137 missing = set()
138 138 for rev, known in sample:
139 139 if known:
140 140 common.add(rev)
141 141 else:
142 142 missing.add(rev)
143 143 if common:
144 144 self.addcommons(common)
145 145 if missing:
146 146 self.addmissings(missing)
147 147
148 148 def hasinfo(self):
149 149 """return True is we have any clue about the remote state"""
150 150 return self._common.hasbases()
151 151
152 152 def iscomplete(self):
153 153 """True if all the necessary data have been gathered"""
154 154 return self._undecided is not None and not self._undecided
155 155
156 156 @property
157 157 def undecided(self):
158 158 if self._undecided is not None:
159 159 return self._undecided
160 160 self._undecided = set(self._common.missingancestors(self._targetheads))
161 161 return self._undecided
162 162
163 163 def commonheads(self):
164 164 """the heads of the known common set"""
165 165 # heads(common) == heads(common.bases) since common represents
166 166 # common.bases and all its ancestors
167 167 return self._common.basesheads()
168 168
169 169 def _parentsgetter(self):
170 170 getrev = self._repo.changelog.index.__getitem__
171 171 def getparents(r):
172 172 return getrev(r)[5:6]
173 173 return getparents
174 174
175 def _childrengetter(self, revs):
175 def _childrengetter(self):
176 176
177 177 if self._childrenmap is not None:
178 178 return self._childrenmap.__getitem__
179 179
180 180 # _updatesample() essentially does interaction over revisions to look
181 181 # up their children. This lookup is expensive and doing it in a loop is
182 182 # quadratic. We precompute the children for all relevant revisions and
183 183 # make the lookup in _updatesample() a simple dict lookup.
184 184 self._childrenmap = children = {}
185 185
186 186 parentrevs = self._parentsgetter()
187 revs = self.undecided
187 188
188 189 for rev in sorted(revs):
189 190 # Always ensure revision has an entry so we don't need to worry
190 191 # about missing keys.
191 192 children[rev] = []
192 193 for prev in parentrevs(rev):
193 194 if prev == nullrev:
194 195 continue
195 196 c = children.get(prev)
196 197 if c is not None:
197 198 c.append(rev)
198 199 return children.__getitem__
199 200
200 201 def takequicksample(self, headrevs, size):
201 202 """takes a quick sample of size <size>
202 203
203 204 It is meant for initial sampling and focuses on querying heads and close
204 205 ancestors of heads.
205 206
206 207 :headrevs: set of head revisions in local DAG to consider
207 208 :size: the maximum size of the sample"""
208 209 revs = self.undecided
209 210 if len(revs) <= size:
210 211 return list(revs)
211 212 sample = set(self._repo.revs('heads(%ld)', revs))
212 213
213 214 if len(sample) >= size:
214 215 return _limitsample(sample, size)
215 216
216 217 _updatesample(None, headrevs, sample, self._parentsgetter(),
217 218 quicksamplesize=size)
218 219 return sample
219 220
220 221 def takefullsample(self, headrevs, size):
221 222 revs = self.undecided
222 223 if len(revs) <= size:
223 224 return list(revs)
224 225 repo = self._repo
225 226 sample = set(repo.revs('heads(%ld)', revs))
226 227 parentrevs = self._parentsgetter()
227 228
228 229 # update from heads
229 230 revsheads = sample.copy()
230 231 _updatesample(revs, revsheads, sample, parentrevs)
231 232
232 233 # update from roots
233 234 revsroots = set(repo.revs('roots(%ld)', revs))
234 235
235 childrenrevs = self._childrengetter(revs)
236 childrenrevs = self._childrengetter()
236 237
237 238 _updatesample(revs, revsroots, sample, childrenrevs)
238 239 assert sample
239 240 sample = _limitsample(sample, size)
240 241 if len(sample) < size:
241 242 more = size - len(sample)
242 243 sample.update(random.sample(list(revs - sample), more))
243 244 return sample
244 245
245 246 def findcommonheads(ui, local, remote,
246 247 initialsamplesize=100,
247 248 fullsamplesize=200,
248 249 abortwhenunrelated=True,
249 250 ancestorsof=None):
250 251 '''Return a tuple (common, anyincoming, remoteheads) used to identify
251 252 missing nodes from or in remote.
252 253 '''
253 254 start = util.timer()
254 255
255 256 roundtrips = 0
256 257 cl = local.changelog
257 258 clnode = cl.node
258 259 clrev = cl.rev
259 260
260 261 if ancestorsof is not None:
261 262 ownheads = [clrev(n) for n in ancestorsof]
262 263 else:
263 264 ownheads = [rev for rev in cl.headrevs() if rev != nullrev]
264 265
265 266 # early exit if we know all the specified remote heads already
266 267 ui.debug("query 1; heads\n")
267 268 roundtrips += 1
268 269 sample = _limitsample(ownheads, initialsamplesize)
269 270 # indices between sample and externalized version must match
270 271 sample = list(sample)
271 272
272 273 with remote.commandexecutor() as e:
273 274 fheads = e.callcommand('heads', {})
274 275 fknown = e.callcommand('known', {
275 276 'nodes': [clnode(r) for r in sample],
276 277 })
277 278
278 279 srvheadhashes, yesno = fheads.result(), fknown.result()
279 280
280 281 if cl.tip() == nullid:
281 282 if srvheadhashes != [nullid]:
282 283 return [nullid], True, srvheadhashes
283 284 return [nullid], False, []
284 285
285 286 # start actual discovery (we note this before the next "if" for
286 287 # compatibility reasons)
287 288 ui.status(_("searching for changes\n"))
288 289
289 290 knownsrvheads = [] # revnos of remote heads that are known locally
290 291 for node in srvheadhashes:
291 292 if node == nullid:
292 293 continue
293 294
294 295 try:
295 296 knownsrvheads.append(clrev(node))
296 297 # Catches unknown and filtered nodes.
297 298 except error.LookupError:
298 299 continue
299 300
300 301 if len(knownsrvheads) == len(srvheadhashes):
301 302 ui.debug("all remote heads known locally\n")
302 303 return srvheadhashes, False, srvheadhashes
303 304
304 305 if len(sample) == len(ownheads) and all(yesno):
305 306 ui.note(_("all local heads known remotely\n"))
306 307 ownheadhashes = [clnode(r) for r in ownheads]
307 308 return ownheadhashes, True, srvheadhashes
308 309
309 310 # full blown discovery
310 311
311 312 disco = partialdiscovery(local, ownheads)
312 313 # treat remote heads (and maybe own heads) as a first implicit sample
313 314 # response
314 315 disco.addcommons(knownsrvheads)
315 316 disco.addinfo(zip(sample, yesno))
316 317
317 318 full = False
318 319 progress = ui.makeprogress(_('searching'), unit=_('queries'))
319 320 while not disco.iscomplete():
320 321
321 322 if full or disco.hasinfo():
322 323 if full:
323 324 ui.note(_("sampling from both directions\n"))
324 325 else:
325 326 ui.debug("taking initial sample\n")
326 327 samplefunc = disco.takefullsample
327 328 targetsize = fullsamplesize
328 329 else:
329 330 # use even cheaper initial sample
330 331 ui.debug("taking quick initial sample\n")
331 332 samplefunc = disco.takequicksample
332 333 targetsize = initialsamplesize
333 334 sample = samplefunc(ownheads, targetsize)
334 335
335 336 roundtrips += 1
336 337 progress.update(roundtrips)
337 338 ui.debug("query %i; still undecided: %i, sample size is: %i\n"
338 339 % (roundtrips, len(disco.undecided), len(sample)))
339 340 # indices between sample and externalized version must match
340 341 sample = list(sample)
341 342
342 343 with remote.commandexecutor() as e:
343 344 yesno = e.callcommand('known', {
344 345 'nodes': [clnode(r) for r in sample],
345 346 }).result()
346 347
347 348 full = True
348 349
349 350 disco.addinfo(zip(sample, yesno))
350 351
351 352 result = disco.commonheads()
352 353 elapsed = util.timer() - start
353 354 progress.complete()
354 355 ui.debug("%d total queries in %.4fs\n" % (roundtrips, elapsed))
355 356 msg = ('found %d common and %d unknown server heads,'
356 357 ' %d roundtrips in %.4fs\n')
357 358 missing = set(result) - set(knownsrvheads)
358 359 ui.log('discovery', msg, len(result), len(missing), roundtrips,
359 360 elapsed)
360 361
361 362 if not result and srvheadhashes != [nullid]:
362 363 if abortwhenunrelated:
363 364 raise error.Abort(_("repository is unrelated"))
364 365 else:
365 366 ui.warn(_("warning: repository is unrelated\n"))
366 367 return ({nullid}, True, srvheadhashes,)
367 368
368 369 anyincoming = (srvheadhashes != [nullid])
369 370 result = {clnode(r) for r in result}
370 371 return result, anyincoming, srvheadhashes
General Comments 0
You need to be logged in to leave comments. Login now