##// END OF EJS Templates
setdiscovery: don't use dagutil to compute heads...
Gregory Szorc -
r39201:860e83cd default
parent child Browse files
Show More
@@ -1,284 +1,285 b''
1 # setdiscovery.py - improved discovery of common nodeset for mercurial
1 # setdiscovery.py - improved discovery of common nodeset for mercurial
2 #
2 #
3 # Copyright 2010 Benoit Boissinot <bboissin@gmail.com>
3 # Copyright 2010 Benoit Boissinot <bboissin@gmail.com>
4 # and Peter Arrenbrecht <peter@arrenbrecht.ch>
4 # and Peter Arrenbrecht <peter@arrenbrecht.ch>
5 #
5 #
6 # This software may be used and distributed according to the terms of the
6 # This software may be used and distributed according to the terms of the
7 # GNU General Public License version 2 or any later version.
7 # GNU General Public License version 2 or any later version.
8 """
8 """
9 Algorithm works in the following way. You have two repository: local and
9 Algorithm works in the following way. You have two repository: local and
10 remote. They both contains a DAG of changelists.
10 remote. They both contains a DAG of changelists.
11
11
12 The goal of the discovery protocol is to find one set of node *common*,
12 The goal of the discovery protocol is to find one set of node *common*,
13 the set of nodes shared by local and remote.
13 the set of nodes shared by local and remote.
14
14
15 One of the issue with the original protocol was latency, it could
15 One of the issue with the original protocol was latency, it could
16 potentially require lots of roundtrips to discover that the local repo was a
16 potentially require lots of roundtrips to discover that the local repo was a
17 subset of remote (which is a very common case, you usually have few changes
17 subset of remote (which is a very common case, you usually have few changes
18 compared to upstream, while upstream probably had lots of development).
18 compared to upstream, while upstream probably had lots of development).
19
19
20 The new protocol only requires one interface for the remote repo: `known()`,
20 The new protocol only requires one interface for the remote repo: `known()`,
21 which given a set of changelists tells you if they are present in the DAG.
21 which given a set of changelists tells you if they are present in the DAG.
22
22
23 The algorithm then works as follow:
23 The algorithm then works as follow:
24
24
25 - We will be using three sets, `common`, `missing`, `unknown`. Originally
25 - We will be using three sets, `common`, `missing`, `unknown`. Originally
26 all nodes are in `unknown`.
26 all nodes are in `unknown`.
27 - Take a sample from `unknown`, call `remote.known(sample)`
27 - Take a sample from `unknown`, call `remote.known(sample)`
28 - For each node that remote knows, move it and all its ancestors to `common`
28 - For each node that remote knows, move it and all its ancestors to `common`
29 - For each node that remote doesn't know, move it and all its descendants
29 - For each node that remote doesn't know, move it and all its descendants
30 to `missing`
30 to `missing`
31 - Iterate until `unknown` is empty
31 - Iterate until `unknown` is empty
32
32
33 There are a couple optimizations, first is instead of starting with a random
33 There are a couple optimizations, first is instead of starting with a random
34 sample of missing, start by sending all heads, in the case where the local
34 sample of missing, start by sending all heads, in the case where the local
35 repo is a subset, you computed the answer in one round trip.
35 repo is a subset, you computed the answer in one round trip.
36
36
37 Then you can do something similar to the bisecting strategy used when
37 Then you can do something similar to the bisecting strategy used when
38 finding faulty changesets. Instead of random samples, you can try picking
38 finding faulty changesets. Instead of random samples, you can try picking
39 nodes that will maximize the number of nodes that will be
39 nodes that will maximize the number of nodes that will be
40 classified with it (since all ancestors or descendants will be marked as well).
40 classified with it (since all ancestors or descendants will be marked as well).
41 """
41 """
42
42
43 from __future__ import absolute_import
43 from __future__ import absolute_import
44
44
45 import collections
45 import collections
46 import random
46 import random
47
47
48 from .i18n import _
48 from .i18n import _
49 from .node import (
49 from .node import (
50 nullid,
50 nullid,
51 nullrev,
51 nullrev,
52 )
52 )
53 from . import (
53 from . import (
54 dagutil,
54 dagutil,
55 error,
55 error,
56 util,
56 util,
57 )
57 )
58
58
59 def _updatesample(dag, nodes, sample, quicksamplesize=0):
59 def _updatesample(dag, nodes, sample, quicksamplesize=0):
60 """update an existing sample to match the expected size
60 """update an existing sample to match the expected size
61
61
62 The sample is updated with nodes exponentially distant from each head of the
62 The sample is updated with nodes exponentially distant from each head of the
63 <nodes> set. (H~1, H~2, H~4, H~8, etc).
63 <nodes> set. (H~1, H~2, H~4, H~8, etc).
64
64
65 If a target size is specified, the sampling will stop once this size is
65 If a target size is specified, the sampling will stop once this size is
66 reached. Otherwise sampling will happen until roots of the <nodes> set are
66 reached. Otherwise sampling will happen until roots of the <nodes> set are
67 reached.
67 reached.
68
68
69 :dag: a dag object from dagutil
69 :dag: a dag object from dagutil
70 :nodes: set of nodes we want to discover (if None, assume the whole dag)
70 :nodes: set of nodes we want to discover (if None, assume the whole dag)
71 :sample: a sample to update
71 :sample: a sample to update
72 :quicksamplesize: optional target size of the sample"""
72 :quicksamplesize: optional target size of the sample"""
73 # if nodes is empty we scan the entire graph
73 # if nodes is empty we scan the entire graph
74 if nodes:
74 if nodes:
75 heads = dag.headsetofconnecteds(nodes)
75 heads = dag.headsetofconnecteds(nodes)
76 else:
76 else:
77 heads = dag.heads()
77 heads = dag.heads()
78 dist = {}
78 dist = {}
79 visit = collections.deque(heads)
79 visit = collections.deque(heads)
80 seen = set()
80 seen = set()
81 factor = 1
81 factor = 1
82 while visit:
82 while visit:
83 curr = visit.popleft()
83 curr = visit.popleft()
84 if curr in seen:
84 if curr in seen:
85 continue
85 continue
86 d = dist.setdefault(curr, 1)
86 d = dist.setdefault(curr, 1)
87 if d > factor:
87 if d > factor:
88 factor *= 2
88 factor *= 2
89 if d == factor:
89 if d == factor:
90 sample.add(curr)
90 sample.add(curr)
91 if quicksamplesize and (len(sample) >= quicksamplesize):
91 if quicksamplesize and (len(sample) >= quicksamplesize):
92 return
92 return
93 seen.add(curr)
93 seen.add(curr)
94 for p in dag.parents(curr):
94 for p in dag.parents(curr):
95 if not nodes or p in nodes:
95 if not nodes or p in nodes:
96 dist.setdefault(p, d + 1)
96 dist.setdefault(p, d + 1)
97 visit.append(p)
97 visit.append(p)
98
98
99 def _takequicksample(dag, nodes, size):
99 def _takequicksample(dag, nodes, size):
100 """takes a quick sample of size <size>
100 """takes a quick sample of size <size>
101
101
102 It is meant for initial sampling and focuses on querying heads and close
102 It is meant for initial sampling and focuses on querying heads and close
103 ancestors of heads.
103 ancestors of heads.
104
104
105 :dag: a dag object
105 :dag: a dag object
106 :nodes: set of nodes to discover
106 :nodes: set of nodes to discover
107 :size: the maximum size of the sample"""
107 :size: the maximum size of the sample"""
108 sample = dag.headsetofconnecteds(nodes)
108 sample = dag.headsetofconnecteds(nodes)
109 if len(sample) >= size:
109 if len(sample) >= size:
110 return _limitsample(sample, size)
110 return _limitsample(sample, size)
111 _updatesample(dag, None, sample, quicksamplesize=size)
111 _updatesample(dag, None, sample, quicksamplesize=size)
112 return sample
112 return sample
113
113
114 def _takefullsample(dag, nodes, size):
114 def _takefullsample(dag, nodes, size):
115 sample = dag.headsetofconnecteds(nodes)
115 sample = dag.headsetofconnecteds(nodes)
116 # update from heads
116 # update from heads
117 _updatesample(dag, nodes, sample)
117 _updatesample(dag, nodes, sample)
118 # update from roots
118 # update from roots
119 _updatesample(dag.inverse(), nodes, sample)
119 _updatesample(dag.inverse(), nodes, sample)
120 assert sample
120 assert sample
121 sample = _limitsample(sample, size)
121 sample = _limitsample(sample, size)
122 if len(sample) < size:
122 if len(sample) < size:
123 more = size - len(sample)
123 more = size - len(sample)
124 sample.update(random.sample(list(nodes - sample), more))
124 sample.update(random.sample(list(nodes - sample), more))
125 return sample
125 return sample
126
126
127 def _limitsample(sample, desiredlen):
127 def _limitsample(sample, desiredlen):
128 """return a random subset of sample of at most desiredlen item"""
128 """return a random subset of sample of at most desiredlen item"""
129 if len(sample) > desiredlen:
129 if len(sample) > desiredlen:
130 sample = set(random.sample(sample, desiredlen))
130 sample = set(random.sample(sample, desiredlen))
131 return sample
131 return sample
132
132
133 def findcommonheads(ui, local, remote,
133 def findcommonheads(ui, local, remote,
134 initialsamplesize=100,
134 initialsamplesize=100,
135 fullsamplesize=200,
135 fullsamplesize=200,
136 abortwhenunrelated=True,
136 abortwhenunrelated=True,
137 ancestorsof=None):
137 ancestorsof=None):
138 '''Return a tuple (common, anyincoming, remoteheads) used to identify
138 '''Return a tuple (common, anyincoming, remoteheads) used to identify
139 missing nodes from or in remote.
139 missing nodes from or in remote.
140 '''
140 '''
141 start = util.timer()
141 start = util.timer()
142
142
143 roundtrips = 0
143 roundtrips = 0
144 cl = local.changelog
144 cl = local.changelog
145 clnode = cl.node
145 clnode = cl.node
146 clrev = cl.rev
146 clrev = cl.rev
147 localsubset = None
148
147
149 if ancestorsof is not None:
148 if ancestorsof is not None:
150 localsubset = [clrev(n) for n in ancestorsof]
149 ownheads = [clrev(n) for n in ancestorsof]
151 dag = dagutil.revlogdag(cl, localsubset=localsubset)
150 else:
151 ownheads = [rev for rev in cl.headrevs() if rev != nullrev]
152
153 dag = dagutil.revlogdag(cl, localsubset=ownheads)
152
154
153 # early exit if we know all the specified remote heads already
155 # early exit if we know all the specified remote heads already
154 ui.debug("query 1; heads\n")
156 ui.debug("query 1; heads\n")
155 roundtrips += 1
157 roundtrips += 1
156 ownheads = dag.heads()
157 sample = _limitsample(ownheads, initialsamplesize)
158 sample = _limitsample(ownheads, initialsamplesize)
158 # indices between sample and externalized version must match
159 # indices between sample and externalized version must match
159 sample = list(sample)
160 sample = list(sample)
160
161
161 with remote.commandexecutor() as e:
162 with remote.commandexecutor() as e:
162 fheads = e.callcommand('heads', {})
163 fheads = e.callcommand('heads', {})
163 fknown = e.callcommand('known', {
164 fknown = e.callcommand('known', {
164 'nodes': [clnode(r) for r in sample],
165 'nodes': [clnode(r) for r in sample],
165 })
166 })
166
167
167 srvheadhashes, yesno = fheads.result(), fknown.result()
168 srvheadhashes, yesno = fheads.result(), fknown.result()
168
169
169 if cl.tip() == nullid:
170 if cl.tip() == nullid:
170 if srvheadhashes != [nullid]:
171 if srvheadhashes != [nullid]:
171 return [nullid], True, srvheadhashes
172 return [nullid], True, srvheadhashes
172 return [nullid], False, []
173 return [nullid], False, []
173
174
174 # start actual discovery (we note this before the next "if" for
175 # start actual discovery (we note this before the next "if" for
175 # compatibility reasons)
176 # compatibility reasons)
176 ui.status(_("searching for changes\n"))
177 ui.status(_("searching for changes\n"))
177
178
178 srvheads = []
179 srvheads = []
179 for node in srvheadhashes:
180 for node in srvheadhashes:
180 if node == nullid:
181 if node == nullid:
181 continue
182 continue
182
183
183 try:
184 try:
184 srvheads.append(clrev(node))
185 srvheads.append(clrev(node))
185 # Catches unknown and filtered nodes.
186 # Catches unknown and filtered nodes.
186 except error.LookupError:
187 except error.LookupError:
187 continue
188 continue
188
189
189 if len(srvheads) == len(srvheadhashes):
190 if len(srvheads) == len(srvheadhashes):
190 ui.debug("all remote heads known locally\n")
191 ui.debug("all remote heads known locally\n")
191 return srvheadhashes, False, srvheadhashes
192 return srvheadhashes, False, srvheadhashes
192
193
193 if len(sample) == len(ownheads) and all(yesno):
194 if len(sample) == len(ownheads) and all(yesno):
194 ui.note(_("all local heads known remotely\n"))
195 ui.note(_("all local heads known remotely\n"))
195 ownheadhashes = [clnode(r) for r in ownheads]
196 ownheadhashes = [clnode(r) for r in ownheads]
196 return ownheadhashes, True, srvheadhashes
197 return ownheadhashes, True, srvheadhashes
197
198
198 # full blown discovery
199 # full blown discovery
199
200
200 # own nodes I know we both know
201 # own nodes I know we both know
201 # treat remote heads (and maybe own heads) as a first implicit sample
202 # treat remote heads (and maybe own heads) as a first implicit sample
202 # response
203 # response
203 common = cl.incrementalmissingrevs(srvheads)
204 common = cl.incrementalmissingrevs(srvheads)
204 commoninsample = set(n for i, n in enumerate(sample) if yesno[i])
205 commoninsample = set(n for i, n in enumerate(sample) if yesno[i])
205 common.addbases(commoninsample)
206 common.addbases(commoninsample)
206 # own nodes where I don't know if remote knows them
207 # own nodes where I don't know if remote knows them
207 undecided = set(common.missingancestors(ownheads))
208 undecided = set(common.missingancestors(ownheads))
208 # own nodes I know remote lacks
209 # own nodes I know remote lacks
209 missing = set()
210 missing = set()
210
211
211 full = False
212 full = False
212 progress = ui.makeprogress(_('searching'), unit=_('queries'))
213 progress = ui.makeprogress(_('searching'), unit=_('queries'))
213 while undecided:
214 while undecided:
214
215
215 if sample:
216 if sample:
216 missinginsample = [n for i, n in enumerate(sample) if not yesno[i]]
217 missinginsample = [n for i, n in enumerate(sample) if not yesno[i]]
217 missing.update(dag.descendantset(missinginsample, missing))
218 missing.update(dag.descendantset(missinginsample, missing))
218
219
219 undecided.difference_update(missing)
220 undecided.difference_update(missing)
220
221
221 if not undecided:
222 if not undecided:
222 break
223 break
223
224
224 if full or common.hasbases():
225 if full or common.hasbases():
225 if full:
226 if full:
226 ui.note(_("sampling from both directions\n"))
227 ui.note(_("sampling from both directions\n"))
227 else:
228 else:
228 ui.debug("taking initial sample\n")
229 ui.debug("taking initial sample\n")
229 samplefunc = _takefullsample
230 samplefunc = _takefullsample
230 targetsize = fullsamplesize
231 targetsize = fullsamplesize
231 else:
232 else:
232 # use even cheaper initial sample
233 # use even cheaper initial sample
233 ui.debug("taking quick initial sample\n")
234 ui.debug("taking quick initial sample\n")
234 samplefunc = _takequicksample
235 samplefunc = _takequicksample
235 targetsize = initialsamplesize
236 targetsize = initialsamplesize
236 if len(undecided) < targetsize:
237 if len(undecided) < targetsize:
237 sample = list(undecided)
238 sample = list(undecided)
238 else:
239 else:
239 sample = samplefunc(dag, undecided, targetsize)
240 sample = samplefunc(dag, undecided, targetsize)
240
241
241 roundtrips += 1
242 roundtrips += 1
242 progress.update(roundtrips)
243 progress.update(roundtrips)
243 ui.debug("query %i; still undecided: %i, sample size is: %i\n"
244 ui.debug("query %i; still undecided: %i, sample size is: %i\n"
244 % (roundtrips, len(undecided), len(sample)))
245 % (roundtrips, len(undecided), len(sample)))
245 # indices between sample and externalized version must match
246 # indices between sample and externalized version must match
246 sample = list(sample)
247 sample = list(sample)
247
248
248 with remote.commandexecutor() as e:
249 with remote.commandexecutor() as e:
249 yesno = e.callcommand('known', {
250 yesno = e.callcommand('known', {
250 'nodes': [clnode(r) for r in sample],
251 'nodes': [clnode(r) for r in sample],
251 }).result()
252 }).result()
252
253
253 full = True
254 full = True
254
255
255 if sample:
256 if sample:
256 commoninsample = set(n for i, n in enumerate(sample) if yesno[i])
257 commoninsample = set(n for i, n in enumerate(sample) if yesno[i])
257 common.addbases(commoninsample)
258 common.addbases(commoninsample)
258 common.removeancestorsfrom(undecided)
259 common.removeancestorsfrom(undecided)
259
260
260 # heads(common) == heads(common.bases) since common represents common.bases
261 # heads(common) == heads(common.bases) since common represents common.bases
261 # and all its ancestors
262 # and all its ancestors
262 result = dag.headsetofconnecteds(common.bases)
263 result = dag.headsetofconnecteds(common.bases)
263 # common.bases can include nullrev, but our contract requires us to not
264 # common.bases can include nullrev, but our contract requires us to not
264 # return any heads in that case, so discard that
265 # return any heads in that case, so discard that
265 result.discard(nullrev)
266 result.discard(nullrev)
266 elapsed = util.timer() - start
267 elapsed = util.timer() - start
267 progress.complete()
268 progress.complete()
268 ui.debug("%d total queries in %.4fs\n" % (roundtrips, elapsed))
269 ui.debug("%d total queries in %.4fs\n" % (roundtrips, elapsed))
269 msg = ('found %d common and %d unknown server heads,'
270 msg = ('found %d common and %d unknown server heads,'
270 ' %d roundtrips in %.4fs\n')
271 ' %d roundtrips in %.4fs\n')
271 missing = set(result) - set(srvheads)
272 missing = set(result) - set(srvheads)
272 ui.log('discovery', msg, len(result), len(missing), roundtrips,
273 ui.log('discovery', msg, len(result), len(missing), roundtrips,
273 elapsed)
274 elapsed)
274
275
275 if not result and srvheadhashes != [nullid]:
276 if not result and srvheadhashes != [nullid]:
276 if abortwhenunrelated:
277 if abortwhenunrelated:
277 raise error.Abort(_("repository is unrelated"))
278 raise error.Abort(_("repository is unrelated"))
278 else:
279 else:
279 ui.warn(_("warning: repository is unrelated\n"))
280 ui.warn(_("warning: repository is unrelated\n"))
280 return ({nullid}, True, srvheadhashes,)
281 return ({nullid}, True, srvheadhashes,)
281
282
282 anyincoming = (srvheadhashes != [nullid])
283 anyincoming = (srvheadhashes != [nullid])
283 result = {clnode(r) for r in result}
284 result = {clnode(r) for r in result}
284 return result, anyincoming, srvheadhashes
285 return result, anyincoming, srvheadhashes
General Comments 0
You need to be logged in to leave comments. Login now