##// END OF EJS Templates
exchangev2: honor server advertised manifestdata recommended batch size...
Gregory Szorc -
r40209:b797150a default
parent child Browse files
Show More
@@ -1,417 +1,416 b''
1 # exchangev2.py - repository exchange for wire protocol version 2
1 # exchangev2.py - repository exchange for wire protocol version 2
2 #
2 #
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import collections
10 import collections
11 import weakref
11 import weakref
12
12
13 from .i18n import _
13 from .i18n import _
14 from .node import (
14 from .node import (
15 nullid,
15 nullid,
16 short,
16 short,
17 )
17 )
18 from . import (
18 from . import (
19 bookmarks,
19 bookmarks,
20 error,
20 error,
21 mdiff,
21 mdiff,
22 phases,
22 phases,
23 pycompat,
23 pycompat,
24 setdiscovery,
24 setdiscovery,
25 )
25 )
26
26
27 def pull(pullop):
27 def pull(pullop):
28 """Pull using wire protocol version 2."""
28 """Pull using wire protocol version 2."""
29 repo = pullop.repo
29 repo = pullop.repo
30 remote = pullop.remote
30 remote = pullop.remote
31 tr = pullop.trmanager.transaction()
31 tr = pullop.trmanager.transaction()
32
32
33 # Figure out what needs to be fetched.
33 # Figure out what needs to be fetched.
34 common, fetch, remoteheads = _pullchangesetdiscovery(
34 common, fetch, remoteheads = _pullchangesetdiscovery(
35 repo, remote, pullop.heads, abortwhenunrelated=pullop.force)
35 repo, remote, pullop.heads, abortwhenunrelated=pullop.force)
36
36
37 # And fetch the data.
37 # And fetch the data.
38 pullheads = pullop.heads or remoteheads
38 pullheads = pullop.heads or remoteheads
39 csetres = _fetchchangesets(repo, tr, remote, common, fetch, pullheads)
39 csetres = _fetchchangesets(repo, tr, remote, common, fetch, pullheads)
40
40
41 # New revisions are written to the changelog. But all other updates
41 # New revisions are written to the changelog. But all other updates
42 # are deferred. Do those now.
42 # are deferred. Do those now.
43
43
44 # Ensure all new changesets are draft by default. If the repo is
44 # Ensure all new changesets are draft by default. If the repo is
45 # publishing, the phase will be adjusted by the loop below.
45 # publishing, the phase will be adjusted by the loop below.
46 if csetres['added']:
46 if csetres['added']:
47 phases.registernew(repo, tr, phases.draft, csetres['added'])
47 phases.registernew(repo, tr, phases.draft, csetres['added'])
48
48
49 # And adjust the phase of all changesets accordingly.
49 # And adjust the phase of all changesets accordingly.
50 for phase in phases.phasenames:
50 for phase in phases.phasenames:
51 if phase == b'secret' or not csetres['nodesbyphase'][phase]:
51 if phase == b'secret' or not csetres['nodesbyphase'][phase]:
52 continue
52 continue
53
53
54 phases.advanceboundary(repo, tr, phases.phasenames.index(phase),
54 phases.advanceboundary(repo, tr, phases.phasenames.index(phase),
55 csetres['nodesbyphase'][phase])
55 csetres['nodesbyphase'][phase])
56
56
57 # Write bookmark updates.
57 # Write bookmark updates.
58 bookmarks.updatefromremote(repo.ui, repo, csetres['bookmarks'],
58 bookmarks.updatefromremote(repo.ui, repo, csetres['bookmarks'],
59 remote.url(), pullop.gettransaction,
59 remote.url(), pullop.gettransaction,
60 explicit=pullop.explicitbookmarks)
60 explicit=pullop.explicitbookmarks)
61
61
62 manres = _fetchmanifests(repo, tr, remote, csetres['manifestnodes'])
62 manres = _fetchmanifests(repo, tr, remote, csetres['manifestnodes'])
63
63
64 # Find all file nodes referenced by added manifests and fetch those
64 # Find all file nodes referenced by added manifests and fetch those
65 # revisions.
65 # revisions.
66 fnodes = _derivefilesfrommanifests(repo, manres['added'])
66 fnodes = _derivefilesfrommanifests(repo, manres['added'])
67 _fetchfiles(repo, tr, remote, fnodes, manres['linkrevs'])
67 _fetchfiles(repo, tr, remote, fnodes, manres['linkrevs'])
68
68
69 def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True):
69 def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True):
70 """Determine which changesets need to be pulled."""
70 """Determine which changesets need to be pulled."""
71
71
72 if heads:
72 if heads:
73 knownnode = repo.changelog.hasnode
73 knownnode = repo.changelog.hasnode
74 if all(knownnode(head) for head in heads):
74 if all(knownnode(head) for head in heads):
75 return heads, False, heads
75 return heads, False, heads
76
76
77 # TODO wire protocol version 2 is capable of more efficient discovery
77 # TODO wire protocol version 2 is capable of more efficient discovery
78 # than setdiscovery. Consider implementing something better.
78 # than setdiscovery. Consider implementing something better.
79 common, fetch, remoteheads = setdiscovery.findcommonheads(
79 common, fetch, remoteheads = setdiscovery.findcommonheads(
80 repo.ui, repo, remote, abortwhenunrelated=abortwhenunrelated)
80 repo.ui, repo, remote, abortwhenunrelated=abortwhenunrelated)
81
81
82 common = set(common)
82 common = set(common)
83 remoteheads = set(remoteheads)
83 remoteheads = set(remoteheads)
84
84
85 # If a remote head is filtered locally, put it back in the common set.
85 # If a remote head is filtered locally, put it back in the common set.
86 # See the comment in exchange._pulldiscoverychangegroup() for more.
86 # See the comment in exchange._pulldiscoverychangegroup() for more.
87
87
88 if fetch and remoteheads:
88 if fetch and remoteheads:
89 nodemap = repo.unfiltered().changelog.nodemap
89 nodemap = repo.unfiltered().changelog.nodemap
90
90
91 common |= {head for head in remoteheads if head in nodemap}
91 common |= {head for head in remoteheads if head in nodemap}
92
92
93 if set(remoteheads).issubset(common):
93 if set(remoteheads).issubset(common):
94 fetch = []
94 fetch = []
95
95
96 common.discard(nullid)
96 common.discard(nullid)
97
97
98 return common, fetch, remoteheads
98 return common, fetch, remoteheads
99
99
100 def _fetchchangesets(repo, tr, remote, common, fetch, remoteheads):
100 def _fetchchangesets(repo, tr, remote, common, fetch, remoteheads):
101 # TODO consider adding a step here where we obtain the DAG shape first
101 # TODO consider adding a step here where we obtain the DAG shape first
102 # (or ask the server to slice changesets into chunks for us) so that
102 # (or ask the server to slice changesets into chunks for us) so that
103 # we can perform multiple fetches in batches. This will facilitate
103 # we can perform multiple fetches in batches. This will facilitate
104 # resuming interrupted clones, higher server-side cache hit rates due
104 # resuming interrupted clones, higher server-side cache hit rates due
105 # to smaller segments, etc.
105 # to smaller segments, etc.
106 with remote.commandexecutor() as e:
106 with remote.commandexecutor() as e:
107 objs = e.callcommand(b'changesetdata', {
107 objs = e.callcommand(b'changesetdata', {
108 b'noderange': [sorted(common), sorted(remoteheads)],
108 b'noderange': [sorted(common), sorted(remoteheads)],
109 b'fields': {b'bookmarks', b'parents', b'phase', b'revision'},
109 b'fields': {b'bookmarks', b'parents', b'phase', b'revision'},
110 }).result()
110 }).result()
111
111
112 # The context manager waits on all response data when exiting. So
112 # The context manager waits on all response data when exiting. So
113 # we need to remain in the context manager in order to stream data.
113 # we need to remain in the context manager in order to stream data.
114 return _processchangesetdata(repo, tr, objs)
114 return _processchangesetdata(repo, tr, objs)
115
115
116 def _processchangesetdata(repo, tr, objs):
116 def _processchangesetdata(repo, tr, objs):
117 repo.hook('prechangegroup', throw=True,
117 repo.hook('prechangegroup', throw=True,
118 **pycompat.strkwargs(tr.hookargs))
118 **pycompat.strkwargs(tr.hookargs))
119
119
120 urepo = repo.unfiltered()
120 urepo = repo.unfiltered()
121 cl = urepo.changelog
121 cl = urepo.changelog
122
122
123 cl.delayupdate(tr)
123 cl.delayupdate(tr)
124
124
125 # The first emitted object is a header describing the data that
125 # The first emitted object is a header describing the data that
126 # follows.
126 # follows.
127 meta = next(objs)
127 meta = next(objs)
128
128
129 progress = repo.ui.makeprogress(_('changesets'),
129 progress = repo.ui.makeprogress(_('changesets'),
130 unit=_('chunks'),
130 unit=_('chunks'),
131 total=meta.get(b'totalitems'))
131 total=meta.get(b'totalitems'))
132
132
133 manifestnodes = {}
133 manifestnodes = {}
134
134
135 def linkrev(node):
135 def linkrev(node):
136 repo.ui.debug('add changeset %s\n' % short(node))
136 repo.ui.debug('add changeset %s\n' % short(node))
137 # Linkrev for changelog is always self.
137 # Linkrev for changelog is always self.
138 return len(cl)
138 return len(cl)
139
139
140 def onchangeset(cl, node):
140 def onchangeset(cl, node):
141 progress.increment()
141 progress.increment()
142
142
143 revision = cl.changelogrevision(node)
143 revision = cl.changelogrevision(node)
144
144
145 # We need to preserve the mapping of changelog revision to node
145 # We need to preserve the mapping of changelog revision to node
146 # so we can set the linkrev accordingly when manifests are added.
146 # so we can set the linkrev accordingly when manifests are added.
147 manifestnodes[cl.rev(node)] = revision.manifest
147 manifestnodes[cl.rev(node)] = revision.manifest
148
148
149 nodesbyphase = {phase: set() for phase in phases.phasenames}
149 nodesbyphase = {phase: set() for phase in phases.phasenames}
150 remotebookmarks = {}
150 remotebookmarks = {}
151
151
152 # addgroup() expects a 7-tuple describing revisions. This normalizes
152 # addgroup() expects a 7-tuple describing revisions. This normalizes
153 # the wire data to that format.
153 # the wire data to that format.
154 #
154 #
155 # This loop also aggregates non-revision metadata, such as phase
155 # This loop also aggregates non-revision metadata, such as phase
156 # data.
156 # data.
157 def iterrevisions():
157 def iterrevisions():
158 for cset in objs:
158 for cset in objs:
159 node = cset[b'node']
159 node = cset[b'node']
160
160
161 if b'phase' in cset:
161 if b'phase' in cset:
162 nodesbyphase[cset[b'phase']].add(node)
162 nodesbyphase[cset[b'phase']].add(node)
163
163
164 for mark in cset.get(b'bookmarks', []):
164 for mark in cset.get(b'bookmarks', []):
165 remotebookmarks[mark] = node
165 remotebookmarks[mark] = node
166
166
167 # TODO add mechanism for extensions to examine records so they
167 # TODO add mechanism for extensions to examine records so they
168 # can siphon off custom data fields.
168 # can siphon off custom data fields.
169
169
170 extrafields = {}
170 extrafields = {}
171
171
172 for field, size in cset.get(b'fieldsfollowing', []):
172 for field, size in cset.get(b'fieldsfollowing', []):
173 extrafields[field] = next(objs)
173 extrafields[field] = next(objs)
174
174
175 # Some entries might only be metadata only updates.
175 # Some entries might only be metadata only updates.
176 if b'revision' not in extrafields:
176 if b'revision' not in extrafields:
177 continue
177 continue
178
178
179 data = extrafields[b'revision']
179 data = extrafields[b'revision']
180
180
181 yield (
181 yield (
182 node,
182 node,
183 cset[b'parents'][0],
183 cset[b'parents'][0],
184 cset[b'parents'][1],
184 cset[b'parents'][1],
185 # Linknode is always itself for changesets.
185 # Linknode is always itself for changesets.
186 cset[b'node'],
186 cset[b'node'],
187 # We always send full revisions. So delta base is not set.
187 # We always send full revisions. So delta base is not set.
188 nullid,
188 nullid,
189 mdiff.trivialdiffheader(len(data)) + data,
189 mdiff.trivialdiffheader(len(data)) + data,
190 # Flags not yet supported.
190 # Flags not yet supported.
191 0,
191 0,
192 )
192 )
193
193
194 added = cl.addgroup(iterrevisions(), linkrev, weakref.proxy(tr),
194 added = cl.addgroup(iterrevisions(), linkrev, weakref.proxy(tr),
195 addrevisioncb=onchangeset)
195 addrevisioncb=onchangeset)
196
196
197 progress.complete()
197 progress.complete()
198
198
199 return {
199 return {
200 'added': added,
200 'added': added,
201 'nodesbyphase': nodesbyphase,
201 'nodesbyphase': nodesbyphase,
202 'bookmarks': remotebookmarks,
202 'bookmarks': remotebookmarks,
203 'manifestnodes': manifestnodes,
203 'manifestnodes': manifestnodes,
204 }
204 }
205
205
206 def _fetchmanifests(repo, tr, remote, manifestnodes):
206 def _fetchmanifests(repo, tr, remote, manifestnodes):
207 rootmanifest = repo.manifestlog.getstorage(b'')
207 rootmanifest = repo.manifestlog.getstorage(b'')
208
208
209 # Some manifests can be shared between changesets. Filter out revisions
209 # Some manifests can be shared between changesets. Filter out revisions
210 # we already know about.
210 # we already know about.
211 fetchnodes = []
211 fetchnodes = []
212 linkrevs = {}
212 linkrevs = {}
213 seen = set()
213 seen = set()
214
214
215 for clrev, node in sorted(manifestnodes.iteritems()):
215 for clrev, node in sorted(manifestnodes.iteritems()):
216 if node in seen:
216 if node in seen:
217 continue
217 continue
218
218
219 try:
219 try:
220 rootmanifest.rev(node)
220 rootmanifest.rev(node)
221 except error.LookupError:
221 except error.LookupError:
222 fetchnodes.append(node)
222 fetchnodes.append(node)
223 linkrevs[node] = clrev
223 linkrevs[node] = clrev
224
224
225 seen.add(node)
225 seen.add(node)
226
226
227 # TODO handle tree manifests
227 # TODO handle tree manifests
228
228
229 # addgroup() expects 7-tuple describing revisions. This normalizes
229 # addgroup() expects 7-tuple describing revisions. This normalizes
230 # the wire data to that format.
230 # the wire data to that format.
231 def iterrevisions(objs, progress):
231 def iterrevisions(objs, progress):
232 for manifest in objs:
232 for manifest in objs:
233 node = manifest[b'node']
233 node = manifest[b'node']
234
234
235 extrafields = {}
235 extrafields = {}
236
236
237 for field, size in manifest.get(b'fieldsfollowing', []):
237 for field, size in manifest.get(b'fieldsfollowing', []):
238 extrafields[field] = next(objs)
238 extrafields[field] = next(objs)
239
239
240 if b'delta' in extrafields:
240 if b'delta' in extrafields:
241 basenode = manifest[b'deltabasenode']
241 basenode = manifest[b'deltabasenode']
242 delta = extrafields[b'delta']
242 delta = extrafields[b'delta']
243 elif b'revision' in extrafields:
243 elif b'revision' in extrafields:
244 basenode = nullid
244 basenode = nullid
245 revision = extrafields[b'revision']
245 revision = extrafields[b'revision']
246 delta = mdiff.trivialdiffheader(len(revision)) + revision
246 delta = mdiff.trivialdiffheader(len(revision)) + revision
247 else:
247 else:
248 continue
248 continue
249
249
250 yield (
250 yield (
251 node,
251 node,
252 manifest[b'parents'][0],
252 manifest[b'parents'][0],
253 manifest[b'parents'][1],
253 manifest[b'parents'][1],
254 # The value passed in is passed to the lookup function passed
254 # The value passed in is passed to the lookup function passed
255 # to addgroup(). We already have a map of manifest node to
255 # to addgroup(). We already have a map of manifest node to
256 # changelog revision number. So we just pass in the
256 # changelog revision number. So we just pass in the
257 # manifest node here and use linkrevs.__getitem__ as the
257 # manifest node here and use linkrevs.__getitem__ as the
258 # resolution function.
258 # resolution function.
259 node,
259 node,
260 basenode,
260 basenode,
261 delta,
261 delta,
262 # Flags not yet supported.
262 # Flags not yet supported.
263 0
263 0
264 )
264 )
265
265
266 progress.increment()
266 progress.increment()
267
267
268 progress = repo.ui.makeprogress(_('manifests'), unit=_('chunks'),
268 progress = repo.ui.makeprogress(_('manifests'), unit=_('chunks'),
269 total=len(fetchnodes))
269 total=len(fetchnodes))
270
270
271 # Fetch manifests 10,000 per command.
271 commandmeta = remote.apidescriptor[b'commands'][b'manifestdata']
272 # TODO have server advertise preferences?
272 batchsize = commandmeta.get(b'recommendedbatchsize', 10000)
273 # TODO make size configurable on client?
273 # TODO make size configurable on client?
274 batchsize = 10000
275
274
276 # We send commands 1 at a time to the remote. This is not the most
275 # We send commands 1 at a time to the remote. This is not the most
277 # efficient because we incur a round trip at the end of each batch.
276 # efficient because we incur a round trip at the end of each batch.
278 # However, the existing frame-based reactor keeps consuming server
277 # However, the existing frame-based reactor keeps consuming server
279 # data in the background. And this results in response data buffering
278 # data in the background. And this results in response data buffering
280 # in memory. This can consume gigabytes of memory.
279 # in memory. This can consume gigabytes of memory.
281 # TODO send multiple commands in a request once background buffering
280 # TODO send multiple commands in a request once background buffering
282 # issues are resolved.
281 # issues are resolved.
283
282
284 added = []
283 added = []
285
284
286 for i in pycompat.xrange(0, len(fetchnodes), batchsize):
285 for i in pycompat.xrange(0, len(fetchnodes), batchsize):
287 batch = [node for node in fetchnodes[i:i + batchsize]]
286 batch = [node for node in fetchnodes[i:i + batchsize]]
288 if not batch:
287 if not batch:
289 continue
288 continue
290
289
291 with remote.commandexecutor() as e:
290 with remote.commandexecutor() as e:
292 objs = e.callcommand(b'manifestdata', {
291 objs = e.callcommand(b'manifestdata', {
293 b'tree': b'',
292 b'tree': b'',
294 b'nodes': batch,
293 b'nodes': batch,
295 b'fields': {b'parents', b'revision'},
294 b'fields': {b'parents', b'revision'},
296 b'haveparents': True,
295 b'haveparents': True,
297 }).result()
296 }).result()
298
297
299 # Chomp off header object.
298 # Chomp off header object.
300 next(objs)
299 next(objs)
301
300
302 added.extend(rootmanifest.addgroup(
301 added.extend(rootmanifest.addgroup(
303 iterrevisions(objs, progress),
302 iterrevisions(objs, progress),
304 linkrevs.__getitem__,
303 linkrevs.__getitem__,
305 weakref.proxy(tr)))
304 weakref.proxy(tr)))
306
305
307 progress.complete()
306 progress.complete()
308
307
309 return {
308 return {
310 'added': added,
309 'added': added,
311 'linkrevs': linkrevs,
310 'linkrevs': linkrevs,
312 }
311 }
313
312
314 def _derivefilesfrommanifests(repo, manifestnodes):
313 def _derivefilesfrommanifests(repo, manifestnodes):
315 """Determine what file nodes are relevant given a set of manifest nodes.
314 """Determine what file nodes are relevant given a set of manifest nodes.
316
315
317 Returns a dict mapping file paths to dicts of file node to first manifest
316 Returns a dict mapping file paths to dicts of file node to first manifest
318 node.
317 node.
319 """
318 """
320 ml = repo.manifestlog
319 ml = repo.manifestlog
321 fnodes = collections.defaultdict(dict)
320 fnodes = collections.defaultdict(dict)
322
321
323 progress = repo.ui.makeprogress(
322 progress = repo.ui.makeprogress(
324 _('scanning manifests'), total=len(manifestnodes))
323 _('scanning manifests'), total=len(manifestnodes))
325
324
326 with progress:
325 with progress:
327 for manifestnode in manifestnodes:
326 for manifestnode in manifestnodes:
328 m = ml.get(b'', manifestnode)
327 m = ml.get(b'', manifestnode)
329
328
330 # TODO this will pull in unwanted nodes because it takes the storage
329 # TODO this will pull in unwanted nodes because it takes the storage
331 # delta into consideration. What we really want is something that
330 # delta into consideration. What we really want is something that
332 # takes the delta between the manifest's parents. And ideally we
331 # takes the delta between the manifest's parents. And ideally we
333 # would ignore file nodes that are known locally. For now, ignore
332 # would ignore file nodes that are known locally. For now, ignore
334 # both these limitations. This will result in incremental fetches
333 # both these limitations. This will result in incremental fetches
335 # requesting data we already have. So this is far from ideal.
334 # requesting data we already have. So this is far from ideal.
336 md = m.readfast()
335 md = m.readfast()
337
336
338 for path, fnode in md.items():
337 for path, fnode in md.items():
339 fnodes[path].setdefault(fnode, manifestnode)
338 fnodes[path].setdefault(fnode, manifestnode)
340
339
341 progress.increment()
340 progress.increment()
342
341
343 return fnodes
342 return fnodes
344
343
345 def _fetchfiles(repo, tr, remote, fnodes, linkrevs):
344 def _fetchfiles(repo, tr, remote, fnodes, linkrevs):
346 def iterrevisions(objs, progress):
345 def iterrevisions(objs, progress):
347 for filerevision in objs:
346 for filerevision in objs:
348 node = filerevision[b'node']
347 node = filerevision[b'node']
349
348
350 extrafields = {}
349 extrafields = {}
351
350
352 for field, size in filerevision.get(b'fieldsfollowing', []):
351 for field, size in filerevision.get(b'fieldsfollowing', []):
353 extrafields[field] = next(objs)
352 extrafields[field] = next(objs)
354
353
355 if b'delta' in extrafields:
354 if b'delta' in extrafields:
356 basenode = filerevision[b'deltabasenode']
355 basenode = filerevision[b'deltabasenode']
357 delta = extrafields[b'delta']
356 delta = extrafields[b'delta']
358 elif b'revision' in extrafields:
357 elif b'revision' in extrafields:
359 basenode = nullid
358 basenode = nullid
360 revision = extrafields[b'revision']
359 revision = extrafields[b'revision']
361 delta = mdiff.trivialdiffheader(len(revision)) + revision
360 delta = mdiff.trivialdiffheader(len(revision)) + revision
362 else:
361 else:
363 continue
362 continue
364
363
365 yield (
364 yield (
366 node,
365 node,
367 filerevision[b'parents'][0],
366 filerevision[b'parents'][0],
368 filerevision[b'parents'][1],
367 filerevision[b'parents'][1],
369 node,
368 node,
370 basenode,
369 basenode,
371 delta,
370 delta,
372 # Flags not yet supported.
371 # Flags not yet supported.
373 0,
372 0,
374 )
373 )
375
374
376 progress.increment()
375 progress.increment()
377
376
378 progress = repo.ui.makeprogress(
377 progress = repo.ui.makeprogress(
379 _('files'), unit=_('chunks'),
378 _('files'), unit=_('chunks'),
380 total=sum(len(v) for v in fnodes.itervalues()))
379 total=sum(len(v) for v in fnodes.itervalues()))
381
380
382 # TODO make batch size configurable
381 # TODO make batch size configurable
383 batchsize = 10000
382 batchsize = 10000
384 fnodeslist = [x for x in sorted(fnodes.items())]
383 fnodeslist = [x for x in sorted(fnodes.items())]
385
384
386 for i in pycompat.xrange(0, len(fnodeslist), batchsize):
385 for i in pycompat.xrange(0, len(fnodeslist), batchsize):
387 batch = [x for x in fnodeslist[i:i + batchsize]]
386 batch = [x for x in fnodeslist[i:i + batchsize]]
388 if not batch:
387 if not batch:
389 continue
388 continue
390
389
391 with remote.commandexecutor() as e:
390 with remote.commandexecutor() as e:
392 fs = []
391 fs = []
393 locallinkrevs = {}
392 locallinkrevs = {}
394
393
395 for path, nodes in batch:
394 for path, nodes in batch:
396 fs.append((path, e.callcommand(b'filedata', {
395 fs.append((path, e.callcommand(b'filedata', {
397 b'path': path,
396 b'path': path,
398 b'nodes': sorted(nodes),
397 b'nodes': sorted(nodes),
399 b'fields': {b'parents', b'revision'},
398 b'fields': {b'parents', b'revision'},
400 b'haveparents': True,
399 b'haveparents': True,
401 })))
400 })))
402
401
403 locallinkrevs[path] = {
402 locallinkrevs[path] = {
404 node: linkrevs[manifestnode]
403 node: linkrevs[manifestnode]
405 for node, manifestnode in nodes.iteritems()}
404 for node, manifestnode in nodes.iteritems()}
406
405
407 for path, f in fs:
406 for path, f in fs:
408 objs = f.result()
407 objs = f.result()
409
408
410 # Chomp off header objects.
409 # Chomp off header objects.
411 next(objs)
410 next(objs)
412
411
413 store = repo.file(path)
412 store = repo.file(path)
414 store.addgroup(
413 store.addgroup(
415 iterrevisions(objs, progress),
414 iterrevisions(objs, progress),
416 locallinkrevs[path].__getitem__,
415 locallinkrevs[path].__getitem__,
417 weakref.proxy(tr))
416 weakref.proxy(tr))
General Comments 0
You need to be logged in to leave comments. Login now