##// END OF EJS Templates
stream: prefer keeping an open file handle to volatile file instead of copy...
marmoute -
r52912:a47f09da default
parent child Browse files
Show More
@@ -1,1178 +1,1238 b''
1 # streamclone.py - producing and consuming streaming repository data
1 # streamclone.py - producing and consuming streaming repository data
2 #
2 #
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import annotations
8 from __future__ import annotations
9
9
10 import contextlib
10 import contextlib
11 import os
11 import os
12 import struct
12 import struct
13
13
14 from .i18n import _
14 from .i18n import _
15 from .interfaces import repository
15 from .interfaces import repository
16 from . import (
16 from . import (
17 bookmarks,
17 bookmarks,
18 bundle2 as bundle2mod,
18 bundle2 as bundle2mod,
19 cacheutil,
19 cacheutil,
20 error,
20 error,
21 narrowspec,
21 narrowspec,
22 phases,
22 phases,
23 pycompat,
23 pycompat,
24 requirements as requirementsmod,
24 requirements as requirementsmod,
25 scmutil,
25 scmutil,
26 store,
26 store,
27 transaction,
27 transaction,
28 util,
28 util,
29 )
29 )
30 from .revlogutils import (
30 from .revlogutils import (
31 nodemap,
31 nodemap,
32 )
32 )
33
33
34
34
35 def new_stream_clone_requirements(default_requirements, streamed_requirements):
35 def new_stream_clone_requirements(default_requirements, streamed_requirements):
36 """determine the final set of requirement for a new stream clone
36 """determine the final set of requirement for a new stream clone
37
37
38 this method combine the "default" requirements that a new repository would
38 this method combine the "default" requirements that a new repository would
39 use with the constaint we get from the stream clone content. We keep local
39 use with the constaint we get from the stream clone content. We keep local
40 configuration choice when possible.
40 configuration choice when possible.
41 """
41 """
42 requirements = set(default_requirements)
42 requirements = set(default_requirements)
43 requirements -= requirementsmod.STREAM_FIXED_REQUIREMENTS
43 requirements -= requirementsmod.STREAM_FIXED_REQUIREMENTS
44 requirements.update(streamed_requirements)
44 requirements.update(streamed_requirements)
45 return requirements
45 return requirements
46
46
47
47
48 def streamed_requirements(repo):
48 def streamed_requirements(repo):
49 """the set of requirement the new clone will have to support
49 """the set of requirement the new clone will have to support
50
50
51 This is used for advertising the stream options and to generate the actual
51 This is used for advertising the stream options and to generate the actual
52 stream content."""
52 stream content."""
53 requiredformats = (
53 requiredformats = (
54 repo.requirements & requirementsmod.STREAM_FIXED_REQUIREMENTS
54 repo.requirements & requirementsmod.STREAM_FIXED_REQUIREMENTS
55 )
55 )
56 return requiredformats
56 return requiredformats
57
57
58
58
59 def canperformstreamclone(pullop, bundle2=False):
59 def canperformstreamclone(pullop, bundle2=False):
60 """Whether it is possible to perform a streaming clone as part of pull.
60 """Whether it is possible to perform a streaming clone as part of pull.
61
61
62 ``bundle2`` will cause the function to consider stream clone through
62 ``bundle2`` will cause the function to consider stream clone through
63 bundle2 and only through bundle2.
63 bundle2 and only through bundle2.
64
64
65 Returns a tuple of (supported, requirements). ``supported`` is True if
65 Returns a tuple of (supported, requirements). ``supported`` is True if
66 streaming clone is supported and False otherwise. ``requirements`` is
66 streaming clone is supported and False otherwise. ``requirements`` is
67 a set of repo requirements from the remote, or ``None`` if stream clone
67 a set of repo requirements from the remote, or ``None`` if stream clone
68 isn't supported.
68 isn't supported.
69 """
69 """
70 repo = pullop.repo
70 repo = pullop.repo
71 remote = pullop.remote
71 remote = pullop.remote
72
72
73 # should we consider streaming clone at all ?
73 # should we consider streaming clone at all ?
74 streamrequested = pullop.streamclonerequested
74 streamrequested = pullop.streamclonerequested
75 # If we don't have a preference, let the server decide for us. This
75 # If we don't have a preference, let the server decide for us. This
76 # likely only comes into play in LANs.
76 # likely only comes into play in LANs.
77 if streamrequested is None:
77 if streamrequested is None:
78 # The server can advertise whether to prefer streaming clone.
78 # The server can advertise whether to prefer streaming clone.
79 streamrequested = remote.capable(b'stream-preferred')
79 streamrequested = remote.capable(b'stream-preferred')
80 if not streamrequested:
80 if not streamrequested:
81 return False, None
81 return False, None
82
82
83 # Streaming clone only works on an empty destination repository
83 # Streaming clone only works on an empty destination repository
84 if len(repo):
84 if len(repo):
85 return False, None
85 return False, None
86
86
87 # Streaming clone only works if all data is being requested.
87 # Streaming clone only works if all data is being requested.
88 if pullop.heads:
88 if pullop.heads:
89 return False, None
89 return False, None
90
90
91 bundle2supported = False
91 bundle2supported = False
92 if pullop.canusebundle2:
92 if pullop.canusebundle2:
93 local_caps = bundle2mod.getrepocaps(repo, role=b'client')
93 local_caps = bundle2mod.getrepocaps(repo, role=b'client')
94 local_supported = set(local_caps.get(b'stream', []))
94 local_supported = set(local_caps.get(b'stream', []))
95 remote_supported = set(pullop.remotebundle2caps.get(b'stream', []))
95 remote_supported = set(pullop.remotebundle2caps.get(b'stream', []))
96 bundle2supported = bool(local_supported & remote_supported)
96 bundle2supported = bool(local_supported & remote_supported)
97 # else
97 # else
98 # Server doesn't support bundle2 stream clone or doesn't support
98 # Server doesn't support bundle2 stream clone or doesn't support
99 # the versions we support. Fall back and possibly allow legacy.
99 # the versions we support. Fall back and possibly allow legacy.
100
100
101 # Ensures legacy code path uses available bundle2.
101 # Ensures legacy code path uses available bundle2.
102 if bundle2supported and not bundle2:
102 if bundle2supported and not bundle2:
103 return False, None
103 return False, None
104 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
104 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
105 elif bundle2 and not bundle2supported:
105 elif bundle2 and not bundle2supported:
106 return False, None
106 return False, None
107
107
108 # In order for stream clone to work, the client has to support all the
108 # In order for stream clone to work, the client has to support all the
109 # requirements advertised by the server.
109 # requirements advertised by the server.
110 #
110 #
111 # The server advertises its requirements via the "stream" and "streamreqs"
111 # The server advertises its requirements via the "stream" and "streamreqs"
112 # capability. "stream" (a value-less capability) is advertised if and only
112 # capability. "stream" (a value-less capability) is advertised if and only
113 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
113 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
114 # is advertised and contains a comma-delimited list of requirements.
114 # is advertised and contains a comma-delimited list of requirements.
115 requirements = set()
115 requirements = set()
116 if remote.capable(b'stream'):
116 if remote.capable(b'stream'):
117 requirements.add(requirementsmod.REVLOGV1_REQUIREMENT)
117 requirements.add(requirementsmod.REVLOGV1_REQUIREMENT)
118 else:
118 else:
119 streamreqs = remote.capable(b'streamreqs')
119 streamreqs = remote.capable(b'streamreqs')
120 # This is weird and shouldn't happen with modern servers.
120 # This is weird and shouldn't happen with modern servers.
121 if not streamreqs:
121 if not streamreqs:
122 pullop.repo.ui.warn(
122 pullop.repo.ui.warn(
123 _(
123 _(
124 b'warning: stream clone requested but server has them '
124 b'warning: stream clone requested but server has them '
125 b'disabled\n'
125 b'disabled\n'
126 )
126 )
127 )
127 )
128 return False, None
128 return False, None
129
129
130 streamreqs = set(streamreqs.split(b','))
130 streamreqs = set(streamreqs.split(b','))
131 # Server requires something we don't support. Bail.
131 # Server requires something we don't support. Bail.
132 missingreqs = streamreqs - repo.supported
132 missingreqs = streamreqs - repo.supported
133 if missingreqs:
133 if missingreqs:
134 pullop.repo.ui.warn(
134 pullop.repo.ui.warn(
135 _(
135 _(
136 b'warning: stream clone requested but client is missing '
136 b'warning: stream clone requested but client is missing '
137 b'requirements: %s\n'
137 b'requirements: %s\n'
138 )
138 )
139 % b', '.join(sorted(missingreqs))
139 % b', '.join(sorted(missingreqs))
140 )
140 )
141 pullop.repo.ui.warn(
141 pullop.repo.ui.warn(
142 _(
142 _(
143 b'(see https://www.mercurial-scm.org/wiki/MissingRequirement '
143 b'(see https://www.mercurial-scm.org/wiki/MissingRequirement '
144 b'for more information)\n'
144 b'for more information)\n'
145 )
145 )
146 )
146 )
147 return False, None
147 return False, None
148 requirements = streamreqs
148 requirements = streamreqs
149
149
150 return True, requirements
150 return True, requirements
151
151
152
152
153 def maybeperformlegacystreamclone(pullop):
153 def maybeperformlegacystreamclone(pullop):
154 """Possibly perform a legacy stream clone operation.
154 """Possibly perform a legacy stream clone operation.
155
155
156 Legacy stream clones are performed as part of pull but before all other
156 Legacy stream clones are performed as part of pull but before all other
157 operations.
157 operations.
158
158
159 A legacy stream clone will not be performed if a bundle2 stream clone is
159 A legacy stream clone will not be performed if a bundle2 stream clone is
160 supported.
160 supported.
161 """
161 """
162 from . import localrepo
162 from . import localrepo
163
163
164 supported, requirements = canperformstreamclone(pullop)
164 supported, requirements = canperformstreamclone(pullop)
165
165
166 if not supported:
166 if not supported:
167 return
167 return
168
168
169 repo = pullop.repo
169 repo = pullop.repo
170 remote = pullop.remote
170 remote = pullop.remote
171
171
172 # Save remote branchmap. We will use it later to speed up branchcache
172 # Save remote branchmap. We will use it later to speed up branchcache
173 # creation.
173 # creation.
174 rbranchmap = None
174 rbranchmap = None
175 if remote.capable(b'branchmap'):
175 if remote.capable(b'branchmap'):
176 with remote.commandexecutor() as e:
176 with remote.commandexecutor() as e:
177 rbranchmap = e.callcommand(b'branchmap', {}).result()
177 rbranchmap = e.callcommand(b'branchmap', {}).result()
178
178
179 repo.ui.status(_(b'streaming all changes\n'))
179 repo.ui.status(_(b'streaming all changes\n'))
180
180
181 with remote.commandexecutor() as e:
181 with remote.commandexecutor() as e:
182 fp = e.callcommand(b'stream_out', {}).result()
182 fp = e.callcommand(b'stream_out', {}).result()
183
183
184 # TODO strictly speaking, this code should all be inside the context
184 # TODO strictly speaking, this code should all be inside the context
185 # manager because the context manager is supposed to ensure all wire state
185 # manager because the context manager is supposed to ensure all wire state
186 # is flushed when exiting. But the legacy peers don't do this, so it
186 # is flushed when exiting. But the legacy peers don't do this, so it
187 # doesn't matter.
187 # doesn't matter.
188 l = fp.readline()
188 l = fp.readline()
189 try:
189 try:
190 resp = int(l)
190 resp = int(l)
191 except ValueError:
191 except ValueError:
192 raise error.ResponseError(
192 raise error.ResponseError(
193 _(b'unexpected response from remote server:'), l
193 _(b'unexpected response from remote server:'), l
194 )
194 )
195 if resp == 1:
195 if resp == 1:
196 raise error.Abort(_(b'operation forbidden by server'))
196 raise error.Abort(_(b'operation forbidden by server'))
197 elif resp == 2:
197 elif resp == 2:
198 raise error.Abort(_(b'locking the remote repository failed'))
198 raise error.Abort(_(b'locking the remote repository failed'))
199 elif resp != 0:
199 elif resp != 0:
200 raise error.Abort(_(b'the server sent an unknown error code'))
200 raise error.Abort(_(b'the server sent an unknown error code'))
201
201
202 l = fp.readline()
202 l = fp.readline()
203 try:
203 try:
204 filecount, bytecount = map(int, l.split(b' ', 1))
204 filecount, bytecount = map(int, l.split(b' ', 1))
205 except (ValueError, TypeError):
205 except (ValueError, TypeError):
206 raise error.ResponseError(
206 raise error.ResponseError(
207 _(b'unexpected response from remote server:'), l
207 _(b'unexpected response from remote server:'), l
208 )
208 )
209
209
210 with repo.lock():
210 with repo.lock():
211 consumev1(repo, fp, filecount, bytecount)
211 consumev1(repo, fp, filecount, bytecount)
212 repo.requirements = new_stream_clone_requirements(
212 repo.requirements = new_stream_clone_requirements(
213 repo.requirements,
213 repo.requirements,
214 requirements,
214 requirements,
215 )
215 )
216 repo.svfs.options = localrepo.resolvestorevfsoptions(
216 repo.svfs.options = localrepo.resolvestorevfsoptions(
217 repo.ui, repo.requirements, repo.features
217 repo.ui, repo.requirements, repo.features
218 )
218 )
219 scmutil.writereporequirements(repo)
219 scmutil.writereporequirements(repo)
220 nodemap.post_stream_cleanup(repo)
220 nodemap.post_stream_cleanup(repo)
221
221
222 if rbranchmap:
222 if rbranchmap:
223 repo._branchcaches.replace(repo, rbranchmap)
223 repo._branchcaches.replace(repo, rbranchmap)
224
224
225 repo.invalidate()
225 repo.invalidate()
226
226
227
227
228 def allowservergeneration(repo):
228 def allowservergeneration(repo):
229 """Whether streaming clones are allowed from the server."""
229 """Whether streaming clones are allowed from the server."""
230 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
230 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
231 return False
231 return False
232
232
233 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
233 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
234 return False
234 return False
235
235
236 # The way stream clone works makes it impossible to hide secret changesets.
236 # The way stream clone works makes it impossible to hide secret changesets.
237 # So don't allow this by default.
237 # So don't allow this by default.
238 secret = phases.hassecret(repo)
238 secret = phases.hassecret(repo)
239 if secret:
239 if secret:
240 return repo.ui.configbool(b'server', b'uncompressedallowsecret')
240 return repo.ui.configbool(b'server', b'uncompressedallowsecret')
241
241
242 return True
242 return True
243
243
244
244
245 # This is it's own function so extensions can override it.
245 # This is it's own function so extensions can override it.
246 def _walkstreamfiles(repo, matcher=None, phase=False, obsolescence=False):
246 def _walkstreamfiles(repo, matcher=None, phase=False, obsolescence=False):
247 return repo.store.walk(matcher, phase=phase, obsolescence=obsolescence)
247 return repo.store.walk(matcher, phase=phase, obsolescence=obsolescence)
248
248
249
249
250 def generatev1(repo):
250 def generatev1(repo):
251 """Emit content for version 1 of a streaming clone.
251 """Emit content for version 1 of a streaming clone.
252
252
253 This returns a 3-tuple of (file count, byte size, data iterator).
253 This returns a 3-tuple of (file count, byte size, data iterator).
254
254
255 The data iterator consists of N entries for each file being transferred.
255 The data iterator consists of N entries for each file being transferred.
256 Each file entry starts as a line with the file name and integer size
256 Each file entry starts as a line with the file name and integer size
257 delimited by a null byte.
257 delimited by a null byte.
258
258
259 The raw file data follows. Following the raw file data is the next file
259 The raw file data follows. Following the raw file data is the next file
260 entry, or EOF.
260 entry, or EOF.
261
261
262 When used on the wire protocol, an additional line indicating protocol
262 When used on the wire protocol, an additional line indicating protocol
263 success will be prepended to the stream. This function is not responsible
263 success will be prepended to the stream. This function is not responsible
264 for adding it.
264 for adding it.
265
265
266 This function will obtain a repository lock to ensure a consistent view of
266 This function will obtain a repository lock to ensure a consistent view of
267 the store is captured. It therefore may raise LockError.
267 the store is captured. It therefore may raise LockError.
268 """
268 """
269 entries = []
269 entries = []
270 total_bytes = 0
270 total_bytes = 0
271 # Get consistent snapshot of repo, lock during scan.
271 # Get consistent snapshot of repo, lock during scan.
272 with repo.lock():
272 with repo.lock():
273 repo.ui.debug(b'scanning\n')
273 repo.ui.debug(b'scanning\n')
274 for entry in _walkstreamfiles(repo):
274 for entry in _walkstreamfiles(repo):
275 for f in entry.files():
275 for f in entry.files():
276 file_size = f.file_size(repo.store.vfs)
276 file_size = f.file_size(repo.store.vfs)
277 if file_size:
277 if file_size:
278 entries.append((f.unencoded_path, file_size))
278 entries.append((f.unencoded_path, file_size))
279 total_bytes += file_size
279 total_bytes += file_size
280 _test_sync_point_walk_1(repo)
280 _test_sync_point_walk_1(repo)
281 _test_sync_point_walk_2(repo)
281 _test_sync_point_walk_2(repo)
282
282
283 repo.ui.debug(
283 repo.ui.debug(
284 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
284 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
285 )
285 )
286
286
287 svfs = repo.svfs
287 svfs = repo.svfs
288 debugflag = repo.ui.debugflag
288 debugflag = repo.ui.debugflag
289
289
290 def emitrevlogdata():
290 def emitrevlogdata():
291 for name, size in entries:
291 for name, size in entries:
292 if debugflag:
292 if debugflag:
293 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
293 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
294 # partially encode name over the wire for backwards compat
294 # partially encode name over the wire for backwards compat
295 yield b'%s\0%d\n' % (store.encodedir(name), size)
295 yield b'%s\0%d\n' % (store.encodedir(name), size)
296 # auditing at this stage is both pointless (paths are already
296 # auditing at this stage is both pointless (paths are already
297 # trusted by the local repo) and expensive
297 # trusted by the local repo) and expensive
298 with svfs(name, b'rb', auditpath=False) as fp:
298 with svfs(name, b'rb', auditpath=False) as fp:
299 if size <= 65536:
299 if size <= 65536:
300 yield fp.read(size)
300 yield fp.read(size)
301 else:
301 else:
302 for chunk in util.filechunkiter(fp, limit=size):
302 for chunk in util.filechunkiter(fp, limit=size):
303 yield chunk
303 yield chunk
304
304
305 return len(entries), total_bytes, emitrevlogdata()
305 return len(entries), total_bytes, emitrevlogdata()
306
306
307
307
308 def generatev1wireproto(repo):
308 def generatev1wireproto(repo):
309 """Emit content for version 1 of streaming clone suitable for the wire.
309 """Emit content for version 1 of streaming clone suitable for the wire.
310
310
311 This is the data output from ``generatev1()`` with 2 header lines. The
311 This is the data output from ``generatev1()`` with 2 header lines. The
312 first line indicates overall success. The 2nd contains the file count and
312 first line indicates overall success. The 2nd contains the file count and
313 byte size of payload.
313 byte size of payload.
314
314
315 The success line contains "0" for success, "1" for stream generation not
315 The success line contains "0" for success, "1" for stream generation not
316 allowed, and "2" for error locking the repository (possibly indicating
316 allowed, and "2" for error locking the repository (possibly indicating
317 a permissions error for the server process).
317 a permissions error for the server process).
318 """
318 """
319 if not allowservergeneration(repo):
319 if not allowservergeneration(repo):
320 yield b'1\n'
320 yield b'1\n'
321 return
321 return
322
322
323 try:
323 try:
324 filecount, bytecount, it = generatev1(repo)
324 filecount, bytecount, it = generatev1(repo)
325 except error.LockError:
325 except error.LockError:
326 yield b'2\n'
326 yield b'2\n'
327 return
327 return
328
328
329 # Indicates successful response.
329 # Indicates successful response.
330 yield b'0\n'
330 yield b'0\n'
331 yield b'%d %d\n' % (filecount, bytecount)
331 yield b'%d %d\n' % (filecount, bytecount)
332 for chunk in it:
332 for chunk in it:
333 yield chunk
333 yield chunk
334
334
335
335
336 def generatebundlev1(repo, compression=b'UN'):
336 def generatebundlev1(repo, compression=b'UN'):
337 """Emit content for version 1 of a stream clone bundle.
337 """Emit content for version 1 of a stream clone bundle.
338
338
339 The first 4 bytes of the output ("HGS1") denote this as stream clone
339 The first 4 bytes of the output ("HGS1") denote this as stream clone
340 bundle version 1.
340 bundle version 1.
341
341
342 The next 2 bytes indicate the compression type. Only "UN" is currently
342 The next 2 bytes indicate the compression type. Only "UN" is currently
343 supported.
343 supported.
344
344
345 The next 16 bytes are two 64-bit big endian unsigned integers indicating
345 The next 16 bytes are two 64-bit big endian unsigned integers indicating
346 file count and byte count, respectively.
346 file count and byte count, respectively.
347
347
348 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
348 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
349 of the requirements string, including a trailing \0. The following N bytes
349 of the requirements string, including a trailing \0. The following N bytes
350 are the requirements string, which is ASCII containing a comma-delimited
350 are the requirements string, which is ASCII containing a comma-delimited
351 list of repo requirements that are needed to support the data.
351 list of repo requirements that are needed to support the data.
352
352
353 The remaining content is the output of ``generatev1()`` (which may be
353 The remaining content is the output of ``generatev1()`` (which may be
354 compressed in the future).
354 compressed in the future).
355
355
356 Returns a tuple of (requirements, data generator).
356 Returns a tuple of (requirements, data generator).
357 """
357 """
358 if compression != b'UN':
358 if compression != b'UN':
359 raise ValueError(b'we do not support the compression argument yet')
359 raise ValueError(b'we do not support the compression argument yet')
360
360
361 requirements = streamed_requirements(repo)
361 requirements = streamed_requirements(repo)
362 requires = b','.join(sorted(requirements))
362 requires = b','.join(sorted(requirements))
363
363
364 def gen():
364 def gen():
365 yield b'HGS1'
365 yield b'HGS1'
366 yield compression
366 yield compression
367
367
368 filecount, bytecount, it = generatev1(repo)
368 filecount, bytecount, it = generatev1(repo)
369 repo.ui.status(
369 repo.ui.status(
370 _(b'writing %d bytes for %d files\n') % (bytecount, filecount)
370 _(b'writing %d bytes for %d files\n') % (bytecount, filecount)
371 )
371 )
372
372
373 yield struct.pack(b'>QQ', filecount, bytecount)
373 yield struct.pack(b'>QQ', filecount, bytecount)
374 yield struct.pack(b'>H', len(requires) + 1)
374 yield struct.pack(b'>H', len(requires) + 1)
375 yield requires + b'\0'
375 yield requires + b'\0'
376
376
377 # This is where we'll add compression in the future.
377 # This is where we'll add compression in the future.
378 assert compression == b'UN'
378 assert compression == b'UN'
379
379
380 progress = repo.ui.makeprogress(
380 progress = repo.ui.makeprogress(
381 _(b'bundle'), total=bytecount, unit=_(b'bytes')
381 _(b'bundle'), total=bytecount, unit=_(b'bytes')
382 )
382 )
383 progress.update(0)
383 progress.update(0)
384
384
385 for chunk in it:
385 for chunk in it:
386 progress.increment(step=len(chunk))
386 progress.increment(step=len(chunk))
387 yield chunk
387 yield chunk
388
388
389 progress.complete()
389 progress.complete()
390
390
391 return requirements, gen()
391 return requirements, gen()
392
392
393
393
394 def consumev1(repo, fp, filecount, bytecount):
394 def consumev1(repo, fp, filecount, bytecount):
395 """Apply the contents from version 1 of a streaming clone file handle.
395 """Apply the contents from version 1 of a streaming clone file handle.
396
396
397 This takes the output from "stream_out" and applies it to the specified
397 This takes the output from "stream_out" and applies it to the specified
398 repository.
398 repository.
399
399
400 Like "stream_out," the status line added by the wire protocol is not
400 Like "stream_out," the status line added by the wire protocol is not
401 handled by this function.
401 handled by this function.
402 """
402 """
403 with repo.lock():
403 with repo.lock():
404 repo.ui.status(
404 repo.ui.status(
405 _(b'%d files to transfer, %s of data\n')
405 _(b'%d files to transfer, %s of data\n')
406 % (filecount, util.bytecount(bytecount))
406 % (filecount, util.bytecount(bytecount))
407 )
407 )
408 progress = repo.ui.makeprogress(
408 progress = repo.ui.makeprogress(
409 _(b'clone'), total=bytecount, unit=_(b'bytes')
409 _(b'clone'), total=bytecount, unit=_(b'bytes')
410 )
410 )
411 progress.update(0)
411 progress.update(0)
412 start = util.timer()
412 start = util.timer()
413
413
414 # TODO: get rid of (potential) inconsistency
414 # TODO: get rid of (potential) inconsistency
415 #
415 #
416 # If transaction is started and any @filecache property is
416 # If transaction is started and any @filecache property is
417 # changed at this point, it causes inconsistency between
417 # changed at this point, it causes inconsistency between
418 # in-memory cached property and streamclone-ed file on the
418 # in-memory cached property and streamclone-ed file on the
419 # disk. Nested transaction prevents transaction scope "clone"
419 # disk. Nested transaction prevents transaction scope "clone"
420 # below from writing in-memory changes out at the end of it,
420 # below from writing in-memory changes out at the end of it,
421 # even though in-memory changes are discarded at the end of it
421 # even though in-memory changes are discarded at the end of it
422 # regardless of transaction nesting.
422 # regardless of transaction nesting.
423 #
423 #
424 # But transaction nesting can't be simply prohibited, because
424 # But transaction nesting can't be simply prohibited, because
425 # nesting occurs also in ordinary case (e.g. enabling
425 # nesting occurs also in ordinary case (e.g. enabling
426 # clonebundles).
426 # clonebundles).
427
427
428 with repo.transaction(b'clone'):
428 with repo.transaction(b'clone'):
429 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
429 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
430 for i in range(filecount):
430 for i in range(filecount):
431 # XXX doesn't support '\n' or '\r' in filenames
431 # XXX doesn't support '\n' or '\r' in filenames
432 if hasattr(fp, 'readline'):
432 if hasattr(fp, 'readline'):
433 l = fp.readline()
433 l = fp.readline()
434 else:
434 else:
435 # inline clonebundles use a chunkbuffer, so no readline
435 # inline clonebundles use a chunkbuffer, so no readline
436 # --> this should be small anyway, the first line
436 # --> this should be small anyway, the first line
437 # only contains the size of the bundle
437 # only contains the size of the bundle
438 l_buf = []
438 l_buf = []
439 while not (l_buf and l_buf[-1] == b'\n'):
439 while not (l_buf and l_buf[-1] == b'\n'):
440 l_buf.append(fp.read(1))
440 l_buf.append(fp.read(1))
441 l = b''.join(l_buf)
441 l = b''.join(l_buf)
442 try:
442 try:
443 name, size = l.split(b'\0', 1)
443 name, size = l.split(b'\0', 1)
444 size = int(size)
444 size = int(size)
445 except (ValueError, TypeError):
445 except (ValueError, TypeError):
446 raise error.ResponseError(
446 raise error.ResponseError(
447 _(b'unexpected response from remote server:'), l
447 _(b'unexpected response from remote server:'), l
448 )
448 )
449 if repo.ui.debugflag:
449 if repo.ui.debugflag:
450 repo.ui.debug(
450 repo.ui.debug(
451 b'adding %s (%s)\n' % (name, util.bytecount(size))
451 b'adding %s (%s)\n' % (name, util.bytecount(size))
452 )
452 )
453 # for backwards compat, name was partially encoded
453 # for backwards compat, name was partially encoded
454 path = store.decodedir(name)
454 path = store.decodedir(name)
455 with repo.svfs(path, b'w', backgroundclose=True) as ofp:
455 with repo.svfs(path, b'w', backgroundclose=True) as ofp:
456 for chunk in util.filechunkiter(fp, limit=size):
456 for chunk in util.filechunkiter(fp, limit=size):
457 progress.increment(step=len(chunk))
457 progress.increment(step=len(chunk))
458 ofp.write(chunk)
458 ofp.write(chunk)
459
459
460 # force @filecache properties to be reloaded from
460 # force @filecache properties to be reloaded from
461 # streamclone-ed file at next access
461 # streamclone-ed file at next access
462 repo.invalidate(clearfilecache=True)
462 repo.invalidate(clearfilecache=True)
463
463
464 elapsed = util.timer() - start
464 elapsed = util.timer() - start
465 if elapsed <= 0:
465 if elapsed <= 0:
466 elapsed = 0.001
466 elapsed = 0.001
467 progress.complete()
467 progress.complete()
468 repo.ui.status(
468 repo.ui.status(
469 _(b'transferred %s in %.1f seconds (%s/sec)\n')
469 _(b'transferred %s in %.1f seconds (%s/sec)\n')
470 % (
470 % (
471 util.bytecount(bytecount),
471 util.bytecount(bytecount),
472 elapsed,
472 elapsed,
473 util.bytecount(bytecount / elapsed),
473 util.bytecount(bytecount / elapsed),
474 )
474 )
475 )
475 )
476
476
477
477
478 def readbundle1header(fp):
478 def readbundle1header(fp):
479 compression = fp.read(2)
479 compression = fp.read(2)
480 if compression != b'UN':
480 if compression != b'UN':
481 raise error.Abort(
481 raise error.Abort(
482 _(
482 _(
483 b'only uncompressed stream clone bundles are '
483 b'only uncompressed stream clone bundles are '
484 b'supported; got %s'
484 b'supported; got %s'
485 )
485 )
486 % compression
486 % compression
487 )
487 )
488
488
489 filecount, bytecount = struct.unpack(b'>QQ', fp.read(16))
489 filecount, bytecount = struct.unpack(b'>QQ', fp.read(16))
490 requireslen = struct.unpack(b'>H', fp.read(2))[0]
490 requireslen = struct.unpack(b'>H', fp.read(2))[0]
491 requires = fp.read(requireslen)
491 requires = fp.read(requireslen)
492
492
493 if not requires.endswith(b'\0'):
493 if not requires.endswith(b'\0'):
494 raise error.Abort(
494 raise error.Abort(
495 _(
495 _(
496 b'malformed stream clone bundle: '
496 b'malformed stream clone bundle: '
497 b'requirements not properly encoded'
497 b'requirements not properly encoded'
498 )
498 )
499 )
499 )
500
500
501 requirements = set(requires.rstrip(b'\0').split(b','))
501 requirements = set(requires.rstrip(b'\0').split(b','))
502
502
503 return filecount, bytecount, requirements
503 return filecount, bytecount, requirements
504
504
505
505
506 def applybundlev1(repo, fp):
506 def applybundlev1(repo, fp):
507 """Apply the content from a stream clone bundle version 1.
507 """Apply the content from a stream clone bundle version 1.
508
508
509 We assume the 4 byte header has been read and validated and the file handle
509 We assume the 4 byte header has been read and validated and the file handle
510 is at the 2 byte compression identifier.
510 is at the 2 byte compression identifier.
511 """
511 """
512 if len(repo):
512 if len(repo):
513 raise error.Abort(
513 raise error.Abort(
514 _(b'cannot apply stream clone bundle on non-empty repo')
514 _(b'cannot apply stream clone bundle on non-empty repo')
515 )
515 )
516
516
517 filecount, bytecount, requirements = readbundle1header(fp)
517 filecount, bytecount, requirements = readbundle1header(fp)
518 missingreqs = requirements - repo.supported
518 missingreqs = requirements - repo.supported
519 if missingreqs:
519 if missingreqs:
520 raise error.Abort(
520 raise error.Abort(
521 _(b'unable to apply stream clone: unsupported format: %s')
521 _(b'unable to apply stream clone: unsupported format: %s')
522 % b', '.join(sorted(missingreqs))
522 % b', '.join(sorted(missingreqs))
523 )
523 )
524
524
525 consumev1(repo, fp, filecount, bytecount)
525 consumev1(repo, fp, filecount, bytecount)
526 nodemap.post_stream_cleanup(repo)
526 nodemap.post_stream_cleanup(repo)
527
527
528
528
529 class streamcloneapplier:
529 class streamcloneapplier:
530 """Class to manage applying streaming clone bundles.
530 """Class to manage applying streaming clone bundles.
531
531
532 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
532 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
533 readers to perform bundle type-specific functionality.
533 readers to perform bundle type-specific functionality.
534 """
534 """
535
535
536 def __init__(self, fh):
536 def __init__(self, fh):
537 self._fh = fh
537 self._fh = fh
538
538
539 def apply(self, repo):
539 def apply(self, repo):
540 return applybundlev1(repo, self._fh)
540 return applybundlev1(repo, self._fh)
541
541
542
542
543 # type of file to stream
543 # type of file to stream
544 _fileappend = 0 # append only file
544 _fileappend = 0 # append only file
545 _filefull = 1 # full snapshot file
545 _filefull = 1 # full snapshot file
546
546
547 # Source of the file
547 # Source of the file
548 _srcstore = b's' # store (svfs)
548 _srcstore = b's' # store (svfs)
549 _srccache = b'c' # cache (cache)
549 _srccache = b'c' # cache (cache)
550
550
551
551
552 # This is it's own function so extensions can override it.
552 # This is it's own function so extensions can override it.
553 def _walkstreamfullstorefiles(repo):
553 def _walkstreamfullstorefiles(repo):
554 """list snapshot file from the store"""
554 """list snapshot file from the store"""
555 fnames = []
555 fnames = []
556 if not repo.publishing():
556 if not repo.publishing():
557 fnames.append(b'phaseroots')
557 fnames.append(b'phaseroots')
558 return fnames
558 return fnames
559
559
560
560
561 def _filterfull(entry, copy, vfsmap):
561 def _filterfull(entry, copy, vfsmap):
562 """actually copy the snapshot files"""
562 """actually copy the snapshot files"""
563 src, name, ftype, data = entry
563 src, name, ftype, data = entry
564 if ftype != _filefull:
564 if ftype != _filefull:
565 return entry
565 return entry
566 return (src, name, ftype, copy(vfsmap[src].join(name)))
566 return (src, name, ftype, copy(vfsmap[src].join(name)))
567
567
568
568
569 class VolatileManager:
569 class VolatileManager:
570 """Manage temporary backup of volatile file during stream clone
570 """Manage temporary backups of volatile files during stream clone.
571
571
572 This should be used as a Python context, the copies will be discarded when
572 This class will keep open file handles for the volatile files, writing the
573 exiting the context.
573 smaller ones on disk if the number of open file handles grow too much.
574
574
575 A copy can be done by calling the object on the real path (encoded full
575 This should be used as a Python context, the file handles and copies will
576 path)
576 be discarded when exiting the context.
577
577
578 The backup path can be retrieved using the __getitem__ protocol, obj[path].
578 The preservation can be done by calling the object on the real path
579 On file without backup, it will return the unmodified path. (equivalent to
579 (encoded full path).
580 `dict.get(x, x)`)
580
581 Valid filehandles for any file should be retrieved by calling `open(path)`.
581 """
582 """
582
583
584 # arbitrarily picked as "it seemed fine" and much higher than the current
585 # usage.
586 MAX_OPEN = 100
587
583 def __init__(self):
588 def __init__(self):
589 self._counter = 0
590 self._volatile_fps = None
584 self._copies = None
591 self._copies = None
585 self._dst_dir = None
592 self._dst_dir = None
586
593
587 def __enter__(self):
594 def __enter__(self):
588 if self._copies is not None:
595 if self._counter == 0:
589 msg = "Copies context already open"
596 assert self._volatile_fps is None
590 raise error.ProgrammingError(msg)
597 self._volatile_fps = {}
591 self._copies = {}
598 self._counter += 1
592 self._dst_dir = pycompat.mkdtemp(prefix=b'hg-clone-')
593 return self
599 return self
594
600
595 def __call__(self, src):
596 """create a backup of the file at src"""
597 prefix = os.path.basename(src)
598 fd, dst = pycompat.mkstemp(prefix=prefix, dir=self._dst_dir)
599 os.close(fd)
600 self._copies[src] = dst
601 util.copyfiles(src, dst, hardlink=True)
602 return dst
603
604 @contextlib.contextmanager
605 def open(self, src):
606 actual_path = self._copies.get(src, src)
607 with open(actual_path, 'rb') as fp:
608 yield fp
609
610 def __exit__(self, *args, **kwars):
601 def __exit__(self, *args, **kwars):
611 """discard all backups"""
602 """discard all backups"""
603 self._counter -= 1
604 if self._counter == 0:
605 for _size, fp in self._volatile_fps.values():
606 fp.close()
607 self._volatile_fps = None
608 if self._copies is not None:
612 for tmp in self._copies.values():
609 for tmp in self._copies.values():
613 util.tryunlink(tmp)
610 util.tryunlink(tmp)
614 util.tryrmdir(self._dst_dir)
611 util.tryrmdir(self._dst_dir)
615 self._copies = None
612 self._copies = None
616 self._dst_dir = None
613 self._dst_dir = None
614 assert self._volatile_fps is None
615 assert self._copies is None
616 assert self._dst_dir is None
617
618 def _init_tmp_copies(self):
619 """prepare a temporary directory to save volatile files
620
621 This will be used as backup if we have too many files open"""
622 assert 0 < self._counter
623 assert self._copies is None
624 assert self._dst_dir is None
625 self._copies = {}
626 self._dst_dir = pycompat.mkdtemp(prefix=b'hg-clone-')
627
628 def _flush_some_on_disk(self):
629 """move some of the open files to tempory files on disk"""
630 if self._copies is None:
631 self._init_tmp_copies()
632 flush_count = self.MAX_OPEN // 2
633 for src, (size, fp) in sorted(self._volatile_fps.items())[:flush_count]:
634 prefix = os.path.basename(src)
635 fd, dst = pycompat.mkstemp(prefix=prefix, dir=self._dst_dir)
636 self._copies[src] = dst
637 os.close(fd)
638 # we no longer hardlink, but on the other hand we rarely do this,
639 # and we do it for the smallest file only and not at all in the
640 # common case.
641 with open(dst, 'wb') as bck:
642 fp.seek(0)
643 bck.write(fp.read())
644 del self._volatile_fps[src]
645 fp.close()
646
647 def _keep_one(self, src):
648 """preserve an open file handle for a given path"""
649 # store the file quickly to ensure we close it if any error happens
650 _, fp = self._volatile_fps[src] = (None, open(src, 'rb'))
651 fp.seek(0, os.SEEK_END)
652 size = fp.tell()
653 self._volatile_fps[src] = (size, fp)
654
655 def __call__(self, src):
656 """preserve the volatile file at src"""
657 assert 0 < self._counter
658 if len(self._volatile_fps) >= (self.MAX_OPEN - 1):
659 self._flush_some_on_disk()
660 self._keep_one(src)
661
662 @contextlib.contextmanager
663 def open(self, src):
664 assert 0 < self._counter
665 entry = self._volatile_fps.get(src)
666 if entry is not None:
667 _size, fp = entry
668 fp.seek(0)
669 yield fp
670 else:
671 if self._copies is None:
672 actual_path = src
673 else:
674 actual_path = self._copies.get(src, src)
675 with open(actual_path, 'rb') as fp:
676 yield fp
617
677
618
678
619 def _makemap(repo):
679 def _makemap(repo):
620 """make a (src -> vfs) map for the repo"""
680 """make a (src -> vfs) map for the repo"""
621 vfsmap = {
681 vfsmap = {
622 _srcstore: repo.svfs,
682 _srcstore: repo.svfs,
623 _srccache: repo.cachevfs,
683 _srccache: repo.cachevfs,
624 }
684 }
625 # we keep repo.vfs out of the on purpose, ther are too many danger there
685 # we keep repo.vfs out of the on purpose, ther are too many danger there
626 # (eg: .hg/hgrc)
686 # (eg: .hg/hgrc)
627 assert repo.vfs not in vfsmap.values()
687 assert repo.vfs not in vfsmap.values()
628
688
629 return vfsmap
689 return vfsmap
630
690
631
691
632 def _emit2(repo, entries):
692 def _emit2(repo, entries):
633 """actually emit the stream bundle"""
693 """actually emit the stream bundle"""
634 vfsmap = _makemap(repo)
694 vfsmap = _makemap(repo)
635 # we keep repo.vfs out of the on purpose, ther are too many danger there
695 # we keep repo.vfs out of the on purpose, ther are too many danger there
636 # (eg: .hg/hgrc),
696 # (eg: .hg/hgrc),
637 #
697 #
638 # this assert is duplicated (from _makemap) as author might think this is
698 # this assert is duplicated (from _makemap) as author might think this is
639 # fine, while this is really not fine.
699 # fine, while this is really not fine.
640 if repo.vfs in vfsmap.values():
700 if repo.vfs in vfsmap.values():
641 raise error.ProgrammingError(
701 raise error.ProgrammingError(
642 b'repo.vfs must not be added to vfsmap for security reasons'
702 b'repo.vfs must not be added to vfsmap for security reasons'
643 )
703 )
644
704
645 # translate the vfs one
705 # translate the vfs one
646 entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries]
706 entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries]
647
707
648 max_linkrev = len(repo)
708 max_linkrev = len(repo)
649 file_count = totalfilesize = 0
709 file_count = totalfilesize = 0
650 with util.nogc():
710 with util.nogc():
651 # record the expected size of every file
711 # record the expected size of every file
652 for k, vfs, e in entries:
712 for k, vfs, e in entries:
653 for f in e.files():
713 for f in e.files():
654 file_count += 1
714 file_count += 1
655 totalfilesize += f.file_size(vfs)
715 totalfilesize += f.file_size(vfs)
656
716
657 progress = repo.ui.makeprogress(
717 progress = repo.ui.makeprogress(
658 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
718 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
659 )
719 )
660 progress.update(0)
720 progress.update(0)
661 with VolatileManager() as volatiles, progress:
721 with VolatileManager() as volatiles, progress:
662 # make sure we preserve volatile files
722 # make sure we preserve volatile files
663 for k, vfs, e in entries:
723 for k, vfs, e in entries:
664 for f in e.files():
724 for f in e.files():
665 if f.is_volatile:
725 if f.is_volatile:
666 volatiles(vfs.join(f.unencoded_path))
726 volatiles(vfs.join(f.unencoded_path))
667 # the first yield release the lock on the repository
727 # the first yield release the lock on the repository
668 yield file_count, totalfilesize
728 yield file_count, totalfilesize
669 totalbytecount = 0
729 totalbytecount = 0
670
730
671 for src, vfs, e in entries:
731 for src, vfs, e in entries:
672 entry_streams = e.get_streams(
732 entry_streams = e.get_streams(
673 repo=repo,
733 repo=repo,
674 vfs=vfs,
734 vfs=vfs,
675 volatiles=volatiles,
735 volatiles=volatiles,
676 max_changeset=max_linkrev,
736 max_changeset=max_linkrev,
677 preserve_file_count=True,
737 preserve_file_count=True,
678 )
738 )
679 for name, stream, size in entry_streams:
739 for name, stream, size in entry_streams:
680 yield src
740 yield src
681 yield util.uvarintencode(len(name))
741 yield util.uvarintencode(len(name))
682 yield util.uvarintencode(size)
742 yield util.uvarintencode(size)
683 yield name
743 yield name
684 bytecount = 0
744 bytecount = 0
685 for chunk in stream:
745 for chunk in stream:
686 bytecount += len(chunk)
746 bytecount += len(chunk)
687 totalbytecount += len(chunk)
747 totalbytecount += len(chunk)
688 progress.update(totalbytecount)
748 progress.update(totalbytecount)
689 yield chunk
749 yield chunk
690 if bytecount != size:
750 if bytecount != size:
691 # Would most likely be caused by a race due to `hg
751 # Would most likely be caused by a race due to `hg
692 # strip` or a revlog split
752 # strip` or a revlog split
693 msg = _(
753 msg = _(
694 b'clone could only read %d bytes from %s, but '
754 b'clone could only read %d bytes from %s, but '
695 b'expected %d bytes'
755 b'expected %d bytes'
696 )
756 )
697 raise error.Abort(msg % (bytecount, name, size))
757 raise error.Abort(msg % (bytecount, name, size))
698
758
699
759
700 def _emit3(repo, entries):
760 def _emit3(repo, entries):
701 """actually emit the stream bundle (v3)"""
761 """actually emit the stream bundle (v3)"""
702 vfsmap = _makemap(repo)
762 vfsmap = _makemap(repo)
703 # we keep repo.vfs out of the map on purpose, ther are too many dangers
763 # we keep repo.vfs out of the map on purpose, ther are too many dangers
704 # there (eg: .hg/hgrc),
764 # there (eg: .hg/hgrc),
705 #
765 #
706 # this assert is duplicated (from _makemap) as authors might think this is
766 # this assert is duplicated (from _makemap) as authors might think this is
707 # fine, while this is really not fine.
767 # fine, while this is really not fine.
708 if repo.vfs in vfsmap.values():
768 if repo.vfs in vfsmap.values():
709 raise error.ProgrammingError(
769 raise error.ProgrammingError(
710 b'repo.vfs must not be added to vfsmap for security reasons'
770 b'repo.vfs must not be added to vfsmap for security reasons'
711 )
771 )
712
772
713 # translate the vfs once
773 # translate the vfs once
714 entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries]
774 entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries]
715 total_entry_count = len(entries)
775 total_entry_count = len(entries)
716
776
717 max_linkrev = len(repo)
777 max_linkrev = len(repo)
718 progress = repo.ui.makeprogress(
778 progress = repo.ui.makeprogress(
719 _(b'bundle'),
779 _(b'bundle'),
720 total=total_entry_count,
780 total=total_entry_count,
721 unit=_(b'entry'),
781 unit=_(b'entry'),
722 )
782 )
723 progress.update(0)
783 progress.update(0)
724 with VolatileManager() as volatiles, progress:
784 with VolatileManager() as volatiles, progress:
725 # make sure we preserve volatile files
785 # make sure we preserve volatile files
726 for k, vfs, e in entries:
786 for k, vfs, e in entries:
727 if e.maybe_volatile:
787 if e.maybe_volatile:
728 for f in e.files():
788 for f in e.files():
729 if f.is_volatile:
789 if f.is_volatile:
730 # record the expected size under lock
790 # record the expected size under lock
731 f.file_size(vfs)
791 f.file_size(vfs)
732 volatiles(vfs.join(f.unencoded_path))
792 volatiles(vfs.join(f.unencoded_path))
733 # the first yield release the lock on the repository
793 # the first yield release the lock on the repository
734 yield None
794 yield None
735
795
736 yield util.uvarintencode(total_entry_count)
796 yield util.uvarintencode(total_entry_count)
737
797
738 for src, vfs, e in entries:
798 for src, vfs, e in entries:
739 entry_streams = e.get_streams(
799 entry_streams = e.get_streams(
740 repo=repo,
800 repo=repo,
741 vfs=vfs,
801 vfs=vfs,
742 volatiles=volatiles,
802 volatiles=volatiles,
743 max_changeset=max_linkrev,
803 max_changeset=max_linkrev,
744 )
804 )
745 yield util.uvarintencode(len(entry_streams))
805 yield util.uvarintencode(len(entry_streams))
746 for name, stream, size in entry_streams:
806 for name, stream, size in entry_streams:
747 yield src
807 yield src
748 yield util.uvarintencode(len(name))
808 yield util.uvarintencode(len(name))
749 yield util.uvarintencode(size)
809 yield util.uvarintencode(size)
750 yield name
810 yield name
751 yield from stream
811 yield from stream
752 progress.increment()
812 progress.increment()
753
813
754
814
755 def _test_sync_point_walk_1(repo):
815 def _test_sync_point_walk_1(repo):
756 """a function for synchronisation during tests"""
816 """a function for synchronisation during tests"""
757
817
758
818
759 def _test_sync_point_walk_2(repo):
819 def _test_sync_point_walk_2(repo):
760 """a function for synchronisation during tests"""
820 """a function for synchronisation during tests"""
761
821
762
822
763 def _entries_walk(repo, includes, excludes, includeobsmarkers):
823 def _entries_walk(repo, includes, excludes, includeobsmarkers):
764 """emit a seris of files information useful to clone a repo
824 """emit a seris of files information useful to clone a repo
765
825
766 return (vfs-key, entry) iterator
826 return (vfs-key, entry) iterator
767
827
768 Where `entry` is StoreEntry. (used even for cache entries)
828 Where `entry` is StoreEntry. (used even for cache entries)
769 """
829 """
770 assert repo._currentlock(repo._lockref) is not None
830 assert repo._currentlock(repo._lockref) is not None
771
831
772 matcher = None
832 matcher = None
773 if includes or excludes:
833 if includes or excludes:
774 matcher = narrowspec.match(repo.root, includes, excludes)
834 matcher = narrowspec.match(repo.root, includes, excludes)
775
835
776 phase = not repo.publishing()
836 phase = not repo.publishing()
777 # Python is getting crazy at all the small container we creates, disabling
837 # Python is getting crazy at all the small container we creates, disabling
778 # the gc while we do so helps performance a lot.
838 # the gc while we do so helps performance a lot.
779 with util.nogc():
839 with util.nogc():
780 entries = _walkstreamfiles(
840 entries = _walkstreamfiles(
781 repo,
841 repo,
782 matcher,
842 matcher,
783 phase=phase,
843 phase=phase,
784 obsolescence=includeobsmarkers,
844 obsolescence=includeobsmarkers,
785 )
845 )
786 for entry in entries:
846 for entry in entries:
787 yield (_srcstore, entry)
847 yield (_srcstore, entry)
788
848
789 for name in cacheutil.cachetocopy(repo):
849 for name in cacheutil.cachetocopy(repo):
790 if repo.cachevfs.exists(name):
850 if repo.cachevfs.exists(name):
791 # not really a StoreEntry, but close enough
851 # not really a StoreEntry, but close enough
792 entry = store.SimpleStoreEntry(
852 entry = store.SimpleStoreEntry(
793 entry_path=name,
853 entry_path=name,
794 is_volatile=True,
854 is_volatile=True,
795 )
855 )
796 yield (_srccache, entry)
856 yield (_srccache, entry)
797
857
798
858
799 def generatev2(repo, includes, excludes, includeobsmarkers):
859 def generatev2(repo, includes, excludes, includeobsmarkers):
800 """Emit content for version 2 of a streaming clone.
860 """Emit content for version 2 of a streaming clone.
801
861
802 the data stream consists the following entries:
862 the data stream consists the following entries:
803 1) A char representing the file destination (eg: store or cache)
863 1) A char representing the file destination (eg: store or cache)
804 2) A varint containing the length of the filename
864 2) A varint containing the length of the filename
805 3) A varint containing the length of file data
865 3) A varint containing the length of file data
806 4) N bytes containing the filename (the internal, store-agnostic form)
866 4) N bytes containing the filename (the internal, store-agnostic form)
807 5) N bytes containing the file data
867 5) N bytes containing the file data
808
868
809 Returns a 3-tuple of (file count, file size, data iterator).
869 Returns a 3-tuple of (file count, file size, data iterator).
810 """
870 """
811
871
812 with repo.lock():
872 with repo.lock():
813 repo.ui.debug(b'scanning\n')
873 repo.ui.debug(b'scanning\n')
814
874
815 entries = _entries_walk(
875 entries = _entries_walk(
816 repo,
876 repo,
817 includes=includes,
877 includes=includes,
818 excludes=excludes,
878 excludes=excludes,
819 includeobsmarkers=includeobsmarkers,
879 includeobsmarkers=includeobsmarkers,
820 )
880 )
821
881
822 chunks = _emit2(repo, entries)
882 chunks = _emit2(repo, entries)
823 first = next(chunks)
883 first = next(chunks)
824 file_count, total_file_size = first
884 file_count, total_file_size = first
825 _test_sync_point_walk_1(repo)
885 _test_sync_point_walk_1(repo)
826 _test_sync_point_walk_2(repo)
886 _test_sync_point_walk_2(repo)
827
887
828 return file_count, total_file_size, chunks
888 return file_count, total_file_size, chunks
829
889
830
890
831 def generatev3(repo, includes, excludes, includeobsmarkers):
891 def generatev3(repo, includes, excludes, includeobsmarkers):
832 """Emit content for version 3 of a streaming clone.
892 """Emit content for version 3 of a streaming clone.
833
893
834 the data stream consists the following:
894 the data stream consists the following:
835 1) A varint E containing the number of entries (can be 0), then E entries follow
895 1) A varint E containing the number of entries (can be 0), then E entries follow
836 2) For each entry:
896 2) For each entry:
837 2.1) The number of files in this entry (can be 0, but typically 1 or 2)
897 2.1) The number of files in this entry (can be 0, but typically 1 or 2)
838 2.2) For each file:
898 2.2) For each file:
839 2.2.1) A char representing the file destination (eg: store or cache)
899 2.2.1) A char representing the file destination (eg: store or cache)
840 2.2.2) A varint N containing the length of the filename
900 2.2.2) A varint N containing the length of the filename
841 2.2.3) A varint M containing the length of file data
901 2.2.3) A varint M containing the length of file data
842 2.2.4) N bytes containing the filename (the internal, store-agnostic form)
902 2.2.4) N bytes containing the filename (the internal, store-agnostic form)
843 2.2.5) M bytes containing the file data
903 2.2.5) M bytes containing the file data
844
904
845 Returns the data iterator.
905 Returns the data iterator.
846
906
847 XXX This format is experimental and subject to change. Here is a
907 XXX This format is experimental and subject to change. Here is a
848 XXX non-exhaustive list of things this format could do or change:
908 XXX non-exhaustive list of things this format could do or change:
849
909
850 - making it easier to write files in parallel
910 - making it easier to write files in parallel
851 - holding the lock for a shorter time
911 - holding the lock for a shorter time
852 - improving progress information
912 - improving progress information
853 - ways to adjust the number of expected entries/files ?
913 - ways to adjust the number of expected entries/files ?
854 """
914 """
855
915
856 # Python is getting crazy at all the small container we creates while
916 # Python is getting crazy at all the small container we creates while
857 # considering the files to preserve, disabling the gc while we do so helps
917 # considering the files to preserve, disabling the gc while we do so helps
858 # performance a lot.
918 # performance a lot.
859 with repo.lock(), util.nogc():
919 with repo.lock(), util.nogc():
860 repo.ui.debug(b'scanning\n')
920 repo.ui.debug(b'scanning\n')
861
921
862 entries = _entries_walk(
922 entries = _entries_walk(
863 repo,
923 repo,
864 includes=includes,
924 includes=includes,
865 excludes=excludes,
925 excludes=excludes,
866 includeobsmarkers=includeobsmarkers,
926 includeobsmarkers=includeobsmarkers,
867 )
927 )
868 chunks = _emit3(repo, list(entries))
928 chunks = _emit3(repo, list(entries))
869 first = next(chunks)
929 first = next(chunks)
870 assert first is None
930 assert first is None
871 _test_sync_point_walk_1(repo)
931 _test_sync_point_walk_1(repo)
872 _test_sync_point_walk_2(repo)
932 _test_sync_point_walk_2(repo)
873
933
874 return chunks
934 return chunks
875
935
876
936
877 @contextlib.contextmanager
937 @contextlib.contextmanager
878 def nested(*ctxs):
938 def nested(*ctxs):
879 this = ctxs[0]
939 this = ctxs[0]
880 rest = ctxs[1:]
940 rest = ctxs[1:]
881 with this:
941 with this:
882 if rest:
942 if rest:
883 with nested(*rest):
943 with nested(*rest):
884 yield
944 yield
885 else:
945 else:
886 yield
946 yield
887
947
888
948
889 def consumev2(repo, fp, filecount, filesize):
949 def consumev2(repo, fp, filecount, filesize):
890 """Apply the contents from a version 2 streaming clone.
950 """Apply the contents from a version 2 streaming clone.
891
951
892 Data is read from an object that only needs to provide a ``read(size)``
952 Data is read from an object that only needs to provide a ``read(size)``
893 method.
953 method.
894 """
954 """
895 with repo.lock():
955 with repo.lock():
896 repo.ui.status(
956 repo.ui.status(
897 _(b'%d files to transfer, %s of data\n')
957 _(b'%d files to transfer, %s of data\n')
898 % (filecount, util.bytecount(filesize))
958 % (filecount, util.bytecount(filesize))
899 )
959 )
900
960
901 start = util.timer()
961 start = util.timer()
902 progress = repo.ui.makeprogress(
962 progress = repo.ui.makeprogress(
903 _(b'clone'), total=filesize, unit=_(b'bytes')
963 _(b'clone'), total=filesize, unit=_(b'bytes')
904 )
964 )
905 progress.update(0)
965 progress.update(0)
906
966
907 vfsmap = _makemap(repo)
967 vfsmap = _makemap(repo)
908 # we keep repo.vfs out of the on purpose, ther are too many danger
968 # we keep repo.vfs out of the on purpose, ther are too many danger
909 # there (eg: .hg/hgrc),
969 # there (eg: .hg/hgrc),
910 #
970 #
911 # this assert is duplicated (from _makemap) as author might think this
971 # this assert is duplicated (from _makemap) as author might think this
912 # is fine, while this is really not fine.
972 # is fine, while this is really not fine.
913 if repo.vfs in vfsmap.values():
973 if repo.vfs in vfsmap.values():
914 raise error.ProgrammingError(
974 raise error.ProgrammingError(
915 b'repo.vfs must not be added to vfsmap for security reasons'
975 b'repo.vfs must not be added to vfsmap for security reasons'
916 )
976 )
917
977
918 with repo.transaction(b'clone'):
978 with repo.transaction(b'clone'):
919 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
979 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
920 with nested(*ctxs):
980 with nested(*ctxs):
921 for i in range(filecount):
981 for i in range(filecount):
922 src = util.readexactly(fp, 1)
982 src = util.readexactly(fp, 1)
923 vfs = vfsmap[src]
983 vfs = vfsmap[src]
924 namelen = util.uvarintdecodestream(fp)
984 namelen = util.uvarintdecodestream(fp)
925 datalen = util.uvarintdecodestream(fp)
985 datalen = util.uvarintdecodestream(fp)
926
986
927 name = util.readexactly(fp, namelen)
987 name = util.readexactly(fp, namelen)
928
988
929 if repo.ui.debugflag:
989 if repo.ui.debugflag:
930 repo.ui.debug(
990 repo.ui.debug(
931 b'adding [%s] %s (%s)\n'
991 b'adding [%s] %s (%s)\n'
932 % (src, name, util.bytecount(datalen))
992 % (src, name, util.bytecount(datalen))
933 )
993 )
934
994
935 with vfs(name, b'w') as ofp:
995 with vfs(name, b'w') as ofp:
936 for chunk in util.filechunkiter(fp, limit=datalen):
996 for chunk in util.filechunkiter(fp, limit=datalen):
937 progress.increment(step=len(chunk))
997 progress.increment(step=len(chunk))
938 ofp.write(chunk)
998 ofp.write(chunk)
939
999
940 # force @filecache properties to be reloaded from
1000 # force @filecache properties to be reloaded from
941 # streamclone-ed file at next access
1001 # streamclone-ed file at next access
942 repo.invalidate(clearfilecache=True)
1002 repo.invalidate(clearfilecache=True)
943
1003
944 elapsed = util.timer() - start
1004 elapsed = util.timer() - start
945 if elapsed <= 0:
1005 if elapsed <= 0:
946 elapsed = 0.001
1006 elapsed = 0.001
947 repo.ui.status(
1007 repo.ui.status(
948 _(b'transferred %s in %.1f seconds (%s/sec)\n')
1008 _(b'transferred %s in %.1f seconds (%s/sec)\n')
949 % (
1009 % (
950 util.bytecount(progress.pos),
1010 util.bytecount(progress.pos),
951 elapsed,
1011 elapsed,
952 util.bytecount(progress.pos / elapsed),
1012 util.bytecount(progress.pos / elapsed),
953 )
1013 )
954 )
1014 )
955 progress.complete()
1015 progress.complete()
956
1016
957
1017
958 def consumev3(repo, fp):
1018 def consumev3(repo, fp):
959 """Apply the contents from a version 3 streaming clone.
1019 """Apply the contents from a version 3 streaming clone.
960
1020
961 Data is read from an object that only needs to provide a ``read(size)``
1021 Data is read from an object that only needs to provide a ``read(size)``
962 method.
1022 method.
963 """
1023 """
964 with repo.lock():
1024 with repo.lock():
965 start = util.timer()
1025 start = util.timer()
966
1026
967 entrycount = util.uvarintdecodestream(fp)
1027 entrycount = util.uvarintdecodestream(fp)
968 repo.ui.status(_(b'%d entries to transfer\n') % (entrycount))
1028 repo.ui.status(_(b'%d entries to transfer\n') % (entrycount))
969
1029
970 progress = repo.ui.makeprogress(
1030 progress = repo.ui.makeprogress(
971 _(b'clone'),
1031 _(b'clone'),
972 total=entrycount,
1032 total=entrycount,
973 unit=_(b'entries'),
1033 unit=_(b'entries'),
974 )
1034 )
975 progress.update(0)
1035 progress.update(0)
976 bytes_transferred = 0
1036 bytes_transferred = 0
977
1037
978 vfsmap = _makemap(repo)
1038 vfsmap = _makemap(repo)
979 # we keep repo.vfs out of the on purpose, there are too many dangers
1039 # we keep repo.vfs out of the on purpose, there are too many dangers
980 # there (eg: .hg/hgrc),
1040 # there (eg: .hg/hgrc),
981 #
1041 #
982 # this assert is duplicated (from _makemap) as authors might think this
1042 # this assert is duplicated (from _makemap) as authors might think this
983 # is fine, while this is really not fine.
1043 # is fine, while this is really not fine.
984 if repo.vfs in vfsmap.values():
1044 if repo.vfs in vfsmap.values():
985 raise error.ProgrammingError(
1045 raise error.ProgrammingError(
986 b'repo.vfs must not be added to vfsmap for security reasons'
1046 b'repo.vfs must not be added to vfsmap for security reasons'
987 )
1047 )
988
1048
989 with repo.transaction(b'clone'):
1049 with repo.transaction(b'clone'):
990 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
1050 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
991 with nested(*ctxs):
1051 with nested(*ctxs):
992 for i in range(entrycount):
1052 for i in range(entrycount):
993 filecount = util.uvarintdecodestream(fp)
1053 filecount = util.uvarintdecodestream(fp)
994 if filecount == 0:
1054 if filecount == 0:
995 if repo.ui.debugflag:
1055 if repo.ui.debugflag:
996 repo.ui.debug(b'entry with no files [%d]\n' % (i))
1056 repo.ui.debug(b'entry with no files [%d]\n' % (i))
997 for i in range(filecount):
1057 for i in range(filecount):
998 src = util.readexactly(fp, 1)
1058 src = util.readexactly(fp, 1)
999 vfs = vfsmap[src]
1059 vfs = vfsmap[src]
1000 namelen = util.uvarintdecodestream(fp)
1060 namelen = util.uvarintdecodestream(fp)
1001 datalen = util.uvarintdecodestream(fp)
1061 datalen = util.uvarintdecodestream(fp)
1002
1062
1003 name = util.readexactly(fp, namelen)
1063 name = util.readexactly(fp, namelen)
1004
1064
1005 if repo.ui.debugflag:
1065 if repo.ui.debugflag:
1006 msg = b'adding [%s] %s (%s)\n'
1066 msg = b'adding [%s] %s (%s)\n'
1007 msg %= (src, name, util.bytecount(datalen))
1067 msg %= (src, name, util.bytecount(datalen))
1008 repo.ui.debug(msg)
1068 repo.ui.debug(msg)
1009 bytes_transferred += datalen
1069 bytes_transferred += datalen
1010
1070
1011 with vfs(name, b'w') as ofp:
1071 with vfs(name, b'w') as ofp:
1012 for chunk in util.filechunkiter(fp, limit=datalen):
1072 for chunk in util.filechunkiter(fp, limit=datalen):
1013 ofp.write(chunk)
1073 ofp.write(chunk)
1014 progress.increment(step=1)
1074 progress.increment(step=1)
1015
1075
1016 # force @filecache properties to be reloaded from
1076 # force @filecache properties to be reloaded from
1017 # streamclone-ed file at next access
1077 # streamclone-ed file at next access
1018 repo.invalidate(clearfilecache=True)
1078 repo.invalidate(clearfilecache=True)
1019
1079
1020 elapsed = util.timer() - start
1080 elapsed = util.timer() - start
1021 if elapsed <= 0:
1081 if elapsed <= 0:
1022 elapsed = 0.001
1082 elapsed = 0.001
1023 msg = _(b'transferred %s in %.1f seconds (%s/sec)\n')
1083 msg = _(b'transferred %s in %.1f seconds (%s/sec)\n')
1024 byte_count = util.bytecount(bytes_transferred)
1084 byte_count = util.bytecount(bytes_transferred)
1025 bytes_sec = util.bytecount(bytes_transferred / elapsed)
1085 bytes_sec = util.bytecount(bytes_transferred / elapsed)
1026 msg %= (byte_count, elapsed, bytes_sec)
1086 msg %= (byte_count, elapsed, bytes_sec)
1027 repo.ui.status(msg)
1087 repo.ui.status(msg)
1028 progress.complete()
1088 progress.complete()
1029
1089
1030
1090
1031 def applybundlev2(repo, fp, filecount, filesize, requirements):
1091 def applybundlev2(repo, fp, filecount, filesize, requirements):
1032 from . import localrepo
1092 from . import localrepo
1033
1093
1034 missingreqs = [r for r in requirements if r not in repo.supported]
1094 missingreqs = [r for r in requirements if r not in repo.supported]
1035 if missingreqs:
1095 if missingreqs:
1036 raise error.Abort(
1096 raise error.Abort(
1037 _(b'unable to apply stream clone: unsupported format: %s')
1097 _(b'unable to apply stream clone: unsupported format: %s')
1038 % b', '.join(sorted(missingreqs))
1098 % b', '.join(sorted(missingreqs))
1039 )
1099 )
1040
1100
1041 consumev2(repo, fp, filecount, filesize)
1101 consumev2(repo, fp, filecount, filesize)
1042
1102
1043 repo.requirements = new_stream_clone_requirements(
1103 repo.requirements = new_stream_clone_requirements(
1044 repo.requirements,
1104 repo.requirements,
1045 requirements,
1105 requirements,
1046 )
1106 )
1047 repo.svfs.options = localrepo.resolvestorevfsoptions(
1107 repo.svfs.options = localrepo.resolvestorevfsoptions(
1048 repo.ui, repo.requirements, repo.features
1108 repo.ui, repo.requirements, repo.features
1049 )
1109 )
1050 scmutil.writereporequirements(repo)
1110 scmutil.writereporequirements(repo)
1051 nodemap.post_stream_cleanup(repo)
1111 nodemap.post_stream_cleanup(repo)
1052
1112
1053
1113
1054 def applybundlev3(repo, fp, requirements):
1114 def applybundlev3(repo, fp, requirements):
1055 from . import localrepo
1115 from . import localrepo
1056
1116
1057 missingreqs = [r for r in requirements if r not in repo.supported]
1117 missingreqs = [r for r in requirements if r not in repo.supported]
1058 if missingreqs:
1118 if missingreqs:
1059 msg = _(b'unable to apply stream clone: unsupported format: %s')
1119 msg = _(b'unable to apply stream clone: unsupported format: %s')
1060 msg %= b', '.join(sorted(missingreqs))
1120 msg %= b', '.join(sorted(missingreqs))
1061 raise error.Abort(msg)
1121 raise error.Abort(msg)
1062
1122
1063 consumev3(repo, fp)
1123 consumev3(repo, fp)
1064
1124
1065 repo.requirements = new_stream_clone_requirements(
1125 repo.requirements = new_stream_clone_requirements(
1066 repo.requirements,
1126 repo.requirements,
1067 requirements,
1127 requirements,
1068 )
1128 )
1069 repo.svfs.options = localrepo.resolvestorevfsoptions(
1129 repo.svfs.options = localrepo.resolvestorevfsoptions(
1070 repo.ui, repo.requirements, repo.features
1130 repo.ui, repo.requirements, repo.features
1071 )
1131 )
1072 scmutil.writereporequirements(repo)
1132 scmutil.writereporequirements(repo)
1073 nodemap.post_stream_cleanup(repo)
1133 nodemap.post_stream_cleanup(repo)
1074
1134
1075
1135
1076 def _copy_files(src_vfs_map, dst_vfs_map, entries, progress):
1136 def _copy_files(src_vfs_map, dst_vfs_map, entries, progress):
1077 hardlink = [True]
1137 hardlink = [True]
1078
1138
1079 def copy_used():
1139 def copy_used():
1080 hardlink[0] = False
1140 hardlink[0] = False
1081 progress.topic = _(b'copying')
1141 progress.topic = _(b'copying')
1082
1142
1083 for k, path in entries:
1143 for k, path in entries:
1084 src_vfs = src_vfs_map[k]
1144 src_vfs = src_vfs_map[k]
1085 dst_vfs = dst_vfs_map[k]
1145 dst_vfs = dst_vfs_map[k]
1086 src_path = src_vfs.join(path)
1146 src_path = src_vfs.join(path)
1087 dst_path = dst_vfs.join(path)
1147 dst_path = dst_vfs.join(path)
1088 # We cannot use dirname and makedirs of dst_vfs here because the store
1148 # We cannot use dirname and makedirs of dst_vfs here because the store
1089 # encoding confuses them. See issue 6581 for details.
1149 # encoding confuses them. See issue 6581 for details.
1090 dirname = os.path.dirname(dst_path)
1150 dirname = os.path.dirname(dst_path)
1091 if not os.path.exists(dirname):
1151 if not os.path.exists(dirname):
1092 util.makedirs(dirname)
1152 util.makedirs(dirname)
1093 dst_vfs.register_file(path)
1153 dst_vfs.register_file(path)
1094 # XXX we could use the #nb_bytes argument.
1154 # XXX we could use the #nb_bytes argument.
1095 util.copyfile(
1155 util.copyfile(
1096 src_path,
1156 src_path,
1097 dst_path,
1157 dst_path,
1098 hardlink=hardlink[0],
1158 hardlink=hardlink[0],
1099 no_hardlink_cb=copy_used,
1159 no_hardlink_cb=copy_used,
1100 check_fs_hardlink=False,
1160 check_fs_hardlink=False,
1101 )
1161 )
1102 progress.increment()
1162 progress.increment()
1103 return hardlink[0]
1163 return hardlink[0]
1104
1164
1105
1165
1106 def local_copy(src_repo, dest_repo):
1166 def local_copy(src_repo, dest_repo):
1107 """copy all content from one local repository to another
1167 """copy all content from one local repository to another
1108
1168
1109 This is useful for local clone"""
1169 This is useful for local clone"""
1110 src_store_requirements = {
1170 src_store_requirements = {
1111 r
1171 r
1112 for r in src_repo.requirements
1172 for r in src_repo.requirements
1113 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
1173 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
1114 }
1174 }
1115 dest_store_requirements = {
1175 dest_store_requirements = {
1116 r
1176 r
1117 for r in dest_repo.requirements
1177 for r in dest_repo.requirements
1118 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
1178 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
1119 }
1179 }
1120 assert src_store_requirements == dest_store_requirements
1180 assert src_store_requirements == dest_store_requirements
1121
1181
1122 with dest_repo.lock():
1182 with dest_repo.lock():
1123 with src_repo.lock():
1183 with src_repo.lock():
1124 # bookmark is not integrated to the streaming as it might use the
1184 # bookmark is not integrated to the streaming as it might use the
1125 # `repo.vfs` and they are too many sentitive data accessible
1185 # `repo.vfs` and they are too many sentitive data accessible
1126 # through `repo.vfs` to expose it to streaming clone.
1186 # through `repo.vfs` to expose it to streaming clone.
1127 src_book_vfs = bookmarks.bookmarksvfs(src_repo)
1187 src_book_vfs = bookmarks.bookmarksvfs(src_repo)
1128 srcbookmarks = src_book_vfs.join(b'bookmarks')
1188 srcbookmarks = src_book_vfs.join(b'bookmarks')
1129 bm_count = 0
1189 bm_count = 0
1130 if os.path.exists(srcbookmarks):
1190 if os.path.exists(srcbookmarks):
1131 bm_count = 1
1191 bm_count = 1
1132
1192
1133 entries = _entries_walk(
1193 entries = _entries_walk(
1134 src_repo,
1194 src_repo,
1135 includes=None,
1195 includes=None,
1136 excludes=None,
1196 excludes=None,
1137 includeobsmarkers=True,
1197 includeobsmarkers=True,
1138 )
1198 )
1139 entries = list(entries)
1199 entries = list(entries)
1140 src_vfs_map = _makemap(src_repo)
1200 src_vfs_map = _makemap(src_repo)
1141 dest_vfs_map = _makemap(dest_repo)
1201 dest_vfs_map = _makemap(dest_repo)
1142 total_files = sum(len(e[1].files()) for e in entries) + bm_count
1202 total_files = sum(len(e[1].files()) for e in entries) + bm_count
1143 progress = src_repo.ui.makeprogress(
1203 progress = src_repo.ui.makeprogress(
1144 topic=_(b'linking'),
1204 topic=_(b'linking'),
1145 total=total_files,
1205 total=total_files,
1146 unit=_(b'files'),
1206 unit=_(b'files'),
1147 )
1207 )
1148 # copy files
1208 # copy files
1149 #
1209 #
1150 # We could copy the full file while the source repository is locked
1210 # We could copy the full file while the source repository is locked
1151 # and the other one without the lock. However, in the linking case,
1211 # and the other one without the lock. However, in the linking case,
1152 # this would also requires checks that nobody is appending any data
1212 # this would also requires checks that nobody is appending any data
1153 # to the files while we do the clone, so this is not done yet. We
1213 # to the files while we do the clone, so this is not done yet. We
1154 # could do this blindly when copying files.
1214 # could do this blindly when copying files.
1155 files = [
1215 files = [
1156 (vfs_key, f.unencoded_path)
1216 (vfs_key, f.unencoded_path)
1157 for vfs_key, e in entries
1217 for vfs_key, e in entries
1158 for f in e.files()
1218 for f in e.files()
1159 ]
1219 ]
1160 hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress)
1220 hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress)
1161
1221
1162 # copy bookmarks over
1222 # copy bookmarks over
1163 if bm_count:
1223 if bm_count:
1164 dst_book_vfs = bookmarks.bookmarksvfs(dest_repo)
1224 dst_book_vfs = bookmarks.bookmarksvfs(dest_repo)
1165 dstbookmarks = dst_book_vfs.join(b'bookmarks')
1225 dstbookmarks = dst_book_vfs.join(b'bookmarks')
1166 util.copyfile(srcbookmarks, dstbookmarks)
1226 util.copyfile(srcbookmarks, dstbookmarks)
1167 progress.complete()
1227 progress.complete()
1168 if hardlink:
1228 if hardlink:
1169 msg = b'linked %d files\n'
1229 msg = b'linked %d files\n'
1170 else:
1230 else:
1171 msg = b'copied %d files\n'
1231 msg = b'copied %d files\n'
1172 src_repo.ui.debug(msg % total_files)
1232 src_repo.ui.debug(msg % total_files)
1173
1233
1174 with dest_repo.transaction(b"localclone") as tr:
1234 with dest_repo.transaction(b"localclone") as tr:
1175 dest_repo.store.write(tr)
1235 dest_repo.store.write(tr)
1176
1236
1177 # clean up transaction file as they do not make sense
1237 # clean up transaction file as they do not make sense
1178 transaction.cleanup_undo_files(dest_repo.ui.warn, dest_repo.vfs_map)
1238 transaction.cleanup_undo_files(dest_repo.ui.warn, dest_repo.vfs_map)
General Comments 0
You need to be logged in to leave comments. Login now