##// END OF EJS Templates
setdiscovery: use a revset for finding DAG heads in a subset...
Gregory Szorc -
r39205:14099275 default
parent child Browse files
Show More
@@ -1,290 +1,292 b''
1 1 # setdiscovery.py - improved discovery of common nodeset for mercurial
2 2 #
3 3 # Copyright 2010 Benoit Boissinot <bboissin@gmail.com>
4 4 # and Peter Arrenbrecht <peter@arrenbrecht.ch>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8 """
9 9 Algorithm works in the following way. You have two repository: local and
10 10 remote. They both contains a DAG of changelists.
11 11
12 12 The goal of the discovery protocol is to find one set of node *common*,
13 13 the set of nodes shared by local and remote.
14 14
15 15 One of the issue with the original protocol was latency, it could
16 16 potentially require lots of roundtrips to discover that the local repo was a
17 17 subset of remote (which is a very common case, you usually have few changes
18 18 compared to upstream, while upstream probably had lots of development).
19 19
20 20 The new protocol only requires one interface for the remote repo: `known()`,
21 21 which given a set of changelists tells you if they are present in the DAG.
22 22
23 23 The algorithm then works as follow:
24 24
25 25 - We will be using three sets, `common`, `missing`, `unknown`. Originally
26 26 all nodes are in `unknown`.
27 27 - Take a sample from `unknown`, call `remote.known(sample)`
28 28 - For each node that remote knows, move it and all its ancestors to `common`
29 29 - For each node that remote doesn't know, move it and all its descendants
30 30 to `missing`
31 31 - Iterate until `unknown` is empty
32 32
33 33 There are a couple optimizations, first is instead of starting with a random
34 34 sample of missing, start by sending all heads, in the case where the local
35 35 repo is a subset, you computed the answer in one round trip.
36 36
37 37 Then you can do something similar to the bisecting strategy used when
38 38 finding faulty changesets. Instead of random samples, you can try picking
39 39 nodes that will maximize the number of nodes that will be
40 40 classified with it (since all ancestors or descendants will be marked as well).
41 41 """
42 42
43 43 from __future__ import absolute_import
44 44
45 45 import collections
46 46 import random
47 47
48 48 from .i18n import _
49 49 from .node import (
50 50 nullid,
51 51 nullrev,
52 52 )
53 53 from . import (
54 54 dagutil,
55 55 error,
56 56 util,
57 57 )
58 58
59 59 def _updatesample(dag, revs, sample, quicksamplesize=0):
60 60 """update an existing sample to match the expected size
61 61
62 62 The sample is updated with revs exponentially distant from each head of the
63 63 <revs> set. (H~1, H~2, H~4, H~8, etc).
64 64
65 65 If a target size is specified, the sampling will stop once this size is
66 66 reached. Otherwise sampling will happen until roots of the <revs> set are
67 67 reached.
68 68
69 69 :dag: a dag object from dagutil
70 70 :revs: set of revs we want to discover (if None, assume the whole dag)
71 71 :sample: a sample to update
72 72 :quicksamplesize: optional target size of the sample"""
73 73 # if revs is empty we scan the entire graph
74 74 if revs:
75 75 heads = dag.headsetofconnecteds(revs)
76 76 else:
77 77 heads = dag.heads()
78 78 dist = {}
79 79 visit = collections.deque(heads)
80 80 seen = set()
81 81 factor = 1
82 82 while visit:
83 83 curr = visit.popleft()
84 84 if curr in seen:
85 85 continue
86 86 d = dist.setdefault(curr, 1)
87 87 if d > factor:
88 88 factor *= 2
89 89 if d == factor:
90 90 sample.add(curr)
91 91 if quicksamplesize and (len(sample) >= quicksamplesize):
92 92 return
93 93 seen.add(curr)
94 94 for p in dag.parents(curr):
95 95 if not revs or p in revs:
96 96 dist.setdefault(p, d + 1)
97 97 visit.append(p)
98 98
99 def _takequicksample(dag, revs, size):
99 def _takequicksample(repo, dag, revs, size):
100 100 """takes a quick sample of size <size>
101 101
102 102 It is meant for initial sampling and focuses on querying heads and close
103 103 ancestors of heads.
104 104
105 105 :dag: a dag object
106 106 :revs: set of revs to discover
107 107 :size: the maximum size of the sample"""
108 sample = dag.headsetofconnecteds(revs)
108 sample = set(repo.revs('heads(%ld)', revs))
109
109 110 if len(sample) >= size:
110 111 return _limitsample(sample, size)
111 112 _updatesample(dag, None, sample, quicksamplesize=size)
112 113 return sample
113 114
114 def _takefullsample(dag, revs, size):
115 sample = dag.headsetofconnecteds(revs)
115 def _takefullsample(repo, dag, revs, size):
116 sample = set(repo.revs('heads(%ld)', revs))
117
116 118 # update from heads
117 119 _updatesample(dag, revs, sample)
118 120 # update from roots
119 121 _updatesample(dag.inverse(), revs, sample)
120 122 assert sample
121 123 sample = _limitsample(sample, size)
122 124 if len(sample) < size:
123 125 more = size - len(sample)
124 126 sample.update(random.sample(list(revs - sample), more))
125 127 return sample
126 128
127 129 def _limitsample(sample, desiredlen):
128 130 """return a random subset of sample of at most desiredlen item"""
129 131 if len(sample) > desiredlen:
130 132 sample = set(random.sample(sample, desiredlen))
131 133 return sample
132 134
133 135 def findcommonheads(ui, local, remote,
134 136 initialsamplesize=100,
135 137 fullsamplesize=200,
136 138 abortwhenunrelated=True,
137 139 ancestorsof=None):
138 140 '''Return a tuple (common, anyincoming, remoteheads) used to identify
139 141 missing nodes from or in remote.
140 142 '''
141 143 start = util.timer()
142 144
143 145 roundtrips = 0
144 146 cl = local.changelog
145 147 clnode = cl.node
146 148 clrev = cl.rev
147 149
148 150 if ancestorsof is not None:
149 151 ownheads = [clrev(n) for n in ancestorsof]
150 152 else:
151 153 ownheads = [rev for rev in cl.headrevs() if rev != nullrev]
152 154
153 155 dag = dagutil.revlogdag(cl, localsubset=ownheads)
154 156
155 157 # early exit if we know all the specified remote heads already
156 158 ui.debug("query 1; heads\n")
157 159 roundtrips += 1
158 160 sample = _limitsample(ownheads, initialsamplesize)
159 161 # indices between sample and externalized version must match
160 162 sample = list(sample)
161 163
162 164 with remote.commandexecutor() as e:
163 165 fheads = e.callcommand('heads', {})
164 166 fknown = e.callcommand('known', {
165 167 'nodes': [clnode(r) for r in sample],
166 168 })
167 169
168 170 srvheadhashes, yesno = fheads.result(), fknown.result()
169 171
170 172 if cl.tip() == nullid:
171 173 if srvheadhashes != [nullid]:
172 174 return [nullid], True, srvheadhashes
173 175 return [nullid], False, []
174 176
175 177 # start actual discovery (we note this before the next "if" for
176 178 # compatibility reasons)
177 179 ui.status(_("searching for changes\n"))
178 180
179 181 srvheads = []
180 182 for node in srvheadhashes:
181 183 if node == nullid:
182 184 continue
183 185
184 186 try:
185 187 srvheads.append(clrev(node))
186 188 # Catches unknown and filtered nodes.
187 189 except error.LookupError:
188 190 continue
189 191
190 192 if len(srvheads) == len(srvheadhashes):
191 193 ui.debug("all remote heads known locally\n")
192 194 return srvheadhashes, False, srvheadhashes
193 195
194 196 if len(sample) == len(ownheads) and all(yesno):
195 197 ui.note(_("all local heads known remotely\n"))
196 198 ownheadhashes = [clnode(r) for r in ownheads]
197 199 return ownheadhashes, True, srvheadhashes
198 200
199 201 # full blown discovery
200 202
201 203 # own nodes I know we both know
202 204 # treat remote heads (and maybe own heads) as a first implicit sample
203 205 # response
204 206 common = cl.incrementalmissingrevs(srvheads)
205 207 commoninsample = set(n for i, n in enumerate(sample) if yesno[i])
206 208 common.addbases(commoninsample)
207 209 # own nodes where I don't know if remote knows them
208 210 undecided = set(common.missingancestors(ownheads))
209 211 # own nodes I know remote lacks
210 212 missing = set()
211 213
212 214 full = False
213 215 progress = ui.makeprogress(_('searching'), unit=_('queries'))
214 216 while undecided:
215 217
216 218 if sample:
217 219 missinginsample = [n for i, n in enumerate(sample) if not yesno[i]]
218 220
219 221 if missing:
220 222 missing.update(local.revs('descendants(%ld) - descendants(%ld)',
221 223 missinginsample, missing))
222 224 else:
223 225 missing.update(local.revs('descendants(%ld)', missinginsample))
224 226
225 227 undecided.difference_update(missing)
226 228
227 229 if not undecided:
228 230 break
229 231
230 232 if full or common.hasbases():
231 233 if full:
232 234 ui.note(_("sampling from both directions\n"))
233 235 else:
234 236 ui.debug("taking initial sample\n")
235 237 samplefunc = _takefullsample
236 238 targetsize = fullsamplesize
237 239 else:
238 240 # use even cheaper initial sample
239 241 ui.debug("taking quick initial sample\n")
240 242 samplefunc = _takequicksample
241 243 targetsize = initialsamplesize
242 244 if len(undecided) < targetsize:
243 245 sample = list(undecided)
244 246 else:
245 sample = samplefunc(dag, undecided, targetsize)
247 sample = samplefunc(local, dag, undecided, targetsize)
246 248
247 249 roundtrips += 1
248 250 progress.update(roundtrips)
249 251 ui.debug("query %i; still undecided: %i, sample size is: %i\n"
250 252 % (roundtrips, len(undecided), len(sample)))
251 253 # indices between sample and externalized version must match
252 254 sample = list(sample)
253 255
254 256 with remote.commandexecutor() as e:
255 257 yesno = e.callcommand('known', {
256 258 'nodes': [clnode(r) for r in sample],
257 259 }).result()
258 260
259 261 full = True
260 262
261 263 if sample:
262 264 commoninsample = set(n for i, n in enumerate(sample) if yesno[i])
263 265 common.addbases(commoninsample)
264 266 common.removeancestorsfrom(undecided)
265 267
266 268 # heads(common) == heads(common.bases) since common represents common.bases
267 269 # and all its ancestors
268 270 result = dag.headsetofconnecteds(common.bases)
269 271 # common.bases can include nullrev, but our contract requires us to not
270 272 # return any heads in that case, so discard that
271 273 result.discard(nullrev)
272 274 elapsed = util.timer() - start
273 275 progress.complete()
274 276 ui.debug("%d total queries in %.4fs\n" % (roundtrips, elapsed))
275 277 msg = ('found %d common and %d unknown server heads,'
276 278 ' %d roundtrips in %.4fs\n')
277 279 missing = set(result) - set(srvheads)
278 280 ui.log('discovery', msg, len(result), len(missing), roundtrips,
279 281 elapsed)
280 282
281 283 if not result and srvheadhashes != [nullid]:
282 284 if abortwhenunrelated:
283 285 raise error.Abort(_("repository is unrelated"))
284 286 else:
285 287 ui.warn(_("warning: repository is unrelated\n"))
286 288 return ({nullid}, True, srvheadhashes,)
287 289
288 290 anyincoming = (srvheadhashes != [nullid])
289 291 result = {clnode(r) for r in result}
290 292 return result, anyincoming, srvheadhashes
General Comments 0
You need to be logged in to leave comments. Login now