##// END OF EJS Templates
stream-clone: disable gc for the entry listing section for the v2 format...
marmoute -
r52480:8cd317c0 default
parent child Browse files
Show More
@@ -1,1180 +1,1181 b''
1 # streamclone.py - producing and consuming streaming repository data
1 # streamclone.py - producing and consuming streaming repository data
2 #
2 #
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8
8
9 import contextlib
9 import contextlib
10 import os
10 import os
11 import struct
11 import struct
12
12
13 from .i18n import _
13 from .i18n import _
14 from .interfaces import repository
14 from .interfaces import repository
15 from . import (
15 from . import (
16 bookmarks,
16 bookmarks,
17 bundle2 as bundle2mod,
17 bundle2 as bundle2mod,
18 cacheutil,
18 cacheutil,
19 error,
19 error,
20 narrowspec,
20 narrowspec,
21 phases,
21 phases,
22 pycompat,
22 pycompat,
23 requirements as requirementsmod,
23 requirements as requirementsmod,
24 scmutil,
24 scmutil,
25 store,
25 store,
26 transaction,
26 transaction,
27 util,
27 util,
28 )
28 )
29 from .revlogutils import (
29 from .revlogutils import (
30 nodemap,
30 nodemap,
31 )
31 )
32
32
33
33
34 def new_stream_clone_requirements(default_requirements, streamed_requirements):
34 def new_stream_clone_requirements(default_requirements, streamed_requirements):
35 """determine the final set of requirement for a new stream clone
35 """determine the final set of requirement for a new stream clone
36
36
37 this method combine the "default" requirements that a new repository would
37 this method combine the "default" requirements that a new repository would
38 use with the constaint we get from the stream clone content. We keep local
38 use with the constaint we get from the stream clone content. We keep local
39 configuration choice when possible.
39 configuration choice when possible.
40 """
40 """
41 requirements = set(default_requirements)
41 requirements = set(default_requirements)
42 requirements -= requirementsmod.STREAM_FIXED_REQUIREMENTS
42 requirements -= requirementsmod.STREAM_FIXED_REQUIREMENTS
43 requirements.update(streamed_requirements)
43 requirements.update(streamed_requirements)
44 return requirements
44 return requirements
45
45
46
46
47 def streamed_requirements(repo):
47 def streamed_requirements(repo):
48 """the set of requirement the new clone will have to support
48 """the set of requirement the new clone will have to support
49
49
50 This is used for advertising the stream options and to generate the actual
50 This is used for advertising the stream options and to generate the actual
51 stream content."""
51 stream content."""
52 requiredformats = (
52 requiredformats = (
53 repo.requirements & requirementsmod.STREAM_FIXED_REQUIREMENTS
53 repo.requirements & requirementsmod.STREAM_FIXED_REQUIREMENTS
54 )
54 )
55 return requiredformats
55 return requiredformats
56
56
57
57
58 def canperformstreamclone(pullop, bundle2=False):
58 def canperformstreamclone(pullop, bundle2=False):
59 """Whether it is possible to perform a streaming clone as part of pull.
59 """Whether it is possible to perform a streaming clone as part of pull.
60
60
61 ``bundle2`` will cause the function to consider stream clone through
61 ``bundle2`` will cause the function to consider stream clone through
62 bundle2 and only through bundle2.
62 bundle2 and only through bundle2.
63
63
64 Returns a tuple of (supported, requirements). ``supported`` is True if
64 Returns a tuple of (supported, requirements). ``supported`` is True if
65 streaming clone is supported and False otherwise. ``requirements`` is
65 streaming clone is supported and False otherwise. ``requirements`` is
66 a set of repo requirements from the remote, or ``None`` if stream clone
66 a set of repo requirements from the remote, or ``None`` if stream clone
67 isn't supported.
67 isn't supported.
68 """
68 """
69 repo = pullop.repo
69 repo = pullop.repo
70 remote = pullop.remote
70 remote = pullop.remote
71
71
72 # should we consider streaming clone at all ?
72 # should we consider streaming clone at all ?
73 streamrequested = pullop.streamclonerequested
73 streamrequested = pullop.streamclonerequested
74 # If we don't have a preference, let the server decide for us. This
74 # If we don't have a preference, let the server decide for us. This
75 # likely only comes into play in LANs.
75 # likely only comes into play in LANs.
76 if streamrequested is None:
76 if streamrequested is None:
77 # The server can advertise whether to prefer streaming clone.
77 # The server can advertise whether to prefer streaming clone.
78 streamrequested = remote.capable(b'stream-preferred')
78 streamrequested = remote.capable(b'stream-preferred')
79 if not streamrequested:
79 if not streamrequested:
80 return False, None
80 return False, None
81
81
82 # Streaming clone only works on an empty destination repository
82 # Streaming clone only works on an empty destination repository
83 if len(repo):
83 if len(repo):
84 return False, None
84 return False, None
85
85
86 # Streaming clone only works if all data is being requested.
86 # Streaming clone only works if all data is being requested.
87 if pullop.heads:
87 if pullop.heads:
88 return False, None
88 return False, None
89
89
90 bundle2supported = False
90 bundle2supported = False
91 if pullop.canusebundle2:
91 if pullop.canusebundle2:
92 local_caps = bundle2mod.getrepocaps(repo, role=b'client')
92 local_caps = bundle2mod.getrepocaps(repo, role=b'client')
93 local_supported = set(local_caps.get(b'stream', []))
93 local_supported = set(local_caps.get(b'stream', []))
94 remote_supported = set(pullop.remotebundle2caps.get(b'stream', []))
94 remote_supported = set(pullop.remotebundle2caps.get(b'stream', []))
95 bundle2supported = bool(local_supported & remote_supported)
95 bundle2supported = bool(local_supported & remote_supported)
96 # else
96 # else
97 # Server doesn't support bundle2 stream clone or doesn't support
97 # Server doesn't support bundle2 stream clone or doesn't support
98 # the versions we support. Fall back and possibly allow legacy.
98 # the versions we support. Fall back and possibly allow legacy.
99
99
100 # Ensures legacy code path uses available bundle2.
100 # Ensures legacy code path uses available bundle2.
101 if bundle2supported and not bundle2:
101 if bundle2supported and not bundle2:
102 return False, None
102 return False, None
103 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
103 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
104 elif bundle2 and not bundle2supported:
104 elif bundle2 and not bundle2supported:
105 return False, None
105 return False, None
106
106
107 # In order for stream clone to work, the client has to support all the
107 # In order for stream clone to work, the client has to support all the
108 # requirements advertised by the server.
108 # requirements advertised by the server.
109 #
109 #
110 # The server advertises its requirements via the "stream" and "streamreqs"
110 # The server advertises its requirements via the "stream" and "streamreqs"
111 # capability. "stream" (a value-less capability) is advertised if and only
111 # capability. "stream" (a value-less capability) is advertised if and only
112 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
112 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
113 # is advertised and contains a comma-delimited list of requirements.
113 # is advertised and contains a comma-delimited list of requirements.
114 requirements = set()
114 requirements = set()
115 if remote.capable(b'stream'):
115 if remote.capable(b'stream'):
116 requirements.add(requirementsmod.REVLOGV1_REQUIREMENT)
116 requirements.add(requirementsmod.REVLOGV1_REQUIREMENT)
117 else:
117 else:
118 streamreqs = remote.capable(b'streamreqs')
118 streamreqs = remote.capable(b'streamreqs')
119 # This is weird and shouldn't happen with modern servers.
119 # This is weird and shouldn't happen with modern servers.
120 if not streamreqs:
120 if not streamreqs:
121 pullop.repo.ui.warn(
121 pullop.repo.ui.warn(
122 _(
122 _(
123 b'warning: stream clone requested but server has them '
123 b'warning: stream clone requested but server has them '
124 b'disabled\n'
124 b'disabled\n'
125 )
125 )
126 )
126 )
127 return False, None
127 return False, None
128
128
129 streamreqs = set(streamreqs.split(b','))
129 streamreqs = set(streamreqs.split(b','))
130 # Server requires something we don't support. Bail.
130 # Server requires something we don't support. Bail.
131 missingreqs = streamreqs - repo.supported
131 missingreqs = streamreqs - repo.supported
132 if missingreqs:
132 if missingreqs:
133 pullop.repo.ui.warn(
133 pullop.repo.ui.warn(
134 _(
134 _(
135 b'warning: stream clone requested but client is missing '
135 b'warning: stream clone requested but client is missing '
136 b'requirements: %s\n'
136 b'requirements: %s\n'
137 )
137 )
138 % b', '.join(sorted(missingreqs))
138 % b', '.join(sorted(missingreqs))
139 )
139 )
140 pullop.repo.ui.warn(
140 pullop.repo.ui.warn(
141 _(
141 _(
142 b'(see https://www.mercurial-scm.org/wiki/MissingRequirement '
142 b'(see https://www.mercurial-scm.org/wiki/MissingRequirement '
143 b'for more information)\n'
143 b'for more information)\n'
144 )
144 )
145 )
145 )
146 return False, None
146 return False, None
147 requirements = streamreqs
147 requirements = streamreqs
148
148
149 return True, requirements
149 return True, requirements
150
150
151
151
152 def maybeperformlegacystreamclone(pullop):
152 def maybeperformlegacystreamclone(pullop):
153 """Possibly perform a legacy stream clone operation.
153 """Possibly perform a legacy stream clone operation.
154
154
155 Legacy stream clones are performed as part of pull but before all other
155 Legacy stream clones are performed as part of pull but before all other
156 operations.
156 operations.
157
157
158 A legacy stream clone will not be performed if a bundle2 stream clone is
158 A legacy stream clone will not be performed if a bundle2 stream clone is
159 supported.
159 supported.
160 """
160 """
161 from . import localrepo
161 from . import localrepo
162
162
163 supported, requirements = canperformstreamclone(pullop)
163 supported, requirements = canperformstreamclone(pullop)
164
164
165 if not supported:
165 if not supported:
166 return
166 return
167
167
168 repo = pullop.repo
168 repo = pullop.repo
169 remote = pullop.remote
169 remote = pullop.remote
170
170
171 # Save remote branchmap. We will use it later to speed up branchcache
171 # Save remote branchmap. We will use it later to speed up branchcache
172 # creation.
172 # creation.
173 rbranchmap = None
173 rbranchmap = None
174 if remote.capable(b'branchmap'):
174 if remote.capable(b'branchmap'):
175 with remote.commandexecutor() as e:
175 with remote.commandexecutor() as e:
176 rbranchmap = e.callcommand(b'branchmap', {}).result()
176 rbranchmap = e.callcommand(b'branchmap', {}).result()
177
177
178 repo.ui.status(_(b'streaming all changes\n'))
178 repo.ui.status(_(b'streaming all changes\n'))
179
179
180 with remote.commandexecutor() as e:
180 with remote.commandexecutor() as e:
181 fp = e.callcommand(b'stream_out', {}).result()
181 fp = e.callcommand(b'stream_out', {}).result()
182
182
183 # TODO strictly speaking, this code should all be inside the context
183 # TODO strictly speaking, this code should all be inside the context
184 # manager because the context manager is supposed to ensure all wire state
184 # manager because the context manager is supposed to ensure all wire state
185 # is flushed when exiting. But the legacy peers don't do this, so it
185 # is flushed when exiting. But the legacy peers don't do this, so it
186 # doesn't matter.
186 # doesn't matter.
187 l = fp.readline()
187 l = fp.readline()
188 try:
188 try:
189 resp = int(l)
189 resp = int(l)
190 except ValueError:
190 except ValueError:
191 raise error.ResponseError(
191 raise error.ResponseError(
192 _(b'unexpected response from remote server:'), l
192 _(b'unexpected response from remote server:'), l
193 )
193 )
194 if resp == 1:
194 if resp == 1:
195 raise error.Abort(_(b'operation forbidden by server'))
195 raise error.Abort(_(b'operation forbidden by server'))
196 elif resp == 2:
196 elif resp == 2:
197 raise error.Abort(_(b'locking the remote repository failed'))
197 raise error.Abort(_(b'locking the remote repository failed'))
198 elif resp != 0:
198 elif resp != 0:
199 raise error.Abort(_(b'the server sent an unknown error code'))
199 raise error.Abort(_(b'the server sent an unknown error code'))
200
200
201 l = fp.readline()
201 l = fp.readline()
202 try:
202 try:
203 filecount, bytecount = map(int, l.split(b' ', 1))
203 filecount, bytecount = map(int, l.split(b' ', 1))
204 except (ValueError, TypeError):
204 except (ValueError, TypeError):
205 raise error.ResponseError(
205 raise error.ResponseError(
206 _(b'unexpected response from remote server:'), l
206 _(b'unexpected response from remote server:'), l
207 )
207 )
208
208
209 with repo.lock():
209 with repo.lock():
210 consumev1(repo, fp, filecount, bytecount)
210 consumev1(repo, fp, filecount, bytecount)
211 repo.requirements = new_stream_clone_requirements(
211 repo.requirements = new_stream_clone_requirements(
212 repo.requirements,
212 repo.requirements,
213 requirements,
213 requirements,
214 )
214 )
215 repo.svfs.options = localrepo.resolvestorevfsoptions(
215 repo.svfs.options = localrepo.resolvestorevfsoptions(
216 repo.ui, repo.requirements, repo.features
216 repo.ui, repo.requirements, repo.features
217 )
217 )
218 scmutil.writereporequirements(repo)
218 scmutil.writereporequirements(repo)
219 nodemap.post_stream_cleanup(repo)
219 nodemap.post_stream_cleanup(repo)
220
220
221 if rbranchmap:
221 if rbranchmap:
222 repo._branchcaches.replace(repo, rbranchmap)
222 repo._branchcaches.replace(repo, rbranchmap)
223
223
224 repo.invalidate()
224 repo.invalidate()
225
225
226
226
227 def allowservergeneration(repo):
227 def allowservergeneration(repo):
228 """Whether streaming clones are allowed from the server."""
228 """Whether streaming clones are allowed from the server."""
229 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
229 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
230 return False
230 return False
231
231
232 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
232 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
233 return False
233 return False
234
234
235 # The way stream clone works makes it impossible to hide secret changesets.
235 # The way stream clone works makes it impossible to hide secret changesets.
236 # So don't allow this by default.
236 # So don't allow this by default.
237 secret = phases.hassecret(repo)
237 secret = phases.hassecret(repo)
238 if secret:
238 if secret:
239 return repo.ui.configbool(b'server', b'uncompressedallowsecret')
239 return repo.ui.configbool(b'server', b'uncompressedallowsecret')
240
240
241 return True
241 return True
242
242
243
243
244 # This is it's own function so extensions can override it.
244 # This is it's own function so extensions can override it.
245 def _walkstreamfiles(repo, matcher=None, phase=False, obsolescence=False):
245 def _walkstreamfiles(repo, matcher=None, phase=False, obsolescence=False):
246 return repo.store.walk(matcher, phase=phase, obsolescence=obsolescence)
246 return repo.store.walk(matcher, phase=phase, obsolescence=obsolescence)
247
247
248
248
249 def generatev1(repo):
249 def generatev1(repo):
250 """Emit content for version 1 of a streaming clone.
250 """Emit content for version 1 of a streaming clone.
251
251
252 This returns a 3-tuple of (file count, byte size, data iterator).
252 This returns a 3-tuple of (file count, byte size, data iterator).
253
253
254 The data iterator consists of N entries for each file being transferred.
254 The data iterator consists of N entries for each file being transferred.
255 Each file entry starts as a line with the file name and integer size
255 Each file entry starts as a line with the file name and integer size
256 delimited by a null byte.
256 delimited by a null byte.
257
257
258 The raw file data follows. Following the raw file data is the next file
258 The raw file data follows. Following the raw file data is the next file
259 entry, or EOF.
259 entry, or EOF.
260
260
261 When used on the wire protocol, an additional line indicating protocol
261 When used on the wire protocol, an additional line indicating protocol
262 success will be prepended to the stream. This function is not responsible
262 success will be prepended to the stream. This function is not responsible
263 for adding it.
263 for adding it.
264
264
265 This function will obtain a repository lock to ensure a consistent view of
265 This function will obtain a repository lock to ensure a consistent view of
266 the store is captured. It therefore may raise LockError.
266 the store is captured. It therefore may raise LockError.
267 """
267 """
268 entries = []
268 entries = []
269 total_bytes = 0
269 total_bytes = 0
270 # Get consistent snapshot of repo, lock during scan.
270 # Get consistent snapshot of repo, lock during scan.
271 with repo.lock():
271 with repo.lock():
272 repo.ui.debug(b'scanning\n')
272 repo.ui.debug(b'scanning\n')
273 for entry in _walkstreamfiles(repo):
273 for entry in _walkstreamfiles(repo):
274 for f in entry.files():
274 for f in entry.files():
275 file_size = f.file_size(repo.store.vfs)
275 file_size = f.file_size(repo.store.vfs)
276 if file_size:
276 if file_size:
277 entries.append((f.unencoded_path, file_size))
277 entries.append((f.unencoded_path, file_size))
278 total_bytes += file_size
278 total_bytes += file_size
279 _test_sync_point_walk_1(repo)
279 _test_sync_point_walk_1(repo)
280 _test_sync_point_walk_2(repo)
280 _test_sync_point_walk_2(repo)
281
281
282 repo.ui.debug(
282 repo.ui.debug(
283 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
283 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
284 )
284 )
285
285
286 svfs = repo.svfs
286 svfs = repo.svfs
287 debugflag = repo.ui.debugflag
287 debugflag = repo.ui.debugflag
288
288
289 def emitrevlogdata():
289 def emitrevlogdata():
290 for name, size in entries:
290 for name, size in entries:
291 if debugflag:
291 if debugflag:
292 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
292 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
293 # partially encode name over the wire for backwards compat
293 # partially encode name over the wire for backwards compat
294 yield b'%s\0%d\n' % (store.encodedir(name), size)
294 yield b'%s\0%d\n' % (store.encodedir(name), size)
295 # auditing at this stage is both pointless (paths are already
295 # auditing at this stage is both pointless (paths are already
296 # trusted by the local repo) and expensive
296 # trusted by the local repo) and expensive
297 with svfs(name, b'rb', auditpath=False) as fp:
297 with svfs(name, b'rb', auditpath=False) as fp:
298 if size <= 65536:
298 if size <= 65536:
299 yield fp.read(size)
299 yield fp.read(size)
300 else:
300 else:
301 for chunk in util.filechunkiter(fp, limit=size):
301 for chunk in util.filechunkiter(fp, limit=size):
302 yield chunk
302 yield chunk
303
303
304 return len(entries), total_bytes, emitrevlogdata()
304 return len(entries), total_bytes, emitrevlogdata()
305
305
306
306
307 def generatev1wireproto(repo):
307 def generatev1wireproto(repo):
308 """Emit content for version 1 of streaming clone suitable for the wire.
308 """Emit content for version 1 of streaming clone suitable for the wire.
309
309
310 This is the data output from ``generatev1()`` with 2 header lines. The
310 This is the data output from ``generatev1()`` with 2 header lines. The
311 first line indicates overall success. The 2nd contains the file count and
311 first line indicates overall success. The 2nd contains the file count and
312 byte size of payload.
312 byte size of payload.
313
313
314 The success line contains "0" for success, "1" for stream generation not
314 The success line contains "0" for success, "1" for stream generation not
315 allowed, and "2" for error locking the repository (possibly indicating
315 allowed, and "2" for error locking the repository (possibly indicating
316 a permissions error for the server process).
316 a permissions error for the server process).
317 """
317 """
318 if not allowservergeneration(repo):
318 if not allowservergeneration(repo):
319 yield b'1\n'
319 yield b'1\n'
320 return
320 return
321
321
322 try:
322 try:
323 filecount, bytecount, it = generatev1(repo)
323 filecount, bytecount, it = generatev1(repo)
324 except error.LockError:
324 except error.LockError:
325 yield b'2\n'
325 yield b'2\n'
326 return
326 return
327
327
328 # Indicates successful response.
328 # Indicates successful response.
329 yield b'0\n'
329 yield b'0\n'
330 yield b'%d %d\n' % (filecount, bytecount)
330 yield b'%d %d\n' % (filecount, bytecount)
331 for chunk in it:
331 for chunk in it:
332 yield chunk
332 yield chunk
333
333
334
334
335 def generatebundlev1(repo, compression=b'UN'):
335 def generatebundlev1(repo, compression=b'UN'):
336 """Emit content for version 1 of a stream clone bundle.
336 """Emit content for version 1 of a stream clone bundle.
337
337
338 The first 4 bytes of the output ("HGS1") denote this as stream clone
338 The first 4 bytes of the output ("HGS1") denote this as stream clone
339 bundle version 1.
339 bundle version 1.
340
340
341 The next 2 bytes indicate the compression type. Only "UN" is currently
341 The next 2 bytes indicate the compression type. Only "UN" is currently
342 supported.
342 supported.
343
343
344 The next 16 bytes are two 64-bit big endian unsigned integers indicating
344 The next 16 bytes are two 64-bit big endian unsigned integers indicating
345 file count and byte count, respectively.
345 file count and byte count, respectively.
346
346
347 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
347 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
348 of the requirements string, including a trailing \0. The following N bytes
348 of the requirements string, including a trailing \0. The following N bytes
349 are the requirements string, which is ASCII containing a comma-delimited
349 are the requirements string, which is ASCII containing a comma-delimited
350 list of repo requirements that are needed to support the data.
350 list of repo requirements that are needed to support the data.
351
351
352 The remaining content is the output of ``generatev1()`` (which may be
352 The remaining content is the output of ``generatev1()`` (which may be
353 compressed in the future).
353 compressed in the future).
354
354
355 Returns a tuple of (requirements, data generator).
355 Returns a tuple of (requirements, data generator).
356 """
356 """
357 if compression != b'UN':
357 if compression != b'UN':
358 raise ValueError(b'we do not support the compression argument yet')
358 raise ValueError(b'we do not support the compression argument yet')
359
359
360 requirements = streamed_requirements(repo)
360 requirements = streamed_requirements(repo)
361 requires = b','.join(sorted(requirements))
361 requires = b','.join(sorted(requirements))
362
362
363 def gen():
363 def gen():
364 yield b'HGS1'
364 yield b'HGS1'
365 yield compression
365 yield compression
366
366
367 filecount, bytecount, it = generatev1(repo)
367 filecount, bytecount, it = generatev1(repo)
368 repo.ui.status(
368 repo.ui.status(
369 _(b'writing %d bytes for %d files\n') % (bytecount, filecount)
369 _(b'writing %d bytes for %d files\n') % (bytecount, filecount)
370 )
370 )
371
371
372 yield struct.pack(b'>QQ', filecount, bytecount)
372 yield struct.pack(b'>QQ', filecount, bytecount)
373 yield struct.pack(b'>H', len(requires) + 1)
373 yield struct.pack(b'>H', len(requires) + 1)
374 yield requires + b'\0'
374 yield requires + b'\0'
375
375
376 # This is where we'll add compression in the future.
376 # This is where we'll add compression in the future.
377 assert compression == b'UN'
377 assert compression == b'UN'
378
378
379 progress = repo.ui.makeprogress(
379 progress = repo.ui.makeprogress(
380 _(b'bundle'), total=bytecount, unit=_(b'bytes')
380 _(b'bundle'), total=bytecount, unit=_(b'bytes')
381 )
381 )
382 progress.update(0)
382 progress.update(0)
383
383
384 for chunk in it:
384 for chunk in it:
385 progress.increment(step=len(chunk))
385 progress.increment(step=len(chunk))
386 yield chunk
386 yield chunk
387
387
388 progress.complete()
388 progress.complete()
389
389
390 return requirements, gen()
390 return requirements, gen()
391
391
392
392
393 def consumev1(repo, fp, filecount, bytecount):
393 def consumev1(repo, fp, filecount, bytecount):
394 """Apply the contents from version 1 of a streaming clone file handle.
394 """Apply the contents from version 1 of a streaming clone file handle.
395
395
396 This takes the output from "stream_out" and applies it to the specified
396 This takes the output from "stream_out" and applies it to the specified
397 repository.
397 repository.
398
398
399 Like "stream_out," the status line added by the wire protocol is not
399 Like "stream_out," the status line added by the wire protocol is not
400 handled by this function.
400 handled by this function.
401 """
401 """
402 with repo.lock():
402 with repo.lock():
403 repo.ui.status(
403 repo.ui.status(
404 _(b'%d files to transfer, %s of data\n')
404 _(b'%d files to transfer, %s of data\n')
405 % (filecount, util.bytecount(bytecount))
405 % (filecount, util.bytecount(bytecount))
406 )
406 )
407 progress = repo.ui.makeprogress(
407 progress = repo.ui.makeprogress(
408 _(b'clone'), total=bytecount, unit=_(b'bytes')
408 _(b'clone'), total=bytecount, unit=_(b'bytes')
409 )
409 )
410 progress.update(0)
410 progress.update(0)
411 start = util.timer()
411 start = util.timer()
412
412
413 # TODO: get rid of (potential) inconsistency
413 # TODO: get rid of (potential) inconsistency
414 #
414 #
415 # If transaction is started and any @filecache property is
415 # If transaction is started and any @filecache property is
416 # changed at this point, it causes inconsistency between
416 # changed at this point, it causes inconsistency between
417 # in-memory cached property and streamclone-ed file on the
417 # in-memory cached property and streamclone-ed file on the
418 # disk. Nested transaction prevents transaction scope "clone"
418 # disk. Nested transaction prevents transaction scope "clone"
419 # below from writing in-memory changes out at the end of it,
419 # below from writing in-memory changes out at the end of it,
420 # even though in-memory changes are discarded at the end of it
420 # even though in-memory changes are discarded at the end of it
421 # regardless of transaction nesting.
421 # regardless of transaction nesting.
422 #
422 #
423 # But transaction nesting can't be simply prohibited, because
423 # But transaction nesting can't be simply prohibited, because
424 # nesting occurs also in ordinary case (e.g. enabling
424 # nesting occurs also in ordinary case (e.g. enabling
425 # clonebundles).
425 # clonebundles).
426
426
427 with repo.transaction(b'clone'):
427 with repo.transaction(b'clone'):
428 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
428 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
429 for i in range(filecount):
429 for i in range(filecount):
430 # XXX doesn't support '\n' or '\r' in filenames
430 # XXX doesn't support '\n' or '\r' in filenames
431 if hasattr(fp, 'readline'):
431 if hasattr(fp, 'readline'):
432 l = fp.readline()
432 l = fp.readline()
433 else:
433 else:
434 # inline clonebundles use a chunkbuffer, so no readline
434 # inline clonebundles use a chunkbuffer, so no readline
435 # --> this should be small anyway, the first line
435 # --> this should be small anyway, the first line
436 # only contains the size of the bundle
436 # only contains the size of the bundle
437 l_buf = []
437 l_buf = []
438 while not (l_buf and l_buf[-1] == b'\n'):
438 while not (l_buf and l_buf[-1] == b'\n'):
439 l_buf.append(fp.read(1))
439 l_buf.append(fp.read(1))
440 l = b''.join(l_buf)
440 l = b''.join(l_buf)
441 try:
441 try:
442 name, size = l.split(b'\0', 1)
442 name, size = l.split(b'\0', 1)
443 size = int(size)
443 size = int(size)
444 except (ValueError, TypeError):
444 except (ValueError, TypeError):
445 raise error.ResponseError(
445 raise error.ResponseError(
446 _(b'unexpected response from remote server:'), l
446 _(b'unexpected response from remote server:'), l
447 )
447 )
448 if repo.ui.debugflag:
448 if repo.ui.debugflag:
449 repo.ui.debug(
449 repo.ui.debug(
450 b'adding %s (%s)\n' % (name, util.bytecount(size))
450 b'adding %s (%s)\n' % (name, util.bytecount(size))
451 )
451 )
452 # for backwards compat, name was partially encoded
452 # for backwards compat, name was partially encoded
453 path = store.decodedir(name)
453 path = store.decodedir(name)
454 with repo.svfs(path, b'w', backgroundclose=True) as ofp:
454 with repo.svfs(path, b'w', backgroundclose=True) as ofp:
455 for chunk in util.filechunkiter(fp, limit=size):
455 for chunk in util.filechunkiter(fp, limit=size):
456 progress.increment(step=len(chunk))
456 progress.increment(step=len(chunk))
457 ofp.write(chunk)
457 ofp.write(chunk)
458
458
459 # force @filecache properties to be reloaded from
459 # force @filecache properties to be reloaded from
460 # streamclone-ed file at next access
460 # streamclone-ed file at next access
461 repo.invalidate(clearfilecache=True)
461 repo.invalidate(clearfilecache=True)
462
462
463 elapsed = util.timer() - start
463 elapsed = util.timer() - start
464 if elapsed <= 0:
464 if elapsed <= 0:
465 elapsed = 0.001
465 elapsed = 0.001
466 progress.complete()
466 progress.complete()
467 repo.ui.status(
467 repo.ui.status(
468 _(b'transferred %s in %.1f seconds (%s/sec)\n')
468 _(b'transferred %s in %.1f seconds (%s/sec)\n')
469 % (
469 % (
470 util.bytecount(bytecount),
470 util.bytecount(bytecount),
471 elapsed,
471 elapsed,
472 util.bytecount(bytecount / elapsed),
472 util.bytecount(bytecount / elapsed),
473 )
473 )
474 )
474 )
475
475
476
476
477 def readbundle1header(fp):
477 def readbundle1header(fp):
478 compression = fp.read(2)
478 compression = fp.read(2)
479 if compression != b'UN':
479 if compression != b'UN':
480 raise error.Abort(
480 raise error.Abort(
481 _(
481 _(
482 b'only uncompressed stream clone bundles are '
482 b'only uncompressed stream clone bundles are '
483 b'supported; got %s'
483 b'supported; got %s'
484 )
484 )
485 % compression
485 % compression
486 )
486 )
487
487
488 filecount, bytecount = struct.unpack(b'>QQ', fp.read(16))
488 filecount, bytecount = struct.unpack(b'>QQ', fp.read(16))
489 requireslen = struct.unpack(b'>H', fp.read(2))[0]
489 requireslen = struct.unpack(b'>H', fp.read(2))[0]
490 requires = fp.read(requireslen)
490 requires = fp.read(requireslen)
491
491
492 if not requires.endswith(b'\0'):
492 if not requires.endswith(b'\0'):
493 raise error.Abort(
493 raise error.Abort(
494 _(
494 _(
495 b'malformed stream clone bundle: '
495 b'malformed stream clone bundle: '
496 b'requirements not properly encoded'
496 b'requirements not properly encoded'
497 )
497 )
498 )
498 )
499
499
500 requirements = set(requires.rstrip(b'\0').split(b','))
500 requirements = set(requires.rstrip(b'\0').split(b','))
501
501
502 return filecount, bytecount, requirements
502 return filecount, bytecount, requirements
503
503
504
504
505 def applybundlev1(repo, fp):
505 def applybundlev1(repo, fp):
506 """Apply the content from a stream clone bundle version 1.
506 """Apply the content from a stream clone bundle version 1.
507
507
508 We assume the 4 byte header has been read and validated and the file handle
508 We assume the 4 byte header has been read and validated and the file handle
509 is at the 2 byte compression identifier.
509 is at the 2 byte compression identifier.
510 """
510 """
511 if len(repo):
511 if len(repo):
512 raise error.Abort(
512 raise error.Abort(
513 _(b'cannot apply stream clone bundle on non-empty repo')
513 _(b'cannot apply stream clone bundle on non-empty repo')
514 )
514 )
515
515
516 filecount, bytecount, requirements = readbundle1header(fp)
516 filecount, bytecount, requirements = readbundle1header(fp)
517 missingreqs = requirements - repo.supported
517 missingreqs = requirements - repo.supported
518 if missingreqs:
518 if missingreqs:
519 raise error.Abort(
519 raise error.Abort(
520 _(b'unable to apply stream clone: unsupported format: %s')
520 _(b'unable to apply stream clone: unsupported format: %s')
521 % b', '.join(sorted(missingreqs))
521 % b', '.join(sorted(missingreqs))
522 )
522 )
523
523
524 consumev1(repo, fp, filecount, bytecount)
524 consumev1(repo, fp, filecount, bytecount)
525 nodemap.post_stream_cleanup(repo)
525 nodemap.post_stream_cleanup(repo)
526
526
527
527
528 class streamcloneapplier:
528 class streamcloneapplier:
529 """Class to manage applying streaming clone bundles.
529 """Class to manage applying streaming clone bundles.
530
530
531 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
531 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
532 readers to perform bundle type-specific functionality.
532 readers to perform bundle type-specific functionality.
533 """
533 """
534
534
535 def __init__(self, fh):
535 def __init__(self, fh):
536 self._fh = fh
536 self._fh = fh
537
537
538 def apply(self, repo):
538 def apply(self, repo):
539 return applybundlev1(repo, self._fh)
539 return applybundlev1(repo, self._fh)
540
540
541
541
542 # type of file to stream
542 # type of file to stream
543 _fileappend = 0 # append only file
543 _fileappend = 0 # append only file
544 _filefull = 1 # full snapshot file
544 _filefull = 1 # full snapshot file
545
545
546 # Source of the file
546 # Source of the file
547 _srcstore = b's' # store (svfs)
547 _srcstore = b's' # store (svfs)
548 _srccache = b'c' # cache (cache)
548 _srccache = b'c' # cache (cache)
549
549
550 # This is it's own function so extensions can override it.
550 # This is it's own function so extensions can override it.
551 def _walkstreamfullstorefiles(repo):
551 def _walkstreamfullstorefiles(repo):
552 """list snapshot file from the store"""
552 """list snapshot file from the store"""
553 fnames = []
553 fnames = []
554 if not repo.publishing():
554 if not repo.publishing():
555 fnames.append(b'phaseroots')
555 fnames.append(b'phaseroots')
556 return fnames
556 return fnames
557
557
558
558
559 def _filterfull(entry, copy, vfsmap):
559 def _filterfull(entry, copy, vfsmap):
560 """actually copy the snapshot files"""
560 """actually copy the snapshot files"""
561 src, name, ftype, data = entry
561 src, name, ftype, data = entry
562 if ftype != _filefull:
562 if ftype != _filefull:
563 return entry
563 return entry
564 return (src, name, ftype, copy(vfsmap[src].join(name)))
564 return (src, name, ftype, copy(vfsmap[src].join(name)))
565
565
566
566
567 class TempCopyManager:
567 class TempCopyManager:
568 """Manage temporary backup of volatile file during stream clone
568 """Manage temporary backup of volatile file during stream clone
569
569
570 This should be used as a Python context, the copies will be discarded when
570 This should be used as a Python context, the copies will be discarded when
571 exiting the context.
571 exiting the context.
572
572
573 A copy can be done by calling the object on the real path (encoded full
573 A copy can be done by calling the object on the real path (encoded full
574 path)
574 path)
575
575
576 The backup path can be retrieved using the __getitem__ protocol, obj[path].
576 The backup path can be retrieved using the __getitem__ protocol, obj[path].
577 On file without backup, it will return the unmodified path. (equivalent to
577 On file without backup, it will return the unmodified path. (equivalent to
578 `dict.get(x, x)`)
578 `dict.get(x, x)`)
579 """
579 """
580
580
581 def __init__(self):
581 def __init__(self):
582 self._copies = None
582 self._copies = None
583 self._dst_dir = None
583 self._dst_dir = None
584
584
585 def __enter__(self):
585 def __enter__(self):
586 if self._copies is not None:
586 if self._copies is not None:
587 msg = "Copies context already open"
587 msg = "Copies context already open"
588 raise error.ProgrammingError(msg)
588 raise error.ProgrammingError(msg)
589 self._copies = {}
589 self._copies = {}
590 self._dst_dir = pycompat.mkdtemp(prefix=b'hg-clone-')
590 self._dst_dir = pycompat.mkdtemp(prefix=b'hg-clone-')
591 return self
591 return self
592
592
593 def __call__(self, src):
593 def __call__(self, src):
594 """create a backup of the file at src"""
594 """create a backup of the file at src"""
595 prefix = os.path.basename(src)
595 prefix = os.path.basename(src)
596 fd, dst = pycompat.mkstemp(prefix=prefix, dir=self._dst_dir)
596 fd, dst = pycompat.mkstemp(prefix=prefix, dir=self._dst_dir)
597 os.close(fd)
597 os.close(fd)
598 self._copies[src] = dst
598 self._copies[src] = dst
599 util.copyfiles(src, dst, hardlink=True)
599 util.copyfiles(src, dst, hardlink=True)
600 return dst
600 return dst
601
601
602 def __getitem__(self, src):
602 def __getitem__(self, src):
603 """return the path to a valid version of `src`
603 """return the path to a valid version of `src`
604
604
605 If the file has no backup, the path of the file is returned
605 If the file has no backup, the path of the file is returned
606 unmodified."""
606 unmodified."""
607 return self._copies.get(src, src)
607 return self._copies.get(src, src)
608
608
609 def __exit__(self, *args, **kwars):
609 def __exit__(self, *args, **kwars):
610 """discard all backups"""
610 """discard all backups"""
611 for tmp in self._copies.values():
611 for tmp in self._copies.values():
612 util.tryunlink(tmp)
612 util.tryunlink(tmp)
613 util.tryrmdir(self._dst_dir)
613 util.tryrmdir(self._dst_dir)
614 self._copies = None
614 self._copies = None
615 self._dst_dir = None
615 self._dst_dir = None
616
616
617
617
618 def _makemap(repo):
618 def _makemap(repo):
619 """make a (src -> vfs) map for the repo"""
619 """make a (src -> vfs) map for the repo"""
620 vfsmap = {
620 vfsmap = {
621 _srcstore: repo.svfs,
621 _srcstore: repo.svfs,
622 _srccache: repo.cachevfs,
622 _srccache: repo.cachevfs,
623 }
623 }
624 # we keep repo.vfs out of the on purpose, ther are too many danger there
624 # we keep repo.vfs out of the on purpose, ther are too many danger there
625 # (eg: .hg/hgrc)
625 # (eg: .hg/hgrc)
626 assert repo.vfs not in vfsmap.values()
626 assert repo.vfs not in vfsmap.values()
627
627
628 return vfsmap
628 return vfsmap
629
629
630
630
631 def _emit2(repo, entries):
631 def _emit2(repo, entries):
632 """actually emit the stream bundle"""
632 """actually emit the stream bundle"""
633 vfsmap = _makemap(repo)
633 vfsmap = _makemap(repo)
634 # we keep repo.vfs out of the on purpose, ther are too many danger there
634 # we keep repo.vfs out of the on purpose, ther are too many danger there
635 # (eg: .hg/hgrc),
635 # (eg: .hg/hgrc),
636 #
636 #
637 # this assert is duplicated (from _makemap) as author might think this is
637 # this assert is duplicated (from _makemap) as author might think this is
638 # fine, while this is really not fine.
638 # fine, while this is really not fine.
639 if repo.vfs in vfsmap.values():
639 if repo.vfs in vfsmap.values():
640 raise error.ProgrammingError(
640 raise error.ProgrammingError(
641 b'repo.vfs must not be added to vfsmap for security reasons'
641 b'repo.vfs must not be added to vfsmap for security reasons'
642 )
642 )
643
643
644 # translate the vfs one
644 # translate the vfs one
645 entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries]
645 entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries]
646
646
647 max_linkrev = len(repo)
647 max_linkrev = len(repo)
648 file_count = totalfilesize = 0
648 file_count = totalfilesize = 0
649 # record the expected size of every file
649 with util.nogc():
650 for k, vfs, e in entries:
650 # record the expected size of every file
651 for f in e.files():
651 for k, vfs, e in entries:
652 file_count += 1
652 for f in e.files():
653 totalfilesize += f.file_size(vfs)
653 file_count += 1
654 totalfilesize += f.file_size(vfs)
654
655
655 progress = repo.ui.makeprogress(
656 progress = repo.ui.makeprogress(
656 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
657 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
657 )
658 )
658 progress.update(0)
659 progress.update(0)
659 with TempCopyManager() as copy, progress:
660 with TempCopyManager() as copy, progress:
660 # create a copy of volatile files
661 # create a copy of volatile files
661 for k, vfs, e in entries:
662 for k, vfs, e in entries:
662 for f in e.files():
663 for f in e.files():
663 if f.is_volatile:
664 if f.is_volatile:
664 copy(vfs.join(f.unencoded_path))
665 copy(vfs.join(f.unencoded_path))
665 # the first yield release the lock on the repository
666 # the first yield release the lock on the repository
666 yield file_count, totalfilesize
667 yield file_count, totalfilesize
667 totalbytecount = 0
668 totalbytecount = 0
668
669
669 for src, vfs, e in entries:
670 for src, vfs, e in entries:
670 entry_streams = e.get_streams(
671 entry_streams = e.get_streams(
671 repo=repo,
672 repo=repo,
672 vfs=vfs,
673 vfs=vfs,
673 copies=copy,
674 copies=copy,
674 max_changeset=max_linkrev,
675 max_changeset=max_linkrev,
675 preserve_file_count=True,
676 preserve_file_count=True,
676 )
677 )
677 for name, stream, size in entry_streams:
678 for name, stream, size in entry_streams:
678 yield src
679 yield src
679 yield util.uvarintencode(len(name))
680 yield util.uvarintencode(len(name))
680 yield util.uvarintencode(size)
681 yield util.uvarintencode(size)
681 yield name
682 yield name
682 bytecount = 0
683 bytecount = 0
683 for chunk in stream:
684 for chunk in stream:
684 bytecount += len(chunk)
685 bytecount += len(chunk)
685 totalbytecount += len(chunk)
686 totalbytecount += len(chunk)
686 progress.update(totalbytecount)
687 progress.update(totalbytecount)
687 yield chunk
688 yield chunk
688 if bytecount != size:
689 if bytecount != size:
689 # Would most likely be caused by a race due to `hg
690 # Would most likely be caused by a race due to `hg
690 # strip` or a revlog split
691 # strip` or a revlog split
691 msg = _(
692 msg = _(
692 b'clone could only read %d bytes from %s, but '
693 b'clone could only read %d bytes from %s, but '
693 b'expected %d bytes'
694 b'expected %d bytes'
694 )
695 )
695 raise error.Abort(msg % (bytecount, name, size))
696 raise error.Abort(msg % (bytecount, name, size))
696
697
697
698
698 def _emit3(repo, entries):
699 def _emit3(repo, entries):
699 """actually emit the stream bundle (v3)"""
700 """actually emit the stream bundle (v3)"""
700 vfsmap = _makemap(repo)
701 vfsmap = _makemap(repo)
701 # we keep repo.vfs out of the map on purpose, ther are too many dangers
702 # we keep repo.vfs out of the map on purpose, ther are too many dangers
702 # there (eg: .hg/hgrc),
703 # there (eg: .hg/hgrc),
703 #
704 #
704 # this assert is duplicated (from _makemap) as authors might think this is
705 # this assert is duplicated (from _makemap) as authors might think this is
705 # fine, while this is really not fine.
706 # fine, while this is really not fine.
706 if repo.vfs in vfsmap.values():
707 if repo.vfs in vfsmap.values():
707 raise error.ProgrammingError(
708 raise error.ProgrammingError(
708 b'repo.vfs must not be added to vfsmap for security reasons'
709 b'repo.vfs must not be added to vfsmap for security reasons'
709 )
710 )
710
711
711 # translate the vfs once
712 # translate the vfs once
712 entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries]
713 entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries]
713 total_entry_count = len(entries)
714 total_entry_count = len(entries)
714
715
715 max_linkrev = len(repo)
716 max_linkrev = len(repo)
716 progress = repo.ui.makeprogress(
717 progress = repo.ui.makeprogress(
717 _(b'bundle'),
718 _(b'bundle'),
718 total=total_entry_count,
719 total=total_entry_count,
719 unit=_(b'entry'),
720 unit=_(b'entry'),
720 )
721 )
721 progress.update(0)
722 progress.update(0)
722 with TempCopyManager() as copy, progress:
723 with TempCopyManager() as copy, progress:
723 # create a copy of volatile files
724 # create a copy of volatile files
724 for k, vfs, e in entries:
725 for k, vfs, e in entries:
725 if e.maybe_volatile:
726 if e.maybe_volatile:
726 for f in e.files():
727 for f in e.files():
727 if f.is_volatile:
728 if f.is_volatile:
728 # record the expected size under lock
729 # record the expected size under lock
729 f.file_size(vfs)
730 f.file_size(vfs)
730 copy(vfs.join(f.unencoded_path))
731 copy(vfs.join(f.unencoded_path))
731 # the first yield release the lock on the repository
732 # the first yield release the lock on the repository
732 yield None
733 yield None
733
734
734 yield util.uvarintencode(total_entry_count)
735 yield util.uvarintencode(total_entry_count)
735
736
736 for src, vfs, e in entries:
737 for src, vfs, e in entries:
737 entry_streams = e.get_streams(
738 entry_streams = e.get_streams(
738 repo=repo,
739 repo=repo,
739 vfs=vfs,
740 vfs=vfs,
740 copies=copy,
741 copies=copy,
741 max_changeset=max_linkrev,
742 max_changeset=max_linkrev,
742 )
743 )
743 yield util.uvarintencode(len(entry_streams))
744 yield util.uvarintencode(len(entry_streams))
744 for name, stream, size in entry_streams:
745 for name, stream, size in entry_streams:
745 yield src
746 yield src
746 yield util.uvarintencode(len(name))
747 yield util.uvarintencode(len(name))
747 yield util.uvarintencode(size)
748 yield util.uvarintencode(size)
748 yield name
749 yield name
749 yield from stream
750 yield from stream
750 progress.increment()
751 progress.increment()
751
752
752
753
753 def _test_sync_point_walk_1(repo):
754 def _test_sync_point_walk_1(repo):
754 """a function for synchronisation during tests"""
755 """a function for synchronisation during tests"""
755
756
756
757
757 def _test_sync_point_walk_2(repo):
758 def _test_sync_point_walk_2(repo):
758 """a function for synchronisation during tests"""
759 """a function for synchronisation during tests"""
759
760
760
761
761 def _entries_walk(repo, includes, excludes, includeobsmarkers):
762 def _entries_walk(repo, includes, excludes, includeobsmarkers):
762 """emit a seris of files information useful to clone a repo
763 """emit a seris of files information useful to clone a repo
763
764
764 return (vfs-key, entry) iterator
765 return (vfs-key, entry) iterator
765
766
766 Where `entry` is StoreEntry. (used even for cache entries)
767 Where `entry` is StoreEntry. (used even for cache entries)
767 """
768 """
768 assert repo._currentlock(repo._lockref) is not None
769 assert repo._currentlock(repo._lockref) is not None
769
770
770 matcher = None
771 matcher = None
771 if includes or excludes:
772 if includes or excludes:
772 matcher = narrowspec.match(repo.root, includes, excludes)
773 matcher = narrowspec.match(repo.root, includes, excludes)
773
774
774 phase = not repo.publishing()
775 phase = not repo.publishing()
775 # Python is getting crazy at all the small container we creates, disabling
776 # Python is getting crazy at all the small container we creates, disabling
776 # the gc while we do so helps performance a lot.
777 # the gc while we do so helps performance a lot.
777 with util.nogc():
778 with util.nogc():
778 entries = _walkstreamfiles(
779 entries = _walkstreamfiles(
779 repo,
780 repo,
780 matcher,
781 matcher,
781 phase=phase,
782 phase=phase,
782 obsolescence=includeobsmarkers,
783 obsolescence=includeobsmarkers,
783 )
784 )
784 for entry in entries:
785 for entry in entries:
785 yield (_srcstore, entry)
786 yield (_srcstore, entry)
786
787
787 for name in cacheutil.cachetocopy(repo):
788 for name in cacheutil.cachetocopy(repo):
788 if repo.cachevfs.exists(name):
789 if repo.cachevfs.exists(name):
789 # not really a StoreEntry, but close enough
790 # not really a StoreEntry, but close enough
790 entry = store.SimpleStoreEntry(
791 entry = store.SimpleStoreEntry(
791 entry_path=name,
792 entry_path=name,
792 is_volatile=True,
793 is_volatile=True,
793 )
794 )
794 yield (_srccache, entry)
795 yield (_srccache, entry)
795
796
796
797
797 def generatev2(repo, includes, excludes, includeobsmarkers):
798 def generatev2(repo, includes, excludes, includeobsmarkers):
798 """Emit content for version 2 of a streaming clone.
799 """Emit content for version 2 of a streaming clone.
799
800
800 the data stream consists the following entries:
801 the data stream consists the following entries:
801 1) A char representing the file destination (eg: store or cache)
802 1) A char representing the file destination (eg: store or cache)
802 2) A varint containing the length of the filename
803 2) A varint containing the length of the filename
803 3) A varint containing the length of file data
804 3) A varint containing the length of file data
804 4) N bytes containing the filename (the internal, store-agnostic form)
805 4) N bytes containing the filename (the internal, store-agnostic form)
805 5) N bytes containing the file data
806 5) N bytes containing the file data
806
807
807 Returns a 3-tuple of (file count, file size, data iterator).
808 Returns a 3-tuple of (file count, file size, data iterator).
808 """
809 """
809
810
810 with repo.lock():
811 with repo.lock():
811
812
812 repo.ui.debug(b'scanning\n')
813 repo.ui.debug(b'scanning\n')
813
814
814 entries = _entries_walk(
815 entries = _entries_walk(
815 repo,
816 repo,
816 includes=includes,
817 includes=includes,
817 excludes=excludes,
818 excludes=excludes,
818 includeobsmarkers=includeobsmarkers,
819 includeobsmarkers=includeobsmarkers,
819 )
820 )
820
821
821 chunks = _emit2(repo, entries)
822 chunks = _emit2(repo, entries)
822 first = next(chunks)
823 first = next(chunks)
823 file_count, total_file_size = first
824 file_count, total_file_size = first
824 _test_sync_point_walk_1(repo)
825 _test_sync_point_walk_1(repo)
825 _test_sync_point_walk_2(repo)
826 _test_sync_point_walk_2(repo)
826
827
827 return file_count, total_file_size, chunks
828 return file_count, total_file_size, chunks
828
829
829
830
830 def generatev3(repo, includes, excludes, includeobsmarkers):
831 def generatev3(repo, includes, excludes, includeobsmarkers):
831 """Emit content for version 3 of a streaming clone.
832 """Emit content for version 3 of a streaming clone.
832
833
833 the data stream consists the following:
834 the data stream consists the following:
834 1) A varint E containing the number of entries (can be 0), then E entries follow
835 1) A varint E containing the number of entries (can be 0), then E entries follow
835 2) For each entry:
836 2) For each entry:
836 2.1) The number of files in this entry (can be 0, but typically 1 or 2)
837 2.1) The number of files in this entry (can be 0, but typically 1 or 2)
837 2.2) For each file:
838 2.2) For each file:
838 2.2.1) A char representing the file destination (eg: store or cache)
839 2.2.1) A char representing the file destination (eg: store or cache)
839 2.2.2) A varint N containing the length of the filename
840 2.2.2) A varint N containing the length of the filename
840 2.2.3) A varint M containing the length of file data
841 2.2.3) A varint M containing the length of file data
841 2.2.4) N bytes containing the filename (the internal, store-agnostic form)
842 2.2.4) N bytes containing the filename (the internal, store-agnostic form)
842 2.2.5) M bytes containing the file data
843 2.2.5) M bytes containing the file data
843
844
844 Returns the data iterator.
845 Returns the data iterator.
845
846
846 XXX This format is experimental and subject to change. Here is a
847 XXX This format is experimental and subject to change. Here is a
847 XXX non-exhaustive list of things this format could do or change:
848 XXX non-exhaustive list of things this format could do or change:
848
849
849 - making it easier to write files in parallel
850 - making it easier to write files in parallel
850 - holding the lock for a shorter time
851 - holding the lock for a shorter time
851 - improving progress information
852 - improving progress information
852 - ways to adjust the number of expected entries/files ?
853 - ways to adjust the number of expected entries/files ?
853 """
854 """
854
855
855 # Python is getting crazy at all the small container we creates while
856 # Python is getting crazy at all the small container we creates while
856 # considering the files to preserve, disabling the gc while we do so helps
857 # considering the files to preserve, disabling the gc while we do so helps
857 # performance a lot.
858 # performance a lot.
858 with repo.lock(), util.nogc():
859 with repo.lock(), util.nogc():
859
860
860 repo.ui.debug(b'scanning\n')
861 repo.ui.debug(b'scanning\n')
861
862
862 entries = _entries_walk(
863 entries = _entries_walk(
863 repo,
864 repo,
864 includes=includes,
865 includes=includes,
865 excludes=excludes,
866 excludes=excludes,
866 includeobsmarkers=includeobsmarkers,
867 includeobsmarkers=includeobsmarkers,
867 )
868 )
868 chunks = _emit3(repo, list(entries))
869 chunks = _emit3(repo, list(entries))
869 first = next(chunks)
870 first = next(chunks)
870 assert first is None
871 assert first is None
871 _test_sync_point_walk_1(repo)
872 _test_sync_point_walk_1(repo)
872 _test_sync_point_walk_2(repo)
873 _test_sync_point_walk_2(repo)
873
874
874 return chunks
875 return chunks
875
876
876
877
877 @contextlib.contextmanager
878 @contextlib.contextmanager
878 def nested(*ctxs):
879 def nested(*ctxs):
879 this = ctxs[0]
880 this = ctxs[0]
880 rest = ctxs[1:]
881 rest = ctxs[1:]
881 with this:
882 with this:
882 if rest:
883 if rest:
883 with nested(*rest):
884 with nested(*rest):
884 yield
885 yield
885 else:
886 else:
886 yield
887 yield
887
888
888
889
889 def consumev2(repo, fp, filecount, filesize):
890 def consumev2(repo, fp, filecount, filesize):
890 """Apply the contents from a version 2 streaming clone.
891 """Apply the contents from a version 2 streaming clone.
891
892
892 Data is read from an object that only needs to provide a ``read(size)``
893 Data is read from an object that only needs to provide a ``read(size)``
893 method.
894 method.
894 """
895 """
895 with repo.lock():
896 with repo.lock():
896 repo.ui.status(
897 repo.ui.status(
897 _(b'%d files to transfer, %s of data\n')
898 _(b'%d files to transfer, %s of data\n')
898 % (filecount, util.bytecount(filesize))
899 % (filecount, util.bytecount(filesize))
899 )
900 )
900
901
901 start = util.timer()
902 start = util.timer()
902 progress = repo.ui.makeprogress(
903 progress = repo.ui.makeprogress(
903 _(b'clone'), total=filesize, unit=_(b'bytes')
904 _(b'clone'), total=filesize, unit=_(b'bytes')
904 )
905 )
905 progress.update(0)
906 progress.update(0)
906
907
907 vfsmap = _makemap(repo)
908 vfsmap = _makemap(repo)
908 # we keep repo.vfs out of the on purpose, ther are too many danger
909 # we keep repo.vfs out of the on purpose, ther are too many danger
909 # there (eg: .hg/hgrc),
910 # there (eg: .hg/hgrc),
910 #
911 #
911 # this assert is duplicated (from _makemap) as author might think this
912 # this assert is duplicated (from _makemap) as author might think this
912 # is fine, while this is really not fine.
913 # is fine, while this is really not fine.
913 if repo.vfs in vfsmap.values():
914 if repo.vfs in vfsmap.values():
914 raise error.ProgrammingError(
915 raise error.ProgrammingError(
915 b'repo.vfs must not be added to vfsmap for security reasons'
916 b'repo.vfs must not be added to vfsmap for security reasons'
916 )
917 )
917
918
918 with repo.transaction(b'clone'):
919 with repo.transaction(b'clone'):
919 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
920 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
920 with nested(*ctxs):
921 with nested(*ctxs):
921 for i in range(filecount):
922 for i in range(filecount):
922 src = util.readexactly(fp, 1)
923 src = util.readexactly(fp, 1)
923 vfs = vfsmap[src]
924 vfs = vfsmap[src]
924 namelen = util.uvarintdecodestream(fp)
925 namelen = util.uvarintdecodestream(fp)
925 datalen = util.uvarintdecodestream(fp)
926 datalen = util.uvarintdecodestream(fp)
926
927
927 name = util.readexactly(fp, namelen)
928 name = util.readexactly(fp, namelen)
928
929
929 if repo.ui.debugflag:
930 if repo.ui.debugflag:
930 repo.ui.debug(
931 repo.ui.debug(
931 b'adding [%s] %s (%s)\n'
932 b'adding [%s] %s (%s)\n'
932 % (src, name, util.bytecount(datalen))
933 % (src, name, util.bytecount(datalen))
933 )
934 )
934
935
935 with vfs(name, b'w') as ofp:
936 with vfs(name, b'w') as ofp:
936 for chunk in util.filechunkiter(fp, limit=datalen):
937 for chunk in util.filechunkiter(fp, limit=datalen):
937 progress.increment(step=len(chunk))
938 progress.increment(step=len(chunk))
938 ofp.write(chunk)
939 ofp.write(chunk)
939
940
940 # force @filecache properties to be reloaded from
941 # force @filecache properties to be reloaded from
941 # streamclone-ed file at next access
942 # streamclone-ed file at next access
942 repo.invalidate(clearfilecache=True)
943 repo.invalidate(clearfilecache=True)
943
944
944 elapsed = util.timer() - start
945 elapsed = util.timer() - start
945 if elapsed <= 0:
946 if elapsed <= 0:
946 elapsed = 0.001
947 elapsed = 0.001
947 repo.ui.status(
948 repo.ui.status(
948 _(b'transferred %s in %.1f seconds (%s/sec)\n')
949 _(b'transferred %s in %.1f seconds (%s/sec)\n')
949 % (
950 % (
950 util.bytecount(progress.pos),
951 util.bytecount(progress.pos),
951 elapsed,
952 elapsed,
952 util.bytecount(progress.pos / elapsed),
953 util.bytecount(progress.pos / elapsed),
953 )
954 )
954 )
955 )
955 progress.complete()
956 progress.complete()
956
957
957
958
958 def consumev3(repo, fp):
959 def consumev3(repo, fp):
959 """Apply the contents from a version 3 streaming clone.
960 """Apply the contents from a version 3 streaming clone.
960
961
961 Data is read from an object that only needs to provide a ``read(size)``
962 Data is read from an object that only needs to provide a ``read(size)``
962 method.
963 method.
963 """
964 """
964 with repo.lock():
965 with repo.lock():
965 start = util.timer()
966 start = util.timer()
966
967
967 entrycount = util.uvarintdecodestream(fp)
968 entrycount = util.uvarintdecodestream(fp)
968 repo.ui.status(_(b'%d entries to transfer\n') % (entrycount))
969 repo.ui.status(_(b'%d entries to transfer\n') % (entrycount))
969
970
970 progress = repo.ui.makeprogress(
971 progress = repo.ui.makeprogress(
971 _(b'clone'),
972 _(b'clone'),
972 total=entrycount,
973 total=entrycount,
973 unit=_(b'entries'),
974 unit=_(b'entries'),
974 )
975 )
975 progress.update(0)
976 progress.update(0)
976 bytes_transferred = 0
977 bytes_transferred = 0
977
978
978 vfsmap = _makemap(repo)
979 vfsmap = _makemap(repo)
979 # we keep repo.vfs out of the on purpose, there are too many dangers
980 # we keep repo.vfs out of the on purpose, there are too many dangers
980 # there (eg: .hg/hgrc),
981 # there (eg: .hg/hgrc),
981 #
982 #
982 # this assert is duplicated (from _makemap) as authors might think this
983 # this assert is duplicated (from _makemap) as authors might think this
983 # is fine, while this is really not fine.
984 # is fine, while this is really not fine.
984 if repo.vfs in vfsmap.values():
985 if repo.vfs in vfsmap.values():
985 raise error.ProgrammingError(
986 raise error.ProgrammingError(
986 b'repo.vfs must not be added to vfsmap for security reasons'
987 b'repo.vfs must not be added to vfsmap for security reasons'
987 )
988 )
988
989
989 with repo.transaction(b'clone'):
990 with repo.transaction(b'clone'):
990 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
991 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
991 with nested(*ctxs):
992 with nested(*ctxs):
992
993
993 for i in range(entrycount):
994 for i in range(entrycount):
994 filecount = util.uvarintdecodestream(fp)
995 filecount = util.uvarintdecodestream(fp)
995 if filecount == 0:
996 if filecount == 0:
996 if repo.ui.debugflag:
997 if repo.ui.debugflag:
997 repo.ui.debug(b'entry with no files [%d]\n' % (i))
998 repo.ui.debug(b'entry with no files [%d]\n' % (i))
998 for i in range(filecount):
999 for i in range(filecount):
999 src = util.readexactly(fp, 1)
1000 src = util.readexactly(fp, 1)
1000 vfs = vfsmap[src]
1001 vfs = vfsmap[src]
1001 namelen = util.uvarintdecodestream(fp)
1002 namelen = util.uvarintdecodestream(fp)
1002 datalen = util.uvarintdecodestream(fp)
1003 datalen = util.uvarintdecodestream(fp)
1003
1004
1004 name = util.readexactly(fp, namelen)
1005 name = util.readexactly(fp, namelen)
1005
1006
1006 if repo.ui.debugflag:
1007 if repo.ui.debugflag:
1007 msg = b'adding [%s] %s (%s)\n'
1008 msg = b'adding [%s] %s (%s)\n'
1008 msg %= (src, name, util.bytecount(datalen))
1009 msg %= (src, name, util.bytecount(datalen))
1009 repo.ui.debug(msg)
1010 repo.ui.debug(msg)
1010 bytes_transferred += datalen
1011 bytes_transferred += datalen
1011
1012
1012 with vfs(name, b'w') as ofp:
1013 with vfs(name, b'w') as ofp:
1013 for chunk in util.filechunkiter(fp, limit=datalen):
1014 for chunk in util.filechunkiter(fp, limit=datalen):
1014 ofp.write(chunk)
1015 ofp.write(chunk)
1015 progress.increment(step=1)
1016 progress.increment(step=1)
1016
1017
1017 # force @filecache properties to be reloaded from
1018 # force @filecache properties to be reloaded from
1018 # streamclone-ed file at next access
1019 # streamclone-ed file at next access
1019 repo.invalidate(clearfilecache=True)
1020 repo.invalidate(clearfilecache=True)
1020
1021
1021 elapsed = util.timer() - start
1022 elapsed = util.timer() - start
1022 if elapsed <= 0:
1023 if elapsed <= 0:
1023 elapsed = 0.001
1024 elapsed = 0.001
1024 msg = _(b'transferred %s in %.1f seconds (%s/sec)\n')
1025 msg = _(b'transferred %s in %.1f seconds (%s/sec)\n')
1025 byte_count = util.bytecount(bytes_transferred)
1026 byte_count = util.bytecount(bytes_transferred)
1026 bytes_sec = util.bytecount(bytes_transferred / elapsed)
1027 bytes_sec = util.bytecount(bytes_transferred / elapsed)
1027 msg %= (byte_count, elapsed, bytes_sec)
1028 msg %= (byte_count, elapsed, bytes_sec)
1028 repo.ui.status(msg)
1029 repo.ui.status(msg)
1029 progress.complete()
1030 progress.complete()
1030
1031
1031
1032
1032 def applybundlev2(repo, fp, filecount, filesize, requirements):
1033 def applybundlev2(repo, fp, filecount, filesize, requirements):
1033 from . import localrepo
1034 from . import localrepo
1034
1035
1035 missingreqs = [r for r in requirements if r not in repo.supported]
1036 missingreqs = [r for r in requirements if r not in repo.supported]
1036 if missingreqs:
1037 if missingreqs:
1037 raise error.Abort(
1038 raise error.Abort(
1038 _(b'unable to apply stream clone: unsupported format: %s')
1039 _(b'unable to apply stream clone: unsupported format: %s')
1039 % b', '.join(sorted(missingreqs))
1040 % b', '.join(sorted(missingreqs))
1040 )
1041 )
1041
1042
1042 consumev2(repo, fp, filecount, filesize)
1043 consumev2(repo, fp, filecount, filesize)
1043
1044
1044 repo.requirements = new_stream_clone_requirements(
1045 repo.requirements = new_stream_clone_requirements(
1045 repo.requirements,
1046 repo.requirements,
1046 requirements,
1047 requirements,
1047 )
1048 )
1048 repo.svfs.options = localrepo.resolvestorevfsoptions(
1049 repo.svfs.options = localrepo.resolvestorevfsoptions(
1049 repo.ui, repo.requirements, repo.features
1050 repo.ui, repo.requirements, repo.features
1050 )
1051 )
1051 scmutil.writereporequirements(repo)
1052 scmutil.writereporequirements(repo)
1052 nodemap.post_stream_cleanup(repo)
1053 nodemap.post_stream_cleanup(repo)
1053
1054
1054
1055
1055 def applybundlev3(repo, fp, requirements):
1056 def applybundlev3(repo, fp, requirements):
1056 from . import localrepo
1057 from . import localrepo
1057
1058
1058 missingreqs = [r for r in requirements if r not in repo.supported]
1059 missingreqs = [r for r in requirements if r not in repo.supported]
1059 if missingreqs:
1060 if missingreqs:
1060 msg = _(b'unable to apply stream clone: unsupported format: %s')
1061 msg = _(b'unable to apply stream clone: unsupported format: %s')
1061 msg %= b', '.join(sorted(missingreqs))
1062 msg %= b', '.join(sorted(missingreqs))
1062 raise error.Abort(msg)
1063 raise error.Abort(msg)
1063
1064
1064 consumev3(repo, fp)
1065 consumev3(repo, fp)
1065
1066
1066 repo.requirements = new_stream_clone_requirements(
1067 repo.requirements = new_stream_clone_requirements(
1067 repo.requirements,
1068 repo.requirements,
1068 requirements,
1069 requirements,
1069 )
1070 )
1070 repo.svfs.options = localrepo.resolvestorevfsoptions(
1071 repo.svfs.options = localrepo.resolvestorevfsoptions(
1071 repo.ui, repo.requirements, repo.features
1072 repo.ui, repo.requirements, repo.features
1072 )
1073 )
1073 scmutil.writereporequirements(repo)
1074 scmutil.writereporequirements(repo)
1074 nodemap.post_stream_cleanup(repo)
1075 nodemap.post_stream_cleanup(repo)
1075
1076
1076
1077
1077 def _copy_files(src_vfs_map, dst_vfs_map, entries, progress):
1078 def _copy_files(src_vfs_map, dst_vfs_map, entries, progress):
1078 hardlink = [True]
1079 hardlink = [True]
1079
1080
1080 def copy_used():
1081 def copy_used():
1081 hardlink[0] = False
1082 hardlink[0] = False
1082 progress.topic = _(b'copying')
1083 progress.topic = _(b'copying')
1083
1084
1084 for k, path in entries:
1085 for k, path in entries:
1085 src_vfs = src_vfs_map[k]
1086 src_vfs = src_vfs_map[k]
1086 dst_vfs = dst_vfs_map[k]
1087 dst_vfs = dst_vfs_map[k]
1087 src_path = src_vfs.join(path)
1088 src_path = src_vfs.join(path)
1088 dst_path = dst_vfs.join(path)
1089 dst_path = dst_vfs.join(path)
1089 # We cannot use dirname and makedirs of dst_vfs here because the store
1090 # We cannot use dirname and makedirs of dst_vfs here because the store
1090 # encoding confuses them. See issue 6581 for details.
1091 # encoding confuses them. See issue 6581 for details.
1091 dirname = os.path.dirname(dst_path)
1092 dirname = os.path.dirname(dst_path)
1092 if not os.path.exists(dirname):
1093 if not os.path.exists(dirname):
1093 util.makedirs(dirname)
1094 util.makedirs(dirname)
1094 dst_vfs.register_file(path)
1095 dst_vfs.register_file(path)
1095 # XXX we could use the #nb_bytes argument.
1096 # XXX we could use the #nb_bytes argument.
1096 util.copyfile(
1097 util.copyfile(
1097 src_path,
1098 src_path,
1098 dst_path,
1099 dst_path,
1099 hardlink=hardlink[0],
1100 hardlink=hardlink[0],
1100 no_hardlink_cb=copy_used,
1101 no_hardlink_cb=copy_used,
1101 check_fs_hardlink=False,
1102 check_fs_hardlink=False,
1102 )
1103 )
1103 progress.increment()
1104 progress.increment()
1104 return hardlink[0]
1105 return hardlink[0]
1105
1106
1106
1107
1107 def local_copy(src_repo, dest_repo):
1108 def local_copy(src_repo, dest_repo):
1108 """copy all content from one local repository to another
1109 """copy all content from one local repository to another
1109
1110
1110 This is useful for local clone"""
1111 This is useful for local clone"""
1111 src_store_requirements = {
1112 src_store_requirements = {
1112 r
1113 r
1113 for r in src_repo.requirements
1114 for r in src_repo.requirements
1114 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
1115 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
1115 }
1116 }
1116 dest_store_requirements = {
1117 dest_store_requirements = {
1117 r
1118 r
1118 for r in dest_repo.requirements
1119 for r in dest_repo.requirements
1119 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
1120 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
1120 }
1121 }
1121 assert src_store_requirements == dest_store_requirements
1122 assert src_store_requirements == dest_store_requirements
1122
1123
1123 with dest_repo.lock():
1124 with dest_repo.lock():
1124 with src_repo.lock():
1125 with src_repo.lock():
1125
1126
1126 # bookmark is not integrated to the streaming as it might use the
1127 # bookmark is not integrated to the streaming as it might use the
1127 # `repo.vfs` and they are too many sentitive data accessible
1128 # `repo.vfs` and they are too many sentitive data accessible
1128 # through `repo.vfs` to expose it to streaming clone.
1129 # through `repo.vfs` to expose it to streaming clone.
1129 src_book_vfs = bookmarks.bookmarksvfs(src_repo)
1130 src_book_vfs = bookmarks.bookmarksvfs(src_repo)
1130 srcbookmarks = src_book_vfs.join(b'bookmarks')
1131 srcbookmarks = src_book_vfs.join(b'bookmarks')
1131 bm_count = 0
1132 bm_count = 0
1132 if os.path.exists(srcbookmarks):
1133 if os.path.exists(srcbookmarks):
1133 bm_count = 1
1134 bm_count = 1
1134
1135
1135 entries = _entries_walk(
1136 entries = _entries_walk(
1136 src_repo,
1137 src_repo,
1137 includes=None,
1138 includes=None,
1138 excludes=None,
1139 excludes=None,
1139 includeobsmarkers=True,
1140 includeobsmarkers=True,
1140 )
1141 )
1141 entries = list(entries)
1142 entries = list(entries)
1142 src_vfs_map = _makemap(src_repo)
1143 src_vfs_map = _makemap(src_repo)
1143 dest_vfs_map = _makemap(dest_repo)
1144 dest_vfs_map = _makemap(dest_repo)
1144 total_files = sum(len(e[1].files()) for e in entries) + bm_count
1145 total_files = sum(len(e[1].files()) for e in entries) + bm_count
1145 progress = src_repo.ui.makeprogress(
1146 progress = src_repo.ui.makeprogress(
1146 topic=_(b'linking'),
1147 topic=_(b'linking'),
1147 total=total_files,
1148 total=total_files,
1148 unit=_(b'files'),
1149 unit=_(b'files'),
1149 )
1150 )
1150 # copy files
1151 # copy files
1151 #
1152 #
1152 # We could copy the full file while the source repository is locked
1153 # We could copy the full file while the source repository is locked
1153 # and the other one without the lock. However, in the linking case,
1154 # and the other one without the lock. However, in the linking case,
1154 # this would also requires checks that nobody is appending any data
1155 # this would also requires checks that nobody is appending any data
1155 # to the files while we do the clone, so this is not done yet. We
1156 # to the files while we do the clone, so this is not done yet. We
1156 # could do this blindly when copying files.
1157 # could do this blindly when copying files.
1157 files = [
1158 files = [
1158 (vfs_key, f.unencoded_path)
1159 (vfs_key, f.unencoded_path)
1159 for vfs_key, e in entries
1160 for vfs_key, e in entries
1160 for f in e.files()
1161 for f in e.files()
1161 ]
1162 ]
1162 hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress)
1163 hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress)
1163
1164
1164 # copy bookmarks over
1165 # copy bookmarks over
1165 if bm_count:
1166 if bm_count:
1166 dst_book_vfs = bookmarks.bookmarksvfs(dest_repo)
1167 dst_book_vfs = bookmarks.bookmarksvfs(dest_repo)
1167 dstbookmarks = dst_book_vfs.join(b'bookmarks')
1168 dstbookmarks = dst_book_vfs.join(b'bookmarks')
1168 util.copyfile(srcbookmarks, dstbookmarks)
1169 util.copyfile(srcbookmarks, dstbookmarks)
1169 progress.complete()
1170 progress.complete()
1170 if hardlink:
1171 if hardlink:
1171 msg = b'linked %d files\n'
1172 msg = b'linked %d files\n'
1172 else:
1173 else:
1173 msg = b'copied %d files\n'
1174 msg = b'copied %d files\n'
1174 src_repo.ui.debug(msg % total_files)
1175 src_repo.ui.debug(msg % total_files)
1175
1176
1176 with dest_repo.transaction(b"localclone") as tr:
1177 with dest_repo.transaction(b"localclone") as tr:
1177 dest_repo.store.write(tr)
1178 dest_repo.store.write(tr)
1178
1179
1179 # clean up transaction file as they do not make sense
1180 # clean up transaction file as they do not make sense
1180 transaction.cleanup_undo_files(dest_repo.ui.warn, dest_repo.vfs_map)
1181 transaction.cleanup_undo_files(dest_repo.ui.warn, dest_repo.vfs_map)
General Comments 0
You need to be logged in to leave comments. Login now