##// END OF EJS Templates
setdiscovery: use iterbatch interface instead of batch...
Augie Fackler -
r28437:c3eacee0 default
parent child Browse files
Show More
@@ -1,250 +1,249 b''
1 1 # setdiscovery.py - improved discovery of common nodeset for mercurial
2 2 #
3 3 # Copyright 2010 Benoit Boissinot <bboissin@gmail.com>
4 4 # and Peter Arrenbrecht <peter@arrenbrecht.ch>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8 """
9 9 Algorithm works in the following way. You have two repository: local and
10 10 remote. They both contains a DAG of changelists.
11 11
12 12 The goal of the discovery protocol is to find one set of node *common*,
13 13 the set of nodes shared by local and remote.
14 14
15 15 One of the issue with the original protocol was latency, it could
16 16 potentially require lots of roundtrips to discover that the local repo was a
17 17 subset of remote (which is a very common case, you usually have few changes
18 18 compared to upstream, while upstream probably had lots of development).
19 19
20 20 The new protocol only requires one interface for the remote repo: `known()`,
21 21 which given a set of changelists tells you if they are present in the DAG.
22 22
23 23 The algorithm then works as follow:
24 24
25 25 - We will be using three sets, `common`, `missing`, `unknown`. Originally
26 26 all nodes are in `unknown`.
27 27 - Take a sample from `unknown`, call `remote.known(sample)`
28 28 - For each node that remote knows, move it and all its ancestors to `common`
29 29 - For each node that remote doesn't know, move it and all its descendants
30 30 to `missing`
31 31 - Iterate until `unknown` is empty
32 32
33 33 There are a couple optimizations, first is instead of starting with a random
34 34 sample of missing, start by sending all heads, in the case where the local
35 35 repo is a subset, you computed the answer in one round trip.
36 36
37 37 Then you can do something similar to the bisecting strategy used when
38 38 finding faulty changesets. Instead of random samples, you can try picking
39 39 nodes that will maximize the number of nodes that will be
40 40 classified with it (since all ancestors or descendants will be marked as well).
41 41 """
42 42
43 43 from __future__ import absolute_import
44 44
45 45 import collections
46 46 import random
47 47
48 48 from .i18n import _
49 49 from .node import (
50 50 nullid,
51 51 nullrev,
52 52 )
53 53 from . import (
54 54 dagutil,
55 55 error,
56 56 )
57 57
58 58 def _updatesample(dag, nodes, sample, quicksamplesize=0):
59 59 """update an existing sample to match the expected size
60 60
61 61 The sample is updated with nodes exponentially distant from each head of the
62 62 <nodes> set. (H~1, H~2, H~4, H~8, etc).
63 63
64 64 If a target size is specified, the sampling will stop once this size is
65 65 reached. Otherwise sampling will happen until roots of the <nodes> set are
66 66 reached.
67 67
68 68 :dag: a dag object from dagutil
69 69 :nodes: set of nodes we want to discover (if None, assume the whole dag)
70 70 :sample: a sample to update
71 71 :quicksamplesize: optional target size of the sample"""
72 72 # if nodes is empty we scan the entire graph
73 73 if nodes:
74 74 heads = dag.headsetofconnecteds(nodes)
75 75 else:
76 76 heads = dag.heads()
77 77 dist = {}
78 78 visit = collections.deque(heads)
79 79 seen = set()
80 80 factor = 1
81 81 while visit:
82 82 curr = visit.popleft()
83 83 if curr in seen:
84 84 continue
85 85 d = dist.setdefault(curr, 1)
86 86 if d > factor:
87 87 factor *= 2
88 88 if d == factor:
89 89 sample.add(curr)
90 90 if quicksamplesize and (len(sample) >= quicksamplesize):
91 91 return
92 92 seen.add(curr)
93 93 for p in dag.parents(curr):
94 94 if not nodes or p in nodes:
95 95 dist.setdefault(p, d + 1)
96 96 visit.append(p)
97 97
98 98 def _takequicksample(dag, nodes, size):
99 99 """takes a quick sample of size <size>
100 100
101 101 It is meant for initial sampling and focuses on querying heads and close
102 102 ancestors of heads.
103 103
104 104 :dag: a dag object
105 105 :nodes: set of nodes to discover
106 106 :size: the maximum size of the sample"""
107 107 sample = dag.headsetofconnecteds(nodes)
108 108 if size <= len(sample):
109 109 return _limitsample(sample, size)
110 110 _updatesample(dag, None, sample, quicksamplesize=size)
111 111 return sample
112 112
113 113 def _takefullsample(dag, nodes, size):
114 114 sample = dag.headsetofconnecteds(nodes)
115 115 # update from heads
116 116 _updatesample(dag, nodes, sample)
117 117 # update from roots
118 118 _updatesample(dag.inverse(), nodes, sample)
119 119 assert sample
120 120 sample = _limitsample(sample, size)
121 121 if len(sample) < size:
122 122 more = size - len(sample)
123 123 sample.update(random.sample(list(nodes - sample), more))
124 124 return sample
125 125
126 126 def _limitsample(sample, desiredlen):
127 127 """return a random subset of sample of at most desiredlen item"""
128 128 if len(sample) > desiredlen:
129 129 sample = set(random.sample(sample, desiredlen))
130 130 return sample
131 131
132 132 def findcommonheads(ui, local, remote,
133 133 initialsamplesize=100,
134 134 fullsamplesize=200,
135 135 abortwhenunrelated=True):
136 136 '''Return a tuple (common, anyincoming, remoteheads) used to identify
137 137 missing nodes from or in remote.
138 138 '''
139 139 roundtrips = 0
140 140 cl = local.changelog
141 141 dag = dagutil.revlogdag(cl)
142 142
143 143 # early exit if we know all the specified remote heads already
144 144 ui.debug("query 1; heads\n")
145 145 roundtrips += 1
146 146 ownheads = dag.heads()
147 147 sample = _limitsample(ownheads, initialsamplesize)
148 148 # indices between sample and externalized version must match
149 149 sample = list(sample)
150 batch = remote.batch()
151 srvheadhashesref = batch.heads()
152 yesnoref = batch.known(dag.externalizeall(sample))
150 batch = remote.iterbatch()
151 batch.heads()
152 batch.known(dag.externalizeall(sample))
153 153 batch.submit()
154 srvheadhashes = srvheadhashesref.value
155 yesno = yesnoref.value
154 srvheadhashes, yesno = batch.results()
156 155
157 156 if cl.tip() == nullid:
158 157 if srvheadhashes != [nullid]:
159 158 return [nullid], True, srvheadhashes
160 159 return [nullid], False, []
161 160
162 161 # start actual discovery (we note this before the next "if" for
163 162 # compatibility reasons)
164 163 ui.status(_("searching for changes\n"))
165 164
166 165 srvheads = dag.internalizeall(srvheadhashes, filterunknown=True)
167 166 if len(srvheads) == len(srvheadhashes):
168 167 ui.debug("all remote heads known locally\n")
169 168 return (srvheadhashes, False, srvheadhashes,)
170 169
171 170 if sample and len(ownheads) <= initialsamplesize and all(yesno):
172 171 ui.note(_("all local heads known remotely\n"))
173 172 ownheadhashes = dag.externalizeall(ownheads)
174 173 return (ownheadhashes, True, srvheadhashes,)
175 174
176 175 # full blown discovery
177 176
178 177 # own nodes I know we both know
179 178 # treat remote heads (and maybe own heads) as a first implicit sample
180 179 # response
181 180 common = cl.incrementalmissingrevs(srvheads)
182 181 commoninsample = set(n for i, n in enumerate(sample) if yesno[i])
183 182 common.addbases(commoninsample)
184 183 # own nodes where I don't know if remote knows them
185 184 undecided = set(common.missingancestors(ownheads))
186 185 # own nodes I know remote lacks
187 186 missing = set()
188 187
189 188 full = False
190 189 while undecided:
191 190
192 191 if sample:
193 192 missinginsample = [n for i, n in enumerate(sample) if not yesno[i]]
194 193 missing.update(dag.descendantset(missinginsample, missing))
195 194
196 195 undecided.difference_update(missing)
197 196
198 197 if not undecided:
199 198 break
200 199
201 200 if full or common.hasbases():
202 201 if full:
203 202 ui.note(_("sampling from both directions\n"))
204 203 else:
205 204 ui.debug("taking initial sample\n")
206 205 samplefunc = _takefullsample
207 206 targetsize = fullsamplesize
208 207 else:
209 208 # use even cheaper initial sample
210 209 ui.debug("taking quick initial sample\n")
211 210 samplefunc = _takequicksample
212 211 targetsize = initialsamplesize
213 212 if len(undecided) < targetsize:
214 213 sample = list(undecided)
215 214 else:
216 215 sample = samplefunc(dag, undecided, targetsize)
217 216 sample = _limitsample(sample, targetsize)
218 217
219 218 roundtrips += 1
220 219 ui.progress(_('searching'), roundtrips, unit=_('queries'))
221 220 ui.debug("query %i; still undecided: %i, sample size is: %i\n"
222 221 % (roundtrips, len(undecided), len(sample)))
223 222 # indices between sample and externalized version must match
224 223 sample = list(sample)
225 224 yesno = remote.known(dag.externalizeall(sample))
226 225 full = True
227 226
228 227 if sample:
229 228 commoninsample = set(n for i, n in enumerate(sample) if yesno[i])
230 229 common.addbases(commoninsample)
231 230 common.removeancestorsfrom(undecided)
232 231
233 232 # heads(common) == heads(common.bases) since common represents common.bases
234 233 # and all its ancestors
235 234 result = dag.headsetofconnecteds(common.bases)
236 235 # common.bases can include nullrev, but our contract requires us to not
237 236 # return any heads in that case, so discard that
238 237 result.discard(nullrev)
239 238 ui.progress(_('searching'), None)
240 239 ui.debug("%d total queries\n" % roundtrips)
241 240
242 241 if not result and srvheadhashes != [nullid]:
243 242 if abortwhenunrelated:
244 243 raise error.Abort(_("repository is unrelated"))
245 244 else:
246 245 ui.warn(_("warning: repository is unrelated\n"))
247 246 return (set([nullid]), True, srvheadhashes,)
248 247
249 248 anyincoming = (srvheadhashes != [nullid])
250 249 return dag.externalizeall(result), anyincoming, srvheadhashes
General Comments 0
You need to be logged in to leave comments. Login now