##// END OF EJS Templates
exchangev2: honor server advertised manifestdata recommended batch size...
Gregory Szorc -
r40209:b797150a default
parent child Browse files
Show More
@@ -1,417 +1,416 b''
1 1 # exchangev2.py - repository exchange for wire protocol version 2
2 2 #
3 3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import collections
11 11 import weakref
12 12
13 13 from .i18n import _
14 14 from .node import (
15 15 nullid,
16 16 short,
17 17 )
18 18 from . import (
19 19 bookmarks,
20 20 error,
21 21 mdiff,
22 22 phases,
23 23 pycompat,
24 24 setdiscovery,
25 25 )
26 26
27 27 def pull(pullop):
28 28 """Pull using wire protocol version 2."""
29 29 repo = pullop.repo
30 30 remote = pullop.remote
31 31 tr = pullop.trmanager.transaction()
32 32
33 33 # Figure out what needs to be fetched.
34 34 common, fetch, remoteheads = _pullchangesetdiscovery(
35 35 repo, remote, pullop.heads, abortwhenunrelated=pullop.force)
36 36
37 37 # And fetch the data.
38 38 pullheads = pullop.heads or remoteheads
39 39 csetres = _fetchchangesets(repo, tr, remote, common, fetch, pullheads)
40 40
41 41 # New revisions are written to the changelog. But all other updates
42 42 # are deferred. Do those now.
43 43
44 44 # Ensure all new changesets are draft by default. If the repo is
45 45 # publishing, the phase will be adjusted by the loop below.
46 46 if csetres['added']:
47 47 phases.registernew(repo, tr, phases.draft, csetres['added'])
48 48
49 49 # And adjust the phase of all changesets accordingly.
50 50 for phase in phases.phasenames:
51 51 if phase == b'secret' or not csetres['nodesbyphase'][phase]:
52 52 continue
53 53
54 54 phases.advanceboundary(repo, tr, phases.phasenames.index(phase),
55 55 csetres['nodesbyphase'][phase])
56 56
57 57 # Write bookmark updates.
58 58 bookmarks.updatefromremote(repo.ui, repo, csetres['bookmarks'],
59 59 remote.url(), pullop.gettransaction,
60 60 explicit=pullop.explicitbookmarks)
61 61
62 62 manres = _fetchmanifests(repo, tr, remote, csetres['manifestnodes'])
63 63
64 64 # Find all file nodes referenced by added manifests and fetch those
65 65 # revisions.
66 66 fnodes = _derivefilesfrommanifests(repo, manres['added'])
67 67 _fetchfiles(repo, tr, remote, fnodes, manres['linkrevs'])
68 68
69 69 def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True):
70 70 """Determine which changesets need to be pulled."""
71 71
72 72 if heads:
73 73 knownnode = repo.changelog.hasnode
74 74 if all(knownnode(head) for head in heads):
75 75 return heads, False, heads
76 76
77 77 # TODO wire protocol version 2 is capable of more efficient discovery
78 78 # than setdiscovery. Consider implementing something better.
79 79 common, fetch, remoteheads = setdiscovery.findcommonheads(
80 80 repo.ui, repo, remote, abortwhenunrelated=abortwhenunrelated)
81 81
82 82 common = set(common)
83 83 remoteheads = set(remoteheads)
84 84
85 85 # If a remote head is filtered locally, put it back in the common set.
86 86 # See the comment in exchange._pulldiscoverychangegroup() for more.
87 87
88 88 if fetch and remoteheads:
89 89 nodemap = repo.unfiltered().changelog.nodemap
90 90
91 91 common |= {head for head in remoteheads if head in nodemap}
92 92
93 93 if set(remoteheads).issubset(common):
94 94 fetch = []
95 95
96 96 common.discard(nullid)
97 97
98 98 return common, fetch, remoteheads
99 99
100 100 def _fetchchangesets(repo, tr, remote, common, fetch, remoteheads):
101 101 # TODO consider adding a step here where we obtain the DAG shape first
102 102 # (or ask the server to slice changesets into chunks for us) so that
103 103 # we can perform multiple fetches in batches. This will facilitate
104 104 # resuming interrupted clones, higher server-side cache hit rates due
105 105 # to smaller segments, etc.
106 106 with remote.commandexecutor() as e:
107 107 objs = e.callcommand(b'changesetdata', {
108 108 b'noderange': [sorted(common), sorted(remoteheads)],
109 109 b'fields': {b'bookmarks', b'parents', b'phase', b'revision'},
110 110 }).result()
111 111
112 112 # The context manager waits on all response data when exiting. So
113 113 # we need to remain in the context manager in order to stream data.
114 114 return _processchangesetdata(repo, tr, objs)
115 115
116 116 def _processchangesetdata(repo, tr, objs):
117 117 repo.hook('prechangegroup', throw=True,
118 118 **pycompat.strkwargs(tr.hookargs))
119 119
120 120 urepo = repo.unfiltered()
121 121 cl = urepo.changelog
122 122
123 123 cl.delayupdate(tr)
124 124
125 125 # The first emitted object is a header describing the data that
126 126 # follows.
127 127 meta = next(objs)
128 128
129 129 progress = repo.ui.makeprogress(_('changesets'),
130 130 unit=_('chunks'),
131 131 total=meta.get(b'totalitems'))
132 132
133 133 manifestnodes = {}
134 134
135 135 def linkrev(node):
136 136 repo.ui.debug('add changeset %s\n' % short(node))
137 137 # Linkrev for changelog is always self.
138 138 return len(cl)
139 139
140 140 def onchangeset(cl, node):
141 141 progress.increment()
142 142
143 143 revision = cl.changelogrevision(node)
144 144
145 145 # We need to preserve the mapping of changelog revision to node
146 146 # so we can set the linkrev accordingly when manifests are added.
147 147 manifestnodes[cl.rev(node)] = revision.manifest
148 148
149 149 nodesbyphase = {phase: set() for phase in phases.phasenames}
150 150 remotebookmarks = {}
151 151
152 152 # addgroup() expects a 7-tuple describing revisions. This normalizes
153 153 # the wire data to that format.
154 154 #
155 155 # This loop also aggregates non-revision metadata, such as phase
156 156 # data.
157 157 def iterrevisions():
158 158 for cset in objs:
159 159 node = cset[b'node']
160 160
161 161 if b'phase' in cset:
162 162 nodesbyphase[cset[b'phase']].add(node)
163 163
164 164 for mark in cset.get(b'bookmarks', []):
165 165 remotebookmarks[mark] = node
166 166
167 167 # TODO add mechanism for extensions to examine records so they
168 168 # can siphon off custom data fields.
169 169
170 170 extrafields = {}
171 171
172 172 for field, size in cset.get(b'fieldsfollowing', []):
173 173 extrafields[field] = next(objs)
174 174
175 175 # Some entries might only be metadata only updates.
176 176 if b'revision' not in extrafields:
177 177 continue
178 178
179 179 data = extrafields[b'revision']
180 180
181 181 yield (
182 182 node,
183 183 cset[b'parents'][0],
184 184 cset[b'parents'][1],
185 185 # Linknode is always itself for changesets.
186 186 cset[b'node'],
187 187 # We always send full revisions. So delta base is not set.
188 188 nullid,
189 189 mdiff.trivialdiffheader(len(data)) + data,
190 190 # Flags not yet supported.
191 191 0,
192 192 )
193 193
194 194 added = cl.addgroup(iterrevisions(), linkrev, weakref.proxy(tr),
195 195 addrevisioncb=onchangeset)
196 196
197 197 progress.complete()
198 198
199 199 return {
200 200 'added': added,
201 201 'nodesbyphase': nodesbyphase,
202 202 'bookmarks': remotebookmarks,
203 203 'manifestnodes': manifestnodes,
204 204 }
205 205
206 206 def _fetchmanifests(repo, tr, remote, manifestnodes):
207 207 rootmanifest = repo.manifestlog.getstorage(b'')
208 208
209 209 # Some manifests can be shared between changesets. Filter out revisions
210 210 # we already know about.
211 211 fetchnodes = []
212 212 linkrevs = {}
213 213 seen = set()
214 214
215 215 for clrev, node in sorted(manifestnodes.iteritems()):
216 216 if node in seen:
217 217 continue
218 218
219 219 try:
220 220 rootmanifest.rev(node)
221 221 except error.LookupError:
222 222 fetchnodes.append(node)
223 223 linkrevs[node] = clrev
224 224
225 225 seen.add(node)
226 226
227 227 # TODO handle tree manifests
228 228
229 229 # addgroup() expects 7-tuple describing revisions. This normalizes
230 230 # the wire data to that format.
231 231 def iterrevisions(objs, progress):
232 232 for manifest in objs:
233 233 node = manifest[b'node']
234 234
235 235 extrafields = {}
236 236
237 237 for field, size in manifest.get(b'fieldsfollowing', []):
238 238 extrafields[field] = next(objs)
239 239
240 240 if b'delta' in extrafields:
241 241 basenode = manifest[b'deltabasenode']
242 242 delta = extrafields[b'delta']
243 243 elif b'revision' in extrafields:
244 244 basenode = nullid
245 245 revision = extrafields[b'revision']
246 246 delta = mdiff.trivialdiffheader(len(revision)) + revision
247 247 else:
248 248 continue
249 249
250 250 yield (
251 251 node,
252 252 manifest[b'parents'][0],
253 253 manifest[b'parents'][1],
254 254 # The value passed in is passed to the lookup function passed
255 255 # to addgroup(). We already have a map of manifest node to
256 256 # changelog revision number. So we just pass in the
257 257 # manifest node here and use linkrevs.__getitem__ as the
258 258 # resolution function.
259 259 node,
260 260 basenode,
261 261 delta,
262 262 # Flags not yet supported.
263 263 0
264 264 )
265 265
266 266 progress.increment()
267 267
268 268 progress = repo.ui.makeprogress(_('manifests'), unit=_('chunks'),
269 269 total=len(fetchnodes))
270 270
271 # Fetch manifests 10,000 per command.
272 # TODO have server advertise preferences?
271 commandmeta = remote.apidescriptor[b'commands'][b'manifestdata']
272 batchsize = commandmeta.get(b'recommendedbatchsize', 10000)
273 273 # TODO make size configurable on client?
274 batchsize = 10000
275 274
276 275 # We send commands 1 at a time to the remote. This is not the most
277 276 # efficient because we incur a round trip at the end of each batch.
278 277 # However, the existing frame-based reactor keeps consuming server
279 278 # data in the background. And this results in response data buffering
280 279 # in memory. This can consume gigabytes of memory.
281 280 # TODO send multiple commands in a request once background buffering
282 281 # issues are resolved.
283 282
284 283 added = []
285 284
286 285 for i in pycompat.xrange(0, len(fetchnodes), batchsize):
287 286 batch = [node for node in fetchnodes[i:i + batchsize]]
288 287 if not batch:
289 288 continue
290 289
291 290 with remote.commandexecutor() as e:
292 291 objs = e.callcommand(b'manifestdata', {
293 292 b'tree': b'',
294 293 b'nodes': batch,
295 294 b'fields': {b'parents', b'revision'},
296 295 b'haveparents': True,
297 296 }).result()
298 297
299 298 # Chomp off header object.
300 299 next(objs)
301 300
302 301 added.extend(rootmanifest.addgroup(
303 302 iterrevisions(objs, progress),
304 303 linkrevs.__getitem__,
305 304 weakref.proxy(tr)))
306 305
307 306 progress.complete()
308 307
309 308 return {
310 309 'added': added,
311 310 'linkrevs': linkrevs,
312 311 }
313 312
314 313 def _derivefilesfrommanifests(repo, manifestnodes):
315 314 """Determine what file nodes are relevant given a set of manifest nodes.
316 315
317 316 Returns a dict mapping file paths to dicts of file node to first manifest
318 317 node.
319 318 """
320 319 ml = repo.manifestlog
321 320 fnodes = collections.defaultdict(dict)
322 321
323 322 progress = repo.ui.makeprogress(
324 323 _('scanning manifests'), total=len(manifestnodes))
325 324
326 325 with progress:
327 326 for manifestnode in manifestnodes:
328 327 m = ml.get(b'', manifestnode)
329 328
330 329 # TODO this will pull in unwanted nodes because it takes the storage
331 330 # delta into consideration. What we really want is something that
332 331 # takes the delta between the manifest's parents. And ideally we
333 332 # would ignore file nodes that are known locally. For now, ignore
334 333 # both these limitations. This will result in incremental fetches
335 334 # requesting data we already have. So this is far from ideal.
336 335 md = m.readfast()
337 336
338 337 for path, fnode in md.items():
339 338 fnodes[path].setdefault(fnode, manifestnode)
340 339
341 340 progress.increment()
342 341
343 342 return fnodes
344 343
345 344 def _fetchfiles(repo, tr, remote, fnodes, linkrevs):
346 345 def iterrevisions(objs, progress):
347 346 for filerevision in objs:
348 347 node = filerevision[b'node']
349 348
350 349 extrafields = {}
351 350
352 351 for field, size in filerevision.get(b'fieldsfollowing', []):
353 352 extrafields[field] = next(objs)
354 353
355 354 if b'delta' in extrafields:
356 355 basenode = filerevision[b'deltabasenode']
357 356 delta = extrafields[b'delta']
358 357 elif b'revision' in extrafields:
359 358 basenode = nullid
360 359 revision = extrafields[b'revision']
361 360 delta = mdiff.trivialdiffheader(len(revision)) + revision
362 361 else:
363 362 continue
364 363
365 364 yield (
366 365 node,
367 366 filerevision[b'parents'][0],
368 367 filerevision[b'parents'][1],
369 368 node,
370 369 basenode,
371 370 delta,
372 371 # Flags not yet supported.
373 372 0,
374 373 )
375 374
376 375 progress.increment()
377 376
378 377 progress = repo.ui.makeprogress(
379 378 _('files'), unit=_('chunks'),
380 379 total=sum(len(v) for v in fnodes.itervalues()))
381 380
382 381 # TODO make batch size configurable
383 382 batchsize = 10000
384 383 fnodeslist = [x for x in sorted(fnodes.items())]
385 384
386 385 for i in pycompat.xrange(0, len(fnodeslist), batchsize):
387 386 batch = [x for x in fnodeslist[i:i + batchsize]]
388 387 if not batch:
389 388 continue
390 389
391 390 with remote.commandexecutor() as e:
392 391 fs = []
393 392 locallinkrevs = {}
394 393
395 394 for path, nodes in batch:
396 395 fs.append((path, e.callcommand(b'filedata', {
397 396 b'path': path,
398 397 b'nodes': sorted(nodes),
399 398 b'fields': {b'parents', b'revision'},
400 399 b'haveparents': True,
401 400 })))
402 401
403 402 locallinkrevs[path] = {
404 403 node: linkrevs[manifestnode]
405 404 for node, manifestnode in nodes.iteritems()}
406 405
407 406 for path, f in fs:
408 407 objs = f.result()
409 408
410 409 # Chomp off header objects.
411 410 next(objs)
412 411
413 412 store = repo.file(path)
414 413 store.addgroup(
415 414 iterrevisions(objs, progress),
416 415 locallinkrevs[path].__getitem__,
417 416 weakref.proxy(tr))
General Comments 0
You need to be logged in to leave comments. Login now