##// END OF EJS Templates
stream-clone: yield cache entry in `_entries_walk` too...
marmoute -
r51409:43ed1f12 default
parent child Browse files
Show More
@@ -1,955 +1,960 b''
1 # streamclone.py - producing and consuming streaming repository data
1 # streamclone.py - producing and consuming streaming repository data
2 #
2 #
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8
8
9 import contextlib
9 import contextlib
10 import os
10 import os
11 import struct
11 import struct
12
12
13 from .i18n import _
13 from .i18n import _
14 from .pycompat import open
14 from .pycompat import open
15 from .interfaces import repository
15 from .interfaces import repository
16 from . import (
16 from . import (
17 bookmarks,
17 bookmarks,
18 cacheutil,
18 cacheutil,
19 error,
19 error,
20 narrowspec,
20 narrowspec,
21 phases,
21 phases,
22 pycompat,
22 pycompat,
23 requirements as requirementsmod,
23 requirements as requirementsmod,
24 scmutil,
24 scmutil,
25 store,
25 store,
26 transaction,
26 transaction,
27 util,
27 util,
28 )
28 )
29 from .revlogutils import (
29 from .revlogutils import (
30 nodemap,
30 nodemap,
31 )
31 )
32
32
33
33
34 def new_stream_clone_requirements(default_requirements, streamed_requirements):
34 def new_stream_clone_requirements(default_requirements, streamed_requirements):
35 """determine the final set of requirement for a new stream clone
35 """determine the final set of requirement for a new stream clone
36
36
37 this method combine the "default" requirements that a new repository would
37 this method combine the "default" requirements that a new repository would
38 use with the constaint we get from the stream clone content. We keep local
38 use with the constaint we get from the stream clone content. We keep local
39 configuration choice when possible.
39 configuration choice when possible.
40 """
40 """
41 requirements = set(default_requirements)
41 requirements = set(default_requirements)
42 requirements -= requirementsmod.STREAM_FIXED_REQUIREMENTS
42 requirements -= requirementsmod.STREAM_FIXED_REQUIREMENTS
43 requirements.update(streamed_requirements)
43 requirements.update(streamed_requirements)
44 return requirements
44 return requirements
45
45
46
46
47 def streamed_requirements(repo):
47 def streamed_requirements(repo):
48 """the set of requirement the new clone will have to support
48 """the set of requirement the new clone will have to support
49
49
50 This is used for advertising the stream options and to generate the actual
50 This is used for advertising the stream options and to generate the actual
51 stream content."""
51 stream content."""
52 requiredformats = (
52 requiredformats = (
53 repo.requirements & requirementsmod.STREAM_FIXED_REQUIREMENTS
53 repo.requirements & requirementsmod.STREAM_FIXED_REQUIREMENTS
54 )
54 )
55 return requiredformats
55 return requiredformats
56
56
57
57
58 def canperformstreamclone(pullop, bundle2=False):
58 def canperformstreamclone(pullop, bundle2=False):
59 """Whether it is possible to perform a streaming clone as part of pull.
59 """Whether it is possible to perform a streaming clone as part of pull.
60
60
61 ``bundle2`` will cause the function to consider stream clone through
61 ``bundle2`` will cause the function to consider stream clone through
62 bundle2 and only through bundle2.
62 bundle2 and only through bundle2.
63
63
64 Returns a tuple of (supported, requirements). ``supported`` is True if
64 Returns a tuple of (supported, requirements). ``supported`` is True if
65 streaming clone is supported and False otherwise. ``requirements`` is
65 streaming clone is supported and False otherwise. ``requirements`` is
66 a set of repo requirements from the remote, or ``None`` if stream clone
66 a set of repo requirements from the remote, or ``None`` if stream clone
67 isn't supported.
67 isn't supported.
68 """
68 """
69 repo = pullop.repo
69 repo = pullop.repo
70 remote = pullop.remote
70 remote = pullop.remote
71
71
72 bundle2supported = False
72 bundle2supported = False
73 if pullop.canusebundle2:
73 if pullop.canusebundle2:
74 if b'v2' in pullop.remotebundle2caps.get(b'stream', []):
74 if b'v2' in pullop.remotebundle2caps.get(b'stream', []):
75 bundle2supported = True
75 bundle2supported = True
76 # else
76 # else
77 # Server doesn't support bundle2 stream clone or doesn't support
77 # Server doesn't support bundle2 stream clone or doesn't support
78 # the versions we support. Fall back and possibly allow legacy.
78 # the versions we support. Fall back and possibly allow legacy.
79
79
80 # Ensures legacy code path uses available bundle2.
80 # Ensures legacy code path uses available bundle2.
81 if bundle2supported and not bundle2:
81 if bundle2supported and not bundle2:
82 return False, None
82 return False, None
83 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
83 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
84 elif bundle2 and not bundle2supported:
84 elif bundle2 and not bundle2supported:
85 return False, None
85 return False, None
86
86
87 # Streaming clone only works on empty repositories.
87 # Streaming clone only works on empty repositories.
88 if len(repo):
88 if len(repo):
89 return False, None
89 return False, None
90
90
91 # Streaming clone only works if all data is being requested.
91 # Streaming clone only works if all data is being requested.
92 if pullop.heads:
92 if pullop.heads:
93 return False, None
93 return False, None
94
94
95 streamrequested = pullop.streamclonerequested
95 streamrequested = pullop.streamclonerequested
96
96
97 # If we don't have a preference, let the server decide for us. This
97 # If we don't have a preference, let the server decide for us. This
98 # likely only comes into play in LANs.
98 # likely only comes into play in LANs.
99 if streamrequested is None:
99 if streamrequested is None:
100 # The server can advertise whether to prefer streaming clone.
100 # The server can advertise whether to prefer streaming clone.
101 streamrequested = remote.capable(b'stream-preferred')
101 streamrequested = remote.capable(b'stream-preferred')
102
102
103 if not streamrequested:
103 if not streamrequested:
104 return False, None
104 return False, None
105
105
106 # In order for stream clone to work, the client has to support all the
106 # In order for stream clone to work, the client has to support all the
107 # requirements advertised by the server.
107 # requirements advertised by the server.
108 #
108 #
109 # The server advertises its requirements via the "stream" and "streamreqs"
109 # The server advertises its requirements via the "stream" and "streamreqs"
110 # capability. "stream" (a value-less capability) is advertised if and only
110 # capability. "stream" (a value-less capability) is advertised if and only
111 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
111 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
112 # is advertised and contains a comma-delimited list of requirements.
112 # is advertised and contains a comma-delimited list of requirements.
113 requirements = set()
113 requirements = set()
114 if remote.capable(b'stream'):
114 if remote.capable(b'stream'):
115 requirements.add(requirementsmod.REVLOGV1_REQUIREMENT)
115 requirements.add(requirementsmod.REVLOGV1_REQUIREMENT)
116 else:
116 else:
117 streamreqs = remote.capable(b'streamreqs')
117 streamreqs = remote.capable(b'streamreqs')
118 # This is weird and shouldn't happen with modern servers.
118 # This is weird and shouldn't happen with modern servers.
119 if not streamreqs:
119 if not streamreqs:
120 pullop.repo.ui.warn(
120 pullop.repo.ui.warn(
121 _(
121 _(
122 b'warning: stream clone requested but server has them '
122 b'warning: stream clone requested but server has them '
123 b'disabled\n'
123 b'disabled\n'
124 )
124 )
125 )
125 )
126 return False, None
126 return False, None
127
127
128 streamreqs = set(streamreqs.split(b','))
128 streamreqs = set(streamreqs.split(b','))
129 # Server requires something we don't support. Bail.
129 # Server requires something we don't support. Bail.
130 missingreqs = streamreqs - repo.supported
130 missingreqs = streamreqs - repo.supported
131 if missingreqs:
131 if missingreqs:
132 pullop.repo.ui.warn(
132 pullop.repo.ui.warn(
133 _(
133 _(
134 b'warning: stream clone requested but client is missing '
134 b'warning: stream clone requested but client is missing '
135 b'requirements: %s\n'
135 b'requirements: %s\n'
136 )
136 )
137 % b', '.join(sorted(missingreqs))
137 % b', '.join(sorted(missingreqs))
138 )
138 )
139 pullop.repo.ui.warn(
139 pullop.repo.ui.warn(
140 _(
140 _(
141 b'(see https://www.mercurial-scm.org/wiki/MissingRequirement '
141 b'(see https://www.mercurial-scm.org/wiki/MissingRequirement '
142 b'for more information)\n'
142 b'for more information)\n'
143 )
143 )
144 )
144 )
145 return False, None
145 return False, None
146 requirements = streamreqs
146 requirements = streamreqs
147
147
148 return True, requirements
148 return True, requirements
149
149
150
150
151 def maybeperformlegacystreamclone(pullop):
151 def maybeperformlegacystreamclone(pullop):
152 """Possibly perform a legacy stream clone operation.
152 """Possibly perform a legacy stream clone operation.
153
153
154 Legacy stream clones are performed as part of pull but before all other
154 Legacy stream clones are performed as part of pull but before all other
155 operations.
155 operations.
156
156
157 A legacy stream clone will not be performed if a bundle2 stream clone is
157 A legacy stream clone will not be performed if a bundle2 stream clone is
158 supported.
158 supported.
159 """
159 """
160 from . import localrepo
160 from . import localrepo
161
161
162 supported, requirements = canperformstreamclone(pullop)
162 supported, requirements = canperformstreamclone(pullop)
163
163
164 if not supported:
164 if not supported:
165 return
165 return
166
166
167 repo = pullop.repo
167 repo = pullop.repo
168 remote = pullop.remote
168 remote = pullop.remote
169
169
170 # Save remote branchmap. We will use it later to speed up branchcache
170 # Save remote branchmap. We will use it later to speed up branchcache
171 # creation.
171 # creation.
172 rbranchmap = None
172 rbranchmap = None
173 if remote.capable(b'branchmap'):
173 if remote.capable(b'branchmap'):
174 with remote.commandexecutor() as e:
174 with remote.commandexecutor() as e:
175 rbranchmap = e.callcommand(b'branchmap', {}).result()
175 rbranchmap = e.callcommand(b'branchmap', {}).result()
176
176
177 repo.ui.status(_(b'streaming all changes\n'))
177 repo.ui.status(_(b'streaming all changes\n'))
178
178
179 with remote.commandexecutor() as e:
179 with remote.commandexecutor() as e:
180 fp = e.callcommand(b'stream_out', {}).result()
180 fp = e.callcommand(b'stream_out', {}).result()
181
181
182 # TODO strictly speaking, this code should all be inside the context
182 # TODO strictly speaking, this code should all be inside the context
183 # manager because the context manager is supposed to ensure all wire state
183 # manager because the context manager is supposed to ensure all wire state
184 # is flushed when exiting. But the legacy peers don't do this, so it
184 # is flushed when exiting. But the legacy peers don't do this, so it
185 # doesn't matter.
185 # doesn't matter.
186 l = fp.readline()
186 l = fp.readline()
187 try:
187 try:
188 resp = int(l)
188 resp = int(l)
189 except ValueError:
189 except ValueError:
190 raise error.ResponseError(
190 raise error.ResponseError(
191 _(b'unexpected response from remote server:'), l
191 _(b'unexpected response from remote server:'), l
192 )
192 )
193 if resp == 1:
193 if resp == 1:
194 raise error.Abort(_(b'operation forbidden by server'))
194 raise error.Abort(_(b'operation forbidden by server'))
195 elif resp == 2:
195 elif resp == 2:
196 raise error.Abort(_(b'locking the remote repository failed'))
196 raise error.Abort(_(b'locking the remote repository failed'))
197 elif resp != 0:
197 elif resp != 0:
198 raise error.Abort(_(b'the server sent an unknown error code'))
198 raise error.Abort(_(b'the server sent an unknown error code'))
199
199
200 l = fp.readline()
200 l = fp.readline()
201 try:
201 try:
202 filecount, bytecount = map(int, l.split(b' ', 1))
202 filecount, bytecount = map(int, l.split(b' ', 1))
203 except (ValueError, TypeError):
203 except (ValueError, TypeError):
204 raise error.ResponseError(
204 raise error.ResponseError(
205 _(b'unexpected response from remote server:'), l
205 _(b'unexpected response from remote server:'), l
206 )
206 )
207
207
208 with repo.lock():
208 with repo.lock():
209 consumev1(repo, fp, filecount, bytecount)
209 consumev1(repo, fp, filecount, bytecount)
210 repo.requirements = new_stream_clone_requirements(
210 repo.requirements = new_stream_clone_requirements(
211 repo.requirements,
211 repo.requirements,
212 requirements,
212 requirements,
213 )
213 )
214 repo.svfs.options = localrepo.resolvestorevfsoptions(
214 repo.svfs.options = localrepo.resolvestorevfsoptions(
215 repo.ui, repo.requirements, repo.features
215 repo.ui, repo.requirements, repo.features
216 )
216 )
217 scmutil.writereporequirements(repo)
217 scmutil.writereporequirements(repo)
218 nodemap.post_stream_cleanup(repo)
218 nodemap.post_stream_cleanup(repo)
219
219
220 if rbranchmap:
220 if rbranchmap:
221 repo._branchcaches.replace(repo, rbranchmap)
221 repo._branchcaches.replace(repo, rbranchmap)
222
222
223 repo.invalidate()
223 repo.invalidate()
224
224
225
225
226 def allowservergeneration(repo):
226 def allowservergeneration(repo):
227 """Whether streaming clones are allowed from the server."""
227 """Whether streaming clones are allowed from the server."""
228 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
228 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
229 return False
229 return False
230
230
231 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
231 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
232 return False
232 return False
233
233
234 # The way stream clone works makes it impossible to hide secret changesets.
234 # The way stream clone works makes it impossible to hide secret changesets.
235 # So don't allow this by default.
235 # So don't allow this by default.
236 secret = phases.hassecret(repo)
236 secret = phases.hassecret(repo)
237 if secret:
237 if secret:
238 return repo.ui.configbool(b'server', b'uncompressedallowsecret')
238 return repo.ui.configbool(b'server', b'uncompressedallowsecret')
239
239
240 return True
240 return True
241
241
242
242
243 # This is it's own function so extensions can override it.
243 # This is it's own function so extensions can override it.
244 def _walkstreamfiles(repo, matcher=None, phase=False, obsolescence=False):
244 def _walkstreamfiles(repo, matcher=None, phase=False, obsolescence=False):
245 return repo.store.walk(matcher, phase=phase, obsolescence=obsolescence)
245 return repo.store.walk(matcher, phase=phase, obsolescence=obsolescence)
246
246
247
247
248 def generatev1(repo):
248 def generatev1(repo):
249 """Emit content for version 1 of a streaming clone.
249 """Emit content for version 1 of a streaming clone.
250
250
251 This returns a 3-tuple of (file count, byte size, data iterator).
251 This returns a 3-tuple of (file count, byte size, data iterator).
252
252
253 The data iterator consists of N entries for each file being transferred.
253 The data iterator consists of N entries for each file being transferred.
254 Each file entry starts as a line with the file name and integer size
254 Each file entry starts as a line with the file name and integer size
255 delimited by a null byte.
255 delimited by a null byte.
256
256
257 The raw file data follows. Following the raw file data is the next file
257 The raw file data follows. Following the raw file data is the next file
258 entry, or EOF.
258 entry, or EOF.
259
259
260 When used on the wire protocol, an additional line indicating protocol
260 When used on the wire protocol, an additional line indicating protocol
261 success will be prepended to the stream. This function is not responsible
261 success will be prepended to the stream. This function is not responsible
262 for adding it.
262 for adding it.
263
263
264 This function will obtain a repository lock to ensure a consistent view of
264 This function will obtain a repository lock to ensure a consistent view of
265 the store is captured. It therefore may raise LockError.
265 the store is captured. It therefore may raise LockError.
266 """
266 """
267 entries = []
267 entries = []
268 total_bytes = 0
268 total_bytes = 0
269 # Get consistent snapshot of repo, lock during scan.
269 # Get consistent snapshot of repo, lock during scan.
270 with repo.lock():
270 with repo.lock():
271 repo.ui.debug(b'scanning\n')
271 repo.ui.debug(b'scanning\n')
272 for entry in _walkstreamfiles(repo):
272 for entry in _walkstreamfiles(repo):
273 for f in entry.files():
273 for f in entry.files():
274 file_size = f.file_size(repo.store.vfs)
274 file_size = f.file_size(repo.store.vfs)
275 if file_size:
275 if file_size:
276 entries.append((f.unencoded_path, file_size))
276 entries.append((f.unencoded_path, file_size))
277 total_bytes += file_size
277 total_bytes += file_size
278 _test_sync_point_walk_1(repo)
278 _test_sync_point_walk_1(repo)
279 _test_sync_point_walk_2(repo)
279 _test_sync_point_walk_2(repo)
280
280
281 repo.ui.debug(
281 repo.ui.debug(
282 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
282 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
283 )
283 )
284
284
285 svfs = repo.svfs
285 svfs = repo.svfs
286 debugflag = repo.ui.debugflag
286 debugflag = repo.ui.debugflag
287
287
288 def emitrevlogdata():
288 def emitrevlogdata():
289 for name, size in entries:
289 for name, size in entries:
290 if debugflag:
290 if debugflag:
291 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
291 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
292 # partially encode name over the wire for backwards compat
292 # partially encode name over the wire for backwards compat
293 yield b'%s\0%d\n' % (store.encodedir(name), size)
293 yield b'%s\0%d\n' % (store.encodedir(name), size)
294 # auditing at this stage is both pointless (paths are already
294 # auditing at this stage is both pointless (paths are already
295 # trusted by the local repo) and expensive
295 # trusted by the local repo) and expensive
296 with svfs(name, b'rb', auditpath=False) as fp:
296 with svfs(name, b'rb', auditpath=False) as fp:
297 if size <= 65536:
297 if size <= 65536:
298 yield fp.read(size)
298 yield fp.read(size)
299 else:
299 else:
300 for chunk in util.filechunkiter(fp, limit=size):
300 for chunk in util.filechunkiter(fp, limit=size):
301 yield chunk
301 yield chunk
302
302
303 return len(entries), total_bytes, emitrevlogdata()
303 return len(entries), total_bytes, emitrevlogdata()
304
304
305
305
306 def generatev1wireproto(repo):
306 def generatev1wireproto(repo):
307 """Emit content for version 1 of streaming clone suitable for the wire.
307 """Emit content for version 1 of streaming clone suitable for the wire.
308
308
309 This is the data output from ``generatev1()`` with 2 header lines. The
309 This is the data output from ``generatev1()`` with 2 header lines. The
310 first line indicates overall success. The 2nd contains the file count and
310 first line indicates overall success. The 2nd contains the file count and
311 byte size of payload.
311 byte size of payload.
312
312
313 The success line contains "0" for success, "1" for stream generation not
313 The success line contains "0" for success, "1" for stream generation not
314 allowed, and "2" for error locking the repository (possibly indicating
314 allowed, and "2" for error locking the repository (possibly indicating
315 a permissions error for the server process).
315 a permissions error for the server process).
316 """
316 """
317 if not allowservergeneration(repo):
317 if not allowservergeneration(repo):
318 yield b'1\n'
318 yield b'1\n'
319 return
319 return
320
320
321 try:
321 try:
322 filecount, bytecount, it = generatev1(repo)
322 filecount, bytecount, it = generatev1(repo)
323 except error.LockError:
323 except error.LockError:
324 yield b'2\n'
324 yield b'2\n'
325 return
325 return
326
326
327 # Indicates successful response.
327 # Indicates successful response.
328 yield b'0\n'
328 yield b'0\n'
329 yield b'%d %d\n' % (filecount, bytecount)
329 yield b'%d %d\n' % (filecount, bytecount)
330 for chunk in it:
330 for chunk in it:
331 yield chunk
331 yield chunk
332
332
333
333
334 def generatebundlev1(repo, compression=b'UN'):
334 def generatebundlev1(repo, compression=b'UN'):
335 """Emit content for version 1 of a stream clone bundle.
335 """Emit content for version 1 of a stream clone bundle.
336
336
337 The first 4 bytes of the output ("HGS1") denote this as stream clone
337 The first 4 bytes of the output ("HGS1") denote this as stream clone
338 bundle version 1.
338 bundle version 1.
339
339
340 The next 2 bytes indicate the compression type. Only "UN" is currently
340 The next 2 bytes indicate the compression type. Only "UN" is currently
341 supported.
341 supported.
342
342
343 The next 16 bytes are two 64-bit big endian unsigned integers indicating
343 The next 16 bytes are two 64-bit big endian unsigned integers indicating
344 file count and byte count, respectively.
344 file count and byte count, respectively.
345
345
346 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
346 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
347 of the requirements string, including a trailing \0. The following N bytes
347 of the requirements string, including a trailing \0. The following N bytes
348 are the requirements string, which is ASCII containing a comma-delimited
348 are the requirements string, which is ASCII containing a comma-delimited
349 list of repo requirements that are needed to support the data.
349 list of repo requirements that are needed to support the data.
350
350
351 The remaining content is the output of ``generatev1()`` (which may be
351 The remaining content is the output of ``generatev1()`` (which may be
352 compressed in the future).
352 compressed in the future).
353
353
354 Returns a tuple of (requirements, data generator).
354 Returns a tuple of (requirements, data generator).
355 """
355 """
356 if compression != b'UN':
356 if compression != b'UN':
357 raise ValueError(b'we do not support the compression argument yet')
357 raise ValueError(b'we do not support the compression argument yet')
358
358
359 requirements = streamed_requirements(repo)
359 requirements = streamed_requirements(repo)
360 requires = b','.join(sorted(requirements))
360 requires = b','.join(sorted(requirements))
361
361
362 def gen():
362 def gen():
363 yield b'HGS1'
363 yield b'HGS1'
364 yield compression
364 yield compression
365
365
366 filecount, bytecount, it = generatev1(repo)
366 filecount, bytecount, it = generatev1(repo)
367 repo.ui.status(
367 repo.ui.status(
368 _(b'writing %d bytes for %d files\n') % (bytecount, filecount)
368 _(b'writing %d bytes for %d files\n') % (bytecount, filecount)
369 )
369 )
370
370
371 yield struct.pack(b'>QQ', filecount, bytecount)
371 yield struct.pack(b'>QQ', filecount, bytecount)
372 yield struct.pack(b'>H', len(requires) + 1)
372 yield struct.pack(b'>H', len(requires) + 1)
373 yield requires + b'\0'
373 yield requires + b'\0'
374
374
375 # This is where we'll add compression in the future.
375 # This is where we'll add compression in the future.
376 assert compression == b'UN'
376 assert compression == b'UN'
377
377
378 progress = repo.ui.makeprogress(
378 progress = repo.ui.makeprogress(
379 _(b'bundle'), total=bytecount, unit=_(b'bytes')
379 _(b'bundle'), total=bytecount, unit=_(b'bytes')
380 )
380 )
381 progress.update(0)
381 progress.update(0)
382
382
383 for chunk in it:
383 for chunk in it:
384 progress.increment(step=len(chunk))
384 progress.increment(step=len(chunk))
385 yield chunk
385 yield chunk
386
386
387 progress.complete()
387 progress.complete()
388
388
389 return requirements, gen()
389 return requirements, gen()
390
390
391
391
392 def consumev1(repo, fp, filecount, bytecount):
392 def consumev1(repo, fp, filecount, bytecount):
393 """Apply the contents from version 1 of a streaming clone file handle.
393 """Apply the contents from version 1 of a streaming clone file handle.
394
394
395 This takes the output from "stream_out" and applies it to the specified
395 This takes the output from "stream_out" and applies it to the specified
396 repository.
396 repository.
397
397
398 Like "stream_out," the status line added by the wire protocol is not
398 Like "stream_out," the status line added by the wire protocol is not
399 handled by this function.
399 handled by this function.
400 """
400 """
401 with repo.lock():
401 with repo.lock():
402 repo.ui.status(
402 repo.ui.status(
403 _(b'%d files to transfer, %s of data\n')
403 _(b'%d files to transfer, %s of data\n')
404 % (filecount, util.bytecount(bytecount))
404 % (filecount, util.bytecount(bytecount))
405 )
405 )
406 progress = repo.ui.makeprogress(
406 progress = repo.ui.makeprogress(
407 _(b'clone'), total=bytecount, unit=_(b'bytes')
407 _(b'clone'), total=bytecount, unit=_(b'bytes')
408 )
408 )
409 progress.update(0)
409 progress.update(0)
410 start = util.timer()
410 start = util.timer()
411
411
412 # TODO: get rid of (potential) inconsistency
412 # TODO: get rid of (potential) inconsistency
413 #
413 #
414 # If transaction is started and any @filecache property is
414 # If transaction is started and any @filecache property is
415 # changed at this point, it causes inconsistency between
415 # changed at this point, it causes inconsistency between
416 # in-memory cached property and streamclone-ed file on the
416 # in-memory cached property and streamclone-ed file on the
417 # disk. Nested transaction prevents transaction scope "clone"
417 # disk. Nested transaction prevents transaction scope "clone"
418 # below from writing in-memory changes out at the end of it,
418 # below from writing in-memory changes out at the end of it,
419 # even though in-memory changes are discarded at the end of it
419 # even though in-memory changes are discarded at the end of it
420 # regardless of transaction nesting.
420 # regardless of transaction nesting.
421 #
421 #
422 # But transaction nesting can't be simply prohibited, because
422 # But transaction nesting can't be simply prohibited, because
423 # nesting occurs also in ordinary case (e.g. enabling
423 # nesting occurs also in ordinary case (e.g. enabling
424 # clonebundles).
424 # clonebundles).
425
425
426 with repo.transaction(b'clone'):
426 with repo.transaction(b'clone'):
427 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
427 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
428 for i in range(filecount):
428 for i in range(filecount):
429 # XXX doesn't support '\n' or '\r' in filenames
429 # XXX doesn't support '\n' or '\r' in filenames
430 l = fp.readline()
430 l = fp.readline()
431 try:
431 try:
432 name, size = l.split(b'\0', 1)
432 name, size = l.split(b'\0', 1)
433 size = int(size)
433 size = int(size)
434 except (ValueError, TypeError):
434 except (ValueError, TypeError):
435 raise error.ResponseError(
435 raise error.ResponseError(
436 _(b'unexpected response from remote server:'), l
436 _(b'unexpected response from remote server:'), l
437 )
437 )
438 if repo.ui.debugflag:
438 if repo.ui.debugflag:
439 repo.ui.debug(
439 repo.ui.debug(
440 b'adding %s (%s)\n' % (name, util.bytecount(size))
440 b'adding %s (%s)\n' % (name, util.bytecount(size))
441 )
441 )
442 # for backwards compat, name was partially encoded
442 # for backwards compat, name was partially encoded
443 path = store.decodedir(name)
443 path = store.decodedir(name)
444 with repo.svfs(path, b'w', backgroundclose=True) as ofp:
444 with repo.svfs(path, b'w', backgroundclose=True) as ofp:
445 for chunk in util.filechunkiter(fp, limit=size):
445 for chunk in util.filechunkiter(fp, limit=size):
446 progress.increment(step=len(chunk))
446 progress.increment(step=len(chunk))
447 ofp.write(chunk)
447 ofp.write(chunk)
448
448
449 # force @filecache properties to be reloaded from
449 # force @filecache properties to be reloaded from
450 # streamclone-ed file at next access
450 # streamclone-ed file at next access
451 repo.invalidate(clearfilecache=True)
451 repo.invalidate(clearfilecache=True)
452
452
453 elapsed = util.timer() - start
453 elapsed = util.timer() - start
454 if elapsed <= 0:
454 if elapsed <= 0:
455 elapsed = 0.001
455 elapsed = 0.001
456 progress.complete()
456 progress.complete()
457 repo.ui.status(
457 repo.ui.status(
458 _(b'transferred %s in %.1f seconds (%s/sec)\n')
458 _(b'transferred %s in %.1f seconds (%s/sec)\n')
459 % (
459 % (
460 util.bytecount(bytecount),
460 util.bytecount(bytecount),
461 elapsed,
461 elapsed,
462 util.bytecount(bytecount / elapsed),
462 util.bytecount(bytecount / elapsed),
463 )
463 )
464 )
464 )
465
465
466
466
467 def readbundle1header(fp):
467 def readbundle1header(fp):
468 compression = fp.read(2)
468 compression = fp.read(2)
469 if compression != b'UN':
469 if compression != b'UN':
470 raise error.Abort(
470 raise error.Abort(
471 _(
471 _(
472 b'only uncompressed stream clone bundles are '
472 b'only uncompressed stream clone bundles are '
473 b'supported; got %s'
473 b'supported; got %s'
474 )
474 )
475 % compression
475 % compression
476 )
476 )
477
477
478 filecount, bytecount = struct.unpack(b'>QQ', fp.read(16))
478 filecount, bytecount = struct.unpack(b'>QQ', fp.read(16))
479 requireslen = struct.unpack(b'>H', fp.read(2))[0]
479 requireslen = struct.unpack(b'>H', fp.read(2))[0]
480 requires = fp.read(requireslen)
480 requires = fp.read(requireslen)
481
481
482 if not requires.endswith(b'\0'):
482 if not requires.endswith(b'\0'):
483 raise error.Abort(
483 raise error.Abort(
484 _(
484 _(
485 b'malformed stream clone bundle: '
485 b'malformed stream clone bundle: '
486 b'requirements not properly encoded'
486 b'requirements not properly encoded'
487 )
487 )
488 )
488 )
489
489
490 requirements = set(requires.rstrip(b'\0').split(b','))
490 requirements = set(requires.rstrip(b'\0').split(b','))
491
491
492 return filecount, bytecount, requirements
492 return filecount, bytecount, requirements
493
493
494
494
495 def applybundlev1(repo, fp):
495 def applybundlev1(repo, fp):
496 """Apply the content from a stream clone bundle version 1.
496 """Apply the content from a stream clone bundle version 1.
497
497
498 We assume the 4 byte header has been read and validated and the file handle
498 We assume the 4 byte header has been read and validated and the file handle
499 is at the 2 byte compression identifier.
499 is at the 2 byte compression identifier.
500 """
500 """
501 if len(repo):
501 if len(repo):
502 raise error.Abort(
502 raise error.Abort(
503 _(b'cannot apply stream clone bundle on non-empty repo')
503 _(b'cannot apply stream clone bundle on non-empty repo')
504 )
504 )
505
505
506 filecount, bytecount, requirements = readbundle1header(fp)
506 filecount, bytecount, requirements = readbundle1header(fp)
507 missingreqs = requirements - repo.supported
507 missingreqs = requirements - repo.supported
508 if missingreqs:
508 if missingreqs:
509 raise error.Abort(
509 raise error.Abort(
510 _(b'unable to apply stream clone: unsupported format: %s')
510 _(b'unable to apply stream clone: unsupported format: %s')
511 % b', '.join(sorted(missingreqs))
511 % b', '.join(sorted(missingreqs))
512 )
512 )
513
513
514 consumev1(repo, fp, filecount, bytecount)
514 consumev1(repo, fp, filecount, bytecount)
515 nodemap.post_stream_cleanup(repo)
515 nodemap.post_stream_cleanup(repo)
516
516
517
517
518 class streamcloneapplier:
518 class streamcloneapplier:
519 """Class to manage applying streaming clone bundles.
519 """Class to manage applying streaming clone bundles.
520
520
521 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
521 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
522 readers to perform bundle type-specific functionality.
522 readers to perform bundle type-specific functionality.
523 """
523 """
524
524
525 def __init__(self, fh):
525 def __init__(self, fh):
526 self._fh = fh
526 self._fh = fh
527
527
528 def apply(self, repo):
528 def apply(self, repo):
529 return applybundlev1(repo, self._fh)
529 return applybundlev1(repo, self._fh)
530
530
531
531
532 # type of file to stream
532 # type of file to stream
533 _fileappend = 0 # append only file
533 _fileappend = 0 # append only file
534 _filefull = 1 # full snapshot file
534 _filefull = 1 # full snapshot file
535
535
536 # Source of the file
536 # Source of the file
537 _srcstore = b's' # store (svfs)
537 _srcstore = b's' # store (svfs)
538 _srccache = b'c' # cache (cache)
538 _srccache = b'c' # cache (cache)
539
539
540 # This is it's own function so extensions can override it.
540 # This is it's own function so extensions can override it.
541 def _walkstreamfullstorefiles(repo):
541 def _walkstreamfullstorefiles(repo):
542 """list snapshot file from the store"""
542 """list snapshot file from the store"""
543 fnames = []
543 fnames = []
544 if not repo.publishing():
544 if not repo.publishing():
545 fnames.append(b'phaseroots')
545 fnames.append(b'phaseroots')
546 return fnames
546 return fnames
547
547
548
548
549 def _filterfull(entry, copy, vfsmap):
549 def _filterfull(entry, copy, vfsmap):
550 """actually copy the snapshot files"""
550 """actually copy the snapshot files"""
551 src, name, ftype, data = entry
551 src, name, ftype, data = entry
552 if ftype != _filefull:
552 if ftype != _filefull:
553 return entry
553 return entry
554 return (src, name, ftype, copy(vfsmap[src].join(name)))
554 return (src, name, ftype, copy(vfsmap[src].join(name)))
555
555
556
556
557 @contextlib.contextmanager
557 @contextlib.contextmanager
558 def maketempcopies():
558 def maketempcopies():
559 """return a function to temporary copy file"""
559 """return a function to temporary copy file"""
560
560
561 files = []
561 files = []
562 dst_dir = pycompat.mkdtemp(prefix=b'hg-clone-')
562 dst_dir = pycompat.mkdtemp(prefix=b'hg-clone-')
563 try:
563 try:
564
564
565 def copy(src):
565 def copy(src):
566 fd, dst = pycompat.mkstemp(
566 fd, dst = pycompat.mkstemp(
567 prefix=os.path.basename(src), dir=dst_dir
567 prefix=os.path.basename(src), dir=dst_dir
568 )
568 )
569 os.close(fd)
569 os.close(fd)
570 files.append(dst)
570 files.append(dst)
571 util.copyfiles(src, dst, hardlink=True)
571 util.copyfiles(src, dst, hardlink=True)
572 return dst
572 return dst
573
573
574 yield copy
574 yield copy
575 finally:
575 finally:
576 for tmp in files:
576 for tmp in files:
577 util.tryunlink(tmp)
577 util.tryunlink(tmp)
578 util.tryrmdir(dst_dir)
578 util.tryrmdir(dst_dir)
579
579
580
580
581 def _makemap(repo):
581 def _makemap(repo):
582 """make a (src -> vfs) map for the repo"""
582 """make a (src -> vfs) map for the repo"""
583 vfsmap = {
583 vfsmap = {
584 _srcstore: repo.svfs,
584 _srcstore: repo.svfs,
585 _srccache: repo.cachevfs,
585 _srccache: repo.cachevfs,
586 }
586 }
587 # we keep repo.vfs out of the on purpose, ther are too many danger there
587 # we keep repo.vfs out of the on purpose, ther are too many danger there
588 # (eg: .hg/hgrc)
588 # (eg: .hg/hgrc)
589 assert repo.vfs not in vfsmap.values()
589 assert repo.vfs not in vfsmap.values()
590
590
591 return vfsmap
591 return vfsmap
592
592
593
593
594 def _emit2(repo, entries, totalfilesize):
594 def _emit2(repo, entries, totalfilesize):
595 """actually emit the stream bundle"""
595 """actually emit the stream bundle"""
596 vfsmap = _makemap(repo)
596 vfsmap = _makemap(repo)
597 # we keep repo.vfs out of the on purpose, ther are too many danger there
597 # we keep repo.vfs out of the on purpose, ther are too many danger there
598 # (eg: .hg/hgrc),
598 # (eg: .hg/hgrc),
599 #
599 #
600 # this assert is duplicated (from _makemap) as author might think this is
600 # this assert is duplicated (from _makemap) as author might think this is
601 # fine, while this is really not fine.
601 # fine, while this is really not fine.
602 if repo.vfs in vfsmap.values():
602 if repo.vfs in vfsmap.values():
603 raise error.ProgrammingError(
603 raise error.ProgrammingError(
604 b'repo.vfs must not be added to vfsmap for security reasons'
604 b'repo.vfs must not be added to vfsmap for security reasons'
605 )
605 )
606
606
607 progress = repo.ui.makeprogress(
607 progress = repo.ui.makeprogress(
608 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
608 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
609 )
609 )
610 progress.update(0)
610 progress.update(0)
611 with maketempcopies() as copy, progress:
611 with maketempcopies() as copy, progress:
612 # copy is delayed until we are in the try
612 # copy is delayed until we are in the try
613 entries = [_filterfull(e, copy, vfsmap) for e in entries]
613 entries = [_filterfull(e, copy, vfsmap) for e in entries]
614 yield None # this release the lock on the repository
614 yield None # this release the lock on the repository
615 totalbytecount = 0
615 totalbytecount = 0
616
616
617 for src, name, ftype, data in entries:
617 for src, name, ftype, data in entries:
618 vfs = vfsmap[src]
618 vfs = vfsmap[src]
619 yield src
619 yield src
620 yield util.uvarintencode(len(name))
620 yield util.uvarintencode(len(name))
621 if ftype == _fileappend:
621 if ftype == _fileappend:
622 fp = vfs(name)
622 fp = vfs(name)
623 size = data
623 size = data
624 elif ftype == _filefull:
624 elif ftype == _filefull:
625 fp = open(data, b'rb')
625 fp = open(data, b'rb')
626 size = util.fstat(fp).st_size
626 size = util.fstat(fp).st_size
627 bytecount = 0
627 bytecount = 0
628 try:
628 try:
629 yield util.uvarintencode(size)
629 yield util.uvarintencode(size)
630 yield name
630 yield name
631 if size <= 65536:
631 if size <= 65536:
632 chunks = (fp.read(size),)
632 chunks = (fp.read(size),)
633 else:
633 else:
634 chunks = util.filechunkiter(fp, limit=size)
634 chunks = util.filechunkiter(fp, limit=size)
635 for chunk in chunks:
635 for chunk in chunks:
636 bytecount += len(chunk)
636 bytecount += len(chunk)
637 totalbytecount += len(chunk)
637 totalbytecount += len(chunk)
638 progress.update(totalbytecount)
638 progress.update(totalbytecount)
639 yield chunk
639 yield chunk
640 if bytecount != size:
640 if bytecount != size:
641 # Would most likely be caused by a race due to `hg strip` or
641 # Would most likely be caused by a race due to `hg strip` or
642 # a revlog split
642 # a revlog split
643 raise error.Abort(
643 raise error.Abort(
644 _(
644 _(
645 b'clone could only read %d bytes from %s, but '
645 b'clone could only read %d bytes from %s, but '
646 b'expected %d bytes'
646 b'expected %d bytes'
647 )
647 )
648 % (bytecount, name, size)
648 % (bytecount, name, size)
649 )
649 )
650 finally:
650 finally:
651 fp.close()
651 fp.close()
652
652
653
653
654 def _test_sync_point_walk_1(repo):
654 def _test_sync_point_walk_1(repo):
655 """a function for synchronisation during tests"""
655 """a function for synchronisation during tests"""
656
656
657
657
658 def _test_sync_point_walk_2(repo):
658 def _test_sync_point_walk_2(repo):
659 """a function for synchronisation during tests"""
659 """a function for synchronisation during tests"""
660
660
661
661
662 def _entries_walk(repo, includes, excludes, includeobsmarkers):
662 def _entries_walk(repo, includes, excludes, includeobsmarkers):
663 """emit a seris of files information useful to clone a repo
663 """emit a seris of files information useful to clone a repo
664
664
665 return (vfs-key, entry) iterator
665 return (vfs-key, entry) iterator
666
666
667 Where `entry` is StoreEntry. (used even for cache entries)
667 Where `entry` is StoreEntry. (used even for cache entries)
668 """
668 """
669 assert repo._currentlock(repo._lockref) is not None
669 assert repo._currentlock(repo._lockref) is not None
670
670
671 matcher = None
671 matcher = None
672 if includes or excludes:
672 if includes or excludes:
673 matcher = narrowspec.match(repo.root, includes, excludes)
673 matcher = narrowspec.match(repo.root, includes, excludes)
674
674
675 phase = not repo.publishing()
675 phase = not repo.publishing()
676 entries = _walkstreamfiles(
676 entries = _walkstreamfiles(
677 repo,
677 repo,
678 matcher,
678 matcher,
679 phase=phase,
679 phase=phase,
680 obsolescence=includeobsmarkers,
680 obsolescence=includeobsmarkers,
681 )
681 )
682 for entry in entries:
682 for entry in entries:
683 yield (_srcstore, entry)
683 yield (_srcstore, entry)
684
684
685 for name in cacheutil.cachetocopy(repo):
686 if repo.cachevfs.exists(name):
687 # not really a StoreEntry, but close enough
688 entry = store.SimpleStoreEntry(
689 entry_path=name,
690 is_volatile=True,
691 )
692 yield (_srccache, entry)
693
685
694
686 def _v2_walk(repo, includes, excludes, includeobsmarkers):
695 def _v2_walk(repo, includes, excludes, includeobsmarkers):
687 """emit a seris of files information useful to clone a repo
696 """emit a seris of files information useful to clone a repo
688
697
689 return (entries, totalfilesize)
698 return (entries, totalfilesize)
690
699
691 entries is a list of tuple (vfs-key, file-path, file-type, size)
700 entries is a list of tuple (vfs-key, file-path, file-type, size)
692
701
693 - `vfs-key`: is a key to the right vfs to write the file (see _makemap)
702 - `vfs-key`: is a key to the right vfs to write the file (see _makemap)
694 - `name`: file path of the file to copy (to be feed to the vfss)
703 - `name`: file path of the file to copy (to be feed to the vfss)
695 - `file-type`: do this file need to be copied with the source lock ?
704 - `file-type`: do this file need to be copied with the source lock ?
696 - `size`: the size of the file (or None)
705 - `size`: the size of the file (or None)
697 """
706 """
698 assert repo._currentlock(repo._lockref) is not None
707 assert repo._currentlock(repo._lockref) is not None
699 files = []
708 files = []
700 totalfilesize = 0
709 totalfilesize = 0
701
710
702 vfsmap = _makemap(repo)
711 vfsmap = _makemap(repo)
703 entries = _entries_walk(repo, includes, excludes, includeobsmarkers)
712 entries = _entries_walk(repo, includes, excludes, includeobsmarkers)
704 for vfs_key, entry in entries:
713 for vfs_key, entry in entries:
705 vfs = vfsmap[vfs_key]
714 vfs = vfsmap[vfs_key]
706 for f in entry.files():
715 for f in entry.files():
707 file_size = f.file_size(vfs)
716 file_size = f.file_size(vfs)
708 if file_size:
717 if file_size:
709 ft = _fileappend
718 ft = _fileappend
710 if f.is_volatile:
719 if f.is_volatile:
711 ft = _filefull
720 ft = _filefull
712 files.append((vfs_key, f.unencoded_path, ft, file_size))
721 files.append((vfs_key, f.unencoded_path, ft, file_size))
713 totalfilesize += file_size
722 totalfilesize += file_size
714 for name in cacheutil.cachetocopy(repo):
715 if repo.cachevfs.exists(name):
716 totalfilesize += repo.cachevfs.lstat(name).st_size
717 files.append((_srccache, name, _filefull, None))
718 return files, totalfilesize
723 return files, totalfilesize
719
724
720
725
721 def generatev2(repo, includes, excludes, includeobsmarkers):
726 def generatev2(repo, includes, excludes, includeobsmarkers):
722 """Emit content for version 2 of a streaming clone.
727 """Emit content for version 2 of a streaming clone.
723
728
724 the data stream consists the following entries:
729 the data stream consists the following entries:
725 1) A char representing the file destination (eg: store or cache)
730 1) A char representing the file destination (eg: store or cache)
726 2) A varint containing the length of the filename
731 2) A varint containing the length of the filename
727 3) A varint containing the length of file data
732 3) A varint containing the length of file data
728 4) N bytes containing the filename (the internal, store-agnostic form)
733 4) N bytes containing the filename (the internal, store-agnostic form)
729 5) N bytes containing the file data
734 5) N bytes containing the file data
730
735
731 Returns a 3-tuple of (file count, file size, data iterator).
736 Returns a 3-tuple of (file count, file size, data iterator).
732 """
737 """
733
738
734 with repo.lock():
739 with repo.lock():
735
740
736 repo.ui.debug(b'scanning\n')
741 repo.ui.debug(b'scanning\n')
737
742
738 entries, totalfilesize = _v2_walk(
743 entries, totalfilesize = _v2_walk(
739 repo,
744 repo,
740 includes=includes,
745 includes=includes,
741 excludes=excludes,
746 excludes=excludes,
742 includeobsmarkers=includeobsmarkers,
747 includeobsmarkers=includeobsmarkers,
743 )
748 )
744
749
745 chunks = _emit2(repo, entries, totalfilesize)
750 chunks = _emit2(repo, entries, totalfilesize)
746 first = next(chunks)
751 first = next(chunks)
747 assert first is None
752 assert first is None
748 _test_sync_point_walk_1(repo)
753 _test_sync_point_walk_1(repo)
749 _test_sync_point_walk_2(repo)
754 _test_sync_point_walk_2(repo)
750
755
751 return len(entries), totalfilesize, chunks
756 return len(entries), totalfilesize, chunks
752
757
753
758
754 @contextlib.contextmanager
759 @contextlib.contextmanager
755 def nested(*ctxs):
760 def nested(*ctxs):
756 this = ctxs[0]
761 this = ctxs[0]
757 rest = ctxs[1:]
762 rest = ctxs[1:]
758 with this:
763 with this:
759 if rest:
764 if rest:
760 with nested(*rest):
765 with nested(*rest):
761 yield
766 yield
762 else:
767 else:
763 yield
768 yield
764
769
765
770
766 def consumev2(repo, fp, filecount, filesize):
771 def consumev2(repo, fp, filecount, filesize):
767 """Apply the contents from a version 2 streaming clone.
772 """Apply the contents from a version 2 streaming clone.
768
773
769 Data is read from an object that only needs to provide a ``read(size)``
774 Data is read from an object that only needs to provide a ``read(size)``
770 method.
775 method.
771 """
776 """
772 with repo.lock():
777 with repo.lock():
773 repo.ui.status(
778 repo.ui.status(
774 _(b'%d files to transfer, %s of data\n')
779 _(b'%d files to transfer, %s of data\n')
775 % (filecount, util.bytecount(filesize))
780 % (filecount, util.bytecount(filesize))
776 )
781 )
777
782
778 start = util.timer()
783 start = util.timer()
779 progress = repo.ui.makeprogress(
784 progress = repo.ui.makeprogress(
780 _(b'clone'), total=filesize, unit=_(b'bytes')
785 _(b'clone'), total=filesize, unit=_(b'bytes')
781 )
786 )
782 progress.update(0)
787 progress.update(0)
783
788
784 vfsmap = _makemap(repo)
789 vfsmap = _makemap(repo)
785 # we keep repo.vfs out of the on purpose, ther are too many danger
790 # we keep repo.vfs out of the on purpose, ther are too many danger
786 # there (eg: .hg/hgrc),
791 # there (eg: .hg/hgrc),
787 #
792 #
788 # this assert is duplicated (from _makemap) as author might think this
793 # this assert is duplicated (from _makemap) as author might think this
789 # is fine, while this is really not fine.
794 # is fine, while this is really not fine.
790 if repo.vfs in vfsmap.values():
795 if repo.vfs in vfsmap.values():
791 raise error.ProgrammingError(
796 raise error.ProgrammingError(
792 b'repo.vfs must not be added to vfsmap for security reasons'
797 b'repo.vfs must not be added to vfsmap for security reasons'
793 )
798 )
794
799
795 with repo.transaction(b'clone'):
800 with repo.transaction(b'clone'):
796 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
801 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
797 with nested(*ctxs):
802 with nested(*ctxs):
798 for i in range(filecount):
803 for i in range(filecount):
799 src = util.readexactly(fp, 1)
804 src = util.readexactly(fp, 1)
800 vfs = vfsmap[src]
805 vfs = vfsmap[src]
801 namelen = util.uvarintdecodestream(fp)
806 namelen = util.uvarintdecodestream(fp)
802 datalen = util.uvarintdecodestream(fp)
807 datalen = util.uvarintdecodestream(fp)
803
808
804 name = util.readexactly(fp, namelen)
809 name = util.readexactly(fp, namelen)
805
810
806 if repo.ui.debugflag:
811 if repo.ui.debugflag:
807 repo.ui.debug(
812 repo.ui.debug(
808 b'adding [%s] %s (%s)\n'
813 b'adding [%s] %s (%s)\n'
809 % (src, name, util.bytecount(datalen))
814 % (src, name, util.bytecount(datalen))
810 )
815 )
811
816
812 with vfs(name, b'w') as ofp:
817 with vfs(name, b'w') as ofp:
813 for chunk in util.filechunkiter(fp, limit=datalen):
818 for chunk in util.filechunkiter(fp, limit=datalen):
814 progress.increment(step=len(chunk))
819 progress.increment(step=len(chunk))
815 ofp.write(chunk)
820 ofp.write(chunk)
816
821
817 # force @filecache properties to be reloaded from
822 # force @filecache properties to be reloaded from
818 # streamclone-ed file at next access
823 # streamclone-ed file at next access
819 repo.invalidate(clearfilecache=True)
824 repo.invalidate(clearfilecache=True)
820
825
821 elapsed = util.timer() - start
826 elapsed = util.timer() - start
822 if elapsed <= 0:
827 if elapsed <= 0:
823 elapsed = 0.001
828 elapsed = 0.001
824 repo.ui.status(
829 repo.ui.status(
825 _(b'transferred %s in %.1f seconds (%s/sec)\n')
830 _(b'transferred %s in %.1f seconds (%s/sec)\n')
826 % (
831 % (
827 util.bytecount(progress.pos),
832 util.bytecount(progress.pos),
828 elapsed,
833 elapsed,
829 util.bytecount(progress.pos / elapsed),
834 util.bytecount(progress.pos / elapsed),
830 )
835 )
831 )
836 )
832 progress.complete()
837 progress.complete()
833
838
834
839
835 def applybundlev2(repo, fp, filecount, filesize, requirements):
840 def applybundlev2(repo, fp, filecount, filesize, requirements):
836 from . import localrepo
841 from . import localrepo
837
842
838 missingreqs = [r for r in requirements if r not in repo.supported]
843 missingreqs = [r for r in requirements if r not in repo.supported]
839 if missingreqs:
844 if missingreqs:
840 raise error.Abort(
845 raise error.Abort(
841 _(b'unable to apply stream clone: unsupported format: %s')
846 _(b'unable to apply stream clone: unsupported format: %s')
842 % b', '.join(sorted(missingreqs))
847 % b', '.join(sorted(missingreqs))
843 )
848 )
844
849
845 consumev2(repo, fp, filecount, filesize)
850 consumev2(repo, fp, filecount, filesize)
846
851
847 repo.requirements = new_stream_clone_requirements(
852 repo.requirements = new_stream_clone_requirements(
848 repo.requirements,
853 repo.requirements,
849 requirements,
854 requirements,
850 )
855 )
851 repo.svfs.options = localrepo.resolvestorevfsoptions(
856 repo.svfs.options = localrepo.resolvestorevfsoptions(
852 repo.ui, repo.requirements, repo.features
857 repo.ui, repo.requirements, repo.features
853 )
858 )
854 scmutil.writereporequirements(repo)
859 scmutil.writereporequirements(repo)
855 nodemap.post_stream_cleanup(repo)
860 nodemap.post_stream_cleanup(repo)
856
861
857
862
858 def _copy_files(src_vfs_map, dst_vfs_map, entries, progress):
863 def _copy_files(src_vfs_map, dst_vfs_map, entries, progress):
859 hardlink = [True]
864 hardlink = [True]
860
865
861 def copy_used():
866 def copy_used():
862 hardlink[0] = False
867 hardlink[0] = False
863 progress.topic = _(b'copying')
868 progress.topic = _(b'copying')
864
869
865 for k, path, size in entries:
870 for k, path, size in entries:
866 src_vfs = src_vfs_map[k]
871 src_vfs = src_vfs_map[k]
867 dst_vfs = dst_vfs_map[k]
872 dst_vfs = dst_vfs_map[k]
868 src_path = src_vfs.join(path)
873 src_path = src_vfs.join(path)
869 dst_path = dst_vfs.join(path)
874 dst_path = dst_vfs.join(path)
870 # We cannot use dirname and makedirs of dst_vfs here because the store
875 # We cannot use dirname and makedirs of dst_vfs here because the store
871 # encoding confuses them. See issue 6581 for details.
876 # encoding confuses them. See issue 6581 for details.
872 dirname = os.path.dirname(dst_path)
877 dirname = os.path.dirname(dst_path)
873 if not os.path.exists(dirname):
878 if not os.path.exists(dirname):
874 util.makedirs(dirname)
879 util.makedirs(dirname)
875 dst_vfs.register_file(path)
880 dst_vfs.register_file(path)
876 # XXX we could use the #nb_bytes argument.
881 # XXX we could use the #nb_bytes argument.
877 util.copyfile(
882 util.copyfile(
878 src_path,
883 src_path,
879 dst_path,
884 dst_path,
880 hardlink=hardlink[0],
885 hardlink=hardlink[0],
881 no_hardlink_cb=copy_used,
886 no_hardlink_cb=copy_used,
882 check_fs_hardlink=False,
887 check_fs_hardlink=False,
883 )
888 )
884 progress.increment()
889 progress.increment()
885 return hardlink[0]
890 return hardlink[0]
886
891
887
892
888 def local_copy(src_repo, dest_repo):
893 def local_copy(src_repo, dest_repo):
889 """copy all content from one local repository to another
894 """copy all content from one local repository to another
890
895
891 This is useful for local clone"""
896 This is useful for local clone"""
892 src_store_requirements = {
897 src_store_requirements = {
893 r
898 r
894 for r in src_repo.requirements
899 for r in src_repo.requirements
895 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
900 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
896 }
901 }
897 dest_store_requirements = {
902 dest_store_requirements = {
898 r
903 r
899 for r in dest_repo.requirements
904 for r in dest_repo.requirements
900 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
905 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
901 }
906 }
902 assert src_store_requirements == dest_store_requirements
907 assert src_store_requirements == dest_store_requirements
903
908
904 with dest_repo.lock():
909 with dest_repo.lock():
905 with src_repo.lock():
910 with src_repo.lock():
906
911
907 # bookmark is not integrated to the streaming as it might use the
912 # bookmark is not integrated to the streaming as it might use the
908 # `repo.vfs` and they are too many sentitive data accessible
913 # `repo.vfs` and they are too many sentitive data accessible
909 # through `repo.vfs` to expose it to streaming clone.
914 # through `repo.vfs` to expose it to streaming clone.
910 src_book_vfs = bookmarks.bookmarksvfs(src_repo)
915 src_book_vfs = bookmarks.bookmarksvfs(src_repo)
911 srcbookmarks = src_book_vfs.join(b'bookmarks')
916 srcbookmarks = src_book_vfs.join(b'bookmarks')
912 bm_count = 0
917 bm_count = 0
913 if os.path.exists(srcbookmarks):
918 if os.path.exists(srcbookmarks):
914 bm_count = 1
919 bm_count = 1
915
920
916 entries, totalfilesize = _v2_walk(
921 entries, totalfilesize = _v2_walk(
917 src_repo,
922 src_repo,
918 includes=None,
923 includes=None,
919 excludes=None,
924 excludes=None,
920 includeobsmarkers=True,
925 includeobsmarkers=True,
921 )
926 )
922 src_vfs_map = _makemap(src_repo)
927 src_vfs_map = _makemap(src_repo)
923 dest_vfs_map = _makemap(dest_repo)
928 dest_vfs_map = _makemap(dest_repo)
924 progress = src_repo.ui.makeprogress(
929 progress = src_repo.ui.makeprogress(
925 topic=_(b'linking'),
930 topic=_(b'linking'),
926 total=len(entries) + bm_count,
931 total=len(entries) + bm_count,
927 unit=_(b'files'),
932 unit=_(b'files'),
928 )
933 )
929 # copy files
934 # copy files
930 #
935 #
931 # We could copy the full file while the source repository is locked
936 # We could copy the full file while the source repository is locked
932 # and the other one without the lock. However, in the linking case,
937 # and the other one without the lock. However, in the linking case,
933 # this would also requires checks that nobody is appending any data
938 # this would also requires checks that nobody is appending any data
934 # to the files while we do the clone, so this is not done yet. We
939 # to the files while we do the clone, so this is not done yet. We
935 # could do this blindly when copying files.
940 # could do this blindly when copying files.
936 files = ((k, path, size) for k, path, ftype, size in entries)
941 files = ((k, path, size) for k, path, ftype, size in entries)
937 hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress)
942 hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress)
938
943
939 # copy bookmarks over
944 # copy bookmarks over
940 if bm_count:
945 if bm_count:
941 dst_book_vfs = bookmarks.bookmarksvfs(dest_repo)
946 dst_book_vfs = bookmarks.bookmarksvfs(dest_repo)
942 dstbookmarks = dst_book_vfs.join(b'bookmarks')
947 dstbookmarks = dst_book_vfs.join(b'bookmarks')
943 util.copyfile(srcbookmarks, dstbookmarks)
948 util.copyfile(srcbookmarks, dstbookmarks)
944 progress.complete()
949 progress.complete()
945 if hardlink:
950 if hardlink:
946 msg = b'linked %d files\n'
951 msg = b'linked %d files\n'
947 else:
952 else:
948 msg = b'copied %d files\n'
953 msg = b'copied %d files\n'
949 src_repo.ui.debug(msg % (len(entries) + bm_count))
954 src_repo.ui.debug(msg % (len(entries) + bm_count))
950
955
951 with dest_repo.transaction(b"localclone") as tr:
956 with dest_repo.transaction(b"localclone") as tr:
952 dest_repo.store.write(tr)
957 dest_repo.store.write(tr)
953
958
954 # clean up transaction file as they do not make sense
959 # clean up transaction file as they do not make sense
955 transaction.cleanup_undo_files(dest_repo.ui.warn, dest_repo.vfs_map)
960 transaction.cleanup_undo_files(dest_repo.ui.warn, dest_repo.vfs_map)
General Comments 0
You need to be logged in to leave comments. Login now