##// END OF EJS Templates
index: use `index.has_node` in `exchangev2._pullchangesetdiscovery`...
marmoute -
r43946:a166fadf default
parent child Browse files
Show More
@@ -1,775 +1,775 b''
1 # exchangev2.py - repository exchange for wire protocol version 2
1 # exchangev2.py - repository exchange for wire protocol version 2
2 #
2 #
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import collections
10 import collections
11 import weakref
11 import weakref
12
12
13 from .i18n import _
13 from .i18n import _
14 from .node import (
14 from .node import (
15 nullid,
15 nullid,
16 short,
16 short,
17 )
17 )
18 from . import (
18 from . import (
19 bookmarks,
19 bookmarks,
20 error,
20 error,
21 mdiff,
21 mdiff,
22 narrowspec,
22 narrowspec,
23 phases,
23 phases,
24 pycompat,
24 pycompat,
25 setdiscovery,
25 setdiscovery,
26 )
26 )
27 from .interfaces import repository
27 from .interfaces import repository
28
28
29
29
30 def pull(pullop):
30 def pull(pullop):
31 """Pull using wire protocol version 2."""
31 """Pull using wire protocol version 2."""
32 repo = pullop.repo
32 repo = pullop.repo
33 remote = pullop.remote
33 remote = pullop.remote
34
34
35 usingrawchangelogandmanifest = _checkuserawstorefiledata(pullop)
35 usingrawchangelogandmanifest = _checkuserawstorefiledata(pullop)
36
36
37 # If this is a clone and it was requested to perform a "stream clone",
37 # If this is a clone and it was requested to perform a "stream clone",
38 # we obtain the raw files data from the remote then fall back to an
38 # we obtain the raw files data from the remote then fall back to an
39 # incremental pull. This is somewhat hacky and is not nearly robust enough
39 # incremental pull. This is somewhat hacky and is not nearly robust enough
40 # for long-term usage.
40 # for long-term usage.
41 if usingrawchangelogandmanifest:
41 if usingrawchangelogandmanifest:
42 with repo.transaction(b'clone'):
42 with repo.transaction(b'clone'):
43 _fetchrawstorefiles(repo, remote)
43 _fetchrawstorefiles(repo, remote)
44 repo.invalidate(clearfilecache=True)
44 repo.invalidate(clearfilecache=True)
45
45
46 tr = pullop.trmanager.transaction()
46 tr = pullop.trmanager.transaction()
47
47
48 # We don't use the repo's narrow matcher here because the patterns passed
48 # We don't use the repo's narrow matcher here because the patterns passed
49 # to exchange.pull() could be different.
49 # to exchange.pull() could be different.
50 narrowmatcher = narrowspec.match(
50 narrowmatcher = narrowspec.match(
51 repo.root,
51 repo.root,
52 # Empty maps to nevermatcher. So always
52 # Empty maps to nevermatcher. So always
53 # set includes if missing.
53 # set includes if missing.
54 pullop.includepats or {b'path:.'},
54 pullop.includepats or {b'path:.'},
55 pullop.excludepats,
55 pullop.excludepats,
56 )
56 )
57
57
58 if pullop.includepats or pullop.excludepats:
58 if pullop.includepats or pullop.excludepats:
59 pathfilter = {}
59 pathfilter = {}
60 if pullop.includepats:
60 if pullop.includepats:
61 pathfilter[b'include'] = sorted(pullop.includepats)
61 pathfilter[b'include'] = sorted(pullop.includepats)
62 if pullop.excludepats:
62 if pullop.excludepats:
63 pathfilter[b'exclude'] = sorted(pullop.excludepats)
63 pathfilter[b'exclude'] = sorted(pullop.excludepats)
64 else:
64 else:
65 pathfilter = None
65 pathfilter = None
66
66
67 # Figure out what needs to be fetched.
67 # Figure out what needs to be fetched.
68 common, fetch, remoteheads = _pullchangesetdiscovery(
68 common, fetch, remoteheads = _pullchangesetdiscovery(
69 repo, remote, pullop.heads, abortwhenunrelated=pullop.force
69 repo, remote, pullop.heads, abortwhenunrelated=pullop.force
70 )
70 )
71
71
72 # And fetch the data.
72 # And fetch the data.
73 pullheads = pullop.heads or remoteheads
73 pullheads = pullop.heads or remoteheads
74 csetres = _fetchchangesets(repo, tr, remote, common, fetch, pullheads)
74 csetres = _fetchchangesets(repo, tr, remote, common, fetch, pullheads)
75
75
76 # New revisions are written to the changelog. But all other updates
76 # New revisions are written to the changelog. But all other updates
77 # are deferred. Do those now.
77 # are deferred. Do those now.
78
78
79 # Ensure all new changesets are draft by default. If the repo is
79 # Ensure all new changesets are draft by default. If the repo is
80 # publishing, the phase will be adjusted by the loop below.
80 # publishing, the phase will be adjusted by the loop below.
81 if csetres[b'added']:
81 if csetres[b'added']:
82 phases.registernew(repo, tr, phases.draft, csetres[b'added'])
82 phases.registernew(repo, tr, phases.draft, csetres[b'added'])
83
83
84 # And adjust the phase of all changesets accordingly.
84 # And adjust the phase of all changesets accordingly.
85 for phase in phases.phasenames:
85 for phase in phases.phasenames:
86 if phase == b'secret' or not csetres[b'nodesbyphase'][phase]:
86 if phase == b'secret' or not csetres[b'nodesbyphase'][phase]:
87 continue
87 continue
88
88
89 phases.advanceboundary(
89 phases.advanceboundary(
90 repo,
90 repo,
91 tr,
91 tr,
92 phases.phasenames.index(phase),
92 phases.phasenames.index(phase),
93 csetres[b'nodesbyphase'][phase],
93 csetres[b'nodesbyphase'][phase],
94 )
94 )
95
95
96 # Write bookmark updates.
96 # Write bookmark updates.
97 bookmarks.updatefromremote(
97 bookmarks.updatefromremote(
98 repo.ui,
98 repo.ui,
99 repo,
99 repo,
100 csetres[b'bookmarks'],
100 csetres[b'bookmarks'],
101 remote.url(),
101 remote.url(),
102 pullop.gettransaction,
102 pullop.gettransaction,
103 explicit=pullop.explicitbookmarks,
103 explicit=pullop.explicitbookmarks,
104 )
104 )
105
105
106 manres = _fetchmanifests(repo, tr, remote, csetres[b'manifestnodes'])
106 manres = _fetchmanifests(repo, tr, remote, csetres[b'manifestnodes'])
107
107
108 # We don't properly support shallow changeset and manifest yet. So we apply
108 # We don't properly support shallow changeset and manifest yet. So we apply
109 # depth limiting locally.
109 # depth limiting locally.
110 if pullop.depth:
110 if pullop.depth:
111 relevantcsetnodes = set()
111 relevantcsetnodes = set()
112 clnode = repo.changelog.node
112 clnode = repo.changelog.node
113
113
114 for rev in repo.revs(
114 for rev in repo.revs(
115 b'ancestors(%ln, %s)', pullheads, pullop.depth - 1
115 b'ancestors(%ln, %s)', pullheads, pullop.depth - 1
116 ):
116 ):
117 relevantcsetnodes.add(clnode(rev))
117 relevantcsetnodes.add(clnode(rev))
118
118
119 csetrelevantfilter = lambda n: n in relevantcsetnodes
119 csetrelevantfilter = lambda n: n in relevantcsetnodes
120
120
121 else:
121 else:
122 csetrelevantfilter = lambda n: True
122 csetrelevantfilter = lambda n: True
123
123
124 # If obtaining the raw store files, we need to scan the full repo to
124 # If obtaining the raw store files, we need to scan the full repo to
125 # derive all the changesets, manifests, and linkrevs.
125 # derive all the changesets, manifests, and linkrevs.
126 if usingrawchangelogandmanifest:
126 if usingrawchangelogandmanifest:
127 csetsforfiles = []
127 csetsforfiles = []
128 mnodesforfiles = []
128 mnodesforfiles = []
129 manifestlinkrevs = {}
129 manifestlinkrevs = {}
130
130
131 for rev in repo:
131 for rev in repo:
132 ctx = repo[rev]
132 ctx = repo[rev]
133 node = ctx.node()
133 node = ctx.node()
134
134
135 if not csetrelevantfilter(node):
135 if not csetrelevantfilter(node):
136 continue
136 continue
137
137
138 mnode = ctx.manifestnode()
138 mnode = ctx.manifestnode()
139
139
140 csetsforfiles.append(node)
140 csetsforfiles.append(node)
141 mnodesforfiles.append(mnode)
141 mnodesforfiles.append(mnode)
142 manifestlinkrevs[mnode] = rev
142 manifestlinkrevs[mnode] = rev
143
143
144 else:
144 else:
145 csetsforfiles = [n for n in csetres[b'added'] if csetrelevantfilter(n)]
145 csetsforfiles = [n for n in csetres[b'added'] if csetrelevantfilter(n)]
146 mnodesforfiles = manres[b'added']
146 mnodesforfiles = manres[b'added']
147 manifestlinkrevs = manres[b'linkrevs']
147 manifestlinkrevs = manres[b'linkrevs']
148
148
149 # Find all file nodes referenced by added manifests and fetch those
149 # Find all file nodes referenced by added manifests and fetch those
150 # revisions.
150 # revisions.
151 fnodes = _derivefilesfrommanifests(repo, narrowmatcher, mnodesforfiles)
151 fnodes = _derivefilesfrommanifests(repo, narrowmatcher, mnodesforfiles)
152 _fetchfilesfromcsets(
152 _fetchfilesfromcsets(
153 repo,
153 repo,
154 tr,
154 tr,
155 remote,
155 remote,
156 pathfilter,
156 pathfilter,
157 fnodes,
157 fnodes,
158 csetsforfiles,
158 csetsforfiles,
159 manifestlinkrevs,
159 manifestlinkrevs,
160 shallow=bool(pullop.depth),
160 shallow=bool(pullop.depth),
161 )
161 )
162
162
163
163
164 def _checkuserawstorefiledata(pullop):
164 def _checkuserawstorefiledata(pullop):
165 """Check whether we should use rawstorefiledata command to retrieve data."""
165 """Check whether we should use rawstorefiledata command to retrieve data."""
166
166
167 repo = pullop.repo
167 repo = pullop.repo
168 remote = pullop.remote
168 remote = pullop.remote
169
169
170 # Command to obtain raw store data isn't available.
170 # Command to obtain raw store data isn't available.
171 if b'rawstorefiledata' not in remote.apidescriptor[b'commands']:
171 if b'rawstorefiledata' not in remote.apidescriptor[b'commands']:
172 return False
172 return False
173
173
174 # Only honor if user requested stream clone operation.
174 # Only honor if user requested stream clone operation.
175 if not pullop.streamclonerequested:
175 if not pullop.streamclonerequested:
176 return False
176 return False
177
177
178 # Only works on empty repos.
178 # Only works on empty repos.
179 if len(repo):
179 if len(repo):
180 return False
180 return False
181
181
182 # TODO This is super hacky. There needs to be a storage API for this. We
182 # TODO This is super hacky. There needs to be a storage API for this. We
183 # also need to check for compatibility with the remote.
183 # also need to check for compatibility with the remote.
184 if b'revlogv1' not in repo.requirements:
184 if b'revlogv1' not in repo.requirements:
185 return False
185 return False
186
186
187 return True
187 return True
188
188
189
189
190 def _fetchrawstorefiles(repo, remote):
190 def _fetchrawstorefiles(repo, remote):
191 with remote.commandexecutor() as e:
191 with remote.commandexecutor() as e:
192 objs = e.callcommand(
192 objs = e.callcommand(
193 b'rawstorefiledata', {b'files': [b'changelog', b'manifestlog'],}
193 b'rawstorefiledata', {b'files': [b'changelog', b'manifestlog'],}
194 ).result()
194 ).result()
195
195
196 # First object is a summary of files data that follows.
196 # First object is a summary of files data that follows.
197 overall = next(objs)
197 overall = next(objs)
198
198
199 progress = repo.ui.makeprogress(
199 progress = repo.ui.makeprogress(
200 _(b'clone'), total=overall[b'totalsize'], unit=_(b'bytes')
200 _(b'clone'), total=overall[b'totalsize'], unit=_(b'bytes')
201 )
201 )
202 with progress:
202 with progress:
203 progress.update(0)
203 progress.update(0)
204
204
205 # Next are pairs of file metadata, data.
205 # Next are pairs of file metadata, data.
206 while True:
206 while True:
207 try:
207 try:
208 filemeta = next(objs)
208 filemeta = next(objs)
209 except StopIteration:
209 except StopIteration:
210 break
210 break
211
211
212 for k in (b'location', b'path', b'size'):
212 for k in (b'location', b'path', b'size'):
213 if k not in filemeta:
213 if k not in filemeta:
214 raise error.Abort(
214 raise error.Abort(
215 _(b'remote file data missing key: %s') % k
215 _(b'remote file data missing key: %s') % k
216 )
216 )
217
217
218 if filemeta[b'location'] == b'store':
218 if filemeta[b'location'] == b'store':
219 vfs = repo.svfs
219 vfs = repo.svfs
220 else:
220 else:
221 raise error.Abort(
221 raise error.Abort(
222 _(b'invalid location for raw file data: %s')
222 _(b'invalid location for raw file data: %s')
223 % filemeta[b'location']
223 % filemeta[b'location']
224 )
224 )
225
225
226 bytesremaining = filemeta[b'size']
226 bytesremaining = filemeta[b'size']
227
227
228 with vfs.open(filemeta[b'path'], b'wb') as fh:
228 with vfs.open(filemeta[b'path'], b'wb') as fh:
229 while True:
229 while True:
230 try:
230 try:
231 chunk = next(objs)
231 chunk = next(objs)
232 except StopIteration:
232 except StopIteration:
233 break
233 break
234
234
235 bytesremaining -= len(chunk)
235 bytesremaining -= len(chunk)
236
236
237 if bytesremaining < 0:
237 if bytesremaining < 0:
238 raise error.Abort(
238 raise error.Abort(
239 _(
239 _(
240 b'received invalid number of bytes for file '
240 b'received invalid number of bytes for file '
241 b'data; expected %d, got extra'
241 b'data; expected %d, got extra'
242 )
242 )
243 % filemeta[b'size']
243 % filemeta[b'size']
244 )
244 )
245
245
246 progress.increment(step=len(chunk))
246 progress.increment(step=len(chunk))
247 fh.write(chunk)
247 fh.write(chunk)
248
248
249 try:
249 try:
250 if chunk.islast:
250 if chunk.islast:
251 break
251 break
252 except AttributeError:
252 except AttributeError:
253 raise error.Abort(
253 raise error.Abort(
254 _(
254 _(
255 b'did not receive indefinite length bytestring '
255 b'did not receive indefinite length bytestring '
256 b'for file data'
256 b'for file data'
257 )
257 )
258 )
258 )
259
259
260 if bytesremaining:
260 if bytesremaining:
261 raise error.Abort(
261 raise error.Abort(
262 _(
262 _(
263 b'received invalid number of bytes for'
263 b'received invalid number of bytes for'
264 b'file data; expected %d got %d'
264 b'file data; expected %d got %d'
265 )
265 )
266 % (
266 % (
267 filemeta[b'size'],
267 filemeta[b'size'],
268 filemeta[b'size'] - bytesremaining,
268 filemeta[b'size'] - bytesremaining,
269 )
269 )
270 )
270 )
271
271
272
272
273 def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True):
273 def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True):
274 """Determine which changesets need to be pulled."""
274 """Determine which changesets need to be pulled."""
275
275
276 if heads:
276 if heads:
277 knownnode = repo.changelog.hasnode
277 knownnode = repo.changelog.hasnode
278 if all(knownnode(head) for head in heads):
278 if all(knownnode(head) for head in heads):
279 return heads, False, heads
279 return heads, False, heads
280
280
281 # TODO wire protocol version 2 is capable of more efficient discovery
281 # TODO wire protocol version 2 is capable of more efficient discovery
282 # than setdiscovery. Consider implementing something better.
282 # than setdiscovery. Consider implementing something better.
283 common, fetch, remoteheads = setdiscovery.findcommonheads(
283 common, fetch, remoteheads = setdiscovery.findcommonheads(
284 repo.ui, repo, remote, abortwhenunrelated=abortwhenunrelated
284 repo.ui, repo, remote, abortwhenunrelated=abortwhenunrelated
285 )
285 )
286
286
287 common = set(common)
287 common = set(common)
288 remoteheads = set(remoteheads)
288 remoteheads = set(remoteheads)
289
289
290 # If a remote head is filtered locally, put it back in the common set.
290 # If a remote head is filtered locally, put it back in the common set.
291 # See the comment in exchange._pulldiscoverychangegroup() for more.
291 # See the comment in exchange._pulldiscoverychangegroup() for more.
292
292
293 if fetch and remoteheads:
293 if fetch and remoteheads:
294 nodemap = repo.unfiltered().changelog.nodemap
294 has_node = repo.unfiltered().changelog.index.has_node
295
295
296 common |= {head for head in remoteheads if head in nodemap}
296 common |= {head for head in remoteheads if has_node(head)}
297
297
298 if set(remoteheads).issubset(common):
298 if set(remoteheads).issubset(common):
299 fetch = []
299 fetch = []
300
300
301 common.discard(nullid)
301 common.discard(nullid)
302
302
303 return common, fetch, remoteheads
303 return common, fetch, remoteheads
304
304
305
305
306 def _fetchchangesets(repo, tr, remote, common, fetch, remoteheads):
306 def _fetchchangesets(repo, tr, remote, common, fetch, remoteheads):
307 # TODO consider adding a step here where we obtain the DAG shape first
307 # TODO consider adding a step here where we obtain the DAG shape first
308 # (or ask the server to slice changesets into chunks for us) so that
308 # (or ask the server to slice changesets into chunks for us) so that
309 # we can perform multiple fetches in batches. This will facilitate
309 # we can perform multiple fetches in batches. This will facilitate
310 # resuming interrupted clones, higher server-side cache hit rates due
310 # resuming interrupted clones, higher server-side cache hit rates due
311 # to smaller segments, etc.
311 # to smaller segments, etc.
312 with remote.commandexecutor() as e:
312 with remote.commandexecutor() as e:
313 objs = e.callcommand(
313 objs = e.callcommand(
314 b'changesetdata',
314 b'changesetdata',
315 {
315 {
316 b'revisions': [
316 b'revisions': [
317 {
317 {
318 b'type': b'changesetdagrange',
318 b'type': b'changesetdagrange',
319 b'roots': sorted(common),
319 b'roots': sorted(common),
320 b'heads': sorted(remoteheads),
320 b'heads': sorted(remoteheads),
321 }
321 }
322 ],
322 ],
323 b'fields': {b'bookmarks', b'parents', b'phase', b'revision'},
323 b'fields': {b'bookmarks', b'parents', b'phase', b'revision'},
324 },
324 },
325 ).result()
325 ).result()
326
326
327 # The context manager waits on all response data when exiting. So
327 # The context manager waits on all response data when exiting. So
328 # we need to remain in the context manager in order to stream data.
328 # we need to remain in the context manager in order to stream data.
329 return _processchangesetdata(repo, tr, objs)
329 return _processchangesetdata(repo, tr, objs)
330
330
331
331
332 def _processchangesetdata(repo, tr, objs):
332 def _processchangesetdata(repo, tr, objs):
333 repo.hook(b'prechangegroup', throw=True, **pycompat.strkwargs(tr.hookargs))
333 repo.hook(b'prechangegroup', throw=True, **pycompat.strkwargs(tr.hookargs))
334
334
335 urepo = repo.unfiltered()
335 urepo = repo.unfiltered()
336 cl = urepo.changelog
336 cl = urepo.changelog
337
337
338 cl.delayupdate(tr)
338 cl.delayupdate(tr)
339
339
340 # The first emitted object is a header describing the data that
340 # The first emitted object is a header describing the data that
341 # follows.
341 # follows.
342 meta = next(objs)
342 meta = next(objs)
343
343
344 progress = repo.ui.makeprogress(
344 progress = repo.ui.makeprogress(
345 _(b'changesets'), unit=_(b'chunks'), total=meta.get(b'totalitems')
345 _(b'changesets'), unit=_(b'chunks'), total=meta.get(b'totalitems')
346 )
346 )
347
347
348 manifestnodes = {}
348 manifestnodes = {}
349
349
350 def linkrev(node):
350 def linkrev(node):
351 repo.ui.debug(b'add changeset %s\n' % short(node))
351 repo.ui.debug(b'add changeset %s\n' % short(node))
352 # Linkrev for changelog is always self.
352 # Linkrev for changelog is always self.
353 return len(cl)
353 return len(cl)
354
354
355 def onchangeset(cl, node):
355 def onchangeset(cl, node):
356 progress.increment()
356 progress.increment()
357
357
358 revision = cl.changelogrevision(node)
358 revision = cl.changelogrevision(node)
359
359
360 # We need to preserve the mapping of changelog revision to node
360 # We need to preserve the mapping of changelog revision to node
361 # so we can set the linkrev accordingly when manifests are added.
361 # so we can set the linkrev accordingly when manifests are added.
362 manifestnodes[cl.rev(node)] = revision.manifest
362 manifestnodes[cl.rev(node)] = revision.manifest
363
363
364 nodesbyphase = {phase: set() for phase in phases.phasenames}
364 nodesbyphase = {phase: set() for phase in phases.phasenames}
365 remotebookmarks = {}
365 remotebookmarks = {}
366
366
367 # addgroup() expects a 7-tuple describing revisions. This normalizes
367 # addgroup() expects a 7-tuple describing revisions. This normalizes
368 # the wire data to that format.
368 # the wire data to that format.
369 #
369 #
370 # This loop also aggregates non-revision metadata, such as phase
370 # This loop also aggregates non-revision metadata, such as phase
371 # data.
371 # data.
372 def iterrevisions():
372 def iterrevisions():
373 for cset in objs:
373 for cset in objs:
374 node = cset[b'node']
374 node = cset[b'node']
375
375
376 if b'phase' in cset:
376 if b'phase' in cset:
377 nodesbyphase[cset[b'phase']].add(node)
377 nodesbyphase[cset[b'phase']].add(node)
378
378
379 for mark in cset.get(b'bookmarks', []):
379 for mark in cset.get(b'bookmarks', []):
380 remotebookmarks[mark] = node
380 remotebookmarks[mark] = node
381
381
382 # TODO add mechanism for extensions to examine records so they
382 # TODO add mechanism for extensions to examine records so they
383 # can siphon off custom data fields.
383 # can siphon off custom data fields.
384
384
385 extrafields = {}
385 extrafields = {}
386
386
387 for field, size in cset.get(b'fieldsfollowing', []):
387 for field, size in cset.get(b'fieldsfollowing', []):
388 extrafields[field] = next(objs)
388 extrafields[field] = next(objs)
389
389
390 # Some entries might only be metadata only updates.
390 # Some entries might only be metadata only updates.
391 if b'revision' not in extrafields:
391 if b'revision' not in extrafields:
392 continue
392 continue
393
393
394 data = extrafields[b'revision']
394 data = extrafields[b'revision']
395
395
396 yield (
396 yield (
397 node,
397 node,
398 cset[b'parents'][0],
398 cset[b'parents'][0],
399 cset[b'parents'][1],
399 cset[b'parents'][1],
400 # Linknode is always itself for changesets.
400 # Linknode is always itself for changesets.
401 cset[b'node'],
401 cset[b'node'],
402 # We always send full revisions. So delta base is not set.
402 # We always send full revisions. So delta base is not set.
403 nullid,
403 nullid,
404 mdiff.trivialdiffheader(len(data)) + data,
404 mdiff.trivialdiffheader(len(data)) + data,
405 # Flags not yet supported.
405 # Flags not yet supported.
406 0,
406 0,
407 )
407 )
408
408
409 added = cl.addgroup(
409 added = cl.addgroup(
410 iterrevisions(), linkrev, weakref.proxy(tr), addrevisioncb=onchangeset
410 iterrevisions(), linkrev, weakref.proxy(tr), addrevisioncb=onchangeset
411 )
411 )
412
412
413 progress.complete()
413 progress.complete()
414
414
415 return {
415 return {
416 b'added': added,
416 b'added': added,
417 b'nodesbyphase': nodesbyphase,
417 b'nodesbyphase': nodesbyphase,
418 b'bookmarks': remotebookmarks,
418 b'bookmarks': remotebookmarks,
419 b'manifestnodes': manifestnodes,
419 b'manifestnodes': manifestnodes,
420 }
420 }
421
421
422
422
423 def _fetchmanifests(repo, tr, remote, manifestnodes):
423 def _fetchmanifests(repo, tr, remote, manifestnodes):
424 rootmanifest = repo.manifestlog.getstorage(b'')
424 rootmanifest = repo.manifestlog.getstorage(b'')
425
425
426 # Some manifests can be shared between changesets. Filter out revisions
426 # Some manifests can be shared between changesets. Filter out revisions
427 # we already know about.
427 # we already know about.
428 fetchnodes = []
428 fetchnodes = []
429 linkrevs = {}
429 linkrevs = {}
430 seen = set()
430 seen = set()
431
431
432 for clrev, node in sorted(pycompat.iteritems(manifestnodes)):
432 for clrev, node in sorted(pycompat.iteritems(manifestnodes)):
433 if node in seen:
433 if node in seen:
434 continue
434 continue
435
435
436 try:
436 try:
437 rootmanifest.rev(node)
437 rootmanifest.rev(node)
438 except error.LookupError:
438 except error.LookupError:
439 fetchnodes.append(node)
439 fetchnodes.append(node)
440 linkrevs[node] = clrev
440 linkrevs[node] = clrev
441
441
442 seen.add(node)
442 seen.add(node)
443
443
444 # TODO handle tree manifests
444 # TODO handle tree manifests
445
445
446 # addgroup() expects 7-tuple describing revisions. This normalizes
446 # addgroup() expects 7-tuple describing revisions. This normalizes
447 # the wire data to that format.
447 # the wire data to that format.
448 def iterrevisions(objs, progress):
448 def iterrevisions(objs, progress):
449 for manifest in objs:
449 for manifest in objs:
450 node = manifest[b'node']
450 node = manifest[b'node']
451
451
452 extrafields = {}
452 extrafields = {}
453
453
454 for field, size in manifest.get(b'fieldsfollowing', []):
454 for field, size in manifest.get(b'fieldsfollowing', []):
455 extrafields[field] = next(objs)
455 extrafields[field] = next(objs)
456
456
457 if b'delta' in extrafields:
457 if b'delta' in extrafields:
458 basenode = manifest[b'deltabasenode']
458 basenode = manifest[b'deltabasenode']
459 delta = extrafields[b'delta']
459 delta = extrafields[b'delta']
460 elif b'revision' in extrafields:
460 elif b'revision' in extrafields:
461 basenode = nullid
461 basenode = nullid
462 revision = extrafields[b'revision']
462 revision = extrafields[b'revision']
463 delta = mdiff.trivialdiffheader(len(revision)) + revision
463 delta = mdiff.trivialdiffheader(len(revision)) + revision
464 else:
464 else:
465 continue
465 continue
466
466
467 yield (
467 yield (
468 node,
468 node,
469 manifest[b'parents'][0],
469 manifest[b'parents'][0],
470 manifest[b'parents'][1],
470 manifest[b'parents'][1],
471 # The value passed in is passed to the lookup function passed
471 # The value passed in is passed to the lookup function passed
472 # to addgroup(). We already have a map of manifest node to
472 # to addgroup(). We already have a map of manifest node to
473 # changelog revision number. So we just pass in the
473 # changelog revision number. So we just pass in the
474 # manifest node here and use linkrevs.__getitem__ as the
474 # manifest node here and use linkrevs.__getitem__ as the
475 # resolution function.
475 # resolution function.
476 node,
476 node,
477 basenode,
477 basenode,
478 delta,
478 delta,
479 # Flags not yet supported.
479 # Flags not yet supported.
480 0,
480 0,
481 )
481 )
482
482
483 progress.increment()
483 progress.increment()
484
484
485 progress = repo.ui.makeprogress(
485 progress = repo.ui.makeprogress(
486 _(b'manifests'), unit=_(b'chunks'), total=len(fetchnodes)
486 _(b'manifests'), unit=_(b'chunks'), total=len(fetchnodes)
487 )
487 )
488
488
489 commandmeta = remote.apidescriptor[b'commands'][b'manifestdata']
489 commandmeta = remote.apidescriptor[b'commands'][b'manifestdata']
490 batchsize = commandmeta.get(b'recommendedbatchsize', 10000)
490 batchsize = commandmeta.get(b'recommendedbatchsize', 10000)
491 # TODO make size configurable on client?
491 # TODO make size configurable on client?
492
492
493 # We send commands 1 at a time to the remote. This is not the most
493 # We send commands 1 at a time to the remote. This is not the most
494 # efficient because we incur a round trip at the end of each batch.
494 # efficient because we incur a round trip at the end of each batch.
495 # However, the existing frame-based reactor keeps consuming server
495 # However, the existing frame-based reactor keeps consuming server
496 # data in the background. And this results in response data buffering
496 # data in the background. And this results in response data buffering
497 # in memory. This can consume gigabytes of memory.
497 # in memory. This can consume gigabytes of memory.
498 # TODO send multiple commands in a request once background buffering
498 # TODO send multiple commands in a request once background buffering
499 # issues are resolved.
499 # issues are resolved.
500
500
501 added = []
501 added = []
502
502
503 for i in pycompat.xrange(0, len(fetchnodes), batchsize):
503 for i in pycompat.xrange(0, len(fetchnodes), batchsize):
504 batch = [node for node in fetchnodes[i : i + batchsize]]
504 batch = [node for node in fetchnodes[i : i + batchsize]]
505 if not batch:
505 if not batch:
506 continue
506 continue
507
507
508 with remote.commandexecutor() as e:
508 with remote.commandexecutor() as e:
509 objs = e.callcommand(
509 objs = e.callcommand(
510 b'manifestdata',
510 b'manifestdata',
511 {
511 {
512 b'tree': b'',
512 b'tree': b'',
513 b'nodes': batch,
513 b'nodes': batch,
514 b'fields': {b'parents', b'revision'},
514 b'fields': {b'parents', b'revision'},
515 b'haveparents': True,
515 b'haveparents': True,
516 },
516 },
517 ).result()
517 ).result()
518
518
519 # Chomp off header object.
519 # Chomp off header object.
520 next(objs)
520 next(objs)
521
521
522 added.extend(
522 added.extend(
523 rootmanifest.addgroup(
523 rootmanifest.addgroup(
524 iterrevisions(objs, progress),
524 iterrevisions(objs, progress),
525 linkrevs.__getitem__,
525 linkrevs.__getitem__,
526 weakref.proxy(tr),
526 weakref.proxy(tr),
527 )
527 )
528 )
528 )
529
529
530 progress.complete()
530 progress.complete()
531
531
532 return {
532 return {
533 b'added': added,
533 b'added': added,
534 b'linkrevs': linkrevs,
534 b'linkrevs': linkrevs,
535 }
535 }
536
536
537
537
538 def _derivefilesfrommanifests(repo, matcher, manifestnodes):
538 def _derivefilesfrommanifests(repo, matcher, manifestnodes):
539 """Determine what file nodes are relevant given a set of manifest nodes.
539 """Determine what file nodes are relevant given a set of manifest nodes.
540
540
541 Returns a dict mapping file paths to dicts of file node to first manifest
541 Returns a dict mapping file paths to dicts of file node to first manifest
542 node.
542 node.
543 """
543 """
544 ml = repo.manifestlog
544 ml = repo.manifestlog
545 fnodes = collections.defaultdict(dict)
545 fnodes = collections.defaultdict(dict)
546
546
547 progress = repo.ui.makeprogress(
547 progress = repo.ui.makeprogress(
548 _(b'scanning manifests'), total=len(manifestnodes)
548 _(b'scanning manifests'), total=len(manifestnodes)
549 )
549 )
550
550
551 with progress:
551 with progress:
552 for manifestnode in manifestnodes:
552 for manifestnode in manifestnodes:
553 m = ml.get(b'', manifestnode)
553 m = ml.get(b'', manifestnode)
554
554
555 # TODO this will pull in unwanted nodes because it takes the storage
555 # TODO this will pull in unwanted nodes because it takes the storage
556 # delta into consideration. What we really want is something that
556 # delta into consideration. What we really want is something that
557 # takes the delta between the manifest's parents. And ideally we
557 # takes the delta between the manifest's parents. And ideally we
558 # would ignore file nodes that are known locally. For now, ignore
558 # would ignore file nodes that are known locally. For now, ignore
559 # both these limitations. This will result in incremental fetches
559 # both these limitations. This will result in incremental fetches
560 # requesting data we already have. So this is far from ideal.
560 # requesting data we already have. So this is far from ideal.
561 md = m.readfast()
561 md = m.readfast()
562
562
563 for path, fnode in md.items():
563 for path, fnode in md.items():
564 if matcher(path):
564 if matcher(path):
565 fnodes[path].setdefault(fnode, manifestnode)
565 fnodes[path].setdefault(fnode, manifestnode)
566
566
567 progress.increment()
567 progress.increment()
568
568
569 return fnodes
569 return fnodes
570
570
571
571
572 def _fetchfiles(repo, tr, remote, fnodes, linkrevs):
572 def _fetchfiles(repo, tr, remote, fnodes, linkrevs):
573 """Fetch file data from explicit file revisions."""
573 """Fetch file data from explicit file revisions."""
574
574
575 def iterrevisions(objs, progress):
575 def iterrevisions(objs, progress):
576 for filerevision in objs:
576 for filerevision in objs:
577 node = filerevision[b'node']
577 node = filerevision[b'node']
578
578
579 extrafields = {}
579 extrafields = {}
580
580
581 for field, size in filerevision.get(b'fieldsfollowing', []):
581 for field, size in filerevision.get(b'fieldsfollowing', []):
582 extrafields[field] = next(objs)
582 extrafields[field] = next(objs)
583
583
584 if b'delta' in extrafields:
584 if b'delta' in extrafields:
585 basenode = filerevision[b'deltabasenode']
585 basenode = filerevision[b'deltabasenode']
586 delta = extrafields[b'delta']
586 delta = extrafields[b'delta']
587 elif b'revision' in extrafields:
587 elif b'revision' in extrafields:
588 basenode = nullid
588 basenode = nullid
589 revision = extrafields[b'revision']
589 revision = extrafields[b'revision']
590 delta = mdiff.trivialdiffheader(len(revision)) + revision
590 delta = mdiff.trivialdiffheader(len(revision)) + revision
591 else:
591 else:
592 continue
592 continue
593
593
594 yield (
594 yield (
595 node,
595 node,
596 filerevision[b'parents'][0],
596 filerevision[b'parents'][0],
597 filerevision[b'parents'][1],
597 filerevision[b'parents'][1],
598 node,
598 node,
599 basenode,
599 basenode,
600 delta,
600 delta,
601 # Flags not yet supported.
601 # Flags not yet supported.
602 0,
602 0,
603 )
603 )
604
604
605 progress.increment()
605 progress.increment()
606
606
607 progress = repo.ui.makeprogress(
607 progress = repo.ui.makeprogress(
608 _(b'files'),
608 _(b'files'),
609 unit=_(b'chunks'),
609 unit=_(b'chunks'),
610 total=sum(len(v) for v in pycompat.itervalues(fnodes)),
610 total=sum(len(v) for v in pycompat.itervalues(fnodes)),
611 )
611 )
612
612
613 # TODO make batch size configurable
613 # TODO make batch size configurable
614 batchsize = 10000
614 batchsize = 10000
615 fnodeslist = [x for x in sorted(fnodes.items())]
615 fnodeslist = [x for x in sorted(fnodes.items())]
616
616
617 for i in pycompat.xrange(0, len(fnodeslist), batchsize):
617 for i in pycompat.xrange(0, len(fnodeslist), batchsize):
618 batch = [x for x in fnodeslist[i : i + batchsize]]
618 batch = [x for x in fnodeslist[i : i + batchsize]]
619 if not batch:
619 if not batch:
620 continue
620 continue
621
621
622 with remote.commandexecutor() as e:
622 with remote.commandexecutor() as e:
623 fs = []
623 fs = []
624 locallinkrevs = {}
624 locallinkrevs = {}
625
625
626 for path, nodes in batch:
626 for path, nodes in batch:
627 fs.append(
627 fs.append(
628 (
628 (
629 path,
629 path,
630 e.callcommand(
630 e.callcommand(
631 b'filedata',
631 b'filedata',
632 {
632 {
633 b'path': path,
633 b'path': path,
634 b'nodes': sorted(nodes),
634 b'nodes': sorted(nodes),
635 b'fields': {b'parents', b'revision'},
635 b'fields': {b'parents', b'revision'},
636 b'haveparents': True,
636 b'haveparents': True,
637 },
637 },
638 ),
638 ),
639 )
639 )
640 )
640 )
641
641
642 locallinkrevs[path] = {
642 locallinkrevs[path] = {
643 node: linkrevs[manifestnode]
643 node: linkrevs[manifestnode]
644 for node, manifestnode in pycompat.iteritems(nodes)
644 for node, manifestnode in pycompat.iteritems(nodes)
645 }
645 }
646
646
647 for path, f in fs:
647 for path, f in fs:
648 objs = f.result()
648 objs = f.result()
649
649
650 # Chomp off header objects.
650 # Chomp off header objects.
651 next(objs)
651 next(objs)
652
652
653 store = repo.file(path)
653 store = repo.file(path)
654 store.addgroup(
654 store.addgroup(
655 iterrevisions(objs, progress),
655 iterrevisions(objs, progress),
656 locallinkrevs[path].__getitem__,
656 locallinkrevs[path].__getitem__,
657 weakref.proxy(tr),
657 weakref.proxy(tr),
658 )
658 )
659
659
660
660
661 def _fetchfilesfromcsets(
661 def _fetchfilesfromcsets(
662 repo, tr, remote, pathfilter, fnodes, csets, manlinkrevs, shallow=False
662 repo, tr, remote, pathfilter, fnodes, csets, manlinkrevs, shallow=False
663 ):
663 ):
664 """Fetch file data from explicit changeset revisions."""
664 """Fetch file data from explicit changeset revisions."""
665
665
666 def iterrevisions(objs, remaining, progress):
666 def iterrevisions(objs, remaining, progress):
667 while remaining:
667 while remaining:
668 filerevision = next(objs)
668 filerevision = next(objs)
669
669
670 node = filerevision[b'node']
670 node = filerevision[b'node']
671
671
672 extrafields = {}
672 extrafields = {}
673
673
674 for field, size in filerevision.get(b'fieldsfollowing', []):
674 for field, size in filerevision.get(b'fieldsfollowing', []):
675 extrafields[field] = next(objs)
675 extrafields[field] = next(objs)
676
676
677 if b'delta' in extrafields:
677 if b'delta' in extrafields:
678 basenode = filerevision[b'deltabasenode']
678 basenode = filerevision[b'deltabasenode']
679 delta = extrafields[b'delta']
679 delta = extrafields[b'delta']
680 elif b'revision' in extrafields:
680 elif b'revision' in extrafields:
681 basenode = nullid
681 basenode = nullid
682 revision = extrafields[b'revision']
682 revision = extrafields[b'revision']
683 delta = mdiff.trivialdiffheader(len(revision)) + revision
683 delta = mdiff.trivialdiffheader(len(revision)) + revision
684 else:
684 else:
685 continue
685 continue
686
686
687 if b'linknode' in filerevision:
687 if b'linknode' in filerevision:
688 linknode = filerevision[b'linknode']
688 linknode = filerevision[b'linknode']
689 else:
689 else:
690 linknode = node
690 linknode = node
691
691
692 yield (
692 yield (
693 node,
693 node,
694 filerevision[b'parents'][0],
694 filerevision[b'parents'][0],
695 filerevision[b'parents'][1],
695 filerevision[b'parents'][1],
696 linknode,
696 linknode,
697 basenode,
697 basenode,
698 delta,
698 delta,
699 # Flags not yet supported.
699 # Flags not yet supported.
700 0,
700 0,
701 )
701 )
702
702
703 progress.increment()
703 progress.increment()
704 remaining -= 1
704 remaining -= 1
705
705
706 progress = repo.ui.makeprogress(
706 progress = repo.ui.makeprogress(
707 _(b'files'),
707 _(b'files'),
708 unit=_(b'chunks'),
708 unit=_(b'chunks'),
709 total=sum(len(v) for v in pycompat.itervalues(fnodes)),
709 total=sum(len(v) for v in pycompat.itervalues(fnodes)),
710 )
710 )
711
711
712 commandmeta = remote.apidescriptor[b'commands'][b'filesdata']
712 commandmeta = remote.apidescriptor[b'commands'][b'filesdata']
713 batchsize = commandmeta.get(b'recommendedbatchsize', 50000)
713 batchsize = commandmeta.get(b'recommendedbatchsize', 50000)
714
714
715 shallowfiles = repository.REPO_FEATURE_SHALLOW_FILE_STORAGE in repo.features
715 shallowfiles = repository.REPO_FEATURE_SHALLOW_FILE_STORAGE in repo.features
716 fields = {b'parents', b'revision'}
716 fields = {b'parents', b'revision'}
717 clrev = repo.changelog.rev
717 clrev = repo.changelog.rev
718
718
719 # There are no guarantees that we'll have ancestor revisions if
719 # There are no guarantees that we'll have ancestor revisions if
720 # a) this repo has shallow file storage b) shallow data fetching is enabled.
720 # a) this repo has shallow file storage b) shallow data fetching is enabled.
721 # Force remote to not delta against possibly unknown revisions when these
721 # Force remote to not delta against possibly unknown revisions when these
722 # conditions hold.
722 # conditions hold.
723 haveparents = not (shallowfiles or shallow)
723 haveparents = not (shallowfiles or shallow)
724
724
725 # Similarly, we may not have calculated linkrevs for all incoming file
725 # Similarly, we may not have calculated linkrevs for all incoming file
726 # revisions. Ask the remote to do work for us in this case.
726 # revisions. Ask the remote to do work for us in this case.
727 if not haveparents:
727 if not haveparents:
728 fields.add(b'linknode')
728 fields.add(b'linknode')
729
729
730 for i in pycompat.xrange(0, len(csets), batchsize):
730 for i in pycompat.xrange(0, len(csets), batchsize):
731 batch = [x for x in csets[i : i + batchsize]]
731 batch = [x for x in csets[i : i + batchsize]]
732 if not batch:
732 if not batch:
733 continue
733 continue
734
734
735 with remote.commandexecutor() as e:
735 with remote.commandexecutor() as e:
736 args = {
736 args = {
737 b'revisions': [
737 b'revisions': [
738 {b'type': b'changesetexplicit', b'nodes': batch,}
738 {b'type': b'changesetexplicit', b'nodes': batch,}
739 ],
739 ],
740 b'fields': fields,
740 b'fields': fields,
741 b'haveparents': haveparents,
741 b'haveparents': haveparents,
742 }
742 }
743
743
744 if pathfilter:
744 if pathfilter:
745 args[b'pathfilter'] = pathfilter
745 args[b'pathfilter'] = pathfilter
746
746
747 objs = e.callcommand(b'filesdata', args).result()
747 objs = e.callcommand(b'filesdata', args).result()
748
748
749 # First object is an overall header.
749 # First object is an overall header.
750 overall = next(objs)
750 overall = next(objs)
751
751
752 # We have overall['totalpaths'] segments.
752 # We have overall['totalpaths'] segments.
753 for i in pycompat.xrange(overall[b'totalpaths']):
753 for i in pycompat.xrange(overall[b'totalpaths']):
754 header = next(objs)
754 header = next(objs)
755
755
756 path = header[b'path']
756 path = header[b'path']
757 store = repo.file(path)
757 store = repo.file(path)
758
758
759 linkrevs = {
759 linkrevs = {
760 fnode: manlinkrevs[mnode]
760 fnode: manlinkrevs[mnode]
761 for fnode, mnode in pycompat.iteritems(fnodes[path])
761 for fnode, mnode in pycompat.iteritems(fnodes[path])
762 }
762 }
763
763
764 def getlinkrev(node):
764 def getlinkrev(node):
765 if node in linkrevs:
765 if node in linkrevs:
766 return linkrevs[node]
766 return linkrevs[node]
767 else:
767 else:
768 return clrev(node)
768 return clrev(node)
769
769
770 store.addgroup(
770 store.addgroup(
771 iterrevisions(objs, header[b'totalitems'], progress),
771 iterrevisions(objs, header[b'totalitems'], progress),
772 getlinkrev,
772 getlinkrev,
773 weakref.proxy(tr),
773 weakref.proxy(tr),
774 maybemissingparents=shallow,
774 maybemissingparents=shallow,
775 )
775 )
General Comments 0
You need to be logged in to leave comments. Login now