##// END OF EJS Templates
stream-clone: introduce a _entries_walk...
marmoute -
r51408:06d580b8 default
parent child Browse files
Show More
@@ -1,936 +1,955 b''
1 # streamclone.py - producing and consuming streaming repository data
1 # streamclone.py - producing and consuming streaming repository data
2 #
2 #
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8
8
9 import contextlib
9 import contextlib
10 import os
10 import os
11 import struct
11 import struct
12
12
13 from .i18n import _
13 from .i18n import _
14 from .pycompat import open
14 from .pycompat import open
15 from .interfaces import repository
15 from .interfaces import repository
16 from . import (
16 from . import (
17 bookmarks,
17 bookmarks,
18 cacheutil,
18 cacheutil,
19 error,
19 error,
20 narrowspec,
20 narrowspec,
21 phases,
21 phases,
22 pycompat,
22 pycompat,
23 requirements as requirementsmod,
23 requirements as requirementsmod,
24 scmutil,
24 scmutil,
25 store,
25 store,
26 transaction,
26 transaction,
27 util,
27 util,
28 )
28 )
29 from .revlogutils import (
29 from .revlogutils import (
30 nodemap,
30 nodemap,
31 )
31 )
32
32
33
33
34 def new_stream_clone_requirements(default_requirements, streamed_requirements):
34 def new_stream_clone_requirements(default_requirements, streamed_requirements):
35 """determine the final set of requirement for a new stream clone
35 """determine the final set of requirement for a new stream clone
36
36
37 this method combine the "default" requirements that a new repository would
37 this method combine the "default" requirements that a new repository would
38 use with the constaint we get from the stream clone content. We keep local
38 use with the constaint we get from the stream clone content. We keep local
39 configuration choice when possible.
39 configuration choice when possible.
40 """
40 """
41 requirements = set(default_requirements)
41 requirements = set(default_requirements)
42 requirements -= requirementsmod.STREAM_FIXED_REQUIREMENTS
42 requirements -= requirementsmod.STREAM_FIXED_REQUIREMENTS
43 requirements.update(streamed_requirements)
43 requirements.update(streamed_requirements)
44 return requirements
44 return requirements
45
45
46
46
47 def streamed_requirements(repo):
47 def streamed_requirements(repo):
48 """the set of requirement the new clone will have to support
48 """the set of requirement the new clone will have to support
49
49
50 This is used for advertising the stream options and to generate the actual
50 This is used for advertising the stream options and to generate the actual
51 stream content."""
51 stream content."""
52 requiredformats = (
52 requiredformats = (
53 repo.requirements & requirementsmod.STREAM_FIXED_REQUIREMENTS
53 repo.requirements & requirementsmod.STREAM_FIXED_REQUIREMENTS
54 )
54 )
55 return requiredformats
55 return requiredformats
56
56
57
57
58 def canperformstreamclone(pullop, bundle2=False):
58 def canperformstreamclone(pullop, bundle2=False):
59 """Whether it is possible to perform a streaming clone as part of pull.
59 """Whether it is possible to perform a streaming clone as part of pull.
60
60
61 ``bundle2`` will cause the function to consider stream clone through
61 ``bundle2`` will cause the function to consider stream clone through
62 bundle2 and only through bundle2.
62 bundle2 and only through bundle2.
63
63
64 Returns a tuple of (supported, requirements). ``supported`` is True if
64 Returns a tuple of (supported, requirements). ``supported`` is True if
65 streaming clone is supported and False otherwise. ``requirements`` is
65 streaming clone is supported and False otherwise. ``requirements`` is
66 a set of repo requirements from the remote, or ``None`` if stream clone
66 a set of repo requirements from the remote, or ``None`` if stream clone
67 isn't supported.
67 isn't supported.
68 """
68 """
69 repo = pullop.repo
69 repo = pullop.repo
70 remote = pullop.remote
70 remote = pullop.remote
71
71
72 bundle2supported = False
72 bundle2supported = False
73 if pullop.canusebundle2:
73 if pullop.canusebundle2:
74 if b'v2' in pullop.remotebundle2caps.get(b'stream', []):
74 if b'v2' in pullop.remotebundle2caps.get(b'stream', []):
75 bundle2supported = True
75 bundle2supported = True
76 # else
76 # else
77 # Server doesn't support bundle2 stream clone or doesn't support
77 # Server doesn't support bundle2 stream clone or doesn't support
78 # the versions we support. Fall back and possibly allow legacy.
78 # the versions we support. Fall back and possibly allow legacy.
79
79
80 # Ensures legacy code path uses available bundle2.
80 # Ensures legacy code path uses available bundle2.
81 if bundle2supported and not bundle2:
81 if bundle2supported and not bundle2:
82 return False, None
82 return False, None
83 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
83 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
84 elif bundle2 and not bundle2supported:
84 elif bundle2 and not bundle2supported:
85 return False, None
85 return False, None
86
86
87 # Streaming clone only works on empty repositories.
87 # Streaming clone only works on empty repositories.
88 if len(repo):
88 if len(repo):
89 return False, None
89 return False, None
90
90
91 # Streaming clone only works if all data is being requested.
91 # Streaming clone only works if all data is being requested.
92 if pullop.heads:
92 if pullop.heads:
93 return False, None
93 return False, None
94
94
95 streamrequested = pullop.streamclonerequested
95 streamrequested = pullop.streamclonerequested
96
96
97 # If we don't have a preference, let the server decide for us. This
97 # If we don't have a preference, let the server decide for us. This
98 # likely only comes into play in LANs.
98 # likely only comes into play in LANs.
99 if streamrequested is None:
99 if streamrequested is None:
100 # The server can advertise whether to prefer streaming clone.
100 # The server can advertise whether to prefer streaming clone.
101 streamrequested = remote.capable(b'stream-preferred')
101 streamrequested = remote.capable(b'stream-preferred')
102
102
103 if not streamrequested:
103 if not streamrequested:
104 return False, None
104 return False, None
105
105
106 # In order for stream clone to work, the client has to support all the
106 # In order for stream clone to work, the client has to support all the
107 # requirements advertised by the server.
107 # requirements advertised by the server.
108 #
108 #
109 # The server advertises its requirements via the "stream" and "streamreqs"
109 # The server advertises its requirements via the "stream" and "streamreqs"
110 # capability. "stream" (a value-less capability) is advertised if and only
110 # capability. "stream" (a value-less capability) is advertised if and only
111 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
111 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
112 # is advertised and contains a comma-delimited list of requirements.
112 # is advertised and contains a comma-delimited list of requirements.
113 requirements = set()
113 requirements = set()
114 if remote.capable(b'stream'):
114 if remote.capable(b'stream'):
115 requirements.add(requirementsmod.REVLOGV1_REQUIREMENT)
115 requirements.add(requirementsmod.REVLOGV1_REQUIREMENT)
116 else:
116 else:
117 streamreqs = remote.capable(b'streamreqs')
117 streamreqs = remote.capable(b'streamreqs')
118 # This is weird and shouldn't happen with modern servers.
118 # This is weird and shouldn't happen with modern servers.
119 if not streamreqs:
119 if not streamreqs:
120 pullop.repo.ui.warn(
120 pullop.repo.ui.warn(
121 _(
121 _(
122 b'warning: stream clone requested but server has them '
122 b'warning: stream clone requested but server has them '
123 b'disabled\n'
123 b'disabled\n'
124 )
124 )
125 )
125 )
126 return False, None
126 return False, None
127
127
128 streamreqs = set(streamreqs.split(b','))
128 streamreqs = set(streamreqs.split(b','))
129 # Server requires something we don't support. Bail.
129 # Server requires something we don't support. Bail.
130 missingreqs = streamreqs - repo.supported
130 missingreqs = streamreqs - repo.supported
131 if missingreqs:
131 if missingreqs:
132 pullop.repo.ui.warn(
132 pullop.repo.ui.warn(
133 _(
133 _(
134 b'warning: stream clone requested but client is missing '
134 b'warning: stream clone requested but client is missing '
135 b'requirements: %s\n'
135 b'requirements: %s\n'
136 )
136 )
137 % b', '.join(sorted(missingreqs))
137 % b', '.join(sorted(missingreqs))
138 )
138 )
139 pullop.repo.ui.warn(
139 pullop.repo.ui.warn(
140 _(
140 _(
141 b'(see https://www.mercurial-scm.org/wiki/MissingRequirement '
141 b'(see https://www.mercurial-scm.org/wiki/MissingRequirement '
142 b'for more information)\n'
142 b'for more information)\n'
143 )
143 )
144 )
144 )
145 return False, None
145 return False, None
146 requirements = streamreqs
146 requirements = streamreqs
147
147
148 return True, requirements
148 return True, requirements
149
149
150
150
151 def maybeperformlegacystreamclone(pullop):
151 def maybeperformlegacystreamclone(pullop):
152 """Possibly perform a legacy stream clone operation.
152 """Possibly perform a legacy stream clone operation.
153
153
154 Legacy stream clones are performed as part of pull but before all other
154 Legacy stream clones are performed as part of pull but before all other
155 operations.
155 operations.
156
156
157 A legacy stream clone will not be performed if a bundle2 stream clone is
157 A legacy stream clone will not be performed if a bundle2 stream clone is
158 supported.
158 supported.
159 """
159 """
160 from . import localrepo
160 from . import localrepo
161
161
162 supported, requirements = canperformstreamclone(pullop)
162 supported, requirements = canperformstreamclone(pullop)
163
163
164 if not supported:
164 if not supported:
165 return
165 return
166
166
167 repo = pullop.repo
167 repo = pullop.repo
168 remote = pullop.remote
168 remote = pullop.remote
169
169
170 # Save remote branchmap. We will use it later to speed up branchcache
170 # Save remote branchmap. We will use it later to speed up branchcache
171 # creation.
171 # creation.
172 rbranchmap = None
172 rbranchmap = None
173 if remote.capable(b'branchmap'):
173 if remote.capable(b'branchmap'):
174 with remote.commandexecutor() as e:
174 with remote.commandexecutor() as e:
175 rbranchmap = e.callcommand(b'branchmap', {}).result()
175 rbranchmap = e.callcommand(b'branchmap', {}).result()
176
176
177 repo.ui.status(_(b'streaming all changes\n'))
177 repo.ui.status(_(b'streaming all changes\n'))
178
178
179 with remote.commandexecutor() as e:
179 with remote.commandexecutor() as e:
180 fp = e.callcommand(b'stream_out', {}).result()
180 fp = e.callcommand(b'stream_out', {}).result()
181
181
182 # TODO strictly speaking, this code should all be inside the context
182 # TODO strictly speaking, this code should all be inside the context
183 # manager because the context manager is supposed to ensure all wire state
183 # manager because the context manager is supposed to ensure all wire state
184 # is flushed when exiting. But the legacy peers don't do this, so it
184 # is flushed when exiting. But the legacy peers don't do this, so it
185 # doesn't matter.
185 # doesn't matter.
186 l = fp.readline()
186 l = fp.readline()
187 try:
187 try:
188 resp = int(l)
188 resp = int(l)
189 except ValueError:
189 except ValueError:
190 raise error.ResponseError(
190 raise error.ResponseError(
191 _(b'unexpected response from remote server:'), l
191 _(b'unexpected response from remote server:'), l
192 )
192 )
193 if resp == 1:
193 if resp == 1:
194 raise error.Abort(_(b'operation forbidden by server'))
194 raise error.Abort(_(b'operation forbidden by server'))
195 elif resp == 2:
195 elif resp == 2:
196 raise error.Abort(_(b'locking the remote repository failed'))
196 raise error.Abort(_(b'locking the remote repository failed'))
197 elif resp != 0:
197 elif resp != 0:
198 raise error.Abort(_(b'the server sent an unknown error code'))
198 raise error.Abort(_(b'the server sent an unknown error code'))
199
199
200 l = fp.readline()
200 l = fp.readline()
201 try:
201 try:
202 filecount, bytecount = map(int, l.split(b' ', 1))
202 filecount, bytecount = map(int, l.split(b' ', 1))
203 except (ValueError, TypeError):
203 except (ValueError, TypeError):
204 raise error.ResponseError(
204 raise error.ResponseError(
205 _(b'unexpected response from remote server:'), l
205 _(b'unexpected response from remote server:'), l
206 )
206 )
207
207
208 with repo.lock():
208 with repo.lock():
209 consumev1(repo, fp, filecount, bytecount)
209 consumev1(repo, fp, filecount, bytecount)
210 repo.requirements = new_stream_clone_requirements(
210 repo.requirements = new_stream_clone_requirements(
211 repo.requirements,
211 repo.requirements,
212 requirements,
212 requirements,
213 )
213 )
214 repo.svfs.options = localrepo.resolvestorevfsoptions(
214 repo.svfs.options = localrepo.resolvestorevfsoptions(
215 repo.ui, repo.requirements, repo.features
215 repo.ui, repo.requirements, repo.features
216 )
216 )
217 scmutil.writereporequirements(repo)
217 scmutil.writereporequirements(repo)
218 nodemap.post_stream_cleanup(repo)
218 nodemap.post_stream_cleanup(repo)
219
219
220 if rbranchmap:
220 if rbranchmap:
221 repo._branchcaches.replace(repo, rbranchmap)
221 repo._branchcaches.replace(repo, rbranchmap)
222
222
223 repo.invalidate()
223 repo.invalidate()
224
224
225
225
226 def allowservergeneration(repo):
226 def allowservergeneration(repo):
227 """Whether streaming clones are allowed from the server."""
227 """Whether streaming clones are allowed from the server."""
228 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
228 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
229 return False
229 return False
230
230
231 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
231 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
232 return False
232 return False
233
233
234 # The way stream clone works makes it impossible to hide secret changesets.
234 # The way stream clone works makes it impossible to hide secret changesets.
235 # So don't allow this by default.
235 # So don't allow this by default.
236 secret = phases.hassecret(repo)
236 secret = phases.hassecret(repo)
237 if secret:
237 if secret:
238 return repo.ui.configbool(b'server', b'uncompressedallowsecret')
238 return repo.ui.configbool(b'server', b'uncompressedallowsecret')
239
239
240 return True
240 return True
241
241
242
242
243 # This is it's own function so extensions can override it.
243 # This is it's own function so extensions can override it.
244 def _walkstreamfiles(repo, matcher=None, phase=False, obsolescence=False):
244 def _walkstreamfiles(repo, matcher=None, phase=False, obsolescence=False):
245 return repo.store.walk(matcher, phase=phase, obsolescence=obsolescence)
245 return repo.store.walk(matcher, phase=phase, obsolescence=obsolescence)
246
246
247
247
248 def generatev1(repo):
248 def generatev1(repo):
249 """Emit content for version 1 of a streaming clone.
249 """Emit content for version 1 of a streaming clone.
250
250
251 This returns a 3-tuple of (file count, byte size, data iterator).
251 This returns a 3-tuple of (file count, byte size, data iterator).
252
252
253 The data iterator consists of N entries for each file being transferred.
253 The data iterator consists of N entries for each file being transferred.
254 Each file entry starts as a line with the file name and integer size
254 Each file entry starts as a line with the file name and integer size
255 delimited by a null byte.
255 delimited by a null byte.
256
256
257 The raw file data follows. Following the raw file data is the next file
257 The raw file data follows. Following the raw file data is the next file
258 entry, or EOF.
258 entry, or EOF.
259
259
260 When used on the wire protocol, an additional line indicating protocol
260 When used on the wire protocol, an additional line indicating protocol
261 success will be prepended to the stream. This function is not responsible
261 success will be prepended to the stream. This function is not responsible
262 for adding it.
262 for adding it.
263
263
264 This function will obtain a repository lock to ensure a consistent view of
264 This function will obtain a repository lock to ensure a consistent view of
265 the store is captured. It therefore may raise LockError.
265 the store is captured. It therefore may raise LockError.
266 """
266 """
267 entries = []
267 entries = []
268 total_bytes = 0
268 total_bytes = 0
269 # Get consistent snapshot of repo, lock during scan.
269 # Get consistent snapshot of repo, lock during scan.
270 with repo.lock():
270 with repo.lock():
271 repo.ui.debug(b'scanning\n')
271 repo.ui.debug(b'scanning\n')
272 for entry in _walkstreamfiles(repo):
272 for entry in _walkstreamfiles(repo):
273 for f in entry.files():
273 for f in entry.files():
274 file_size = f.file_size(repo.store.vfs)
274 file_size = f.file_size(repo.store.vfs)
275 if file_size:
275 if file_size:
276 entries.append((f.unencoded_path, file_size))
276 entries.append((f.unencoded_path, file_size))
277 total_bytes += file_size
277 total_bytes += file_size
278 _test_sync_point_walk_1(repo)
278 _test_sync_point_walk_1(repo)
279 _test_sync_point_walk_2(repo)
279 _test_sync_point_walk_2(repo)
280
280
281 repo.ui.debug(
281 repo.ui.debug(
282 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
282 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
283 )
283 )
284
284
285 svfs = repo.svfs
285 svfs = repo.svfs
286 debugflag = repo.ui.debugflag
286 debugflag = repo.ui.debugflag
287
287
288 def emitrevlogdata():
288 def emitrevlogdata():
289 for name, size in entries:
289 for name, size in entries:
290 if debugflag:
290 if debugflag:
291 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
291 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
292 # partially encode name over the wire for backwards compat
292 # partially encode name over the wire for backwards compat
293 yield b'%s\0%d\n' % (store.encodedir(name), size)
293 yield b'%s\0%d\n' % (store.encodedir(name), size)
294 # auditing at this stage is both pointless (paths are already
294 # auditing at this stage is both pointless (paths are already
295 # trusted by the local repo) and expensive
295 # trusted by the local repo) and expensive
296 with svfs(name, b'rb', auditpath=False) as fp:
296 with svfs(name, b'rb', auditpath=False) as fp:
297 if size <= 65536:
297 if size <= 65536:
298 yield fp.read(size)
298 yield fp.read(size)
299 else:
299 else:
300 for chunk in util.filechunkiter(fp, limit=size):
300 for chunk in util.filechunkiter(fp, limit=size):
301 yield chunk
301 yield chunk
302
302
303 return len(entries), total_bytes, emitrevlogdata()
303 return len(entries), total_bytes, emitrevlogdata()
304
304
305
305
306 def generatev1wireproto(repo):
306 def generatev1wireproto(repo):
307 """Emit content for version 1 of streaming clone suitable for the wire.
307 """Emit content for version 1 of streaming clone suitable for the wire.
308
308
309 This is the data output from ``generatev1()`` with 2 header lines. The
309 This is the data output from ``generatev1()`` with 2 header lines. The
310 first line indicates overall success. The 2nd contains the file count and
310 first line indicates overall success. The 2nd contains the file count and
311 byte size of payload.
311 byte size of payload.
312
312
313 The success line contains "0" for success, "1" for stream generation not
313 The success line contains "0" for success, "1" for stream generation not
314 allowed, and "2" for error locking the repository (possibly indicating
314 allowed, and "2" for error locking the repository (possibly indicating
315 a permissions error for the server process).
315 a permissions error for the server process).
316 """
316 """
317 if not allowservergeneration(repo):
317 if not allowservergeneration(repo):
318 yield b'1\n'
318 yield b'1\n'
319 return
319 return
320
320
321 try:
321 try:
322 filecount, bytecount, it = generatev1(repo)
322 filecount, bytecount, it = generatev1(repo)
323 except error.LockError:
323 except error.LockError:
324 yield b'2\n'
324 yield b'2\n'
325 return
325 return
326
326
327 # Indicates successful response.
327 # Indicates successful response.
328 yield b'0\n'
328 yield b'0\n'
329 yield b'%d %d\n' % (filecount, bytecount)
329 yield b'%d %d\n' % (filecount, bytecount)
330 for chunk in it:
330 for chunk in it:
331 yield chunk
331 yield chunk
332
332
333
333
334 def generatebundlev1(repo, compression=b'UN'):
334 def generatebundlev1(repo, compression=b'UN'):
335 """Emit content for version 1 of a stream clone bundle.
335 """Emit content for version 1 of a stream clone bundle.
336
336
337 The first 4 bytes of the output ("HGS1") denote this as stream clone
337 The first 4 bytes of the output ("HGS1") denote this as stream clone
338 bundle version 1.
338 bundle version 1.
339
339
340 The next 2 bytes indicate the compression type. Only "UN" is currently
340 The next 2 bytes indicate the compression type. Only "UN" is currently
341 supported.
341 supported.
342
342
343 The next 16 bytes are two 64-bit big endian unsigned integers indicating
343 The next 16 bytes are two 64-bit big endian unsigned integers indicating
344 file count and byte count, respectively.
344 file count and byte count, respectively.
345
345
346 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
346 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
347 of the requirements string, including a trailing \0. The following N bytes
347 of the requirements string, including a trailing \0. The following N bytes
348 are the requirements string, which is ASCII containing a comma-delimited
348 are the requirements string, which is ASCII containing a comma-delimited
349 list of repo requirements that are needed to support the data.
349 list of repo requirements that are needed to support the data.
350
350
351 The remaining content is the output of ``generatev1()`` (which may be
351 The remaining content is the output of ``generatev1()`` (which may be
352 compressed in the future).
352 compressed in the future).
353
353
354 Returns a tuple of (requirements, data generator).
354 Returns a tuple of (requirements, data generator).
355 """
355 """
356 if compression != b'UN':
356 if compression != b'UN':
357 raise ValueError(b'we do not support the compression argument yet')
357 raise ValueError(b'we do not support the compression argument yet')
358
358
359 requirements = streamed_requirements(repo)
359 requirements = streamed_requirements(repo)
360 requires = b','.join(sorted(requirements))
360 requires = b','.join(sorted(requirements))
361
361
362 def gen():
362 def gen():
363 yield b'HGS1'
363 yield b'HGS1'
364 yield compression
364 yield compression
365
365
366 filecount, bytecount, it = generatev1(repo)
366 filecount, bytecount, it = generatev1(repo)
367 repo.ui.status(
367 repo.ui.status(
368 _(b'writing %d bytes for %d files\n') % (bytecount, filecount)
368 _(b'writing %d bytes for %d files\n') % (bytecount, filecount)
369 )
369 )
370
370
371 yield struct.pack(b'>QQ', filecount, bytecount)
371 yield struct.pack(b'>QQ', filecount, bytecount)
372 yield struct.pack(b'>H', len(requires) + 1)
372 yield struct.pack(b'>H', len(requires) + 1)
373 yield requires + b'\0'
373 yield requires + b'\0'
374
374
375 # This is where we'll add compression in the future.
375 # This is where we'll add compression in the future.
376 assert compression == b'UN'
376 assert compression == b'UN'
377
377
378 progress = repo.ui.makeprogress(
378 progress = repo.ui.makeprogress(
379 _(b'bundle'), total=bytecount, unit=_(b'bytes')
379 _(b'bundle'), total=bytecount, unit=_(b'bytes')
380 )
380 )
381 progress.update(0)
381 progress.update(0)
382
382
383 for chunk in it:
383 for chunk in it:
384 progress.increment(step=len(chunk))
384 progress.increment(step=len(chunk))
385 yield chunk
385 yield chunk
386
386
387 progress.complete()
387 progress.complete()
388
388
389 return requirements, gen()
389 return requirements, gen()
390
390
391
391
392 def consumev1(repo, fp, filecount, bytecount):
392 def consumev1(repo, fp, filecount, bytecount):
393 """Apply the contents from version 1 of a streaming clone file handle.
393 """Apply the contents from version 1 of a streaming clone file handle.
394
394
395 This takes the output from "stream_out" and applies it to the specified
395 This takes the output from "stream_out" and applies it to the specified
396 repository.
396 repository.
397
397
398 Like "stream_out," the status line added by the wire protocol is not
398 Like "stream_out," the status line added by the wire protocol is not
399 handled by this function.
399 handled by this function.
400 """
400 """
401 with repo.lock():
401 with repo.lock():
402 repo.ui.status(
402 repo.ui.status(
403 _(b'%d files to transfer, %s of data\n')
403 _(b'%d files to transfer, %s of data\n')
404 % (filecount, util.bytecount(bytecount))
404 % (filecount, util.bytecount(bytecount))
405 )
405 )
406 progress = repo.ui.makeprogress(
406 progress = repo.ui.makeprogress(
407 _(b'clone'), total=bytecount, unit=_(b'bytes')
407 _(b'clone'), total=bytecount, unit=_(b'bytes')
408 )
408 )
409 progress.update(0)
409 progress.update(0)
410 start = util.timer()
410 start = util.timer()
411
411
412 # TODO: get rid of (potential) inconsistency
412 # TODO: get rid of (potential) inconsistency
413 #
413 #
414 # If transaction is started and any @filecache property is
414 # If transaction is started and any @filecache property is
415 # changed at this point, it causes inconsistency between
415 # changed at this point, it causes inconsistency between
416 # in-memory cached property and streamclone-ed file on the
416 # in-memory cached property and streamclone-ed file on the
417 # disk. Nested transaction prevents transaction scope "clone"
417 # disk. Nested transaction prevents transaction scope "clone"
418 # below from writing in-memory changes out at the end of it,
418 # below from writing in-memory changes out at the end of it,
419 # even though in-memory changes are discarded at the end of it
419 # even though in-memory changes are discarded at the end of it
420 # regardless of transaction nesting.
420 # regardless of transaction nesting.
421 #
421 #
422 # But transaction nesting can't be simply prohibited, because
422 # But transaction nesting can't be simply prohibited, because
423 # nesting occurs also in ordinary case (e.g. enabling
423 # nesting occurs also in ordinary case (e.g. enabling
424 # clonebundles).
424 # clonebundles).
425
425
426 with repo.transaction(b'clone'):
426 with repo.transaction(b'clone'):
427 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
427 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
428 for i in range(filecount):
428 for i in range(filecount):
429 # XXX doesn't support '\n' or '\r' in filenames
429 # XXX doesn't support '\n' or '\r' in filenames
430 l = fp.readline()
430 l = fp.readline()
431 try:
431 try:
432 name, size = l.split(b'\0', 1)
432 name, size = l.split(b'\0', 1)
433 size = int(size)
433 size = int(size)
434 except (ValueError, TypeError):
434 except (ValueError, TypeError):
435 raise error.ResponseError(
435 raise error.ResponseError(
436 _(b'unexpected response from remote server:'), l
436 _(b'unexpected response from remote server:'), l
437 )
437 )
438 if repo.ui.debugflag:
438 if repo.ui.debugflag:
439 repo.ui.debug(
439 repo.ui.debug(
440 b'adding %s (%s)\n' % (name, util.bytecount(size))
440 b'adding %s (%s)\n' % (name, util.bytecount(size))
441 )
441 )
442 # for backwards compat, name was partially encoded
442 # for backwards compat, name was partially encoded
443 path = store.decodedir(name)
443 path = store.decodedir(name)
444 with repo.svfs(path, b'w', backgroundclose=True) as ofp:
444 with repo.svfs(path, b'w', backgroundclose=True) as ofp:
445 for chunk in util.filechunkiter(fp, limit=size):
445 for chunk in util.filechunkiter(fp, limit=size):
446 progress.increment(step=len(chunk))
446 progress.increment(step=len(chunk))
447 ofp.write(chunk)
447 ofp.write(chunk)
448
448
449 # force @filecache properties to be reloaded from
449 # force @filecache properties to be reloaded from
450 # streamclone-ed file at next access
450 # streamclone-ed file at next access
451 repo.invalidate(clearfilecache=True)
451 repo.invalidate(clearfilecache=True)
452
452
453 elapsed = util.timer() - start
453 elapsed = util.timer() - start
454 if elapsed <= 0:
454 if elapsed <= 0:
455 elapsed = 0.001
455 elapsed = 0.001
456 progress.complete()
456 progress.complete()
457 repo.ui.status(
457 repo.ui.status(
458 _(b'transferred %s in %.1f seconds (%s/sec)\n')
458 _(b'transferred %s in %.1f seconds (%s/sec)\n')
459 % (
459 % (
460 util.bytecount(bytecount),
460 util.bytecount(bytecount),
461 elapsed,
461 elapsed,
462 util.bytecount(bytecount / elapsed),
462 util.bytecount(bytecount / elapsed),
463 )
463 )
464 )
464 )
465
465
466
466
467 def readbundle1header(fp):
467 def readbundle1header(fp):
468 compression = fp.read(2)
468 compression = fp.read(2)
469 if compression != b'UN':
469 if compression != b'UN':
470 raise error.Abort(
470 raise error.Abort(
471 _(
471 _(
472 b'only uncompressed stream clone bundles are '
472 b'only uncompressed stream clone bundles are '
473 b'supported; got %s'
473 b'supported; got %s'
474 )
474 )
475 % compression
475 % compression
476 )
476 )
477
477
478 filecount, bytecount = struct.unpack(b'>QQ', fp.read(16))
478 filecount, bytecount = struct.unpack(b'>QQ', fp.read(16))
479 requireslen = struct.unpack(b'>H', fp.read(2))[0]
479 requireslen = struct.unpack(b'>H', fp.read(2))[0]
480 requires = fp.read(requireslen)
480 requires = fp.read(requireslen)
481
481
482 if not requires.endswith(b'\0'):
482 if not requires.endswith(b'\0'):
483 raise error.Abort(
483 raise error.Abort(
484 _(
484 _(
485 b'malformed stream clone bundle: '
485 b'malformed stream clone bundle: '
486 b'requirements not properly encoded'
486 b'requirements not properly encoded'
487 )
487 )
488 )
488 )
489
489
490 requirements = set(requires.rstrip(b'\0').split(b','))
490 requirements = set(requires.rstrip(b'\0').split(b','))
491
491
492 return filecount, bytecount, requirements
492 return filecount, bytecount, requirements
493
493
494
494
495 def applybundlev1(repo, fp):
495 def applybundlev1(repo, fp):
496 """Apply the content from a stream clone bundle version 1.
496 """Apply the content from a stream clone bundle version 1.
497
497
498 We assume the 4 byte header has been read and validated and the file handle
498 We assume the 4 byte header has been read and validated and the file handle
499 is at the 2 byte compression identifier.
499 is at the 2 byte compression identifier.
500 """
500 """
501 if len(repo):
501 if len(repo):
502 raise error.Abort(
502 raise error.Abort(
503 _(b'cannot apply stream clone bundle on non-empty repo')
503 _(b'cannot apply stream clone bundle on non-empty repo')
504 )
504 )
505
505
506 filecount, bytecount, requirements = readbundle1header(fp)
506 filecount, bytecount, requirements = readbundle1header(fp)
507 missingreqs = requirements - repo.supported
507 missingreqs = requirements - repo.supported
508 if missingreqs:
508 if missingreqs:
509 raise error.Abort(
509 raise error.Abort(
510 _(b'unable to apply stream clone: unsupported format: %s')
510 _(b'unable to apply stream clone: unsupported format: %s')
511 % b', '.join(sorted(missingreqs))
511 % b', '.join(sorted(missingreqs))
512 )
512 )
513
513
514 consumev1(repo, fp, filecount, bytecount)
514 consumev1(repo, fp, filecount, bytecount)
515 nodemap.post_stream_cleanup(repo)
515 nodemap.post_stream_cleanup(repo)
516
516
517
517
518 class streamcloneapplier:
518 class streamcloneapplier:
519 """Class to manage applying streaming clone bundles.
519 """Class to manage applying streaming clone bundles.
520
520
521 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
521 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
522 readers to perform bundle type-specific functionality.
522 readers to perform bundle type-specific functionality.
523 """
523 """
524
524
525 def __init__(self, fh):
525 def __init__(self, fh):
526 self._fh = fh
526 self._fh = fh
527
527
528 def apply(self, repo):
528 def apply(self, repo):
529 return applybundlev1(repo, self._fh)
529 return applybundlev1(repo, self._fh)
530
530
531
531
532 # type of file to stream
532 # type of file to stream
533 _fileappend = 0 # append only file
533 _fileappend = 0 # append only file
534 _filefull = 1 # full snapshot file
534 _filefull = 1 # full snapshot file
535
535
536 # Source of the file
536 # Source of the file
537 _srcstore = b's' # store (svfs)
537 _srcstore = b's' # store (svfs)
538 _srccache = b'c' # cache (cache)
538 _srccache = b'c' # cache (cache)
539
539
540 # This is it's own function so extensions can override it.
540 # This is it's own function so extensions can override it.
541 def _walkstreamfullstorefiles(repo):
541 def _walkstreamfullstorefiles(repo):
542 """list snapshot file from the store"""
542 """list snapshot file from the store"""
543 fnames = []
543 fnames = []
544 if not repo.publishing():
544 if not repo.publishing():
545 fnames.append(b'phaseroots')
545 fnames.append(b'phaseroots')
546 return fnames
546 return fnames
547
547
548
548
549 def _filterfull(entry, copy, vfsmap):
549 def _filterfull(entry, copy, vfsmap):
550 """actually copy the snapshot files"""
550 """actually copy the snapshot files"""
551 src, name, ftype, data = entry
551 src, name, ftype, data = entry
552 if ftype != _filefull:
552 if ftype != _filefull:
553 return entry
553 return entry
554 return (src, name, ftype, copy(vfsmap[src].join(name)))
554 return (src, name, ftype, copy(vfsmap[src].join(name)))
555
555
556
556
557 @contextlib.contextmanager
557 @contextlib.contextmanager
558 def maketempcopies():
558 def maketempcopies():
559 """return a function to temporary copy file"""
559 """return a function to temporary copy file"""
560
560
561 files = []
561 files = []
562 dst_dir = pycompat.mkdtemp(prefix=b'hg-clone-')
562 dst_dir = pycompat.mkdtemp(prefix=b'hg-clone-')
563 try:
563 try:
564
564
565 def copy(src):
565 def copy(src):
566 fd, dst = pycompat.mkstemp(
566 fd, dst = pycompat.mkstemp(
567 prefix=os.path.basename(src), dir=dst_dir
567 prefix=os.path.basename(src), dir=dst_dir
568 )
568 )
569 os.close(fd)
569 os.close(fd)
570 files.append(dst)
570 files.append(dst)
571 util.copyfiles(src, dst, hardlink=True)
571 util.copyfiles(src, dst, hardlink=True)
572 return dst
572 return dst
573
573
574 yield copy
574 yield copy
575 finally:
575 finally:
576 for tmp in files:
576 for tmp in files:
577 util.tryunlink(tmp)
577 util.tryunlink(tmp)
578 util.tryrmdir(dst_dir)
578 util.tryrmdir(dst_dir)
579
579
580
580
581 def _makemap(repo):
581 def _makemap(repo):
582 """make a (src -> vfs) map for the repo"""
582 """make a (src -> vfs) map for the repo"""
583 vfsmap = {
583 vfsmap = {
584 _srcstore: repo.svfs,
584 _srcstore: repo.svfs,
585 _srccache: repo.cachevfs,
585 _srccache: repo.cachevfs,
586 }
586 }
587 # we keep repo.vfs out of the on purpose, ther are too many danger there
587 # we keep repo.vfs out of the on purpose, ther are too many danger there
588 # (eg: .hg/hgrc)
588 # (eg: .hg/hgrc)
589 assert repo.vfs not in vfsmap.values()
589 assert repo.vfs not in vfsmap.values()
590
590
591 return vfsmap
591 return vfsmap
592
592
593
593
594 def _emit2(repo, entries, totalfilesize):
594 def _emit2(repo, entries, totalfilesize):
595 """actually emit the stream bundle"""
595 """actually emit the stream bundle"""
596 vfsmap = _makemap(repo)
596 vfsmap = _makemap(repo)
597 # we keep repo.vfs out of the on purpose, ther are too many danger there
597 # we keep repo.vfs out of the on purpose, ther are too many danger there
598 # (eg: .hg/hgrc),
598 # (eg: .hg/hgrc),
599 #
599 #
600 # this assert is duplicated (from _makemap) as author might think this is
600 # this assert is duplicated (from _makemap) as author might think this is
601 # fine, while this is really not fine.
601 # fine, while this is really not fine.
602 if repo.vfs in vfsmap.values():
602 if repo.vfs in vfsmap.values():
603 raise error.ProgrammingError(
603 raise error.ProgrammingError(
604 b'repo.vfs must not be added to vfsmap for security reasons'
604 b'repo.vfs must not be added to vfsmap for security reasons'
605 )
605 )
606
606
607 progress = repo.ui.makeprogress(
607 progress = repo.ui.makeprogress(
608 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
608 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
609 )
609 )
610 progress.update(0)
610 progress.update(0)
611 with maketempcopies() as copy, progress:
611 with maketempcopies() as copy, progress:
612 # copy is delayed until we are in the try
612 # copy is delayed until we are in the try
613 entries = [_filterfull(e, copy, vfsmap) for e in entries]
613 entries = [_filterfull(e, copy, vfsmap) for e in entries]
614 yield None # this release the lock on the repository
614 yield None # this release the lock on the repository
615 totalbytecount = 0
615 totalbytecount = 0
616
616
617 for src, name, ftype, data in entries:
617 for src, name, ftype, data in entries:
618 vfs = vfsmap[src]
618 vfs = vfsmap[src]
619 yield src
619 yield src
620 yield util.uvarintencode(len(name))
620 yield util.uvarintencode(len(name))
621 if ftype == _fileappend:
621 if ftype == _fileappend:
622 fp = vfs(name)
622 fp = vfs(name)
623 size = data
623 size = data
624 elif ftype == _filefull:
624 elif ftype == _filefull:
625 fp = open(data, b'rb')
625 fp = open(data, b'rb')
626 size = util.fstat(fp).st_size
626 size = util.fstat(fp).st_size
627 bytecount = 0
627 bytecount = 0
628 try:
628 try:
629 yield util.uvarintencode(size)
629 yield util.uvarintencode(size)
630 yield name
630 yield name
631 if size <= 65536:
631 if size <= 65536:
632 chunks = (fp.read(size),)
632 chunks = (fp.read(size),)
633 else:
633 else:
634 chunks = util.filechunkiter(fp, limit=size)
634 chunks = util.filechunkiter(fp, limit=size)
635 for chunk in chunks:
635 for chunk in chunks:
636 bytecount += len(chunk)
636 bytecount += len(chunk)
637 totalbytecount += len(chunk)
637 totalbytecount += len(chunk)
638 progress.update(totalbytecount)
638 progress.update(totalbytecount)
639 yield chunk
639 yield chunk
640 if bytecount != size:
640 if bytecount != size:
641 # Would most likely be caused by a race due to `hg strip` or
641 # Would most likely be caused by a race due to `hg strip` or
642 # a revlog split
642 # a revlog split
643 raise error.Abort(
643 raise error.Abort(
644 _(
644 _(
645 b'clone could only read %d bytes from %s, but '
645 b'clone could only read %d bytes from %s, but '
646 b'expected %d bytes'
646 b'expected %d bytes'
647 )
647 )
648 % (bytecount, name, size)
648 % (bytecount, name, size)
649 )
649 )
650 finally:
650 finally:
651 fp.close()
651 fp.close()
652
652
653
653
654 def _test_sync_point_walk_1(repo):
654 def _test_sync_point_walk_1(repo):
655 """a function for synchronisation during tests"""
655 """a function for synchronisation during tests"""
656
656
657
657
658 def _test_sync_point_walk_2(repo):
658 def _test_sync_point_walk_2(repo):
659 """a function for synchronisation during tests"""
659 """a function for synchronisation during tests"""
660
660
661
661
662 def _entries_walk(repo, includes, excludes, includeobsmarkers):
663 """emit a seris of files information useful to clone a repo
664
665 return (vfs-key, entry) iterator
666
667 Where `entry` is StoreEntry. (used even for cache entries)
668 """
669 assert repo._currentlock(repo._lockref) is not None
670
671 matcher = None
672 if includes or excludes:
673 matcher = narrowspec.match(repo.root, includes, excludes)
674
675 phase = not repo.publishing()
676 entries = _walkstreamfiles(
677 repo,
678 matcher,
679 phase=phase,
680 obsolescence=includeobsmarkers,
681 )
682 for entry in entries:
683 yield (_srcstore, entry)
684
685
662 def _v2_walk(repo, includes, excludes, includeobsmarkers):
686 def _v2_walk(repo, includes, excludes, includeobsmarkers):
663 """emit a seris of files information useful to clone a repo
687 """emit a seris of files information useful to clone a repo
664
688
665 return (entries, totalfilesize)
689 return (entries, totalfilesize)
666
690
667 entries is a list of tuple (vfs-key, file-path, file-type, size)
691 entries is a list of tuple (vfs-key, file-path, file-type, size)
668
692
669 - `vfs-key`: is a key to the right vfs to write the file (see _makemap)
693 - `vfs-key`: is a key to the right vfs to write the file (see _makemap)
670 - `name`: file path of the file to copy (to be feed to the vfss)
694 - `name`: file path of the file to copy (to be feed to the vfss)
671 - `file-type`: do this file need to be copied with the source lock ?
695 - `file-type`: do this file need to be copied with the source lock ?
672 - `size`: the size of the file (or None)
696 - `size`: the size of the file (or None)
673 """
697 """
674 assert repo._currentlock(repo._lockref) is not None
698 assert repo._currentlock(repo._lockref) is not None
675 files = []
699 files = []
676 totalfilesize = 0
700 totalfilesize = 0
677
701
678 matcher = None
702 vfsmap = _makemap(repo)
679 if includes or excludes:
703 entries = _entries_walk(repo, includes, excludes, includeobsmarkers)
680 matcher = narrowspec.match(repo.root, includes, excludes)
704 for vfs_key, entry in entries:
681
705 vfs = vfsmap[vfs_key]
682 phase = not repo.publishing()
683 entries = _walkstreamfiles(
684 repo, matcher, phase=phase, obsolescence=includeobsmarkers
685 )
686 for entry in entries:
687 for f in entry.files():
706 for f in entry.files():
688 file_size = f.file_size(repo.store.vfs)
707 file_size = f.file_size(vfs)
689 if file_size:
708 if file_size:
690 ft = _fileappend
709 ft = _fileappend
691 if f.is_volatile:
710 if f.is_volatile:
692 ft = _filefull
711 ft = _filefull
693 files.append((_srcstore, f.unencoded_path, ft, file_size))
712 files.append((vfs_key, f.unencoded_path, ft, file_size))
694 totalfilesize += file_size
713 totalfilesize += file_size
695 for name in cacheutil.cachetocopy(repo):
714 for name in cacheutil.cachetocopy(repo):
696 if repo.cachevfs.exists(name):
715 if repo.cachevfs.exists(name):
697 totalfilesize += repo.cachevfs.lstat(name).st_size
716 totalfilesize += repo.cachevfs.lstat(name).st_size
698 files.append((_srccache, name, _filefull, None))
717 files.append((_srccache, name, _filefull, None))
699 return files, totalfilesize
718 return files, totalfilesize
700
719
701
720
702 def generatev2(repo, includes, excludes, includeobsmarkers):
721 def generatev2(repo, includes, excludes, includeobsmarkers):
703 """Emit content for version 2 of a streaming clone.
722 """Emit content for version 2 of a streaming clone.
704
723
705 the data stream consists the following entries:
724 the data stream consists the following entries:
706 1) A char representing the file destination (eg: store or cache)
725 1) A char representing the file destination (eg: store or cache)
707 2) A varint containing the length of the filename
726 2) A varint containing the length of the filename
708 3) A varint containing the length of file data
727 3) A varint containing the length of file data
709 4) N bytes containing the filename (the internal, store-agnostic form)
728 4) N bytes containing the filename (the internal, store-agnostic form)
710 5) N bytes containing the file data
729 5) N bytes containing the file data
711
730
712 Returns a 3-tuple of (file count, file size, data iterator).
731 Returns a 3-tuple of (file count, file size, data iterator).
713 """
732 """
714
733
715 with repo.lock():
734 with repo.lock():
716
735
717 repo.ui.debug(b'scanning\n')
736 repo.ui.debug(b'scanning\n')
718
737
719 entries, totalfilesize = _v2_walk(
738 entries, totalfilesize = _v2_walk(
720 repo,
739 repo,
721 includes=includes,
740 includes=includes,
722 excludes=excludes,
741 excludes=excludes,
723 includeobsmarkers=includeobsmarkers,
742 includeobsmarkers=includeobsmarkers,
724 )
743 )
725
744
726 chunks = _emit2(repo, entries, totalfilesize)
745 chunks = _emit2(repo, entries, totalfilesize)
727 first = next(chunks)
746 first = next(chunks)
728 assert first is None
747 assert first is None
729 _test_sync_point_walk_1(repo)
748 _test_sync_point_walk_1(repo)
730 _test_sync_point_walk_2(repo)
749 _test_sync_point_walk_2(repo)
731
750
732 return len(entries), totalfilesize, chunks
751 return len(entries), totalfilesize, chunks
733
752
734
753
735 @contextlib.contextmanager
754 @contextlib.contextmanager
736 def nested(*ctxs):
755 def nested(*ctxs):
737 this = ctxs[0]
756 this = ctxs[0]
738 rest = ctxs[1:]
757 rest = ctxs[1:]
739 with this:
758 with this:
740 if rest:
759 if rest:
741 with nested(*rest):
760 with nested(*rest):
742 yield
761 yield
743 else:
762 else:
744 yield
763 yield
745
764
746
765
747 def consumev2(repo, fp, filecount, filesize):
766 def consumev2(repo, fp, filecount, filesize):
748 """Apply the contents from a version 2 streaming clone.
767 """Apply the contents from a version 2 streaming clone.
749
768
750 Data is read from an object that only needs to provide a ``read(size)``
769 Data is read from an object that only needs to provide a ``read(size)``
751 method.
770 method.
752 """
771 """
753 with repo.lock():
772 with repo.lock():
754 repo.ui.status(
773 repo.ui.status(
755 _(b'%d files to transfer, %s of data\n')
774 _(b'%d files to transfer, %s of data\n')
756 % (filecount, util.bytecount(filesize))
775 % (filecount, util.bytecount(filesize))
757 )
776 )
758
777
759 start = util.timer()
778 start = util.timer()
760 progress = repo.ui.makeprogress(
779 progress = repo.ui.makeprogress(
761 _(b'clone'), total=filesize, unit=_(b'bytes')
780 _(b'clone'), total=filesize, unit=_(b'bytes')
762 )
781 )
763 progress.update(0)
782 progress.update(0)
764
783
765 vfsmap = _makemap(repo)
784 vfsmap = _makemap(repo)
766 # we keep repo.vfs out of the on purpose, ther are too many danger
785 # we keep repo.vfs out of the on purpose, ther are too many danger
767 # there (eg: .hg/hgrc),
786 # there (eg: .hg/hgrc),
768 #
787 #
769 # this assert is duplicated (from _makemap) as author might think this
788 # this assert is duplicated (from _makemap) as author might think this
770 # is fine, while this is really not fine.
789 # is fine, while this is really not fine.
771 if repo.vfs in vfsmap.values():
790 if repo.vfs in vfsmap.values():
772 raise error.ProgrammingError(
791 raise error.ProgrammingError(
773 b'repo.vfs must not be added to vfsmap for security reasons'
792 b'repo.vfs must not be added to vfsmap for security reasons'
774 )
793 )
775
794
776 with repo.transaction(b'clone'):
795 with repo.transaction(b'clone'):
777 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
796 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
778 with nested(*ctxs):
797 with nested(*ctxs):
779 for i in range(filecount):
798 for i in range(filecount):
780 src = util.readexactly(fp, 1)
799 src = util.readexactly(fp, 1)
781 vfs = vfsmap[src]
800 vfs = vfsmap[src]
782 namelen = util.uvarintdecodestream(fp)
801 namelen = util.uvarintdecodestream(fp)
783 datalen = util.uvarintdecodestream(fp)
802 datalen = util.uvarintdecodestream(fp)
784
803
785 name = util.readexactly(fp, namelen)
804 name = util.readexactly(fp, namelen)
786
805
787 if repo.ui.debugflag:
806 if repo.ui.debugflag:
788 repo.ui.debug(
807 repo.ui.debug(
789 b'adding [%s] %s (%s)\n'
808 b'adding [%s] %s (%s)\n'
790 % (src, name, util.bytecount(datalen))
809 % (src, name, util.bytecount(datalen))
791 )
810 )
792
811
793 with vfs(name, b'w') as ofp:
812 with vfs(name, b'w') as ofp:
794 for chunk in util.filechunkiter(fp, limit=datalen):
813 for chunk in util.filechunkiter(fp, limit=datalen):
795 progress.increment(step=len(chunk))
814 progress.increment(step=len(chunk))
796 ofp.write(chunk)
815 ofp.write(chunk)
797
816
798 # force @filecache properties to be reloaded from
817 # force @filecache properties to be reloaded from
799 # streamclone-ed file at next access
818 # streamclone-ed file at next access
800 repo.invalidate(clearfilecache=True)
819 repo.invalidate(clearfilecache=True)
801
820
802 elapsed = util.timer() - start
821 elapsed = util.timer() - start
803 if elapsed <= 0:
822 if elapsed <= 0:
804 elapsed = 0.001
823 elapsed = 0.001
805 repo.ui.status(
824 repo.ui.status(
806 _(b'transferred %s in %.1f seconds (%s/sec)\n')
825 _(b'transferred %s in %.1f seconds (%s/sec)\n')
807 % (
826 % (
808 util.bytecount(progress.pos),
827 util.bytecount(progress.pos),
809 elapsed,
828 elapsed,
810 util.bytecount(progress.pos / elapsed),
829 util.bytecount(progress.pos / elapsed),
811 )
830 )
812 )
831 )
813 progress.complete()
832 progress.complete()
814
833
815
834
816 def applybundlev2(repo, fp, filecount, filesize, requirements):
835 def applybundlev2(repo, fp, filecount, filesize, requirements):
817 from . import localrepo
836 from . import localrepo
818
837
819 missingreqs = [r for r in requirements if r not in repo.supported]
838 missingreqs = [r for r in requirements if r not in repo.supported]
820 if missingreqs:
839 if missingreqs:
821 raise error.Abort(
840 raise error.Abort(
822 _(b'unable to apply stream clone: unsupported format: %s')
841 _(b'unable to apply stream clone: unsupported format: %s')
823 % b', '.join(sorted(missingreqs))
842 % b', '.join(sorted(missingreqs))
824 )
843 )
825
844
826 consumev2(repo, fp, filecount, filesize)
845 consumev2(repo, fp, filecount, filesize)
827
846
828 repo.requirements = new_stream_clone_requirements(
847 repo.requirements = new_stream_clone_requirements(
829 repo.requirements,
848 repo.requirements,
830 requirements,
849 requirements,
831 )
850 )
832 repo.svfs.options = localrepo.resolvestorevfsoptions(
851 repo.svfs.options = localrepo.resolvestorevfsoptions(
833 repo.ui, repo.requirements, repo.features
852 repo.ui, repo.requirements, repo.features
834 )
853 )
835 scmutil.writereporequirements(repo)
854 scmutil.writereporequirements(repo)
836 nodemap.post_stream_cleanup(repo)
855 nodemap.post_stream_cleanup(repo)
837
856
838
857
839 def _copy_files(src_vfs_map, dst_vfs_map, entries, progress):
858 def _copy_files(src_vfs_map, dst_vfs_map, entries, progress):
840 hardlink = [True]
859 hardlink = [True]
841
860
842 def copy_used():
861 def copy_used():
843 hardlink[0] = False
862 hardlink[0] = False
844 progress.topic = _(b'copying')
863 progress.topic = _(b'copying')
845
864
846 for k, path, size in entries:
865 for k, path, size in entries:
847 src_vfs = src_vfs_map[k]
866 src_vfs = src_vfs_map[k]
848 dst_vfs = dst_vfs_map[k]
867 dst_vfs = dst_vfs_map[k]
849 src_path = src_vfs.join(path)
868 src_path = src_vfs.join(path)
850 dst_path = dst_vfs.join(path)
869 dst_path = dst_vfs.join(path)
851 # We cannot use dirname and makedirs of dst_vfs here because the store
870 # We cannot use dirname and makedirs of dst_vfs here because the store
852 # encoding confuses them. See issue 6581 for details.
871 # encoding confuses them. See issue 6581 for details.
853 dirname = os.path.dirname(dst_path)
872 dirname = os.path.dirname(dst_path)
854 if not os.path.exists(dirname):
873 if not os.path.exists(dirname):
855 util.makedirs(dirname)
874 util.makedirs(dirname)
856 dst_vfs.register_file(path)
875 dst_vfs.register_file(path)
857 # XXX we could use the #nb_bytes argument.
876 # XXX we could use the #nb_bytes argument.
858 util.copyfile(
877 util.copyfile(
859 src_path,
878 src_path,
860 dst_path,
879 dst_path,
861 hardlink=hardlink[0],
880 hardlink=hardlink[0],
862 no_hardlink_cb=copy_used,
881 no_hardlink_cb=copy_used,
863 check_fs_hardlink=False,
882 check_fs_hardlink=False,
864 )
883 )
865 progress.increment()
884 progress.increment()
866 return hardlink[0]
885 return hardlink[0]
867
886
868
887
869 def local_copy(src_repo, dest_repo):
888 def local_copy(src_repo, dest_repo):
870 """copy all content from one local repository to another
889 """copy all content from one local repository to another
871
890
872 This is useful for local clone"""
891 This is useful for local clone"""
873 src_store_requirements = {
892 src_store_requirements = {
874 r
893 r
875 for r in src_repo.requirements
894 for r in src_repo.requirements
876 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
895 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
877 }
896 }
878 dest_store_requirements = {
897 dest_store_requirements = {
879 r
898 r
880 for r in dest_repo.requirements
899 for r in dest_repo.requirements
881 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
900 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
882 }
901 }
883 assert src_store_requirements == dest_store_requirements
902 assert src_store_requirements == dest_store_requirements
884
903
885 with dest_repo.lock():
904 with dest_repo.lock():
886 with src_repo.lock():
905 with src_repo.lock():
887
906
888 # bookmark is not integrated to the streaming as it might use the
907 # bookmark is not integrated to the streaming as it might use the
889 # `repo.vfs` and they are too many sentitive data accessible
908 # `repo.vfs` and they are too many sentitive data accessible
890 # through `repo.vfs` to expose it to streaming clone.
909 # through `repo.vfs` to expose it to streaming clone.
891 src_book_vfs = bookmarks.bookmarksvfs(src_repo)
910 src_book_vfs = bookmarks.bookmarksvfs(src_repo)
892 srcbookmarks = src_book_vfs.join(b'bookmarks')
911 srcbookmarks = src_book_vfs.join(b'bookmarks')
893 bm_count = 0
912 bm_count = 0
894 if os.path.exists(srcbookmarks):
913 if os.path.exists(srcbookmarks):
895 bm_count = 1
914 bm_count = 1
896
915
897 entries, totalfilesize = _v2_walk(
916 entries, totalfilesize = _v2_walk(
898 src_repo,
917 src_repo,
899 includes=None,
918 includes=None,
900 excludes=None,
919 excludes=None,
901 includeobsmarkers=True,
920 includeobsmarkers=True,
902 )
921 )
903 src_vfs_map = _makemap(src_repo)
922 src_vfs_map = _makemap(src_repo)
904 dest_vfs_map = _makemap(dest_repo)
923 dest_vfs_map = _makemap(dest_repo)
905 progress = src_repo.ui.makeprogress(
924 progress = src_repo.ui.makeprogress(
906 topic=_(b'linking'),
925 topic=_(b'linking'),
907 total=len(entries) + bm_count,
926 total=len(entries) + bm_count,
908 unit=_(b'files'),
927 unit=_(b'files'),
909 )
928 )
910 # copy files
929 # copy files
911 #
930 #
912 # We could copy the full file while the source repository is locked
931 # We could copy the full file while the source repository is locked
913 # and the other one without the lock. However, in the linking case,
932 # and the other one without the lock. However, in the linking case,
914 # this would also requires checks that nobody is appending any data
933 # this would also requires checks that nobody is appending any data
915 # to the files while we do the clone, so this is not done yet. We
934 # to the files while we do the clone, so this is not done yet. We
916 # could do this blindly when copying files.
935 # could do this blindly when copying files.
917 files = ((k, path, size) for k, path, ftype, size in entries)
936 files = ((k, path, size) for k, path, ftype, size in entries)
918 hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress)
937 hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress)
919
938
920 # copy bookmarks over
939 # copy bookmarks over
921 if bm_count:
940 if bm_count:
922 dst_book_vfs = bookmarks.bookmarksvfs(dest_repo)
941 dst_book_vfs = bookmarks.bookmarksvfs(dest_repo)
923 dstbookmarks = dst_book_vfs.join(b'bookmarks')
942 dstbookmarks = dst_book_vfs.join(b'bookmarks')
924 util.copyfile(srcbookmarks, dstbookmarks)
943 util.copyfile(srcbookmarks, dstbookmarks)
925 progress.complete()
944 progress.complete()
926 if hardlink:
945 if hardlink:
927 msg = b'linked %d files\n'
946 msg = b'linked %d files\n'
928 else:
947 else:
929 msg = b'copied %d files\n'
948 msg = b'copied %d files\n'
930 src_repo.ui.debug(msg % (len(entries) + bm_count))
949 src_repo.ui.debug(msg % (len(entries) + bm_count))
931
950
932 with dest_repo.transaction(b"localclone") as tr:
951 with dest_repo.transaction(b"localclone") as tr:
933 dest_repo.store.write(tr)
952 dest_repo.store.write(tr)
934
953
935 # clean up transaction file as they do not make sense
954 # clean up transaction file as they do not make sense
936 transaction.cleanup_undo_files(dest_repo.ui.warn, dest_repo.vfs_map)
955 transaction.cleanup_undo_files(dest_repo.ui.warn, dest_repo.vfs_map)
General Comments 0
You need to be logged in to leave comments. Login now