##// END OF EJS Templates
discovery: always use batching now that all peers support batching...
Augie Fackler -
r25914:4d77e896 default
parent child Browse files
Show More
@@ -1,251 +1,241 b''
1 1 # setdiscovery.py - improved discovery of common nodeset for mercurial
2 2 #
3 3 # Copyright 2010 Benoit Boissinot <bboissin@gmail.com>
4 4 # and Peter Arrenbrecht <peter@arrenbrecht.ch>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8 """
9 9 Algorithm works in the following way. You have two repository: local and
10 10 remote. They both contains a DAG of changelists.
11 11
12 12 The goal of the discovery protocol is to find one set of node *common*,
13 13 the set of nodes shared by local and remote.
14 14
15 15 One of the issue with the original protocol was latency, it could
16 16 potentially require lots of roundtrips to discover that the local repo was a
17 17 subset of remote (which is a very common case, you usually have few changes
18 18 compared to upstream, while upstream probably had lots of development).
19 19
20 20 The new protocol only requires one interface for the remote repo: `known()`,
21 21 which given a set of changelists tells you if they are present in the DAG.
22 22
23 23 The algorithm then works as follow:
24 24
25 25 - We will be using three sets, `common`, `missing`, `unknown`. Originally
26 26 all nodes are in `unknown`.
27 27 - Take a sample from `unknown`, call `remote.known(sample)`
28 28 - For each node that remote knows, move it and all its ancestors to `common`
29 29 - For each node that remote doesn't know, move it and all its descendants
30 30 to `missing`
31 31 - Iterate until `unknown` is empty
32 32
33 33 There are a couple optimizations, first is instead of starting with a random
34 34 sample of missing, start by sending all heads, in the case where the local
35 35 repo is a subset, you computed the answer in one round trip.
36 36
37 37 Then you can do something similar to the bisecting strategy used when
38 38 finding faulty changesets. Instead of random samples, you can try picking
39 39 nodes that will maximize the number of nodes that will be
40 40 classified with it (since all ancestors or descendants will be marked as well).
41 41 """
42 42
43 43 import collections
44 44 from node import nullid, nullrev
45 45 from i18n import _
46 46 import random
47 47 import util, dagutil
48 48
49 49 def _updatesample(dag, nodes, sample, quicksamplesize=0):
50 50 """update an existing sample to match the expected size
51 51
52 52 The sample is updated with nodes exponentially distant from each head of the
53 53 <nodes> set. (H~1, H~2, H~4, H~8, etc).
54 54
55 55 If a target size is specified, the sampling will stop once this size is
56 56 reached. Otherwise sampling will happen until roots of the <nodes> set are
57 57 reached.
58 58
59 59 :dag: a dag object from dagutil
60 60 :nodes: set of nodes we want to discover (if None, assume the whole dag)
61 61 :sample: a sample to update
62 62 :quicksamplesize: optional target size of the sample"""
63 63 # if nodes is empty we scan the entire graph
64 64 if nodes:
65 65 heads = dag.headsetofconnecteds(nodes)
66 66 else:
67 67 heads = dag.heads()
68 68 dist = {}
69 69 visit = collections.deque(heads)
70 70 seen = set()
71 71 factor = 1
72 72 while visit:
73 73 curr = visit.popleft()
74 74 if curr in seen:
75 75 continue
76 76 d = dist.setdefault(curr, 1)
77 77 if d > factor:
78 78 factor *= 2
79 79 if d == factor:
80 80 sample.add(curr)
81 81 if quicksamplesize and (len(sample) >= quicksamplesize):
82 82 return
83 83 seen.add(curr)
84 84 for p in dag.parents(curr):
85 85 if not nodes or p in nodes:
86 86 dist.setdefault(p, d + 1)
87 87 visit.append(p)
88 88
89 89 def _takequicksample(dag, nodes, size):
90 90 """takes a quick sample of size <size>
91 91
92 92 It is meant for initial sampling and focuses on querying heads and close
93 93 ancestors of heads.
94 94
95 95 :dag: a dag object
96 96 :nodes: set of nodes to discover
97 97 :size: the maximum size of the sample"""
98 98 sample = dag.headsetofconnecteds(nodes)
99 99 if size <= len(sample):
100 100 return _limitsample(sample, size)
101 101 _updatesample(dag, None, sample, quicksamplesize=size)
102 102 return sample
103 103
104 104 def _takefullsample(dag, nodes, size):
105 105 sample = dag.headsetofconnecteds(nodes)
106 106 # update from heads
107 107 _updatesample(dag, nodes, sample)
108 108 # update from roots
109 109 _updatesample(dag.inverse(), nodes, sample)
110 110 assert sample
111 111 sample = _limitsample(sample, size)
112 112 if len(sample) < size:
113 113 more = size - len(sample)
114 114 sample.update(random.sample(list(nodes - sample), more))
115 115 return sample
116 116
117 117 def _limitsample(sample, desiredlen):
118 118 """return a random subset of sample of at most desiredlen item"""
119 119 if len(sample) > desiredlen:
120 120 sample = set(random.sample(sample, desiredlen))
121 121 return sample
122 122
123 123 def findcommonheads(ui, local, remote,
124 124 initialsamplesize=100,
125 125 fullsamplesize=200,
126 126 abortwhenunrelated=True):
127 127 '''Return a tuple (common, anyincoming, remoteheads) used to identify
128 128 missing nodes from or in remote.
129 129 '''
130 130 roundtrips = 0
131 131 cl = local.changelog
132 132 dag = dagutil.revlogdag(cl)
133 133
134 134 # early exit if we know all the specified remote heads already
135 135 ui.debug("query 1; heads\n")
136 136 roundtrips += 1
137 137 ownheads = dag.heads()
138 138 sample = _limitsample(ownheads, initialsamplesize)
139 139 # indices between sample and externalized version must match
140 140 sample = list(sample)
141 if remote.local():
142 # stopgap until we have a proper localpeer that supports batch()
143 srvheadhashes = remote.heads()
144 yesno = remote.known(dag.externalizeall(sample))
145 elif remote.capable('batch'):
146 141 batch = remote.batch()
147 142 srvheadhashesref = batch.heads()
148 143 yesnoref = batch.known(dag.externalizeall(sample))
149 144 batch.submit()
150 145 srvheadhashes = srvheadhashesref.value
151 146 yesno = yesnoref.value
152 else:
153 # compatibility with pre-batch, but post-known remotes during 1.9
154 # development
155 srvheadhashes = remote.heads()
156 sample = []
157 147
158 148 if cl.tip() == nullid:
159 149 if srvheadhashes != [nullid]:
160 150 return [nullid], True, srvheadhashes
161 151 return [nullid], False, []
162 152
163 153 # start actual discovery (we note this before the next "if" for
164 154 # compatibility reasons)
165 155 ui.status(_("searching for changes\n"))
166 156
167 157 srvheads = dag.internalizeall(srvheadhashes, filterunknown=True)
168 158 if len(srvheads) == len(srvheadhashes):
169 159 ui.debug("all remote heads known locally\n")
170 160 return (srvheadhashes, False, srvheadhashes,)
171 161
172 162 if sample and len(ownheads) <= initialsamplesize and all(yesno):
173 163 ui.note(_("all local heads known remotely\n"))
174 164 ownheadhashes = dag.externalizeall(ownheads)
175 165 return (ownheadhashes, True, srvheadhashes,)
176 166
177 167 # full blown discovery
178 168
179 169 # own nodes I know we both know
180 170 # treat remote heads (and maybe own heads) as a first implicit sample
181 171 # response
182 172 common = cl.incrementalmissingrevs(srvheads)
183 173 commoninsample = set(n for i, n in enumerate(sample) if yesno[i])
184 174 common.addbases(commoninsample)
185 175 # own nodes where I don't know if remote knows them
186 176 undecided = set(common.missingancestors(ownheads))
187 177 # own nodes I know remote lacks
188 178 missing = set()
189 179
190 180 full = False
191 181 while undecided:
192 182
193 183 if sample:
194 184 missinginsample = [n for i, n in enumerate(sample) if not yesno[i]]
195 185 missing.update(dag.descendantset(missinginsample, missing))
196 186
197 187 undecided.difference_update(missing)
198 188
199 189 if not undecided:
200 190 break
201 191
202 192 if full or common.hasbases():
203 193 if full:
204 194 ui.note(_("sampling from both directions\n"))
205 195 else:
206 196 ui.debug("taking initial sample\n")
207 197 samplefunc = _takefullsample
208 198 targetsize = fullsamplesize
209 199 else:
210 200 # use even cheaper initial sample
211 201 ui.debug("taking quick initial sample\n")
212 202 samplefunc = _takequicksample
213 203 targetsize = initialsamplesize
214 204 if len(undecided) < targetsize:
215 205 sample = list(undecided)
216 206 else:
217 207 sample = samplefunc(dag, undecided, targetsize)
218 208 sample = _limitsample(sample, targetsize)
219 209
220 210 roundtrips += 1
221 211 ui.progress(_('searching'), roundtrips, unit=_('queries'))
222 212 ui.debug("query %i; still undecided: %i, sample size is: %i\n"
223 213 % (roundtrips, len(undecided), len(sample)))
224 214 # indices between sample and externalized version must match
225 215 sample = list(sample)
226 216 yesno = remote.known(dag.externalizeall(sample))
227 217 full = True
228 218
229 219 if sample:
230 220 commoninsample = set(n for i, n in enumerate(sample) if yesno[i])
231 221 common.addbases(commoninsample)
232 222 common.removeancestorsfrom(undecided)
233 223
234 224 # heads(common) == heads(common.bases) since common represents common.bases
235 225 # and all its ancestors
236 226 result = dag.headsetofconnecteds(common.bases)
237 227 # common.bases can include nullrev, but our contract requires us to not
238 228 # return any heads in that case, so discard that
239 229 result.discard(nullrev)
240 230 ui.progress(_('searching'), None)
241 231 ui.debug("%d total queries\n" % roundtrips)
242 232
243 233 if not result and srvheadhashes != [nullid]:
244 234 if abortwhenunrelated:
245 235 raise util.Abort(_("repository is unrelated"))
246 236 else:
247 237 ui.warn(_("warning: repository is unrelated\n"))
248 238 return (set([nullid]), True, srvheadhashes,)
249 239
250 240 anyincoming = (srvheadhashes != [nullid])
251 241 return dag.externalizeall(result), anyincoming, srvheadhashes
General Comments 0
You need to be logged in to leave comments. Login now