##// END OF EJS Templates
exchangev2: add progress bar around manifest scanning...
Gregory Szorc -
r40071:7a347d36 default
parent child Browse files
Show More
@@ -1,411 +1,417 b''
1 1 # exchangev2.py - repository exchange for wire protocol version 2
2 2 #
3 3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import collections
11 11 import weakref
12 12
13 13 from .i18n import _
14 14 from .node import (
15 15 nullid,
16 16 short,
17 17 )
18 18 from . import (
19 19 bookmarks,
20 20 error,
21 21 mdiff,
22 22 phases,
23 23 pycompat,
24 24 setdiscovery,
25 25 )
26 26
27 27 def pull(pullop):
28 28 """Pull using wire protocol version 2."""
29 29 repo = pullop.repo
30 30 remote = pullop.remote
31 31 tr = pullop.trmanager.transaction()
32 32
33 33 # Figure out what needs to be fetched.
34 34 common, fetch, remoteheads = _pullchangesetdiscovery(
35 35 repo, remote, pullop.heads, abortwhenunrelated=pullop.force)
36 36
37 37 # And fetch the data.
38 38 pullheads = pullop.heads or remoteheads
39 39 csetres = _fetchchangesets(repo, tr, remote, common, fetch, pullheads)
40 40
41 41 # New revisions are written to the changelog. But all other updates
42 42 # are deferred. Do those now.
43 43
44 44 # Ensure all new changesets are draft by default. If the repo is
45 45 # publishing, the phase will be adjusted by the loop below.
46 46 if csetres['added']:
47 47 phases.registernew(repo, tr, phases.draft, csetres['added'])
48 48
49 49 # And adjust the phase of all changesets accordingly.
50 50 for phase in phases.phasenames:
51 51 if phase == b'secret' or not csetres['nodesbyphase'][phase]:
52 52 continue
53 53
54 54 phases.advanceboundary(repo, tr, phases.phasenames.index(phase),
55 55 csetres['nodesbyphase'][phase])
56 56
57 57 # Write bookmark updates.
58 58 bookmarks.updatefromremote(repo.ui, repo, csetres['bookmarks'],
59 59 remote.url(), pullop.gettransaction,
60 60 explicit=pullop.explicitbookmarks)
61 61
62 62 manres = _fetchmanifests(repo, tr, remote, csetres['manifestnodes'])
63 63
64 64 # Find all file nodes referenced by added manifests and fetch those
65 65 # revisions.
66 66 fnodes = _derivefilesfrommanifests(repo, manres['added'])
67 67 _fetchfiles(repo, tr, remote, fnodes, manres['linkrevs'])
68 68
69 69 def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True):
70 70 """Determine which changesets need to be pulled."""
71 71
72 72 if heads:
73 73 knownnode = repo.changelog.hasnode
74 74 if all(knownnode(head) for head in heads):
75 75 return heads, False, heads
76 76
77 77 # TODO wire protocol version 2 is capable of more efficient discovery
78 78 # than setdiscovery. Consider implementing something better.
79 79 common, fetch, remoteheads = setdiscovery.findcommonheads(
80 80 repo.ui, repo, remote, abortwhenunrelated=abortwhenunrelated)
81 81
82 82 common = set(common)
83 83 remoteheads = set(remoteheads)
84 84
85 85 # If a remote head is filtered locally, put it back in the common set.
86 86 # See the comment in exchange._pulldiscoverychangegroup() for more.
87 87
88 88 if fetch and remoteheads:
89 89 nodemap = repo.unfiltered().changelog.nodemap
90 90
91 91 common |= {head for head in remoteheads if head in nodemap}
92 92
93 93 if set(remoteheads).issubset(common):
94 94 fetch = []
95 95
96 96 common.discard(nullid)
97 97
98 98 return common, fetch, remoteheads
99 99
100 100 def _fetchchangesets(repo, tr, remote, common, fetch, remoteheads):
101 101 # TODO consider adding a step here where we obtain the DAG shape first
102 102 # (or ask the server to slice changesets into chunks for us) so that
103 103 # we can perform multiple fetches in batches. This will facilitate
104 104 # resuming interrupted clones, higher server-side cache hit rates due
105 105 # to smaller segments, etc.
106 106 with remote.commandexecutor() as e:
107 107 objs = e.callcommand(b'changesetdata', {
108 108 b'noderange': [sorted(common), sorted(remoteheads)],
109 109 b'fields': {b'bookmarks', b'parents', b'phase', b'revision'},
110 110 }).result()
111 111
112 112 # The context manager waits on all response data when exiting. So
113 113 # we need to remain in the context manager in order to stream data.
114 114 return _processchangesetdata(repo, tr, objs)
115 115
116 116 def _processchangesetdata(repo, tr, objs):
117 117 repo.hook('prechangegroup', throw=True,
118 118 **pycompat.strkwargs(tr.hookargs))
119 119
120 120 urepo = repo.unfiltered()
121 121 cl = urepo.changelog
122 122
123 123 cl.delayupdate(tr)
124 124
125 125 # The first emitted object is a header describing the data that
126 126 # follows.
127 127 meta = next(objs)
128 128
129 129 progress = repo.ui.makeprogress(_('changesets'),
130 130 unit=_('chunks'),
131 131 total=meta.get(b'totalitems'))
132 132
133 133 manifestnodes = {}
134 134
135 135 def linkrev(node):
136 136 repo.ui.debug('add changeset %s\n' % short(node))
137 137 # Linkrev for changelog is always self.
138 138 return len(cl)
139 139
140 140 def onchangeset(cl, node):
141 141 progress.increment()
142 142
143 143 revision = cl.changelogrevision(node)
144 144
145 145 # We need to preserve the mapping of changelog revision to node
146 146 # so we can set the linkrev accordingly when manifests are added.
147 147 manifestnodes[cl.rev(node)] = revision.manifest
148 148
149 149 nodesbyphase = {phase: set() for phase in phases.phasenames}
150 150 remotebookmarks = {}
151 151
152 152 # addgroup() expects a 7-tuple describing revisions. This normalizes
153 153 # the wire data to that format.
154 154 #
155 155 # This loop also aggregates non-revision metadata, such as phase
156 156 # data.
157 157 def iterrevisions():
158 158 for cset in objs:
159 159 node = cset[b'node']
160 160
161 161 if b'phase' in cset:
162 162 nodesbyphase[cset[b'phase']].add(node)
163 163
164 164 for mark in cset.get(b'bookmarks', []):
165 165 remotebookmarks[mark] = node
166 166
167 167 # TODO add mechanism for extensions to examine records so they
168 168 # can siphon off custom data fields.
169 169
170 170 extrafields = {}
171 171
172 172 for field, size in cset.get(b'fieldsfollowing', []):
173 173 extrafields[field] = next(objs)
174 174
175 175 # Some entries might only be metadata only updates.
176 176 if b'revision' not in extrafields:
177 177 continue
178 178
179 179 data = extrafields[b'revision']
180 180
181 181 yield (
182 182 node,
183 183 cset[b'parents'][0],
184 184 cset[b'parents'][1],
185 185 # Linknode is always itself for changesets.
186 186 cset[b'node'],
187 187 # We always send full revisions. So delta base is not set.
188 188 nullid,
189 189 mdiff.trivialdiffheader(len(data)) + data,
190 190 # Flags not yet supported.
191 191 0,
192 192 )
193 193
194 194 added = cl.addgroup(iterrevisions(), linkrev, weakref.proxy(tr),
195 195 addrevisioncb=onchangeset)
196 196
197 197 progress.complete()
198 198
199 199 return {
200 200 'added': added,
201 201 'nodesbyphase': nodesbyphase,
202 202 'bookmarks': remotebookmarks,
203 203 'manifestnodes': manifestnodes,
204 204 }
205 205
206 206 def _fetchmanifests(repo, tr, remote, manifestnodes):
207 207 rootmanifest = repo.manifestlog.getstorage(b'')
208 208
209 209 # Some manifests can be shared between changesets. Filter out revisions
210 210 # we already know about.
211 211 fetchnodes = []
212 212 linkrevs = {}
213 213 seen = set()
214 214
215 215 for clrev, node in sorted(manifestnodes.iteritems()):
216 216 if node in seen:
217 217 continue
218 218
219 219 try:
220 220 rootmanifest.rev(node)
221 221 except error.LookupError:
222 222 fetchnodes.append(node)
223 223 linkrevs[node] = clrev
224 224
225 225 seen.add(node)
226 226
227 227 # TODO handle tree manifests
228 228
229 229 # addgroup() expects 7-tuple describing revisions. This normalizes
230 230 # the wire data to that format.
231 231 def iterrevisions(objs, progress):
232 232 for manifest in objs:
233 233 node = manifest[b'node']
234 234
235 235 extrafields = {}
236 236
237 237 for field, size in manifest.get(b'fieldsfollowing', []):
238 238 extrafields[field] = next(objs)
239 239
240 240 if b'delta' in extrafields:
241 241 basenode = manifest[b'deltabasenode']
242 242 delta = extrafields[b'delta']
243 243 elif b'revision' in extrafields:
244 244 basenode = nullid
245 245 revision = extrafields[b'revision']
246 246 delta = mdiff.trivialdiffheader(len(revision)) + revision
247 247 else:
248 248 continue
249 249
250 250 yield (
251 251 node,
252 252 manifest[b'parents'][0],
253 253 manifest[b'parents'][1],
254 254 # The value passed in is passed to the lookup function passed
255 255 # to addgroup(). We already have a map of manifest node to
256 256 # changelog revision number. So we just pass in the
257 257 # manifest node here and use linkrevs.__getitem__ as the
258 258 # resolution function.
259 259 node,
260 260 basenode,
261 261 delta,
262 262 # Flags not yet supported.
263 263 0
264 264 )
265 265
266 266 progress.increment()
267 267
268 268 progress = repo.ui.makeprogress(_('manifests'), unit=_('chunks'),
269 269 total=len(fetchnodes))
270 270
271 271 # Fetch manifests 10,000 per command.
272 272 # TODO have server advertise preferences?
273 273 # TODO make size configurable on client?
274 274 batchsize = 10000
275 275
276 276 # We send commands 1 at a time to the remote. This is not the most
277 277 # efficient because we incur a round trip at the end of each batch.
278 278 # However, the existing frame-based reactor keeps consuming server
279 279 # data in the background. And this results in response data buffering
280 280 # in memory. This can consume gigabytes of memory.
281 281 # TODO send multiple commands in a request once background buffering
282 282 # issues are resolved.
283 283
284 284 added = []
285 285
286 286 for i in pycompat.xrange(0, len(fetchnodes), batchsize):
287 287 batch = [node for node in fetchnodes[i:i + batchsize]]
288 288 if not batch:
289 289 continue
290 290
291 291 with remote.commandexecutor() as e:
292 292 objs = e.callcommand(b'manifestdata', {
293 293 b'tree': b'',
294 294 b'nodes': batch,
295 295 b'fields': {b'parents', b'revision'},
296 296 b'haveparents': True,
297 297 }).result()
298 298
299 299 # Chomp off header object.
300 300 next(objs)
301 301
302 302 added.extend(rootmanifest.addgroup(
303 303 iterrevisions(objs, progress),
304 304 linkrevs.__getitem__,
305 305 weakref.proxy(tr)))
306 306
307 307 progress.complete()
308 308
309 309 return {
310 310 'added': added,
311 311 'linkrevs': linkrevs,
312 312 }
313 313
314 314 def _derivefilesfrommanifests(repo, manifestnodes):
315 315 """Determine what file nodes are relevant given a set of manifest nodes.
316 316
317 317 Returns a dict mapping file paths to dicts of file node to first manifest
318 318 node.
319 319 """
320 320 ml = repo.manifestlog
321 321 fnodes = collections.defaultdict(dict)
322 322
323 for manifestnode in manifestnodes:
324 m = ml.get(b'', manifestnode)
323 progress = repo.ui.makeprogress(
324 _('scanning manifests'), total=len(manifestnodes))
325
326 with progress:
327 for manifestnode in manifestnodes:
328 m = ml.get(b'', manifestnode)
325 329
326 # TODO this will pull in unwanted nodes because it takes the storage
327 # delta into consideration. What we really want is something that takes
328 # the delta between the manifest's parents. And ideally we would
329 # ignore file nodes that are known locally. For now, ignore both
330 # these limitations. This will result in incremental fetches requesting
331 # data we already have. So this is far from ideal.
332 md = m.readfast()
330 # TODO this will pull in unwanted nodes because it takes the storage
331 # delta into consideration. What we really want is something that
332 # takes the delta between the manifest's parents. And ideally we
333 # would ignore file nodes that are known locally. For now, ignore
334 # both these limitations. This will result in incremental fetches
335 # requesting data we already have. So this is far from ideal.
336 md = m.readfast()
333 337
334 for path, fnode in md.items():
335 fnodes[path].setdefault(fnode, manifestnode)
338 for path, fnode in md.items():
339 fnodes[path].setdefault(fnode, manifestnode)
340
341 progress.increment()
336 342
337 343 return fnodes
338 344
339 345 def _fetchfiles(repo, tr, remote, fnodes, linkrevs):
340 346 def iterrevisions(objs, progress):
341 347 for filerevision in objs:
342 348 node = filerevision[b'node']
343 349
344 350 extrafields = {}
345 351
346 352 for field, size in filerevision.get(b'fieldsfollowing', []):
347 353 extrafields[field] = next(objs)
348 354
349 355 if b'delta' in extrafields:
350 356 basenode = filerevision[b'deltabasenode']
351 357 delta = extrafields[b'delta']
352 358 elif b'revision' in extrafields:
353 359 basenode = nullid
354 360 revision = extrafields[b'revision']
355 361 delta = mdiff.trivialdiffheader(len(revision)) + revision
356 362 else:
357 363 continue
358 364
359 365 yield (
360 366 node,
361 367 filerevision[b'parents'][0],
362 368 filerevision[b'parents'][1],
363 369 node,
364 370 basenode,
365 371 delta,
366 372 # Flags not yet supported.
367 373 0,
368 374 )
369 375
370 376 progress.increment()
371 377
372 378 progress = repo.ui.makeprogress(
373 379 _('files'), unit=_('chunks'),
374 380 total=sum(len(v) for v in fnodes.itervalues()))
375 381
376 382 # TODO make batch size configurable
377 383 batchsize = 10000
378 384 fnodeslist = [x for x in sorted(fnodes.items())]
379 385
380 386 for i in pycompat.xrange(0, len(fnodeslist), batchsize):
381 387 batch = [x for x in fnodeslist[i:i + batchsize]]
382 388 if not batch:
383 389 continue
384 390
385 391 with remote.commandexecutor() as e:
386 392 fs = []
387 393 locallinkrevs = {}
388 394
389 395 for path, nodes in batch:
390 396 fs.append((path, e.callcommand(b'filedata', {
391 397 b'path': path,
392 398 b'nodes': sorted(nodes),
393 399 b'fields': {b'parents', b'revision'},
394 400 b'haveparents': True,
395 401 })))
396 402
397 403 locallinkrevs[path] = {
398 404 node: linkrevs[manifestnode]
399 405 for node, manifestnode in nodes.iteritems()}
400 406
401 407 for path, f in fs:
402 408 objs = f.result()
403 409
404 410 # Chomp off header objects.
405 411 next(objs)
406 412
407 413 store = repo.file(path)
408 414 store.addgroup(
409 415 iterrevisions(objs, progress),
410 416 locallinkrevs[path].__getitem__,
411 417 weakref.proxy(tr))
General Comments 0
You need to be logged in to leave comments. Login now