##// END OF EJS Templates
discovery: moved sampling functions inside discovery object...
Georges Racinet -
r42045:e5ece0f4 default
parent child Browse files
Show More
@@ -1,356 +1,357 b''
1 1 # setdiscovery.py - improved discovery of common nodeset for mercurial
2 2 #
3 3 # Copyright 2010 Benoit Boissinot <bboissin@gmail.com>
4 4 # and Peter Arrenbrecht <peter@arrenbrecht.ch>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8 """
9 9 Algorithm works in the following way. You have two repository: local and
10 10 remote. They both contains a DAG of changelists.
11 11
12 12 The goal of the discovery protocol is to find one set of node *common*,
13 13 the set of nodes shared by local and remote.
14 14
15 15 One of the issue with the original protocol was latency, it could
16 16 potentially require lots of roundtrips to discover that the local repo was a
17 17 subset of remote (which is a very common case, you usually have few changes
18 18 compared to upstream, while upstream probably had lots of development).
19 19
20 20 The new protocol only requires one interface for the remote repo: `known()`,
21 21 which given a set of changelists tells you if they are present in the DAG.
22 22
23 23 The algorithm then works as follow:
24 24
25 25 - We will be using three sets, `common`, `missing`, `unknown`. Originally
26 26 all nodes are in `unknown`.
27 27 - Take a sample from `unknown`, call `remote.known(sample)`
28 28 - For each node that remote knows, move it and all its ancestors to `common`
29 29 - For each node that remote doesn't know, move it and all its descendants
30 30 to `missing`
31 31 - Iterate until `unknown` is empty
32 32
33 33 There are a couple optimizations, first is instead of starting with a random
34 34 sample of missing, start by sending all heads, in the case where the local
35 35 repo is a subset, you computed the answer in one round trip.
36 36
37 37 Then you can do something similar to the bisecting strategy used when
38 38 finding faulty changesets. Instead of random samples, you can try picking
39 39 nodes that will maximize the number of nodes that will be
40 40 classified with it (since all ancestors or descendants will be marked as well).
41 41 """
42 42
43 43 from __future__ import absolute_import
44 44
45 45 import collections
46 46 import random
47 47
48 48 from .i18n import _
49 49 from .node import (
50 50 nullid,
51 51 nullrev,
52 52 )
53 53 from . import (
54 54 error,
55 55 util,
56 56 )
57 57
58 58 def _updatesample(revs, heads, sample, parentfn, quicksamplesize=0):
59 59 """update an existing sample to match the expected size
60 60
61 61 The sample is updated with revs exponentially distant from each head of the
62 62 <revs> set. (H~1, H~2, H~4, H~8, etc).
63 63
64 64 If a target size is specified, the sampling will stop once this size is
65 65 reached. Otherwise sampling will happen until roots of the <revs> set are
66 66 reached.
67 67
68 68 :revs: set of revs we want to discover (if None, assume the whole dag)
69 69 :heads: set of DAG head revs
70 70 :sample: a sample to update
71 71 :parentfn: a callable to resolve parents for a revision
72 72 :quicksamplesize: optional target size of the sample"""
73 73 dist = {}
74 74 visit = collections.deque(heads)
75 75 seen = set()
76 76 factor = 1
77 77 while visit:
78 78 curr = visit.popleft()
79 79 if curr in seen:
80 80 continue
81 81 d = dist.setdefault(curr, 1)
82 82 if d > factor:
83 83 factor *= 2
84 84 if d == factor:
85 85 sample.add(curr)
86 86 if quicksamplesize and (len(sample) >= quicksamplesize):
87 87 return
88 88 seen.add(curr)
89 89
90 90 for p in parentfn(curr):
91 91 if p != nullrev and (not revs or p in revs):
92 92 dist.setdefault(p, d + 1)
93 93 visit.append(p)
94 94
95 def _takequicksample(repo, headrevs, revs, size):
96 """takes a quick sample of size <size>
97
98 It is meant for initial sampling and focuses on querying heads and close
99 ancestors of heads.
100
101 :dag: a dag object
102 :headrevs: set of head revisions in local DAG to consider
103 :revs: set of revs to discover
104 :size: the maximum size of the sample"""
105 if len(revs) <= size:
106 return list(revs)
107 sample = set(repo.revs('heads(%ld)', revs))
108
109 if len(sample) >= size:
110 return _limitsample(sample, size)
111
112 _updatesample(None, headrevs, sample, repo.changelog.parentrevs,
113 quicksamplesize=size)
114 return sample
115
116 def _takefullsample(repo, headrevs, revs, size):
117 if len(revs) <= size:
118 return list(revs)
119 sample = set(repo.revs('heads(%ld)', revs))
120
121 # update from heads
122 revsheads = set(repo.revs('heads(%ld)', revs))
123 _updatesample(revs, revsheads, sample, repo.changelog.parentrevs)
124
125 # update from roots
126 revsroots = set(repo.revs('roots(%ld)', revs))
127
128 # _updatesample() essentially does interaction over revisions to look up
129 # their children. This lookup is expensive and doing it in a loop is
130 # quadratic. We precompute the children for all relevant revisions and
131 # make the lookup in _updatesample() a simple dict lookup.
132 #
133 # Because this function can be called multiple times during discovery, we
134 # may still perform redundant work and there is room to optimize this by
135 # keeping a persistent cache of children across invocations.
136 children = {}
137
138 parentrevs = repo.changelog.parentrevs
139 for rev in repo.changelog.revs(start=min(revsroots)):
140 # Always ensure revision has an entry so we don't need to worry about
141 # missing keys.
142 children.setdefault(rev, [])
143
144 for prev in parentrevs(rev):
145 if prev == nullrev:
146 continue
147
148 children.setdefault(prev, []).append(rev)
149
150 _updatesample(revs, revsroots, sample, children.__getitem__)
151 assert sample
152 sample = _limitsample(sample, size)
153 if len(sample) < size:
154 more = size - len(sample)
155 sample.update(random.sample(list(revs - sample), more))
156 return sample
157
158 95 def _limitsample(sample, desiredlen):
159 96 """return a random subset of sample of at most desiredlen item"""
160 97 if len(sample) > desiredlen:
161 98 sample = set(random.sample(sample, desiredlen))
162 99 return sample
163 100
164 101 class partialdiscovery(object):
165 102 """an object representing ongoing discovery
166 103
167 104 Feed with data from the remote repository, this object keep track of the
168 105 current set of changeset in various states:
169 106
170 107 - common: revs also known remotely
171 108 - undecided: revs we don't have information on yet
172 109 - missing: revs missing remotely
173 110 (all tracked revisions are known locally)
174 111 """
175 112
176 113 def __init__(self, repo, targetheads):
177 114 self._repo = repo
178 115 self._targetheads = targetheads
179 116 self._common = repo.changelog.incrementalmissingrevs()
180 117 self._undecided = None
181 118 self.missing = set()
182 119
183 120 def addcommons(self, commons):
184 121 """registrer nodes known as common"""
185 122 self._common.addbases(commons)
186 123 if self._undecided is not None:
187 124 self._common.removeancestorsfrom(self._undecided)
188 125
189 126 def addmissings(self, missings):
190 127 """registrer some nodes as missing"""
191 128 newmissing = self._repo.revs('%ld::%ld', missings, self.undecided)
192 129 if newmissing:
193 130 self.missing.update(newmissing)
194 131 self.undecided.difference_update(newmissing)
195 132
196 133 def addinfo(self, sample):
197 134 """consume an iterable of (rev, known) tuples"""
198 135 common = set()
199 136 missing = set()
200 137 for rev, known in sample:
201 138 if known:
202 139 common.add(rev)
203 140 else:
204 141 missing.add(rev)
205 142 if common:
206 143 self.addcommons(common)
207 144 if missing:
208 145 self.addmissings(missing)
209 146
210 147 def hasinfo(self):
211 148 """return True is we have any clue about the remote state"""
212 149 return self._common.hasbases()
213 150
214 151 def iscomplete(self):
215 152 """True if all the necessary data have been gathered"""
216 153 return self._undecided is not None and not self._undecided
217 154
218 155 @property
219 156 def undecided(self):
220 157 if self._undecided is not None:
221 158 return self._undecided
222 159 self._undecided = set(self._common.missingancestors(self._targetheads))
223 160 return self._undecided
224 161
225 162 def commonheads(self):
226 163 """the heads of the known common set"""
227 164 # heads(common) == heads(common.bases) since common represents
228 165 # common.bases and all its ancestors
229 166 return self._common.basesheads()
230 167
168 def takequicksample(self, headrevs, size):
169 """takes a quick sample of size <size>
170
171 It is meant for initial sampling and focuses on querying heads and close
172 ancestors of heads.
173
174 :headrevs: set of head revisions in local DAG to consider
175 :size: the maximum size of the sample"""
176 revs = self.undecided
177 if len(revs) <= size:
178 return list(revs)
179 sample = set(self._repo.revs('heads(%ld)', revs))
180
181 if len(sample) >= size:
182 return _limitsample(sample, size)
183
184 _updatesample(None, headrevs, sample, self._repo.changelog.parentrevs,
185 quicksamplesize=size)
186 return sample
187
188 def takefullsample(self, headrevs, size):
189 revs = self.undecided
190 if len(revs) <= size:
191 return list(revs)
192 repo = self._repo
193 sample = set(repo.revs('heads(%ld)', revs))
194
195 # update from heads
196 revsheads = set(repo.revs('heads(%ld)', revs))
197 _updatesample(revs, revsheads, sample, repo.changelog.parentrevs)
198
199 # update from roots
200 revsroots = set(repo.revs('roots(%ld)', revs))
201
202 # _updatesample() essentially does interaction over revisions to look
203 # up their children. This lookup is expensive and doing it in a loop is
204 # quadratic. We precompute the children for all relevant revisions and
205 # make the lookup in _updatesample() a simple dict lookup.
206 #
207 # Because this function can be called multiple times during discovery,
208 # we may still perform redundant work and there is room to optimize
209 # this by keeping a persistent cache of children across invocations.
210 children = {}
211
212 parentrevs = repo.changelog.parentrevs
213 for rev in repo.changelog.revs(start=min(revsroots)):
214 # Always ensure revision has an entry so we don't need to worry
215 # about missing keys.
216 children.setdefault(rev, [])
217
218 for prev in parentrevs(rev):
219 if prev == nullrev:
220 continue
221
222 children.setdefault(prev, []).append(rev)
223
224 _updatesample(revs, revsroots, sample, children.__getitem__)
225 assert sample
226 sample = _limitsample(sample, size)
227 if len(sample) < size:
228 more = size - len(sample)
229 sample.update(random.sample(list(revs - sample), more))
230 return sample
231
231 232 def findcommonheads(ui, local, remote,
232 233 initialsamplesize=100,
233 234 fullsamplesize=200,
234 235 abortwhenunrelated=True,
235 236 ancestorsof=None):
236 237 '''Return a tuple (common, anyincoming, remoteheads) used to identify
237 238 missing nodes from or in remote.
238 239 '''
239 240 start = util.timer()
240 241
241 242 roundtrips = 0
242 243 cl = local.changelog
243 244 clnode = cl.node
244 245 clrev = cl.rev
245 246
246 247 if ancestorsof is not None:
247 248 ownheads = [clrev(n) for n in ancestorsof]
248 249 else:
249 250 ownheads = [rev for rev in cl.headrevs() if rev != nullrev]
250 251
251 252 # early exit if we know all the specified remote heads already
252 253 ui.debug("query 1; heads\n")
253 254 roundtrips += 1
254 255 sample = _limitsample(ownheads, initialsamplesize)
255 256 # indices between sample and externalized version must match
256 257 sample = list(sample)
257 258
258 259 with remote.commandexecutor() as e:
259 260 fheads = e.callcommand('heads', {})
260 261 fknown = e.callcommand('known', {
261 262 'nodes': [clnode(r) for r in sample],
262 263 })
263 264
264 265 srvheadhashes, yesno = fheads.result(), fknown.result()
265 266
266 267 if cl.tip() == nullid:
267 268 if srvheadhashes != [nullid]:
268 269 return [nullid], True, srvheadhashes
269 270 return [nullid], False, []
270 271
271 272 # start actual discovery (we note this before the next "if" for
272 273 # compatibility reasons)
273 274 ui.status(_("searching for changes\n"))
274 275
275 276 knownsrvheads = [] # revnos of remote heads that are known locally
276 277 for node in srvheadhashes:
277 278 if node == nullid:
278 279 continue
279 280
280 281 try:
281 282 knownsrvheads.append(clrev(node))
282 283 # Catches unknown and filtered nodes.
283 284 except error.LookupError:
284 285 continue
285 286
286 287 if len(knownsrvheads) == len(srvheadhashes):
287 288 ui.debug("all remote heads known locally\n")
288 289 return srvheadhashes, False, srvheadhashes
289 290
290 291 if len(sample) == len(ownheads) and all(yesno):
291 292 ui.note(_("all local heads known remotely\n"))
292 293 ownheadhashes = [clnode(r) for r in ownheads]
293 294 return ownheadhashes, True, srvheadhashes
294 295
295 296 # full blown discovery
296 297
297 298 disco = partialdiscovery(local, ownheads)
298 299 # treat remote heads (and maybe own heads) as a first implicit sample
299 300 # response
300 301 disco.addcommons(knownsrvheads)
301 302 disco.addinfo(zip(sample, yesno))
302 303
303 304 full = False
304 305 progress = ui.makeprogress(_('searching'), unit=_('queries'))
305 306 while not disco.iscomplete():
306 307
307 308 if full or disco.hasinfo():
308 309 if full:
309 310 ui.note(_("sampling from both directions\n"))
310 311 else:
311 312 ui.debug("taking initial sample\n")
312 samplefunc = _takefullsample
313 samplefunc = disco.takefullsample
313 314 targetsize = fullsamplesize
314 315 else:
315 316 # use even cheaper initial sample
316 317 ui.debug("taking quick initial sample\n")
317 samplefunc = _takequicksample
318 samplefunc = disco.takequicksample
318 319 targetsize = initialsamplesize
319 sample = samplefunc(local, ownheads, disco.undecided, targetsize)
320 sample = samplefunc(ownheads, targetsize)
320 321
321 322 roundtrips += 1
322 323 progress.update(roundtrips)
323 324 ui.debug("query %i; still undecided: %i, sample size is: %i\n"
324 325 % (roundtrips, len(disco.undecided), len(sample)))
325 326 # indices between sample and externalized version must match
326 327 sample = list(sample)
327 328
328 329 with remote.commandexecutor() as e:
329 330 yesno = e.callcommand('known', {
330 331 'nodes': [clnode(r) for r in sample],
331 332 }).result()
332 333
333 334 full = True
334 335
335 336 disco.addinfo(zip(sample, yesno))
336 337
337 338 result = disco.commonheads()
338 339 elapsed = util.timer() - start
339 340 progress.complete()
340 341 ui.debug("%d total queries in %.4fs\n" % (roundtrips, elapsed))
341 342 msg = ('found %d common and %d unknown server heads,'
342 343 ' %d roundtrips in %.4fs\n')
343 344 missing = set(result) - set(knownsrvheads)
344 345 ui.log('discovery', msg, len(result), len(missing), roundtrips,
345 346 elapsed)
346 347
347 348 if not result and srvheadhashes != [nullid]:
348 349 if abortwhenunrelated:
349 350 raise error.Abort(_("repository is unrelated"))
350 351 else:
351 352 ui.warn(_("warning: repository is unrelated\n"))
352 353 return ({nullid}, True, srvheadhashes,)
353 354
354 355 anyincoming = (srvheadhashes != [nullid])
355 356 result = {clnode(r) for r in result}
356 357 return result, anyincoming, srvheadhashes
General Comments 0
You need to be logged in to leave comments. Login now