##// END OF EJS Templates
stream-clone: bail-out earlier if stream clone is not requested...
marmoute -
r51415:58e4842f default
parent child Browse files
Show More
@@ -1,960 +1,959 b''
1 # streamclone.py - producing and consuming streaming repository data
1 # streamclone.py - producing and consuming streaming repository data
2 #
2 #
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8
8
9 import contextlib
9 import contextlib
10 import os
10 import os
11 import struct
11 import struct
12
12
13 from .i18n import _
13 from .i18n import _
14 from .pycompat import open
14 from .pycompat import open
15 from .interfaces import repository
15 from .interfaces import repository
16 from . import (
16 from . import (
17 bookmarks,
17 bookmarks,
18 cacheutil,
18 cacheutil,
19 error,
19 error,
20 narrowspec,
20 narrowspec,
21 phases,
21 phases,
22 pycompat,
22 pycompat,
23 requirements as requirementsmod,
23 requirements as requirementsmod,
24 scmutil,
24 scmutil,
25 store,
25 store,
26 transaction,
26 transaction,
27 util,
27 util,
28 )
28 )
29 from .revlogutils import (
29 from .revlogutils import (
30 nodemap,
30 nodemap,
31 )
31 )
32
32
33
33
34 def new_stream_clone_requirements(default_requirements, streamed_requirements):
34 def new_stream_clone_requirements(default_requirements, streamed_requirements):
35 """determine the final set of requirement for a new stream clone
35 """determine the final set of requirement for a new stream clone
36
36
37 this method combine the "default" requirements that a new repository would
37 this method combine the "default" requirements that a new repository would
38 use with the constaint we get from the stream clone content. We keep local
38 use with the constaint we get from the stream clone content. We keep local
39 configuration choice when possible.
39 configuration choice when possible.
40 """
40 """
41 requirements = set(default_requirements)
41 requirements = set(default_requirements)
42 requirements -= requirementsmod.STREAM_FIXED_REQUIREMENTS
42 requirements -= requirementsmod.STREAM_FIXED_REQUIREMENTS
43 requirements.update(streamed_requirements)
43 requirements.update(streamed_requirements)
44 return requirements
44 return requirements
45
45
46
46
47 def streamed_requirements(repo):
47 def streamed_requirements(repo):
48 """the set of requirement the new clone will have to support
48 """the set of requirement the new clone will have to support
49
49
50 This is used for advertising the stream options and to generate the actual
50 This is used for advertising the stream options and to generate the actual
51 stream content."""
51 stream content."""
52 requiredformats = (
52 requiredformats = (
53 repo.requirements & requirementsmod.STREAM_FIXED_REQUIREMENTS
53 repo.requirements & requirementsmod.STREAM_FIXED_REQUIREMENTS
54 )
54 )
55 return requiredformats
55 return requiredformats
56
56
57
57
58 def canperformstreamclone(pullop, bundle2=False):
58 def canperformstreamclone(pullop, bundle2=False):
59 """Whether it is possible to perform a streaming clone as part of pull.
59 """Whether it is possible to perform a streaming clone as part of pull.
60
60
61 ``bundle2`` will cause the function to consider stream clone through
61 ``bundle2`` will cause the function to consider stream clone through
62 bundle2 and only through bundle2.
62 bundle2 and only through bundle2.
63
63
64 Returns a tuple of (supported, requirements). ``supported`` is True if
64 Returns a tuple of (supported, requirements). ``supported`` is True if
65 streaming clone is supported and False otherwise. ``requirements`` is
65 streaming clone is supported and False otherwise. ``requirements`` is
66 a set of repo requirements from the remote, or ``None`` if stream clone
66 a set of repo requirements from the remote, or ``None`` if stream clone
67 isn't supported.
67 isn't supported.
68 """
68 """
69 repo = pullop.repo
69 repo = pullop.repo
70 remote = pullop.remote
70 remote = pullop.remote
71
71
72 # should we consider streaming clone at all ?
73 streamrequested = pullop.streamclonerequested
74 # If we don't have a preference, let the server decide for us. This
75 # likely only comes into play in LANs.
76 if streamrequested is None:
77 # The server can advertise whether to prefer streaming clone.
78 streamrequested = remote.capable(b'stream-preferred')
79 if not streamrequested:
80 return False, None
81
72 # Streaming clone only works on an empty destination repository
82 # Streaming clone only works on an empty destination repository
73 if len(repo):
83 if len(repo):
74 return False, None
84 return False, None
75
85
76 # Streaming clone only works if all data is being requested.
86 # Streaming clone only works if all data is being requested.
77 if pullop.heads:
87 if pullop.heads:
78 return False, None
88 return False, None
79
89
80 bundle2supported = False
90 bundle2supported = False
81 if pullop.canusebundle2:
91 if pullop.canusebundle2:
82 if b'v2' in pullop.remotebundle2caps.get(b'stream', []):
92 if b'v2' in pullop.remotebundle2caps.get(b'stream', []):
83 bundle2supported = True
93 bundle2supported = True
84 # else
94 # else
85 # Server doesn't support bundle2 stream clone or doesn't support
95 # Server doesn't support bundle2 stream clone or doesn't support
86 # the versions we support. Fall back and possibly allow legacy.
96 # the versions we support. Fall back and possibly allow legacy.
87
97
88 # Ensures legacy code path uses available bundle2.
98 # Ensures legacy code path uses available bundle2.
89 if bundle2supported and not bundle2:
99 if bundle2supported and not bundle2:
90 return False, None
100 return False, None
91 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
101 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
92 elif bundle2 and not bundle2supported:
102 elif bundle2 and not bundle2supported:
93 return False, None
103 return False, None
94
104
95 streamrequested = pullop.streamclonerequested
96
97 # If we don't have a preference, let the server decide for us. This
98 # likely only comes into play in LANs.
99 if streamrequested is None:
100 # The server can advertise whether to prefer streaming clone.
101 streamrequested = remote.capable(b'stream-preferred')
102
103 if not streamrequested:
104 return False, None
105
106 # In order for stream clone to work, the client has to support all the
105 # In order for stream clone to work, the client has to support all the
107 # requirements advertised by the server.
106 # requirements advertised by the server.
108 #
107 #
109 # The server advertises its requirements via the "stream" and "streamreqs"
108 # The server advertises its requirements via the "stream" and "streamreqs"
110 # capability. "stream" (a value-less capability) is advertised if and only
109 # capability. "stream" (a value-less capability) is advertised if and only
111 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
110 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
112 # is advertised and contains a comma-delimited list of requirements.
111 # is advertised and contains a comma-delimited list of requirements.
113 requirements = set()
112 requirements = set()
114 if remote.capable(b'stream'):
113 if remote.capable(b'stream'):
115 requirements.add(requirementsmod.REVLOGV1_REQUIREMENT)
114 requirements.add(requirementsmod.REVLOGV1_REQUIREMENT)
116 else:
115 else:
117 streamreqs = remote.capable(b'streamreqs')
116 streamreqs = remote.capable(b'streamreqs')
118 # This is weird and shouldn't happen with modern servers.
117 # This is weird and shouldn't happen with modern servers.
119 if not streamreqs:
118 if not streamreqs:
120 pullop.repo.ui.warn(
119 pullop.repo.ui.warn(
121 _(
120 _(
122 b'warning: stream clone requested but server has them '
121 b'warning: stream clone requested but server has them '
123 b'disabled\n'
122 b'disabled\n'
124 )
123 )
125 )
124 )
126 return False, None
125 return False, None
127
126
128 streamreqs = set(streamreqs.split(b','))
127 streamreqs = set(streamreqs.split(b','))
129 # Server requires something we don't support. Bail.
128 # Server requires something we don't support. Bail.
130 missingreqs = streamreqs - repo.supported
129 missingreqs = streamreqs - repo.supported
131 if missingreqs:
130 if missingreqs:
132 pullop.repo.ui.warn(
131 pullop.repo.ui.warn(
133 _(
132 _(
134 b'warning: stream clone requested but client is missing '
133 b'warning: stream clone requested but client is missing '
135 b'requirements: %s\n'
134 b'requirements: %s\n'
136 )
135 )
137 % b', '.join(sorted(missingreqs))
136 % b', '.join(sorted(missingreqs))
138 )
137 )
139 pullop.repo.ui.warn(
138 pullop.repo.ui.warn(
140 _(
139 _(
141 b'(see https://www.mercurial-scm.org/wiki/MissingRequirement '
140 b'(see https://www.mercurial-scm.org/wiki/MissingRequirement '
142 b'for more information)\n'
141 b'for more information)\n'
143 )
142 )
144 )
143 )
145 return False, None
144 return False, None
146 requirements = streamreqs
145 requirements = streamreqs
147
146
148 return True, requirements
147 return True, requirements
149
148
150
149
151 def maybeperformlegacystreamclone(pullop):
150 def maybeperformlegacystreamclone(pullop):
152 """Possibly perform a legacy stream clone operation.
151 """Possibly perform a legacy stream clone operation.
153
152
154 Legacy stream clones are performed as part of pull but before all other
153 Legacy stream clones are performed as part of pull but before all other
155 operations.
154 operations.
156
155
157 A legacy stream clone will not be performed if a bundle2 stream clone is
156 A legacy stream clone will not be performed if a bundle2 stream clone is
158 supported.
157 supported.
159 """
158 """
160 from . import localrepo
159 from . import localrepo
161
160
162 supported, requirements = canperformstreamclone(pullop)
161 supported, requirements = canperformstreamclone(pullop)
163
162
164 if not supported:
163 if not supported:
165 return
164 return
166
165
167 repo = pullop.repo
166 repo = pullop.repo
168 remote = pullop.remote
167 remote = pullop.remote
169
168
170 # Save remote branchmap. We will use it later to speed up branchcache
169 # Save remote branchmap. We will use it later to speed up branchcache
171 # creation.
170 # creation.
172 rbranchmap = None
171 rbranchmap = None
173 if remote.capable(b'branchmap'):
172 if remote.capable(b'branchmap'):
174 with remote.commandexecutor() as e:
173 with remote.commandexecutor() as e:
175 rbranchmap = e.callcommand(b'branchmap', {}).result()
174 rbranchmap = e.callcommand(b'branchmap', {}).result()
176
175
177 repo.ui.status(_(b'streaming all changes\n'))
176 repo.ui.status(_(b'streaming all changes\n'))
178
177
179 with remote.commandexecutor() as e:
178 with remote.commandexecutor() as e:
180 fp = e.callcommand(b'stream_out', {}).result()
179 fp = e.callcommand(b'stream_out', {}).result()
181
180
182 # TODO strictly speaking, this code should all be inside the context
181 # TODO strictly speaking, this code should all be inside the context
183 # manager because the context manager is supposed to ensure all wire state
182 # manager because the context manager is supposed to ensure all wire state
184 # is flushed when exiting. But the legacy peers don't do this, so it
183 # is flushed when exiting. But the legacy peers don't do this, so it
185 # doesn't matter.
184 # doesn't matter.
186 l = fp.readline()
185 l = fp.readline()
187 try:
186 try:
188 resp = int(l)
187 resp = int(l)
189 except ValueError:
188 except ValueError:
190 raise error.ResponseError(
189 raise error.ResponseError(
191 _(b'unexpected response from remote server:'), l
190 _(b'unexpected response from remote server:'), l
192 )
191 )
193 if resp == 1:
192 if resp == 1:
194 raise error.Abort(_(b'operation forbidden by server'))
193 raise error.Abort(_(b'operation forbidden by server'))
195 elif resp == 2:
194 elif resp == 2:
196 raise error.Abort(_(b'locking the remote repository failed'))
195 raise error.Abort(_(b'locking the remote repository failed'))
197 elif resp != 0:
196 elif resp != 0:
198 raise error.Abort(_(b'the server sent an unknown error code'))
197 raise error.Abort(_(b'the server sent an unknown error code'))
199
198
200 l = fp.readline()
199 l = fp.readline()
201 try:
200 try:
202 filecount, bytecount = map(int, l.split(b' ', 1))
201 filecount, bytecount = map(int, l.split(b' ', 1))
203 except (ValueError, TypeError):
202 except (ValueError, TypeError):
204 raise error.ResponseError(
203 raise error.ResponseError(
205 _(b'unexpected response from remote server:'), l
204 _(b'unexpected response from remote server:'), l
206 )
205 )
207
206
208 with repo.lock():
207 with repo.lock():
209 consumev1(repo, fp, filecount, bytecount)
208 consumev1(repo, fp, filecount, bytecount)
210 repo.requirements = new_stream_clone_requirements(
209 repo.requirements = new_stream_clone_requirements(
211 repo.requirements,
210 repo.requirements,
212 requirements,
211 requirements,
213 )
212 )
214 repo.svfs.options = localrepo.resolvestorevfsoptions(
213 repo.svfs.options = localrepo.resolvestorevfsoptions(
215 repo.ui, repo.requirements, repo.features
214 repo.ui, repo.requirements, repo.features
216 )
215 )
217 scmutil.writereporequirements(repo)
216 scmutil.writereporequirements(repo)
218 nodemap.post_stream_cleanup(repo)
217 nodemap.post_stream_cleanup(repo)
219
218
220 if rbranchmap:
219 if rbranchmap:
221 repo._branchcaches.replace(repo, rbranchmap)
220 repo._branchcaches.replace(repo, rbranchmap)
222
221
223 repo.invalidate()
222 repo.invalidate()
224
223
225
224
226 def allowservergeneration(repo):
225 def allowservergeneration(repo):
227 """Whether streaming clones are allowed from the server."""
226 """Whether streaming clones are allowed from the server."""
228 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
227 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
229 return False
228 return False
230
229
231 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
230 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
232 return False
231 return False
233
232
234 # The way stream clone works makes it impossible to hide secret changesets.
233 # The way stream clone works makes it impossible to hide secret changesets.
235 # So don't allow this by default.
234 # So don't allow this by default.
236 secret = phases.hassecret(repo)
235 secret = phases.hassecret(repo)
237 if secret:
236 if secret:
238 return repo.ui.configbool(b'server', b'uncompressedallowsecret')
237 return repo.ui.configbool(b'server', b'uncompressedallowsecret')
239
238
240 return True
239 return True
241
240
242
241
243 # This is it's own function so extensions can override it.
242 # This is it's own function so extensions can override it.
244 def _walkstreamfiles(repo, matcher=None, phase=False, obsolescence=False):
243 def _walkstreamfiles(repo, matcher=None, phase=False, obsolescence=False):
245 return repo.store.walk(matcher, phase=phase, obsolescence=obsolescence)
244 return repo.store.walk(matcher, phase=phase, obsolescence=obsolescence)
246
245
247
246
248 def generatev1(repo):
247 def generatev1(repo):
249 """Emit content for version 1 of a streaming clone.
248 """Emit content for version 1 of a streaming clone.
250
249
251 This returns a 3-tuple of (file count, byte size, data iterator).
250 This returns a 3-tuple of (file count, byte size, data iterator).
252
251
253 The data iterator consists of N entries for each file being transferred.
252 The data iterator consists of N entries for each file being transferred.
254 Each file entry starts as a line with the file name and integer size
253 Each file entry starts as a line with the file name and integer size
255 delimited by a null byte.
254 delimited by a null byte.
256
255
257 The raw file data follows. Following the raw file data is the next file
256 The raw file data follows. Following the raw file data is the next file
258 entry, or EOF.
257 entry, or EOF.
259
258
260 When used on the wire protocol, an additional line indicating protocol
259 When used on the wire protocol, an additional line indicating protocol
261 success will be prepended to the stream. This function is not responsible
260 success will be prepended to the stream. This function is not responsible
262 for adding it.
261 for adding it.
263
262
264 This function will obtain a repository lock to ensure a consistent view of
263 This function will obtain a repository lock to ensure a consistent view of
265 the store is captured. It therefore may raise LockError.
264 the store is captured. It therefore may raise LockError.
266 """
265 """
267 entries = []
266 entries = []
268 total_bytes = 0
267 total_bytes = 0
269 # Get consistent snapshot of repo, lock during scan.
268 # Get consistent snapshot of repo, lock during scan.
270 with repo.lock():
269 with repo.lock():
271 repo.ui.debug(b'scanning\n')
270 repo.ui.debug(b'scanning\n')
272 for entry in _walkstreamfiles(repo):
271 for entry in _walkstreamfiles(repo):
273 for f in entry.files():
272 for f in entry.files():
274 file_size = f.file_size(repo.store.vfs)
273 file_size = f.file_size(repo.store.vfs)
275 if file_size:
274 if file_size:
276 entries.append((f.unencoded_path, file_size))
275 entries.append((f.unencoded_path, file_size))
277 total_bytes += file_size
276 total_bytes += file_size
278 _test_sync_point_walk_1(repo)
277 _test_sync_point_walk_1(repo)
279 _test_sync_point_walk_2(repo)
278 _test_sync_point_walk_2(repo)
280
279
281 repo.ui.debug(
280 repo.ui.debug(
282 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
281 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
283 )
282 )
284
283
285 svfs = repo.svfs
284 svfs = repo.svfs
286 debugflag = repo.ui.debugflag
285 debugflag = repo.ui.debugflag
287
286
288 def emitrevlogdata():
287 def emitrevlogdata():
289 for name, size in entries:
288 for name, size in entries:
290 if debugflag:
289 if debugflag:
291 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
290 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
292 # partially encode name over the wire for backwards compat
291 # partially encode name over the wire for backwards compat
293 yield b'%s\0%d\n' % (store.encodedir(name), size)
292 yield b'%s\0%d\n' % (store.encodedir(name), size)
294 # auditing at this stage is both pointless (paths are already
293 # auditing at this stage is both pointless (paths are already
295 # trusted by the local repo) and expensive
294 # trusted by the local repo) and expensive
296 with svfs(name, b'rb', auditpath=False) as fp:
295 with svfs(name, b'rb', auditpath=False) as fp:
297 if size <= 65536:
296 if size <= 65536:
298 yield fp.read(size)
297 yield fp.read(size)
299 else:
298 else:
300 for chunk in util.filechunkiter(fp, limit=size):
299 for chunk in util.filechunkiter(fp, limit=size):
301 yield chunk
300 yield chunk
302
301
303 return len(entries), total_bytes, emitrevlogdata()
302 return len(entries), total_bytes, emitrevlogdata()
304
303
305
304
306 def generatev1wireproto(repo):
305 def generatev1wireproto(repo):
307 """Emit content for version 1 of streaming clone suitable for the wire.
306 """Emit content for version 1 of streaming clone suitable for the wire.
308
307
309 This is the data output from ``generatev1()`` with 2 header lines. The
308 This is the data output from ``generatev1()`` with 2 header lines. The
310 first line indicates overall success. The 2nd contains the file count and
309 first line indicates overall success. The 2nd contains the file count and
311 byte size of payload.
310 byte size of payload.
312
311
313 The success line contains "0" for success, "1" for stream generation not
312 The success line contains "0" for success, "1" for stream generation not
314 allowed, and "2" for error locking the repository (possibly indicating
313 allowed, and "2" for error locking the repository (possibly indicating
315 a permissions error for the server process).
314 a permissions error for the server process).
316 """
315 """
317 if not allowservergeneration(repo):
316 if not allowservergeneration(repo):
318 yield b'1\n'
317 yield b'1\n'
319 return
318 return
320
319
321 try:
320 try:
322 filecount, bytecount, it = generatev1(repo)
321 filecount, bytecount, it = generatev1(repo)
323 except error.LockError:
322 except error.LockError:
324 yield b'2\n'
323 yield b'2\n'
325 return
324 return
326
325
327 # Indicates successful response.
326 # Indicates successful response.
328 yield b'0\n'
327 yield b'0\n'
329 yield b'%d %d\n' % (filecount, bytecount)
328 yield b'%d %d\n' % (filecount, bytecount)
330 for chunk in it:
329 for chunk in it:
331 yield chunk
330 yield chunk
332
331
333
332
334 def generatebundlev1(repo, compression=b'UN'):
333 def generatebundlev1(repo, compression=b'UN'):
335 """Emit content for version 1 of a stream clone bundle.
334 """Emit content for version 1 of a stream clone bundle.
336
335
337 The first 4 bytes of the output ("HGS1") denote this as stream clone
336 The first 4 bytes of the output ("HGS1") denote this as stream clone
338 bundle version 1.
337 bundle version 1.
339
338
340 The next 2 bytes indicate the compression type. Only "UN" is currently
339 The next 2 bytes indicate the compression type. Only "UN" is currently
341 supported.
340 supported.
342
341
343 The next 16 bytes are two 64-bit big endian unsigned integers indicating
342 The next 16 bytes are two 64-bit big endian unsigned integers indicating
344 file count and byte count, respectively.
343 file count and byte count, respectively.
345
344
346 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
345 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
347 of the requirements string, including a trailing \0. The following N bytes
346 of the requirements string, including a trailing \0. The following N bytes
348 are the requirements string, which is ASCII containing a comma-delimited
347 are the requirements string, which is ASCII containing a comma-delimited
349 list of repo requirements that are needed to support the data.
348 list of repo requirements that are needed to support the data.
350
349
351 The remaining content is the output of ``generatev1()`` (which may be
350 The remaining content is the output of ``generatev1()`` (which may be
352 compressed in the future).
351 compressed in the future).
353
352
354 Returns a tuple of (requirements, data generator).
353 Returns a tuple of (requirements, data generator).
355 """
354 """
356 if compression != b'UN':
355 if compression != b'UN':
357 raise ValueError(b'we do not support the compression argument yet')
356 raise ValueError(b'we do not support the compression argument yet')
358
357
359 requirements = streamed_requirements(repo)
358 requirements = streamed_requirements(repo)
360 requires = b','.join(sorted(requirements))
359 requires = b','.join(sorted(requirements))
361
360
362 def gen():
361 def gen():
363 yield b'HGS1'
362 yield b'HGS1'
364 yield compression
363 yield compression
365
364
366 filecount, bytecount, it = generatev1(repo)
365 filecount, bytecount, it = generatev1(repo)
367 repo.ui.status(
366 repo.ui.status(
368 _(b'writing %d bytes for %d files\n') % (bytecount, filecount)
367 _(b'writing %d bytes for %d files\n') % (bytecount, filecount)
369 )
368 )
370
369
371 yield struct.pack(b'>QQ', filecount, bytecount)
370 yield struct.pack(b'>QQ', filecount, bytecount)
372 yield struct.pack(b'>H', len(requires) + 1)
371 yield struct.pack(b'>H', len(requires) + 1)
373 yield requires + b'\0'
372 yield requires + b'\0'
374
373
375 # This is where we'll add compression in the future.
374 # This is where we'll add compression in the future.
376 assert compression == b'UN'
375 assert compression == b'UN'
377
376
378 progress = repo.ui.makeprogress(
377 progress = repo.ui.makeprogress(
379 _(b'bundle'), total=bytecount, unit=_(b'bytes')
378 _(b'bundle'), total=bytecount, unit=_(b'bytes')
380 )
379 )
381 progress.update(0)
380 progress.update(0)
382
381
383 for chunk in it:
382 for chunk in it:
384 progress.increment(step=len(chunk))
383 progress.increment(step=len(chunk))
385 yield chunk
384 yield chunk
386
385
387 progress.complete()
386 progress.complete()
388
387
389 return requirements, gen()
388 return requirements, gen()
390
389
391
390
392 def consumev1(repo, fp, filecount, bytecount):
391 def consumev1(repo, fp, filecount, bytecount):
393 """Apply the contents from version 1 of a streaming clone file handle.
392 """Apply the contents from version 1 of a streaming clone file handle.
394
393
395 This takes the output from "stream_out" and applies it to the specified
394 This takes the output from "stream_out" and applies it to the specified
396 repository.
395 repository.
397
396
398 Like "stream_out," the status line added by the wire protocol is not
397 Like "stream_out," the status line added by the wire protocol is not
399 handled by this function.
398 handled by this function.
400 """
399 """
401 with repo.lock():
400 with repo.lock():
402 repo.ui.status(
401 repo.ui.status(
403 _(b'%d files to transfer, %s of data\n')
402 _(b'%d files to transfer, %s of data\n')
404 % (filecount, util.bytecount(bytecount))
403 % (filecount, util.bytecount(bytecount))
405 )
404 )
406 progress = repo.ui.makeprogress(
405 progress = repo.ui.makeprogress(
407 _(b'clone'), total=bytecount, unit=_(b'bytes')
406 _(b'clone'), total=bytecount, unit=_(b'bytes')
408 )
407 )
409 progress.update(0)
408 progress.update(0)
410 start = util.timer()
409 start = util.timer()
411
410
412 # TODO: get rid of (potential) inconsistency
411 # TODO: get rid of (potential) inconsistency
413 #
412 #
414 # If transaction is started and any @filecache property is
413 # If transaction is started and any @filecache property is
415 # changed at this point, it causes inconsistency between
414 # changed at this point, it causes inconsistency between
416 # in-memory cached property and streamclone-ed file on the
415 # in-memory cached property and streamclone-ed file on the
417 # disk. Nested transaction prevents transaction scope "clone"
416 # disk. Nested transaction prevents transaction scope "clone"
418 # below from writing in-memory changes out at the end of it,
417 # below from writing in-memory changes out at the end of it,
419 # even though in-memory changes are discarded at the end of it
418 # even though in-memory changes are discarded at the end of it
420 # regardless of transaction nesting.
419 # regardless of transaction nesting.
421 #
420 #
422 # But transaction nesting can't be simply prohibited, because
421 # But transaction nesting can't be simply prohibited, because
423 # nesting occurs also in ordinary case (e.g. enabling
422 # nesting occurs also in ordinary case (e.g. enabling
424 # clonebundles).
423 # clonebundles).
425
424
426 with repo.transaction(b'clone'):
425 with repo.transaction(b'clone'):
427 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
426 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
428 for i in range(filecount):
427 for i in range(filecount):
429 # XXX doesn't support '\n' or '\r' in filenames
428 # XXX doesn't support '\n' or '\r' in filenames
430 l = fp.readline()
429 l = fp.readline()
431 try:
430 try:
432 name, size = l.split(b'\0', 1)
431 name, size = l.split(b'\0', 1)
433 size = int(size)
432 size = int(size)
434 except (ValueError, TypeError):
433 except (ValueError, TypeError):
435 raise error.ResponseError(
434 raise error.ResponseError(
436 _(b'unexpected response from remote server:'), l
435 _(b'unexpected response from remote server:'), l
437 )
436 )
438 if repo.ui.debugflag:
437 if repo.ui.debugflag:
439 repo.ui.debug(
438 repo.ui.debug(
440 b'adding %s (%s)\n' % (name, util.bytecount(size))
439 b'adding %s (%s)\n' % (name, util.bytecount(size))
441 )
440 )
442 # for backwards compat, name was partially encoded
441 # for backwards compat, name was partially encoded
443 path = store.decodedir(name)
442 path = store.decodedir(name)
444 with repo.svfs(path, b'w', backgroundclose=True) as ofp:
443 with repo.svfs(path, b'w', backgroundclose=True) as ofp:
445 for chunk in util.filechunkiter(fp, limit=size):
444 for chunk in util.filechunkiter(fp, limit=size):
446 progress.increment(step=len(chunk))
445 progress.increment(step=len(chunk))
447 ofp.write(chunk)
446 ofp.write(chunk)
448
447
449 # force @filecache properties to be reloaded from
448 # force @filecache properties to be reloaded from
450 # streamclone-ed file at next access
449 # streamclone-ed file at next access
451 repo.invalidate(clearfilecache=True)
450 repo.invalidate(clearfilecache=True)
452
451
453 elapsed = util.timer() - start
452 elapsed = util.timer() - start
454 if elapsed <= 0:
453 if elapsed <= 0:
455 elapsed = 0.001
454 elapsed = 0.001
456 progress.complete()
455 progress.complete()
457 repo.ui.status(
456 repo.ui.status(
458 _(b'transferred %s in %.1f seconds (%s/sec)\n')
457 _(b'transferred %s in %.1f seconds (%s/sec)\n')
459 % (
458 % (
460 util.bytecount(bytecount),
459 util.bytecount(bytecount),
461 elapsed,
460 elapsed,
462 util.bytecount(bytecount / elapsed),
461 util.bytecount(bytecount / elapsed),
463 )
462 )
464 )
463 )
465
464
466
465
467 def readbundle1header(fp):
466 def readbundle1header(fp):
468 compression = fp.read(2)
467 compression = fp.read(2)
469 if compression != b'UN':
468 if compression != b'UN':
470 raise error.Abort(
469 raise error.Abort(
471 _(
470 _(
472 b'only uncompressed stream clone bundles are '
471 b'only uncompressed stream clone bundles are '
473 b'supported; got %s'
472 b'supported; got %s'
474 )
473 )
475 % compression
474 % compression
476 )
475 )
477
476
478 filecount, bytecount = struct.unpack(b'>QQ', fp.read(16))
477 filecount, bytecount = struct.unpack(b'>QQ', fp.read(16))
479 requireslen = struct.unpack(b'>H', fp.read(2))[0]
478 requireslen = struct.unpack(b'>H', fp.read(2))[0]
480 requires = fp.read(requireslen)
479 requires = fp.read(requireslen)
481
480
482 if not requires.endswith(b'\0'):
481 if not requires.endswith(b'\0'):
483 raise error.Abort(
482 raise error.Abort(
484 _(
483 _(
485 b'malformed stream clone bundle: '
484 b'malformed stream clone bundle: '
486 b'requirements not properly encoded'
485 b'requirements not properly encoded'
487 )
486 )
488 )
487 )
489
488
490 requirements = set(requires.rstrip(b'\0').split(b','))
489 requirements = set(requires.rstrip(b'\0').split(b','))
491
490
492 return filecount, bytecount, requirements
491 return filecount, bytecount, requirements
493
492
494
493
495 def applybundlev1(repo, fp):
494 def applybundlev1(repo, fp):
496 """Apply the content from a stream clone bundle version 1.
495 """Apply the content from a stream clone bundle version 1.
497
496
498 We assume the 4 byte header has been read and validated and the file handle
497 We assume the 4 byte header has been read and validated and the file handle
499 is at the 2 byte compression identifier.
498 is at the 2 byte compression identifier.
500 """
499 """
501 if len(repo):
500 if len(repo):
502 raise error.Abort(
501 raise error.Abort(
503 _(b'cannot apply stream clone bundle on non-empty repo')
502 _(b'cannot apply stream clone bundle on non-empty repo')
504 )
503 )
505
504
506 filecount, bytecount, requirements = readbundle1header(fp)
505 filecount, bytecount, requirements = readbundle1header(fp)
507 missingreqs = requirements - repo.supported
506 missingreqs = requirements - repo.supported
508 if missingreqs:
507 if missingreqs:
509 raise error.Abort(
508 raise error.Abort(
510 _(b'unable to apply stream clone: unsupported format: %s')
509 _(b'unable to apply stream clone: unsupported format: %s')
511 % b', '.join(sorted(missingreqs))
510 % b', '.join(sorted(missingreqs))
512 )
511 )
513
512
514 consumev1(repo, fp, filecount, bytecount)
513 consumev1(repo, fp, filecount, bytecount)
515 nodemap.post_stream_cleanup(repo)
514 nodemap.post_stream_cleanup(repo)
516
515
517
516
518 class streamcloneapplier:
517 class streamcloneapplier:
519 """Class to manage applying streaming clone bundles.
518 """Class to manage applying streaming clone bundles.
520
519
521 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
520 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
522 readers to perform bundle type-specific functionality.
521 readers to perform bundle type-specific functionality.
523 """
522 """
524
523
525 def __init__(self, fh):
524 def __init__(self, fh):
526 self._fh = fh
525 self._fh = fh
527
526
528 def apply(self, repo):
527 def apply(self, repo):
529 return applybundlev1(repo, self._fh)
528 return applybundlev1(repo, self._fh)
530
529
531
530
532 # type of file to stream
531 # type of file to stream
533 _fileappend = 0 # append only file
532 _fileappend = 0 # append only file
534 _filefull = 1 # full snapshot file
533 _filefull = 1 # full snapshot file
535
534
536 # Source of the file
535 # Source of the file
537 _srcstore = b's' # store (svfs)
536 _srcstore = b's' # store (svfs)
538 _srccache = b'c' # cache (cache)
537 _srccache = b'c' # cache (cache)
539
538
540 # This is it's own function so extensions can override it.
539 # This is it's own function so extensions can override it.
541 def _walkstreamfullstorefiles(repo):
540 def _walkstreamfullstorefiles(repo):
542 """list snapshot file from the store"""
541 """list snapshot file from the store"""
543 fnames = []
542 fnames = []
544 if not repo.publishing():
543 if not repo.publishing():
545 fnames.append(b'phaseroots')
544 fnames.append(b'phaseroots')
546 return fnames
545 return fnames
547
546
548
547
549 def _filterfull(entry, copy, vfsmap):
548 def _filterfull(entry, copy, vfsmap):
550 """actually copy the snapshot files"""
549 """actually copy the snapshot files"""
551 src, name, ftype, data = entry
550 src, name, ftype, data = entry
552 if ftype != _filefull:
551 if ftype != _filefull:
553 return entry
552 return entry
554 return (src, name, ftype, copy(vfsmap[src].join(name)))
553 return (src, name, ftype, copy(vfsmap[src].join(name)))
555
554
556
555
557 @contextlib.contextmanager
556 @contextlib.contextmanager
558 def maketempcopies():
557 def maketempcopies():
559 """return a function to temporary copy file"""
558 """return a function to temporary copy file"""
560
559
561 files = []
560 files = []
562 dst_dir = pycompat.mkdtemp(prefix=b'hg-clone-')
561 dst_dir = pycompat.mkdtemp(prefix=b'hg-clone-')
563 try:
562 try:
564
563
565 def copy(src):
564 def copy(src):
566 fd, dst = pycompat.mkstemp(
565 fd, dst = pycompat.mkstemp(
567 prefix=os.path.basename(src), dir=dst_dir
566 prefix=os.path.basename(src), dir=dst_dir
568 )
567 )
569 os.close(fd)
568 os.close(fd)
570 files.append(dst)
569 files.append(dst)
571 util.copyfiles(src, dst, hardlink=True)
570 util.copyfiles(src, dst, hardlink=True)
572 return dst
571 return dst
573
572
574 yield copy
573 yield copy
575 finally:
574 finally:
576 for tmp in files:
575 for tmp in files:
577 util.tryunlink(tmp)
576 util.tryunlink(tmp)
578 util.tryrmdir(dst_dir)
577 util.tryrmdir(dst_dir)
579
578
580
579
581 def _makemap(repo):
580 def _makemap(repo):
582 """make a (src -> vfs) map for the repo"""
581 """make a (src -> vfs) map for the repo"""
583 vfsmap = {
582 vfsmap = {
584 _srcstore: repo.svfs,
583 _srcstore: repo.svfs,
585 _srccache: repo.cachevfs,
584 _srccache: repo.cachevfs,
586 }
585 }
587 # we keep repo.vfs out of the on purpose, ther are too many danger there
586 # we keep repo.vfs out of the on purpose, ther are too many danger there
588 # (eg: .hg/hgrc)
587 # (eg: .hg/hgrc)
589 assert repo.vfs not in vfsmap.values()
588 assert repo.vfs not in vfsmap.values()
590
589
591 return vfsmap
590 return vfsmap
592
591
593
592
594 def _emit2(repo, entries, totalfilesize):
593 def _emit2(repo, entries, totalfilesize):
595 """actually emit the stream bundle"""
594 """actually emit the stream bundle"""
596 vfsmap = _makemap(repo)
595 vfsmap = _makemap(repo)
597 # we keep repo.vfs out of the on purpose, ther are too many danger there
596 # we keep repo.vfs out of the on purpose, ther are too many danger there
598 # (eg: .hg/hgrc),
597 # (eg: .hg/hgrc),
599 #
598 #
600 # this assert is duplicated (from _makemap) as author might think this is
599 # this assert is duplicated (from _makemap) as author might think this is
601 # fine, while this is really not fine.
600 # fine, while this is really not fine.
602 if repo.vfs in vfsmap.values():
601 if repo.vfs in vfsmap.values():
603 raise error.ProgrammingError(
602 raise error.ProgrammingError(
604 b'repo.vfs must not be added to vfsmap for security reasons'
603 b'repo.vfs must not be added to vfsmap for security reasons'
605 )
604 )
606
605
607 progress = repo.ui.makeprogress(
606 progress = repo.ui.makeprogress(
608 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
607 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
609 )
608 )
610 progress.update(0)
609 progress.update(0)
611 with maketempcopies() as copy, progress:
610 with maketempcopies() as copy, progress:
612 # copy is delayed until we are in the try
611 # copy is delayed until we are in the try
613 entries = [_filterfull(e, copy, vfsmap) for e in entries]
612 entries = [_filterfull(e, copy, vfsmap) for e in entries]
614 yield None # this release the lock on the repository
613 yield None # this release the lock on the repository
615 totalbytecount = 0
614 totalbytecount = 0
616
615
617 for src, name, ftype, data in entries:
616 for src, name, ftype, data in entries:
618 vfs = vfsmap[src]
617 vfs = vfsmap[src]
619 yield src
618 yield src
620 yield util.uvarintencode(len(name))
619 yield util.uvarintencode(len(name))
621 if ftype == _fileappend:
620 if ftype == _fileappend:
622 fp = vfs(name)
621 fp = vfs(name)
623 size = data
622 size = data
624 elif ftype == _filefull:
623 elif ftype == _filefull:
625 fp = open(data, b'rb')
624 fp = open(data, b'rb')
626 size = util.fstat(fp).st_size
625 size = util.fstat(fp).st_size
627 bytecount = 0
626 bytecount = 0
628 try:
627 try:
629 yield util.uvarintencode(size)
628 yield util.uvarintencode(size)
630 yield name
629 yield name
631 if size <= 65536:
630 if size <= 65536:
632 chunks = (fp.read(size),)
631 chunks = (fp.read(size),)
633 else:
632 else:
634 chunks = util.filechunkiter(fp, limit=size)
633 chunks = util.filechunkiter(fp, limit=size)
635 for chunk in chunks:
634 for chunk in chunks:
636 bytecount += len(chunk)
635 bytecount += len(chunk)
637 totalbytecount += len(chunk)
636 totalbytecount += len(chunk)
638 progress.update(totalbytecount)
637 progress.update(totalbytecount)
639 yield chunk
638 yield chunk
640 if bytecount != size:
639 if bytecount != size:
641 # Would most likely be caused by a race due to `hg strip` or
640 # Would most likely be caused by a race due to `hg strip` or
642 # a revlog split
641 # a revlog split
643 raise error.Abort(
642 raise error.Abort(
644 _(
643 _(
645 b'clone could only read %d bytes from %s, but '
644 b'clone could only read %d bytes from %s, but '
646 b'expected %d bytes'
645 b'expected %d bytes'
647 )
646 )
648 % (bytecount, name, size)
647 % (bytecount, name, size)
649 )
648 )
650 finally:
649 finally:
651 fp.close()
650 fp.close()
652
651
653
652
654 def _test_sync_point_walk_1(repo):
653 def _test_sync_point_walk_1(repo):
655 """a function for synchronisation during tests"""
654 """a function for synchronisation during tests"""
656
655
657
656
658 def _test_sync_point_walk_2(repo):
657 def _test_sync_point_walk_2(repo):
659 """a function for synchronisation during tests"""
658 """a function for synchronisation during tests"""
660
659
661
660
662 def _entries_walk(repo, includes, excludes, includeobsmarkers):
661 def _entries_walk(repo, includes, excludes, includeobsmarkers):
663 """emit a seris of files information useful to clone a repo
662 """emit a seris of files information useful to clone a repo
664
663
665 return (vfs-key, entry) iterator
664 return (vfs-key, entry) iterator
666
665
667 Where `entry` is StoreEntry. (used even for cache entries)
666 Where `entry` is StoreEntry. (used even for cache entries)
668 """
667 """
669 assert repo._currentlock(repo._lockref) is not None
668 assert repo._currentlock(repo._lockref) is not None
670
669
671 matcher = None
670 matcher = None
672 if includes or excludes:
671 if includes or excludes:
673 matcher = narrowspec.match(repo.root, includes, excludes)
672 matcher = narrowspec.match(repo.root, includes, excludes)
674
673
675 phase = not repo.publishing()
674 phase = not repo.publishing()
676 entries = _walkstreamfiles(
675 entries = _walkstreamfiles(
677 repo,
676 repo,
678 matcher,
677 matcher,
679 phase=phase,
678 phase=phase,
680 obsolescence=includeobsmarkers,
679 obsolescence=includeobsmarkers,
681 )
680 )
682 for entry in entries:
681 for entry in entries:
683 yield (_srcstore, entry)
682 yield (_srcstore, entry)
684
683
685 for name in cacheutil.cachetocopy(repo):
684 for name in cacheutil.cachetocopy(repo):
686 if repo.cachevfs.exists(name):
685 if repo.cachevfs.exists(name):
687 # not really a StoreEntry, but close enough
686 # not really a StoreEntry, but close enough
688 entry = store.SimpleStoreEntry(
687 entry = store.SimpleStoreEntry(
689 entry_path=name,
688 entry_path=name,
690 is_volatile=True,
689 is_volatile=True,
691 )
690 )
692 yield (_srccache, entry)
691 yield (_srccache, entry)
693
692
694
693
695 def _v2_walk(repo, includes, excludes, includeobsmarkers):
694 def _v2_walk(repo, includes, excludes, includeobsmarkers):
696 """emit a seris of files information useful to clone a repo
695 """emit a seris of files information useful to clone a repo
697
696
698 return (entries, totalfilesize)
697 return (entries, totalfilesize)
699
698
700 entries is a list of tuple (vfs-key, file-path, file-type, size)
699 entries is a list of tuple (vfs-key, file-path, file-type, size)
701
700
702 - `vfs-key`: is a key to the right vfs to write the file (see _makemap)
701 - `vfs-key`: is a key to the right vfs to write the file (see _makemap)
703 - `name`: file path of the file to copy (to be feed to the vfss)
702 - `name`: file path of the file to copy (to be feed to the vfss)
704 - `file-type`: do this file need to be copied with the source lock ?
703 - `file-type`: do this file need to be copied with the source lock ?
705 - `size`: the size of the file (or None)
704 - `size`: the size of the file (or None)
706 """
705 """
707 assert repo._currentlock(repo._lockref) is not None
706 assert repo._currentlock(repo._lockref) is not None
708 files = []
707 files = []
709 totalfilesize = 0
708 totalfilesize = 0
710
709
711 vfsmap = _makemap(repo)
710 vfsmap = _makemap(repo)
712 entries = _entries_walk(repo, includes, excludes, includeobsmarkers)
711 entries = _entries_walk(repo, includes, excludes, includeobsmarkers)
713 for vfs_key, entry in entries:
712 for vfs_key, entry in entries:
714 vfs = vfsmap[vfs_key]
713 vfs = vfsmap[vfs_key]
715 for f in entry.files():
714 for f in entry.files():
716 file_size = f.file_size(vfs)
715 file_size = f.file_size(vfs)
717 if file_size:
716 if file_size:
718 ft = _fileappend
717 ft = _fileappend
719 if f.is_volatile:
718 if f.is_volatile:
720 ft = _filefull
719 ft = _filefull
721 files.append((vfs_key, f.unencoded_path, ft, file_size))
720 files.append((vfs_key, f.unencoded_path, ft, file_size))
722 totalfilesize += file_size
721 totalfilesize += file_size
723 return files, totalfilesize
722 return files, totalfilesize
724
723
725
724
726 def generatev2(repo, includes, excludes, includeobsmarkers):
725 def generatev2(repo, includes, excludes, includeobsmarkers):
727 """Emit content for version 2 of a streaming clone.
726 """Emit content for version 2 of a streaming clone.
728
727
729 the data stream consists the following entries:
728 the data stream consists the following entries:
730 1) A char representing the file destination (eg: store or cache)
729 1) A char representing the file destination (eg: store or cache)
731 2) A varint containing the length of the filename
730 2) A varint containing the length of the filename
732 3) A varint containing the length of file data
731 3) A varint containing the length of file data
733 4) N bytes containing the filename (the internal, store-agnostic form)
732 4) N bytes containing the filename (the internal, store-agnostic form)
734 5) N bytes containing the file data
733 5) N bytes containing the file data
735
734
736 Returns a 3-tuple of (file count, file size, data iterator).
735 Returns a 3-tuple of (file count, file size, data iterator).
737 """
736 """
738
737
739 with repo.lock():
738 with repo.lock():
740
739
741 repo.ui.debug(b'scanning\n')
740 repo.ui.debug(b'scanning\n')
742
741
743 entries, totalfilesize = _v2_walk(
742 entries, totalfilesize = _v2_walk(
744 repo,
743 repo,
745 includes=includes,
744 includes=includes,
746 excludes=excludes,
745 excludes=excludes,
747 includeobsmarkers=includeobsmarkers,
746 includeobsmarkers=includeobsmarkers,
748 )
747 )
749
748
750 chunks = _emit2(repo, entries, totalfilesize)
749 chunks = _emit2(repo, entries, totalfilesize)
751 first = next(chunks)
750 first = next(chunks)
752 assert first is None
751 assert first is None
753 _test_sync_point_walk_1(repo)
752 _test_sync_point_walk_1(repo)
754 _test_sync_point_walk_2(repo)
753 _test_sync_point_walk_2(repo)
755
754
756 return len(entries), totalfilesize, chunks
755 return len(entries), totalfilesize, chunks
757
756
758
757
759 @contextlib.contextmanager
758 @contextlib.contextmanager
760 def nested(*ctxs):
759 def nested(*ctxs):
761 this = ctxs[0]
760 this = ctxs[0]
762 rest = ctxs[1:]
761 rest = ctxs[1:]
763 with this:
762 with this:
764 if rest:
763 if rest:
765 with nested(*rest):
764 with nested(*rest):
766 yield
765 yield
767 else:
766 else:
768 yield
767 yield
769
768
770
769
771 def consumev2(repo, fp, filecount, filesize):
770 def consumev2(repo, fp, filecount, filesize):
772 """Apply the contents from a version 2 streaming clone.
771 """Apply the contents from a version 2 streaming clone.
773
772
774 Data is read from an object that only needs to provide a ``read(size)``
773 Data is read from an object that only needs to provide a ``read(size)``
775 method.
774 method.
776 """
775 """
777 with repo.lock():
776 with repo.lock():
778 repo.ui.status(
777 repo.ui.status(
779 _(b'%d files to transfer, %s of data\n')
778 _(b'%d files to transfer, %s of data\n')
780 % (filecount, util.bytecount(filesize))
779 % (filecount, util.bytecount(filesize))
781 )
780 )
782
781
783 start = util.timer()
782 start = util.timer()
784 progress = repo.ui.makeprogress(
783 progress = repo.ui.makeprogress(
785 _(b'clone'), total=filesize, unit=_(b'bytes')
784 _(b'clone'), total=filesize, unit=_(b'bytes')
786 )
785 )
787 progress.update(0)
786 progress.update(0)
788
787
789 vfsmap = _makemap(repo)
788 vfsmap = _makemap(repo)
790 # we keep repo.vfs out of the on purpose, ther are too many danger
789 # we keep repo.vfs out of the on purpose, ther are too many danger
791 # there (eg: .hg/hgrc),
790 # there (eg: .hg/hgrc),
792 #
791 #
793 # this assert is duplicated (from _makemap) as author might think this
792 # this assert is duplicated (from _makemap) as author might think this
794 # is fine, while this is really not fine.
793 # is fine, while this is really not fine.
795 if repo.vfs in vfsmap.values():
794 if repo.vfs in vfsmap.values():
796 raise error.ProgrammingError(
795 raise error.ProgrammingError(
797 b'repo.vfs must not be added to vfsmap for security reasons'
796 b'repo.vfs must not be added to vfsmap for security reasons'
798 )
797 )
799
798
800 with repo.transaction(b'clone'):
799 with repo.transaction(b'clone'):
801 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
800 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
802 with nested(*ctxs):
801 with nested(*ctxs):
803 for i in range(filecount):
802 for i in range(filecount):
804 src = util.readexactly(fp, 1)
803 src = util.readexactly(fp, 1)
805 vfs = vfsmap[src]
804 vfs = vfsmap[src]
806 namelen = util.uvarintdecodestream(fp)
805 namelen = util.uvarintdecodestream(fp)
807 datalen = util.uvarintdecodestream(fp)
806 datalen = util.uvarintdecodestream(fp)
808
807
809 name = util.readexactly(fp, namelen)
808 name = util.readexactly(fp, namelen)
810
809
811 if repo.ui.debugflag:
810 if repo.ui.debugflag:
812 repo.ui.debug(
811 repo.ui.debug(
813 b'adding [%s] %s (%s)\n'
812 b'adding [%s] %s (%s)\n'
814 % (src, name, util.bytecount(datalen))
813 % (src, name, util.bytecount(datalen))
815 )
814 )
816
815
817 with vfs(name, b'w') as ofp:
816 with vfs(name, b'w') as ofp:
818 for chunk in util.filechunkiter(fp, limit=datalen):
817 for chunk in util.filechunkiter(fp, limit=datalen):
819 progress.increment(step=len(chunk))
818 progress.increment(step=len(chunk))
820 ofp.write(chunk)
819 ofp.write(chunk)
821
820
822 # force @filecache properties to be reloaded from
821 # force @filecache properties to be reloaded from
823 # streamclone-ed file at next access
822 # streamclone-ed file at next access
824 repo.invalidate(clearfilecache=True)
823 repo.invalidate(clearfilecache=True)
825
824
826 elapsed = util.timer() - start
825 elapsed = util.timer() - start
827 if elapsed <= 0:
826 if elapsed <= 0:
828 elapsed = 0.001
827 elapsed = 0.001
829 repo.ui.status(
828 repo.ui.status(
830 _(b'transferred %s in %.1f seconds (%s/sec)\n')
829 _(b'transferred %s in %.1f seconds (%s/sec)\n')
831 % (
830 % (
832 util.bytecount(progress.pos),
831 util.bytecount(progress.pos),
833 elapsed,
832 elapsed,
834 util.bytecount(progress.pos / elapsed),
833 util.bytecount(progress.pos / elapsed),
835 )
834 )
836 )
835 )
837 progress.complete()
836 progress.complete()
838
837
839
838
840 def applybundlev2(repo, fp, filecount, filesize, requirements):
839 def applybundlev2(repo, fp, filecount, filesize, requirements):
841 from . import localrepo
840 from . import localrepo
842
841
843 missingreqs = [r for r in requirements if r not in repo.supported]
842 missingreqs = [r for r in requirements if r not in repo.supported]
844 if missingreqs:
843 if missingreqs:
845 raise error.Abort(
844 raise error.Abort(
846 _(b'unable to apply stream clone: unsupported format: %s')
845 _(b'unable to apply stream clone: unsupported format: %s')
847 % b', '.join(sorted(missingreqs))
846 % b', '.join(sorted(missingreqs))
848 )
847 )
849
848
850 consumev2(repo, fp, filecount, filesize)
849 consumev2(repo, fp, filecount, filesize)
851
850
852 repo.requirements = new_stream_clone_requirements(
851 repo.requirements = new_stream_clone_requirements(
853 repo.requirements,
852 repo.requirements,
854 requirements,
853 requirements,
855 )
854 )
856 repo.svfs.options = localrepo.resolvestorevfsoptions(
855 repo.svfs.options = localrepo.resolvestorevfsoptions(
857 repo.ui, repo.requirements, repo.features
856 repo.ui, repo.requirements, repo.features
858 )
857 )
859 scmutil.writereporequirements(repo)
858 scmutil.writereporequirements(repo)
860 nodemap.post_stream_cleanup(repo)
859 nodemap.post_stream_cleanup(repo)
861
860
862
861
863 def _copy_files(src_vfs_map, dst_vfs_map, entries, progress):
862 def _copy_files(src_vfs_map, dst_vfs_map, entries, progress):
864 hardlink = [True]
863 hardlink = [True]
865
864
866 def copy_used():
865 def copy_used():
867 hardlink[0] = False
866 hardlink[0] = False
868 progress.topic = _(b'copying')
867 progress.topic = _(b'copying')
869
868
870 for k, path, size in entries:
869 for k, path, size in entries:
871 src_vfs = src_vfs_map[k]
870 src_vfs = src_vfs_map[k]
872 dst_vfs = dst_vfs_map[k]
871 dst_vfs = dst_vfs_map[k]
873 src_path = src_vfs.join(path)
872 src_path = src_vfs.join(path)
874 dst_path = dst_vfs.join(path)
873 dst_path = dst_vfs.join(path)
875 # We cannot use dirname and makedirs of dst_vfs here because the store
874 # We cannot use dirname and makedirs of dst_vfs here because the store
876 # encoding confuses them. See issue 6581 for details.
875 # encoding confuses them. See issue 6581 for details.
877 dirname = os.path.dirname(dst_path)
876 dirname = os.path.dirname(dst_path)
878 if not os.path.exists(dirname):
877 if not os.path.exists(dirname):
879 util.makedirs(dirname)
878 util.makedirs(dirname)
880 dst_vfs.register_file(path)
879 dst_vfs.register_file(path)
881 # XXX we could use the #nb_bytes argument.
880 # XXX we could use the #nb_bytes argument.
882 util.copyfile(
881 util.copyfile(
883 src_path,
882 src_path,
884 dst_path,
883 dst_path,
885 hardlink=hardlink[0],
884 hardlink=hardlink[0],
886 no_hardlink_cb=copy_used,
885 no_hardlink_cb=copy_used,
887 check_fs_hardlink=False,
886 check_fs_hardlink=False,
888 )
887 )
889 progress.increment()
888 progress.increment()
890 return hardlink[0]
889 return hardlink[0]
891
890
892
891
893 def local_copy(src_repo, dest_repo):
892 def local_copy(src_repo, dest_repo):
894 """copy all content from one local repository to another
893 """copy all content from one local repository to another
895
894
896 This is useful for local clone"""
895 This is useful for local clone"""
897 src_store_requirements = {
896 src_store_requirements = {
898 r
897 r
899 for r in src_repo.requirements
898 for r in src_repo.requirements
900 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
899 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
901 }
900 }
902 dest_store_requirements = {
901 dest_store_requirements = {
903 r
902 r
904 for r in dest_repo.requirements
903 for r in dest_repo.requirements
905 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
904 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
906 }
905 }
907 assert src_store_requirements == dest_store_requirements
906 assert src_store_requirements == dest_store_requirements
908
907
909 with dest_repo.lock():
908 with dest_repo.lock():
910 with src_repo.lock():
909 with src_repo.lock():
911
910
912 # bookmark is not integrated to the streaming as it might use the
911 # bookmark is not integrated to the streaming as it might use the
913 # `repo.vfs` and they are too many sentitive data accessible
912 # `repo.vfs` and they are too many sentitive data accessible
914 # through `repo.vfs` to expose it to streaming clone.
913 # through `repo.vfs` to expose it to streaming clone.
915 src_book_vfs = bookmarks.bookmarksvfs(src_repo)
914 src_book_vfs = bookmarks.bookmarksvfs(src_repo)
916 srcbookmarks = src_book_vfs.join(b'bookmarks')
915 srcbookmarks = src_book_vfs.join(b'bookmarks')
917 bm_count = 0
916 bm_count = 0
918 if os.path.exists(srcbookmarks):
917 if os.path.exists(srcbookmarks):
919 bm_count = 1
918 bm_count = 1
920
919
921 entries, totalfilesize = _v2_walk(
920 entries, totalfilesize = _v2_walk(
922 src_repo,
921 src_repo,
923 includes=None,
922 includes=None,
924 excludes=None,
923 excludes=None,
925 includeobsmarkers=True,
924 includeobsmarkers=True,
926 )
925 )
927 src_vfs_map = _makemap(src_repo)
926 src_vfs_map = _makemap(src_repo)
928 dest_vfs_map = _makemap(dest_repo)
927 dest_vfs_map = _makemap(dest_repo)
929 progress = src_repo.ui.makeprogress(
928 progress = src_repo.ui.makeprogress(
930 topic=_(b'linking'),
929 topic=_(b'linking'),
931 total=len(entries) + bm_count,
930 total=len(entries) + bm_count,
932 unit=_(b'files'),
931 unit=_(b'files'),
933 )
932 )
934 # copy files
933 # copy files
935 #
934 #
936 # We could copy the full file while the source repository is locked
935 # We could copy the full file while the source repository is locked
937 # and the other one without the lock. However, in the linking case,
936 # and the other one without the lock. However, in the linking case,
938 # this would also requires checks that nobody is appending any data
937 # this would also requires checks that nobody is appending any data
939 # to the files while we do the clone, so this is not done yet. We
938 # to the files while we do the clone, so this is not done yet. We
940 # could do this blindly when copying files.
939 # could do this blindly when copying files.
941 files = ((k, path, size) for k, path, ftype, size in entries)
940 files = ((k, path, size) for k, path, ftype, size in entries)
942 hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress)
941 hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress)
943
942
944 # copy bookmarks over
943 # copy bookmarks over
945 if bm_count:
944 if bm_count:
946 dst_book_vfs = bookmarks.bookmarksvfs(dest_repo)
945 dst_book_vfs = bookmarks.bookmarksvfs(dest_repo)
947 dstbookmarks = dst_book_vfs.join(b'bookmarks')
946 dstbookmarks = dst_book_vfs.join(b'bookmarks')
948 util.copyfile(srcbookmarks, dstbookmarks)
947 util.copyfile(srcbookmarks, dstbookmarks)
949 progress.complete()
948 progress.complete()
950 if hardlink:
949 if hardlink:
951 msg = b'linked %d files\n'
950 msg = b'linked %d files\n'
952 else:
951 else:
953 msg = b'copied %d files\n'
952 msg = b'copied %d files\n'
954 src_repo.ui.debug(msg % (len(entries) + bm_count))
953 src_repo.ui.debug(msg % (len(entries) + bm_count))
955
954
956 with dest_repo.transaction(b"localclone") as tr:
955 with dest_repo.transaction(b"localclone") as tr:
957 dest_repo.store.write(tr)
956 dest_repo.store.write(tr)
958
957
959 # clean up transaction file as they do not make sense
958 # clean up transaction file as they do not make sense
960 transaction.cleanup_undo_files(dest_repo.ui.warn, dest_repo.vfs_map)
959 transaction.cleanup_undo_files(dest_repo.ui.warn, dest_repo.vfs_map)
General Comments 0
You need to be logged in to leave comments. Login now