##// END OF EJS Templates
local-clone: perform the hardlink/copy based from _entries_walk returns...
marmoute -
r51526:f2ae815a default
parent child Browse files
Show More
@@ -1,993 +1,999 b''
1 # streamclone.py - producing and consuming streaming repository data
1 # streamclone.py - producing and consuming streaming repository data
2 #
2 #
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8
8
9 import contextlib
9 import contextlib
10 import os
10 import os
11 import struct
11 import struct
12
12
13 from .i18n import _
13 from .i18n import _
14 from .pycompat import open
14 from .pycompat import open
15 from .interfaces import repository
15 from .interfaces import repository
16 from . import (
16 from . import (
17 bookmarks,
17 bookmarks,
18 bundle2 as bundle2mod,
18 bundle2 as bundle2mod,
19 cacheutil,
19 cacheutil,
20 error,
20 error,
21 narrowspec,
21 narrowspec,
22 phases,
22 phases,
23 pycompat,
23 pycompat,
24 requirements as requirementsmod,
24 requirements as requirementsmod,
25 scmutil,
25 scmutil,
26 store,
26 store,
27 transaction,
27 transaction,
28 util,
28 util,
29 )
29 )
30 from .revlogutils import (
30 from .revlogutils import (
31 nodemap,
31 nodemap,
32 )
32 )
33
33
34
34
35 def new_stream_clone_requirements(default_requirements, streamed_requirements):
35 def new_stream_clone_requirements(default_requirements, streamed_requirements):
36 """determine the final set of requirement for a new stream clone
36 """determine the final set of requirement for a new stream clone
37
37
38 this method combine the "default" requirements that a new repository would
38 this method combine the "default" requirements that a new repository would
39 use with the constaint we get from the stream clone content. We keep local
39 use with the constaint we get from the stream clone content. We keep local
40 configuration choice when possible.
40 configuration choice when possible.
41 """
41 """
42 requirements = set(default_requirements)
42 requirements = set(default_requirements)
43 requirements -= requirementsmod.STREAM_FIXED_REQUIREMENTS
43 requirements -= requirementsmod.STREAM_FIXED_REQUIREMENTS
44 requirements.update(streamed_requirements)
44 requirements.update(streamed_requirements)
45 return requirements
45 return requirements
46
46
47
47
48 def streamed_requirements(repo):
48 def streamed_requirements(repo):
49 """the set of requirement the new clone will have to support
49 """the set of requirement the new clone will have to support
50
50
51 This is used for advertising the stream options and to generate the actual
51 This is used for advertising the stream options and to generate the actual
52 stream content."""
52 stream content."""
53 requiredformats = (
53 requiredformats = (
54 repo.requirements & requirementsmod.STREAM_FIXED_REQUIREMENTS
54 repo.requirements & requirementsmod.STREAM_FIXED_REQUIREMENTS
55 )
55 )
56 return requiredformats
56 return requiredformats
57
57
58
58
59 def canperformstreamclone(pullop, bundle2=False):
59 def canperformstreamclone(pullop, bundle2=False):
60 """Whether it is possible to perform a streaming clone as part of pull.
60 """Whether it is possible to perform a streaming clone as part of pull.
61
61
62 ``bundle2`` will cause the function to consider stream clone through
62 ``bundle2`` will cause the function to consider stream clone through
63 bundle2 and only through bundle2.
63 bundle2 and only through bundle2.
64
64
65 Returns a tuple of (supported, requirements). ``supported`` is True if
65 Returns a tuple of (supported, requirements). ``supported`` is True if
66 streaming clone is supported and False otherwise. ``requirements`` is
66 streaming clone is supported and False otherwise. ``requirements`` is
67 a set of repo requirements from the remote, or ``None`` if stream clone
67 a set of repo requirements from the remote, or ``None`` if stream clone
68 isn't supported.
68 isn't supported.
69 """
69 """
70 repo = pullop.repo
70 repo = pullop.repo
71 remote = pullop.remote
71 remote = pullop.remote
72
72
73 # should we consider streaming clone at all ?
73 # should we consider streaming clone at all ?
74 streamrequested = pullop.streamclonerequested
74 streamrequested = pullop.streamclonerequested
75 # If we don't have a preference, let the server decide for us. This
75 # If we don't have a preference, let the server decide for us. This
76 # likely only comes into play in LANs.
76 # likely only comes into play in LANs.
77 if streamrequested is None:
77 if streamrequested is None:
78 # The server can advertise whether to prefer streaming clone.
78 # The server can advertise whether to prefer streaming clone.
79 streamrequested = remote.capable(b'stream-preferred')
79 streamrequested = remote.capable(b'stream-preferred')
80 if not streamrequested:
80 if not streamrequested:
81 return False, None
81 return False, None
82
82
83 # Streaming clone only works on an empty destination repository
83 # Streaming clone only works on an empty destination repository
84 if len(repo):
84 if len(repo):
85 return False, None
85 return False, None
86
86
87 # Streaming clone only works if all data is being requested.
87 # Streaming clone only works if all data is being requested.
88 if pullop.heads:
88 if pullop.heads:
89 return False, None
89 return False, None
90
90
91 bundle2supported = False
91 bundle2supported = False
92 if pullop.canusebundle2:
92 if pullop.canusebundle2:
93 local_caps = bundle2mod.getrepocaps(repo, role=b'client')
93 local_caps = bundle2mod.getrepocaps(repo, role=b'client')
94 local_supported = set(local_caps.get(b'stream', []))
94 local_supported = set(local_caps.get(b'stream', []))
95 remote_supported = set(pullop.remotebundle2caps.get(b'stream', []))
95 remote_supported = set(pullop.remotebundle2caps.get(b'stream', []))
96 bundle2supported = bool(local_supported & remote_supported)
96 bundle2supported = bool(local_supported & remote_supported)
97 # else
97 # else
98 # Server doesn't support bundle2 stream clone or doesn't support
98 # Server doesn't support bundle2 stream clone or doesn't support
99 # the versions we support. Fall back and possibly allow legacy.
99 # the versions we support. Fall back and possibly allow legacy.
100
100
101 # Ensures legacy code path uses available bundle2.
101 # Ensures legacy code path uses available bundle2.
102 if bundle2supported and not bundle2:
102 if bundle2supported and not bundle2:
103 return False, None
103 return False, None
104 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
104 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
105 elif bundle2 and not bundle2supported:
105 elif bundle2 and not bundle2supported:
106 return False, None
106 return False, None
107
107
108 # In order for stream clone to work, the client has to support all the
108 # In order for stream clone to work, the client has to support all the
109 # requirements advertised by the server.
109 # requirements advertised by the server.
110 #
110 #
111 # The server advertises its requirements via the "stream" and "streamreqs"
111 # The server advertises its requirements via the "stream" and "streamreqs"
112 # capability. "stream" (a value-less capability) is advertised if and only
112 # capability. "stream" (a value-less capability) is advertised if and only
113 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
113 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
114 # is advertised and contains a comma-delimited list of requirements.
114 # is advertised and contains a comma-delimited list of requirements.
115 requirements = set()
115 requirements = set()
116 if remote.capable(b'stream'):
116 if remote.capable(b'stream'):
117 requirements.add(requirementsmod.REVLOGV1_REQUIREMENT)
117 requirements.add(requirementsmod.REVLOGV1_REQUIREMENT)
118 else:
118 else:
119 streamreqs = remote.capable(b'streamreqs')
119 streamreqs = remote.capable(b'streamreqs')
120 # This is weird and shouldn't happen with modern servers.
120 # This is weird and shouldn't happen with modern servers.
121 if not streamreqs:
121 if not streamreqs:
122 pullop.repo.ui.warn(
122 pullop.repo.ui.warn(
123 _(
123 _(
124 b'warning: stream clone requested but server has them '
124 b'warning: stream clone requested but server has them '
125 b'disabled\n'
125 b'disabled\n'
126 )
126 )
127 )
127 )
128 return False, None
128 return False, None
129
129
130 streamreqs = set(streamreqs.split(b','))
130 streamreqs = set(streamreqs.split(b','))
131 # Server requires something we don't support. Bail.
131 # Server requires something we don't support. Bail.
132 missingreqs = streamreqs - repo.supported
132 missingreqs = streamreqs - repo.supported
133 if missingreqs:
133 if missingreqs:
134 pullop.repo.ui.warn(
134 pullop.repo.ui.warn(
135 _(
135 _(
136 b'warning: stream clone requested but client is missing '
136 b'warning: stream clone requested but client is missing '
137 b'requirements: %s\n'
137 b'requirements: %s\n'
138 )
138 )
139 % b', '.join(sorted(missingreqs))
139 % b', '.join(sorted(missingreqs))
140 )
140 )
141 pullop.repo.ui.warn(
141 pullop.repo.ui.warn(
142 _(
142 _(
143 b'(see https://www.mercurial-scm.org/wiki/MissingRequirement '
143 b'(see https://www.mercurial-scm.org/wiki/MissingRequirement '
144 b'for more information)\n'
144 b'for more information)\n'
145 )
145 )
146 )
146 )
147 return False, None
147 return False, None
148 requirements = streamreqs
148 requirements = streamreqs
149
149
150 return True, requirements
150 return True, requirements
151
151
152
152
153 def maybeperformlegacystreamclone(pullop):
153 def maybeperformlegacystreamclone(pullop):
154 """Possibly perform a legacy stream clone operation.
154 """Possibly perform a legacy stream clone operation.
155
155
156 Legacy stream clones are performed as part of pull but before all other
156 Legacy stream clones are performed as part of pull but before all other
157 operations.
157 operations.
158
158
159 A legacy stream clone will not be performed if a bundle2 stream clone is
159 A legacy stream clone will not be performed if a bundle2 stream clone is
160 supported.
160 supported.
161 """
161 """
162 from . import localrepo
162 from . import localrepo
163
163
164 supported, requirements = canperformstreamclone(pullop)
164 supported, requirements = canperformstreamclone(pullop)
165
165
166 if not supported:
166 if not supported:
167 return
167 return
168
168
169 repo = pullop.repo
169 repo = pullop.repo
170 remote = pullop.remote
170 remote = pullop.remote
171
171
172 # Save remote branchmap. We will use it later to speed up branchcache
172 # Save remote branchmap. We will use it later to speed up branchcache
173 # creation.
173 # creation.
174 rbranchmap = None
174 rbranchmap = None
175 if remote.capable(b'branchmap'):
175 if remote.capable(b'branchmap'):
176 with remote.commandexecutor() as e:
176 with remote.commandexecutor() as e:
177 rbranchmap = e.callcommand(b'branchmap', {}).result()
177 rbranchmap = e.callcommand(b'branchmap', {}).result()
178
178
179 repo.ui.status(_(b'streaming all changes\n'))
179 repo.ui.status(_(b'streaming all changes\n'))
180
180
181 with remote.commandexecutor() as e:
181 with remote.commandexecutor() as e:
182 fp = e.callcommand(b'stream_out', {}).result()
182 fp = e.callcommand(b'stream_out', {}).result()
183
183
184 # TODO strictly speaking, this code should all be inside the context
184 # TODO strictly speaking, this code should all be inside the context
185 # manager because the context manager is supposed to ensure all wire state
185 # manager because the context manager is supposed to ensure all wire state
186 # is flushed when exiting. But the legacy peers don't do this, so it
186 # is flushed when exiting. But the legacy peers don't do this, so it
187 # doesn't matter.
187 # doesn't matter.
188 l = fp.readline()
188 l = fp.readline()
189 try:
189 try:
190 resp = int(l)
190 resp = int(l)
191 except ValueError:
191 except ValueError:
192 raise error.ResponseError(
192 raise error.ResponseError(
193 _(b'unexpected response from remote server:'), l
193 _(b'unexpected response from remote server:'), l
194 )
194 )
195 if resp == 1:
195 if resp == 1:
196 raise error.Abort(_(b'operation forbidden by server'))
196 raise error.Abort(_(b'operation forbidden by server'))
197 elif resp == 2:
197 elif resp == 2:
198 raise error.Abort(_(b'locking the remote repository failed'))
198 raise error.Abort(_(b'locking the remote repository failed'))
199 elif resp != 0:
199 elif resp != 0:
200 raise error.Abort(_(b'the server sent an unknown error code'))
200 raise error.Abort(_(b'the server sent an unknown error code'))
201
201
202 l = fp.readline()
202 l = fp.readline()
203 try:
203 try:
204 filecount, bytecount = map(int, l.split(b' ', 1))
204 filecount, bytecount = map(int, l.split(b' ', 1))
205 except (ValueError, TypeError):
205 except (ValueError, TypeError):
206 raise error.ResponseError(
206 raise error.ResponseError(
207 _(b'unexpected response from remote server:'), l
207 _(b'unexpected response from remote server:'), l
208 )
208 )
209
209
210 with repo.lock():
210 with repo.lock():
211 consumev1(repo, fp, filecount, bytecount)
211 consumev1(repo, fp, filecount, bytecount)
212 repo.requirements = new_stream_clone_requirements(
212 repo.requirements = new_stream_clone_requirements(
213 repo.requirements,
213 repo.requirements,
214 requirements,
214 requirements,
215 )
215 )
216 repo.svfs.options = localrepo.resolvestorevfsoptions(
216 repo.svfs.options = localrepo.resolvestorevfsoptions(
217 repo.ui, repo.requirements, repo.features
217 repo.ui, repo.requirements, repo.features
218 )
218 )
219 scmutil.writereporequirements(repo)
219 scmutil.writereporequirements(repo)
220 nodemap.post_stream_cleanup(repo)
220 nodemap.post_stream_cleanup(repo)
221
221
222 if rbranchmap:
222 if rbranchmap:
223 repo._branchcaches.replace(repo, rbranchmap)
223 repo._branchcaches.replace(repo, rbranchmap)
224
224
225 repo.invalidate()
225 repo.invalidate()
226
226
227
227
228 def allowservergeneration(repo):
228 def allowservergeneration(repo):
229 """Whether streaming clones are allowed from the server."""
229 """Whether streaming clones are allowed from the server."""
230 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
230 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
231 return False
231 return False
232
232
233 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
233 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
234 return False
234 return False
235
235
236 # The way stream clone works makes it impossible to hide secret changesets.
236 # The way stream clone works makes it impossible to hide secret changesets.
237 # So don't allow this by default.
237 # So don't allow this by default.
238 secret = phases.hassecret(repo)
238 secret = phases.hassecret(repo)
239 if secret:
239 if secret:
240 return repo.ui.configbool(b'server', b'uncompressedallowsecret')
240 return repo.ui.configbool(b'server', b'uncompressedallowsecret')
241
241
242 return True
242 return True
243
243
244
244
245 # This is it's own function so extensions can override it.
245 # This is it's own function so extensions can override it.
246 def _walkstreamfiles(repo, matcher=None, phase=False, obsolescence=False):
246 def _walkstreamfiles(repo, matcher=None, phase=False, obsolescence=False):
247 return repo.store.walk(matcher, phase=phase, obsolescence=obsolescence)
247 return repo.store.walk(matcher, phase=phase, obsolescence=obsolescence)
248
248
249
249
250 def generatev1(repo):
250 def generatev1(repo):
251 """Emit content for version 1 of a streaming clone.
251 """Emit content for version 1 of a streaming clone.
252
252
253 This returns a 3-tuple of (file count, byte size, data iterator).
253 This returns a 3-tuple of (file count, byte size, data iterator).
254
254
255 The data iterator consists of N entries for each file being transferred.
255 The data iterator consists of N entries for each file being transferred.
256 Each file entry starts as a line with the file name and integer size
256 Each file entry starts as a line with the file name and integer size
257 delimited by a null byte.
257 delimited by a null byte.
258
258
259 The raw file data follows. Following the raw file data is the next file
259 The raw file data follows. Following the raw file data is the next file
260 entry, or EOF.
260 entry, or EOF.
261
261
262 When used on the wire protocol, an additional line indicating protocol
262 When used on the wire protocol, an additional line indicating protocol
263 success will be prepended to the stream. This function is not responsible
263 success will be prepended to the stream. This function is not responsible
264 for adding it.
264 for adding it.
265
265
266 This function will obtain a repository lock to ensure a consistent view of
266 This function will obtain a repository lock to ensure a consistent view of
267 the store is captured. It therefore may raise LockError.
267 the store is captured. It therefore may raise LockError.
268 """
268 """
269 entries = []
269 entries = []
270 total_bytes = 0
270 total_bytes = 0
271 # Get consistent snapshot of repo, lock during scan.
271 # Get consistent snapshot of repo, lock during scan.
272 with repo.lock():
272 with repo.lock():
273 repo.ui.debug(b'scanning\n')
273 repo.ui.debug(b'scanning\n')
274 for entry in _walkstreamfiles(repo):
274 for entry in _walkstreamfiles(repo):
275 for f in entry.files():
275 for f in entry.files():
276 file_size = f.file_size(repo.store.vfs)
276 file_size = f.file_size(repo.store.vfs)
277 if file_size:
277 if file_size:
278 entries.append((f.unencoded_path, file_size))
278 entries.append((f.unencoded_path, file_size))
279 total_bytes += file_size
279 total_bytes += file_size
280 _test_sync_point_walk_1(repo)
280 _test_sync_point_walk_1(repo)
281 _test_sync_point_walk_2(repo)
281 _test_sync_point_walk_2(repo)
282
282
283 repo.ui.debug(
283 repo.ui.debug(
284 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
284 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
285 )
285 )
286
286
287 svfs = repo.svfs
287 svfs = repo.svfs
288 debugflag = repo.ui.debugflag
288 debugflag = repo.ui.debugflag
289
289
290 def emitrevlogdata():
290 def emitrevlogdata():
291 for name, size in entries:
291 for name, size in entries:
292 if debugflag:
292 if debugflag:
293 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
293 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
294 # partially encode name over the wire for backwards compat
294 # partially encode name over the wire for backwards compat
295 yield b'%s\0%d\n' % (store.encodedir(name), size)
295 yield b'%s\0%d\n' % (store.encodedir(name), size)
296 # auditing at this stage is both pointless (paths are already
296 # auditing at this stage is both pointless (paths are already
297 # trusted by the local repo) and expensive
297 # trusted by the local repo) and expensive
298 with svfs(name, b'rb', auditpath=False) as fp:
298 with svfs(name, b'rb', auditpath=False) as fp:
299 if size <= 65536:
299 if size <= 65536:
300 yield fp.read(size)
300 yield fp.read(size)
301 else:
301 else:
302 for chunk in util.filechunkiter(fp, limit=size):
302 for chunk in util.filechunkiter(fp, limit=size):
303 yield chunk
303 yield chunk
304
304
305 return len(entries), total_bytes, emitrevlogdata()
305 return len(entries), total_bytes, emitrevlogdata()
306
306
307
307
308 def generatev1wireproto(repo):
308 def generatev1wireproto(repo):
309 """Emit content for version 1 of streaming clone suitable for the wire.
309 """Emit content for version 1 of streaming clone suitable for the wire.
310
310
311 This is the data output from ``generatev1()`` with 2 header lines. The
311 This is the data output from ``generatev1()`` with 2 header lines. The
312 first line indicates overall success. The 2nd contains the file count and
312 first line indicates overall success. The 2nd contains the file count and
313 byte size of payload.
313 byte size of payload.
314
314
315 The success line contains "0" for success, "1" for stream generation not
315 The success line contains "0" for success, "1" for stream generation not
316 allowed, and "2" for error locking the repository (possibly indicating
316 allowed, and "2" for error locking the repository (possibly indicating
317 a permissions error for the server process).
317 a permissions error for the server process).
318 """
318 """
319 if not allowservergeneration(repo):
319 if not allowservergeneration(repo):
320 yield b'1\n'
320 yield b'1\n'
321 return
321 return
322
322
323 try:
323 try:
324 filecount, bytecount, it = generatev1(repo)
324 filecount, bytecount, it = generatev1(repo)
325 except error.LockError:
325 except error.LockError:
326 yield b'2\n'
326 yield b'2\n'
327 return
327 return
328
328
329 # Indicates successful response.
329 # Indicates successful response.
330 yield b'0\n'
330 yield b'0\n'
331 yield b'%d %d\n' % (filecount, bytecount)
331 yield b'%d %d\n' % (filecount, bytecount)
332 for chunk in it:
332 for chunk in it:
333 yield chunk
333 yield chunk
334
334
335
335
336 def generatebundlev1(repo, compression=b'UN'):
336 def generatebundlev1(repo, compression=b'UN'):
337 """Emit content for version 1 of a stream clone bundle.
337 """Emit content for version 1 of a stream clone bundle.
338
338
339 The first 4 bytes of the output ("HGS1") denote this as stream clone
339 The first 4 bytes of the output ("HGS1") denote this as stream clone
340 bundle version 1.
340 bundle version 1.
341
341
342 The next 2 bytes indicate the compression type. Only "UN" is currently
342 The next 2 bytes indicate the compression type. Only "UN" is currently
343 supported.
343 supported.
344
344
345 The next 16 bytes are two 64-bit big endian unsigned integers indicating
345 The next 16 bytes are two 64-bit big endian unsigned integers indicating
346 file count and byte count, respectively.
346 file count and byte count, respectively.
347
347
348 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
348 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
349 of the requirements string, including a trailing \0. The following N bytes
349 of the requirements string, including a trailing \0. The following N bytes
350 are the requirements string, which is ASCII containing a comma-delimited
350 are the requirements string, which is ASCII containing a comma-delimited
351 list of repo requirements that are needed to support the data.
351 list of repo requirements that are needed to support the data.
352
352
353 The remaining content is the output of ``generatev1()`` (which may be
353 The remaining content is the output of ``generatev1()`` (which may be
354 compressed in the future).
354 compressed in the future).
355
355
356 Returns a tuple of (requirements, data generator).
356 Returns a tuple of (requirements, data generator).
357 """
357 """
358 if compression != b'UN':
358 if compression != b'UN':
359 raise ValueError(b'we do not support the compression argument yet')
359 raise ValueError(b'we do not support the compression argument yet')
360
360
361 requirements = streamed_requirements(repo)
361 requirements = streamed_requirements(repo)
362 requires = b','.join(sorted(requirements))
362 requires = b','.join(sorted(requirements))
363
363
364 def gen():
364 def gen():
365 yield b'HGS1'
365 yield b'HGS1'
366 yield compression
366 yield compression
367
367
368 filecount, bytecount, it = generatev1(repo)
368 filecount, bytecount, it = generatev1(repo)
369 repo.ui.status(
369 repo.ui.status(
370 _(b'writing %d bytes for %d files\n') % (bytecount, filecount)
370 _(b'writing %d bytes for %d files\n') % (bytecount, filecount)
371 )
371 )
372
372
373 yield struct.pack(b'>QQ', filecount, bytecount)
373 yield struct.pack(b'>QQ', filecount, bytecount)
374 yield struct.pack(b'>H', len(requires) + 1)
374 yield struct.pack(b'>H', len(requires) + 1)
375 yield requires + b'\0'
375 yield requires + b'\0'
376
376
377 # This is where we'll add compression in the future.
377 # This is where we'll add compression in the future.
378 assert compression == b'UN'
378 assert compression == b'UN'
379
379
380 progress = repo.ui.makeprogress(
380 progress = repo.ui.makeprogress(
381 _(b'bundle'), total=bytecount, unit=_(b'bytes')
381 _(b'bundle'), total=bytecount, unit=_(b'bytes')
382 )
382 )
383 progress.update(0)
383 progress.update(0)
384
384
385 for chunk in it:
385 for chunk in it:
386 progress.increment(step=len(chunk))
386 progress.increment(step=len(chunk))
387 yield chunk
387 yield chunk
388
388
389 progress.complete()
389 progress.complete()
390
390
391 return requirements, gen()
391 return requirements, gen()
392
392
393
393
394 def consumev1(repo, fp, filecount, bytecount):
394 def consumev1(repo, fp, filecount, bytecount):
395 """Apply the contents from version 1 of a streaming clone file handle.
395 """Apply the contents from version 1 of a streaming clone file handle.
396
396
397 This takes the output from "stream_out" and applies it to the specified
397 This takes the output from "stream_out" and applies it to the specified
398 repository.
398 repository.
399
399
400 Like "stream_out," the status line added by the wire protocol is not
400 Like "stream_out," the status line added by the wire protocol is not
401 handled by this function.
401 handled by this function.
402 """
402 """
403 with repo.lock():
403 with repo.lock():
404 repo.ui.status(
404 repo.ui.status(
405 _(b'%d files to transfer, %s of data\n')
405 _(b'%d files to transfer, %s of data\n')
406 % (filecount, util.bytecount(bytecount))
406 % (filecount, util.bytecount(bytecount))
407 )
407 )
408 progress = repo.ui.makeprogress(
408 progress = repo.ui.makeprogress(
409 _(b'clone'), total=bytecount, unit=_(b'bytes')
409 _(b'clone'), total=bytecount, unit=_(b'bytes')
410 )
410 )
411 progress.update(0)
411 progress.update(0)
412 start = util.timer()
412 start = util.timer()
413
413
414 # TODO: get rid of (potential) inconsistency
414 # TODO: get rid of (potential) inconsistency
415 #
415 #
416 # If transaction is started and any @filecache property is
416 # If transaction is started and any @filecache property is
417 # changed at this point, it causes inconsistency between
417 # changed at this point, it causes inconsistency between
418 # in-memory cached property and streamclone-ed file on the
418 # in-memory cached property and streamclone-ed file on the
419 # disk. Nested transaction prevents transaction scope "clone"
419 # disk. Nested transaction prevents transaction scope "clone"
420 # below from writing in-memory changes out at the end of it,
420 # below from writing in-memory changes out at the end of it,
421 # even though in-memory changes are discarded at the end of it
421 # even though in-memory changes are discarded at the end of it
422 # regardless of transaction nesting.
422 # regardless of transaction nesting.
423 #
423 #
424 # But transaction nesting can't be simply prohibited, because
424 # But transaction nesting can't be simply prohibited, because
425 # nesting occurs also in ordinary case (e.g. enabling
425 # nesting occurs also in ordinary case (e.g. enabling
426 # clonebundles).
426 # clonebundles).
427
427
428 with repo.transaction(b'clone'):
428 with repo.transaction(b'clone'):
429 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
429 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
430 for i in range(filecount):
430 for i in range(filecount):
431 # XXX doesn't support '\n' or '\r' in filenames
431 # XXX doesn't support '\n' or '\r' in filenames
432 l = fp.readline()
432 l = fp.readline()
433 try:
433 try:
434 name, size = l.split(b'\0', 1)
434 name, size = l.split(b'\0', 1)
435 size = int(size)
435 size = int(size)
436 except (ValueError, TypeError):
436 except (ValueError, TypeError):
437 raise error.ResponseError(
437 raise error.ResponseError(
438 _(b'unexpected response from remote server:'), l
438 _(b'unexpected response from remote server:'), l
439 )
439 )
440 if repo.ui.debugflag:
440 if repo.ui.debugflag:
441 repo.ui.debug(
441 repo.ui.debug(
442 b'adding %s (%s)\n' % (name, util.bytecount(size))
442 b'adding %s (%s)\n' % (name, util.bytecount(size))
443 )
443 )
444 # for backwards compat, name was partially encoded
444 # for backwards compat, name was partially encoded
445 path = store.decodedir(name)
445 path = store.decodedir(name)
446 with repo.svfs(path, b'w', backgroundclose=True) as ofp:
446 with repo.svfs(path, b'w', backgroundclose=True) as ofp:
447 for chunk in util.filechunkiter(fp, limit=size):
447 for chunk in util.filechunkiter(fp, limit=size):
448 progress.increment(step=len(chunk))
448 progress.increment(step=len(chunk))
449 ofp.write(chunk)
449 ofp.write(chunk)
450
450
451 # force @filecache properties to be reloaded from
451 # force @filecache properties to be reloaded from
452 # streamclone-ed file at next access
452 # streamclone-ed file at next access
453 repo.invalidate(clearfilecache=True)
453 repo.invalidate(clearfilecache=True)
454
454
455 elapsed = util.timer() - start
455 elapsed = util.timer() - start
456 if elapsed <= 0:
456 if elapsed <= 0:
457 elapsed = 0.001
457 elapsed = 0.001
458 progress.complete()
458 progress.complete()
459 repo.ui.status(
459 repo.ui.status(
460 _(b'transferred %s in %.1f seconds (%s/sec)\n')
460 _(b'transferred %s in %.1f seconds (%s/sec)\n')
461 % (
461 % (
462 util.bytecount(bytecount),
462 util.bytecount(bytecount),
463 elapsed,
463 elapsed,
464 util.bytecount(bytecount / elapsed),
464 util.bytecount(bytecount / elapsed),
465 )
465 )
466 )
466 )
467
467
468
468
469 def readbundle1header(fp):
469 def readbundle1header(fp):
470 compression = fp.read(2)
470 compression = fp.read(2)
471 if compression != b'UN':
471 if compression != b'UN':
472 raise error.Abort(
472 raise error.Abort(
473 _(
473 _(
474 b'only uncompressed stream clone bundles are '
474 b'only uncompressed stream clone bundles are '
475 b'supported; got %s'
475 b'supported; got %s'
476 )
476 )
477 % compression
477 % compression
478 )
478 )
479
479
480 filecount, bytecount = struct.unpack(b'>QQ', fp.read(16))
480 filecount, bytecount = struct.unpack(b'>QQ', fp.read(16))
481 requireslen = struct.unpack(b'>H', fp.read(2))[0]
481 requireslen = struct.unpack(b'>H', fp.read(2))[0]
482 requires = fp.read(requireslen)
482 requires = fp.read(requireslen)
483
483
484 if not requires.endswith(b'\0'):
484 if not requires.endswith(b'\0'):
485 raise error.Abort(
485 raise error.Abort(
486 _(
486 _(
487 b'malformed stream clone bundle: '
487 b'malformed stream clone bundle: '
488 b'requirements not properly encoded'
488 b'requirements not properly encoded'
489 )
489 )
490 )
490 )
491
491
492 requirements = set(requires.rstrip(b'\0').split(b','))
492 requirements = set(requires.rstrip(b'\0').split(b','))
493
493
494 return filecount, bytecount, requirements
494 return filecount, bytecount, requirements
495
495
496
496
497 def applybundlev1(repo, fp):
497 def applybundlev1(repo, fp):
498 """Apply the content from a stream clone bundle version 1.
498 """Apply the content from a stream clone bundle version 1.
499
499
500 We assume the 4 byte header has been read and validated and the file handle
500 We assume the 4 byte header has been read and validated and the file handle
501 is at the 2 byte compression identifier.
501 is at the 2 byte compression identifier.
502 """
502 """
503 if len(repo):
503 if len(repo):
504 raise error.Abort(
504 raise error.Abort(
505 _(b'cannot apply stream clone bundle on non-empty repo')
505 _(b'cannot apply stream clone bundle on non-empty repo')
506 )
506 )
507
507
508 filecount, bytecount, requirements = readbundle1header(fp)
508 filecount, bytecount, requirements = readbundle1header(fp)
509 missingreqs = requirements - repo.supported
509 missingreqs = requirements - repo.supported
510 if missingreqs:
510 if missingreqs:
511 raise error.Abort(
511 raise error.Abort(
512 _(b'unable to apply stream clone: unsupported format: %s')
512 _(b'unable to apply stream clone: unsupported format: %s')
513 % b', '.join(sorted(missingreqs))
513 % b', '.join(sorted(missingreqs))
514 )
514 )
515
515
516 consumev1(repo, fp, filecount, bytecount)
516 consumev1(repo, fp, filecount, bytecount)
517 nodemap.post_stream_cleanup(repo)
517 nodemap.post_stream_cleanup(repo)
518
518
519
519
520 class streamcloneapplier:
520 class streamcloneapplier:
521 """Class to manage applying streaming clone bundles.
521 """Class to manage applying streaming clone bundles.
522
522
523 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
523 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
524 readers to perform bundle type-specific functionality.
524 readers to perform bundle type-specific functionality.
525 """
525 """
526
526
527 def __init__(self, fh):
527 def __init__(self, fh):
528 self._fh = fh
528 self._fh = fh
529
529
530 def apply(self, repo):
530 def apply(self, repo):
531 return applybundlev1(repo, self._fh)
531 return applybundlev1(repo, self._fh)
532
532
533
533
534 # type of file to stream
534 # type of file to stream
535 _fileappend = 0 # append only file
535 _fileappend = 0 # append only file
536 _filefull = 1 # full snapshot file
536 _filefull = 1 # full snapshot file
537
537
538 # Source of the file
538 # Source of the file
539 _srcstore = b's' # store (svfs)
539 _srcstore = b's' # store (svfs)
540 _srccache = b'c' # cache (cache)
540 _srccache = b'c' # cache (cache)
541
541
542 # This is it's own function so extensions can override it.
542 # This is it's own function so extensions can override it.
543 def _walkstreamfullstorefiles(repo):
543 def _walkstreamfullstorefiles(repo):
544 """list snapshot file from the store"""
544 """list snapshot file from the store"""
545 fnames = []
545 fnames = []
546 if not repo.publishing():
546 if not repo.publishing():
547 fnames.append(b'phaseroots')
547 fnames.append(b'phaseroots')
548 return fnames
548 return fnames
549
549
550
550
551 def _filterfull(entry, copy, vfsmap):
551 def _filterfull(entry, copy, vfsmap):
552 """actually copy the snapshot files"""
552 """actually copy the snapshot files"""
553 src, name, ftype, data = entry
553 src, name, ftype, data = entry
554 if ftype != _filefull:
554 if ftype != _filefull:
555 return entry
555 return entry
556 return (src, name, ftype, copy(vfsmap[src].join(name)))
556 return (src, name, ftype, copy(vfsmap[src].join(name)))
557
557
558
558
559 class TempCopyManager:
559 class TempCopyManager:
560 """Manage temporary backup of volatile file during stream clone
560 """Manage temporary backup of volatile file during stream clone
561
561
562 This should be used as a Python context, the copies will be discarded when
562 This should be used as a Python context, the copies will be discarded when
563 exiting the context.
563 exiting the context.
564
564
565 A copy can be done by calling the object on the real path (encoded full
565 A copy can be done by calling the object on the real path (encoded full
566 path)
566 path)
567
567
568 The backup path can be retrieved using the __getitem__ protocol, obj[path].
568 The backup path can be retrieved using the __getitem__ protocol, obj[path].
569 On file without backup, it will return the unmodified path. (equivalent to
569 On file without backup, it will return the unmodified path. (equivalent to
570 `dict.get(x, x)`)
570 `dict.get(x, x)`)
571 """
571 """
572
572
573 def __init__(self):
573 def __init__(self):
574 self._copies = None
574 self._copies = None
575 self._dst_dir = None
575 self._dst_dir = None
576
576
577 def __enter__(self):
577 def __enter__(self):
578 if self._copies is not None:
578 if self._copies is not None:
579 msg = "Copies context already open"
579 msg = "Copies context already open"
580 raise error.ProgrammingError(msg)
580 raise error.ProgrammingError(msg)
581 self._copies = {}
581 self._copies = {}
582 self._dst_dir = pycompat.mkdtemp(prefix=b'hg-clone-')
582 self._dst_dir = pycompat.mkdtemp(prefix=b'hg-clone-')
583 return self
583 return self
584
584
585 def __call__(self, src):
585 def __call__(self, src):
586 """create a backup of the file at src"""
586 """create a backup of the file at src"""
587 prefix = os.path.basename(src)
587 prefix = os.path.basename(src)
588 fd, dst = pycompat.mkstemp(prefix=prefix, dir=self._dst_dir)
588 fd, dst = pycompat.mkstemp(prefix=prefix, dir=self._dst_dir)
589 os.close(fd)
589 os.close(fd)
590 self._copies[src] = dst
590 self._copies[src] = dst
591 util.copyfiles(src, dst, hardlink=True)
591 util.copyfiles(src, dst, hardlink=True)
592 return dst
592 return dst
593
593
594 def __getitem__(self, src):
594 def __getitem__(self, src):
595 """return the path to a valid version of `src`
595 """return the path to a valid version of `src`
596
596
597 If the file has no backup, the path of the file is returned
597 If the file has no backup, the path of the file is returned
598 unmodified."""
598 unmodified."""
599 return self._copies.get(src, src)
599 return self._copies.get(src, src)
600
600
601 def __exit__(self, *args, **kwars):
601 def __exit__(self, *args, **kwars):
602 """discard all backups"""
602 """discard all backups"""
603 for tmp in self._copies.values():
603 for tmp in self._copies.values():
604 util.tryunlink(tmp)
604 util.tryunlink(tmp)
605 util.tryrmdir(self._dst_dir)
605 util.tryrmdir(self._dst_dir)
606 self._copies = None
606 self._copies = None
607 self._dst_dir = None
607 self._dst_dir = None
608
608
609
609
610 def _makemap(repo):
610 def _makemap(repo):
611 """make a (src -> vfs) map for the repo"""
611 """make a (src -> vfs) map for the repo"""
612 vfsmap = {
612 vfsmap = {
613 _srcstore: repo.svfs,
613 _srcstore: repo.svfs,
614 _srccache: repo.cachevfs,
614 _srccache: repo.cachevfs,
615 }
615 }
616 # we keep repo.vfs out of the on purpose, ther are too many danger there
616 # we keep repo.vfs out of the on purpose, ther are too many danger there
617 # (eg: .hg/hgrc)
617 # (eg: .hg/hgrc)
618 assert repo.vfs not in vfsmap.values()
618 assert repo.vfs not in vfsmap.values()
619
619
620 return vfsmap
620 return vfsmap
621
621
622
622
623 def _emit2(repo, entries, totalfilesize):
623 def _emit2(repo, entries, totalfilesize):
624 """actually emit the stream bundle"""
624 """actually emit the stream bundle"""
625 vfsmap = _makemap(repo)
625 vfsmap = _makemap(repo)
626 # we keep repo.vfs out of the on purpose, ther are too many danger there
626 # we keep repo.vfs out of the on purpose, ther are too many danger there
627 # (eg: .hg/hgrc),
627 # (eg: .hg/hgrc),
628 #
628 #
629 # this assert is duplicated (from _makemap) as author might think this is
629 # this assert is duplicated (from _makemap) as author might think this is
630 # fine, while this is really not fine.
630 # fine, while this is really not fine.
631 if repo.vfs in vfsmap.values():
631 if repo.vfs in vfsmap.values():
632 raise error.ProgrammingError(
632 raise error.ProgrammingError(
633 b'repo.vfs must not be added to vfsmap for security reasons'
633 b'repo.vfs must not be added to vfsmap for security reasons'
634 )
634 )
635
635
636 progress = repo.ui.makeprogress(
636 progress = repo.ui.makeprogress(
637 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
637 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
638 )
638 )
639 progress.update(0)
639 progress.update(0)
640 with TempCopyManager() as copy, progress:
640 with TempCopyManager() as copy, progress:
641 # copy is delayed until we are in the try
641 # copy is delayed until we are in the try
642 entries = [_filterfull(e, copy, vfsmap) for e in entries]
642 entries = [_filterfull(e, copy, vfsmap) for e in entries]
643 yield None # this release the lock on the repository
643 yield None # this release the lock on the repository
644 totalbytecount = 0
644 totalbytecount = 0
645
645
646 for src, name, ftype, data in entries:
646 for src, name, ftype, data in entries:
647 vfs = vfsmap[src]
647 vfs = vfsmap[src]
648 yield src
648 yield src
649 yield util.uvarintencode(len(name))
649 yield util.uvarintencode(len(name))
650 if ftype == _fileappend:
650 if ftype == _fileappend:
651 fp = vfs(name)
651 fp = vfs(name)
652 size = data
652 size = data
653 elif ftype == _filefull:
653 elif ftype == _filefull:
654 fp = open(data, b'rb')
654 fp = open(data, b'rb')
655 size = util.fstat(fp).st_size
655 size = util.fstat(fp).st_size
656 bytecount = 0
656 bytecount = 0
657 try:
657 try:
658 yield util.uvarintencode(size)
658 yield util.uvarintencode(size)
659 yield name
659 yield name
660 if size <= 65536:
660 if size <= 65536:
661 chunks = (fp.read(size),)
661 chunks = (fp.read(size),)
662 else:
662 else:
663 chunks = util.filechunkiter(fp, limit=size)
663 chunks = util.filechunkiter(fp, limit=size)
664 for chunk in chunks:
664 for chunk in chunks:
665 bytecount += len(chunk)
665 bytecount += len(chunk)
666 totalbytecount += len(chunk)
666 totalbytecount += len(chunk)
667 progress.update(totalbytecount)
667 progress.update(totalbytecount)
668 yield chunk
668 yield chunk
669 if bytecount != size:
669 if bytecount != size:
670 # Would most likely be caused by a race due to `hg strip` or
670 # Would most likely be caused by a race due to `hg strip` or
671 # a revlog split
671 # a revlog split
672 raise error.Abort(
672 raise error.Abort(
673 _(
673 _(
674 b'clone could only read %d bytes from %s, but '
674 b'clone could only read %d bytes from %s, but '
675 b'expected %d bytes'
675 b'expected %d bytes'
676 )
676 )
677 % (bytecount, name, size)
677 % (bytecount, name, size)
678 )
678 )
679 finally:
679 finally:
680 fp.close()
680 fp.close()
681
681
682
682
683 def _test_sync_point_walk_1(repo):
683 def _test_sync_point_walk_1(repo):
684 """a function for synchronisation during tests"""
684 """a function for synchronisation during tests"""
685
685
686
686
687 def _test_sync_point_walk_2(repo):
687 def _test_sync_point_walk_2(repo):
688 """a function for synchronisation during tests"""
688 """a function for synchronisation during tests"""
689
689
690
690
691 def _entries_walk(repo, includes, excludes, includeobsmarkers):
691 def _entries_walk(repo, includes, excludes, includeobsmarkers):
692 """emit a seris of files information useful to clone a repo
692 """emit a seris of files information useful to clone a repo
693
693
694 return (vfs-key, entry) iterator
694 return (vfs-key, entry) iterator
695
695
696 Where `entry` is StoreEntry. (used even for cache entries)
696 Where `entry` is StoreEntry. (used even for cache entries)
697 """
697 """
698 assert repo._currentlock(repo._lockref) is not None
698 assert repo._currentlock(repo._lockref) is not None
699
699
700 matcher = None
700 matcher = None
701 if includes or excludes:
701 if includes or excludes:
702 matcher = narrowspec.match(repo.root, includes, excludes)
702 matcher = narrowspec.match(repo.root, includes, excludes)
703
703
704 phase = not repo.publishing()
704 phase = not repo.publishing()
705 entries = _walkstreamfiles(
705 entries = _walkstreamfiles(
706 repo,
706 repo,
707 matcher,
707 matcher,
708 phase=phase,
708 phase=phase,
709 obsolescence=includeobsmarkers,
709 obsolescence=includeobsmarkers,
710 )
710 )
711 for entry in entries:
711 for entry in entries:
712 yield (_srcstore, entry)
712 yield (_srcstore, entry)
713
713
714 for name in cacheutil.cachetocopy(repo):
714 for name in cacheutil.cachetocopy(repo):
715 if repo.cachevfs.exists(name):
715 if repo.cachevfs.exists(name):
716 # not really a StoreEntry, but close enough
716 # not really a StoreEntry, but close enough
717 entry = store.SimpleStoreEntry(
717 entry = store.SimpleStoreEntry(
718 entry_path=name,
718 entry_path=name,
719 is_volatile=True,
719 is_volatile=True,
720 )
720 )
721 yield (_srccache, entry)
721 yield (_srccache, entry)
722
722
723
723
724 def _v2_walk(repo, includes, excludes, includeobsmarkers):
724 def _v2_walk(repo, includes, excludes, includeobsmarkers):
725 """emit a seris of files information useful to clone a repo
725 """emit a seris of files information useful to clone a repo
726
726
727 return (entries, totalfilesize)
727 return (entries, totalfilesize)
728
728
729 entries is a list of tuple (vfs-key, file-path, file-type, size)
729 entries is a list of tuple (vfs-key, file-path, file-type, size)
730
730
731 - `vfs-key`: is a key to the right vfs to write the file (see _makemap)
731 - `vfs-key`: is a key to the right vfs to write the file (see _makemap)
732 - `name`: file path of the file to copy (to be feed to the vfss)
732 - `name`: file path of the file to copy (to be feed to the vfss)
733 - `file-type`: do this file need to be copied with the source lock ?
733 - `file-type`: do this file need to be copied with the source lock ?
734 - `size`: the size of the file (or None)
734 - `size`: the size of the file (or None)
735 """
735 """
736 assert repo._currentlock(repo._lockref) is not None
736 assert repo._currentlock(repo._lockref) is not None
737 files = []
737 files = []
738 totalfilesize = 0
738 totalfilesize = 0
739
739
740 vfsmap = _makemap(repo)
740 vfsmap = _makemap(repo)
741 entries = _entries_walk(repo, includes, excludes, includeobsmarkers)
741 entries = _entries_walk(repo, includes, excludes, includeobsmarkers)
742 for vfs_key, entry in entries:
742 for vfs_key, entry in entries:
743 vfs = vfsmap[vfs_key]
743 vfs = vfsmap[vfs_key]
744 for f in entry.files():
744 for f in entry.files():
745 file_size = f.file_size(vfs)
745 file_size = f.file_size(vfs)
746 if file_size:
746 if file_size:
747 ft = _fileappend
747 ft = _fileappend
748 if f.is_volatile:
748 if f.is_volatile:
749 ft = _filefull
749 ft = _filefull
750 files.append((vfs_key, f.unencoded_path, ft, file_size))
750 files.append((vfs_key, f.unencoded_path, ft, file_size))
751 totalfilesize += file_size
751 totalfilesize += file_size
752 return files, totalfilesize
752 return files, totalfilesize
753
753
754
754
755 def generatev2(repo, includes, excludes, includeobsmarkers):
755 def generatev2(repo, includes, excludes, includeobsmarkers):
756 """Emit content for version 2 of a streaming clone.
756 """Emit content for version 2 of a streaming clone.
757
757
758 the data stream consists the following entries:
758 the data stream consists the following entries:
759 1) A char representing the file destination (eg: store or cache)
759 1) A char representing the file destination (eg: store or cache)
760 2) A varint containing the length of the filename
760 2) A varint containing the length of the filename
761 3) A varint containing the length of file data
761 3) A varint containing the length of file data
762 4) N bytes containing the filename (the internal, store-agnostic form)
762 4) N bytes containing the filename (the internal, store-agnostic form)
763 5) N bytes containing the file data
763 5) N bytes containing the file data
764
764
765 Returns a 3-tuple of (file count, file size, data iterator).
765 Returns a 3-tuple of (file count, file size, data iterator).
766 """
766 """
767
767
768 with repo.lock():
768 with repo.lock():
769
769
770 repo.ui.debug(b'scanning\n')
770 repo.ui.debug(b'scanning\n')
771
771
772 entries, totalfilesize = _v2_walk(
772 entries, totalfilesize = _v2_walk(
773 repo,
773 repo,
774 includes=includes,
774 includes=includes,
775 excludes=excludes,
775 excludes=excludes,
776 includeobsmarkers=includeobsmarkers,
776 includeobsmarkers=includeobsmarkers,
777 )
777 )
778
778
779 chunks = _emit2(repo, entries, totalfilesize)
779 chunks = _emit2(repo, entries, totalfilesize)
780 first = next(chunks)
780 first = next(chunks)
781 assert first is None
781 assert first is None
782 _test_sync_point_walk_1(repo)
782 _test_sync_point_walk_1(repo)
783 _test_sync_point_walk_2(repo)
783 _test_sync_point_walk_2(repo)
784
784
785 return len(entries), totalfilesize, chunks
785 return len(entries), totalfilesize, chunks
786
786
787
787
788 def generatev3(repo, includes, excludes, includeobsmarkers):
788 def generatev3(repo, includes, excludes, includeobsmarkers):
789 return generatev2(repo, includes, excludes, includeobsmarkers)
789 return generatev2(repo, includes, excludes, includeobsmarkers)
790
790
791
791
792 @contextlib.contextmanager
792 @contextlib.contextmanager
793 def nested(*ctxs):
793 def nested(*ctxs):
794 this = ctxs[0]
794 this = ctxs[0]
795 rest = ctxs[1:]
795 rest = ctxs[1:]
796 with this:
796 with this:
797 if rest:
797 if rest:
798 with nested(*rest):
798 with nested(*rest):
799 yield
799 yield
800 else:
800 else:
801 yield
801 yield
802
802
803
803
804 def consumev2(repo, fp, filecount, filesize):
804 def consumev2(repo, fp, filecount, filesize):
805 """Apply the contents from a version 2 streaming clone.
805 """Apply the contents from a version 2 streaming clone.
806
806
807 Data is read from an object that only needs to provide a ``read(size)``
807 Data is read from an object that only needs to provide a ``read(size)``
808 method.
808 method.
809 """
809 """
810 with repo.lock():
810 with repo.lock():
811 repo.ui.status(
811 repo.ui.status(
812 _(b'%d files to transfer, %s of data\n')
812 _(b'%d files to transfer, %s of data\n')
813 % (filecount, util.bytecount(filesize))
813 % (filecount, util.bytecount(filesize))
814 )
814 )
815
815
816 start = util.timer()
816 start = util.timer()
817 progress = repo.ui.makeprogress(
817 progress = repo.ui.makeprogress(
818 _(b'clone'), total=filesize, unit=_(b'bytes')
818 _(b'clone'), total=filesize, unit=_(b'bytes')
819 )
819 )
820 progress.update(0)
820 progress.update(0)
821
821
822 vfsmap = _makemap(repo)
822 vfsmap = _makemap(repo)
823 # we keep repo.vfs out of the on purpose, ther are too many danger
823 # we keep repo.vfs out of the on purpose, ther are too many danger
824 # there (eg: .hg/hgrc),
824 # there (eg: .hg/hgrc),
825 #
825 #
826 # this assert is duplicated (from _makemap) as author might think this
826 # this assert is duplicated (from _makemap) as author might think this
827 # is fine, while this is really not fine.
827 # is fine, while this is really not fine.
828 if repo.vfs in vfsmap.values():
828 if repo.vfs in vfsmap.values():
829 raise error.ProgrammingError(
829 raise error.ProgrammingError(
830 b'repo.vfs must not be added to vfsmap for security reasons'
830 b'repo.vfs must not be added to vfsmap for security reasons'
831 )
831 )
832
832
833 with repo.transaction(b'clone'):
833 with repo.transaction(b'clone'):
834 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
834 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
835 with nested(*ctxs):
835 with nested(*ctxs):
836 for i in range(filecount):
836 for i in range(filecount):
837 src = util.readexactly(fp, 1)
837 src = util.readexactly(fp, 1)
838 vfs = vfsmap[src]
838 vfs = vfsmap[src]
839 namelen = util.uvarintdecodestream(fp)
839 namelen = util.uvarintdecodestream(fp)
840 datalen = util.uvarintdecodestream(fp)
840 datalen = util.uvarintdecodestream(fp)
841
841
842 name = util.readexactly(fp, namelen)
842 name = util.readexactly(fp, namelen)
843
843
844 if repo.ui.debugflag:
844 if repo.ui.debugflag:
845 repo.ui.debug(
845 repo.ui.debug(
846 b'adding [%s] %s (%s)\n'
846 b'adding [%s] %s (%s)\n'
847 % (src, name, util.bytecount(datalen))
847 % (src, name, util.bytecount(datalen))
848 )
848 )
849
849
850 with vfs(name, b'w') as ofp:
850 with vfs(name, b'w') as ofp:
851 for chunk in util.filechunkiter(fp, limit=datalen):
851 for chunk in util.filechunkiter(fp, limit=datalen):
852 progress.increment(step=len(chunk))
852 progress.increment(step=len(chunk))
853 ofp.write(chunk)
853 ofp.write(chunk)
854
854
855 # force @filecache properties to be reloaded from
855 # force @filecache properties to be reloaded from
856 # streamclone-ed file at next access
856 # streamclone-ed file at next access
857 repo.invalidate(clearfilecache=True)
857 repo.invalidate(clearfilecache=True)
858
858
859 elapsed = util.timer() - start
859 elapsed = util.timer() - start
860 if elapsed <= 0:
860 if elapsed <= 0:
861 elapsed = 0.001
861 elapsed = 0.001
862 repo.ui.status(
862 repo.ui.status(
863 _(b'transferred %s in %.1f seconds (%s/sec)\n')
863 _(b'transferred %s in %.1f seconds (%s/sec)\n')
864 % (
864 % (
865 util.bytecount(progress.pos),
865 util.bytecount(progress.pos),
866 elapsed,
866 elapsed,
867 util.bytecount(progress.pos / elapsed),
867 util.bytecount(progress.pos / elapsed),
868 )
868 )
869 )
869 )
870 progress.complete()
870 progress.complete()
871
871
872
872
873 def applybundlev2(repo, fp, filecount, filesize, requirements):
873 def applybundlev2(repo, fp, filecount, filesize, requirements):
874 from . import localrepo
874 from . import localrepo
875
875
876 missingreqs = [r for r in requirements if r not in repo.supported]
876 missingreqs = [r for r in requirements if r not in repo.supported]
877 if missingreqs:
877 if missingreqs:
878 raise error.Abort(
878 raise error.Abort(
879 _(b'unable to apply stream clone: unsupported format: %s')
879 _(b'unable to apply stream clone: unsupported format: %s')
880 % b', '.join(sorted(missingreqs))
880 % b', '.join(sorted(missingreqs))
881 )
881 )
882
882
883 consumev2(repo, fp, filecount, filesize)
883 consumev2(repo, fp, filecount, filesize)
884
884
885 repo.requirements = new_stream_clone_requirements(
885 repo.requirements = new_stream_clone_requirements(
886 repo.requirements,
886 repo.requirements,
887 requirements,
887 requirements,
888 )
888 )
889 repo.svfs.options = localrepo.resolvestorevfsoptions(
889 repo.svfs.options = localrepo.resolvestorevfsoptions(
890 repo.ui, repo.requirements, repo.features
890 repo.ui, repo.requirements, repo.features
891 )
891 )
892 scmutil.writereporequirements(repo)
892 scmutil.writereporequirements(repo)
893 nodemap.post_stream_cleanup(repo)
893 nodemap.post_stream_cleanup(repo)
894
894
895
895
896 def _copy_files(src_vfs_map, dst_vfs_map, entries, progress):
896 def _copy_files(src_vfs_map, dst_vfs_map, entries, progress):
897 hardlink = [True]
897 hardlink = [True]
898
898
899 def copy_used():
899 def copy_used():
900 hardlink[0] = False
900 hardlink[0] = False
901 progress.topic = _(b'copying')
901 progress.topic = _(b'copying')
902
902
903 for k, path, size in entries:
903 for k, path in entries:
904 src_vfs = src_vfs_map[k]
904 src_vfs = src_vfs_map[k]
905 dst_vfs = dst_vfs_map[k]
905 dst_vfs = dst_vfs_map[k]
906 src_path = src_vfs.join(path)
906 src_path = src_vfs.join(path)
907 dst_path = dst_vfs.join(path)
907 dst_path = dst_vfs.join(path)
908 # We cannot use dirname and makedirs of dst_vfs here because the store
908 # We cannot use dirname and makedirs of dst_vfs here because the store
909 # encoding confuses them. See issue 6581 for details.
909 # encoding confuses them. See issue 6581 for details.
910 dirname = os.path.dirname(dst_path)
910 dirname = os.path.dirname(dst_path)
911 if not os.path.exists(dirname):
911 if not os.path.exists(dirname):
912 util.makedirs(dirname)
912 util.makedirs(dirname)
913 dst_vfs.register_file(path)
913 dst_vfs.register_file(path)
914 # XXX we could use the #nb_bytes argument.
914 # XXX we could use the #nb_bytes argument.
915 util.copyfile(
915 util.copyfile(
916 src_path,
916 src_path,
917 dst_path,
917 dst_path,
918 hardlink=hardlink[0],
918 hardlink=hardlink[0],
919 no_hardlink_cb=copy_used,
919 no_hardlink_cb=copy_used,
920 check_fs_hardlink=False,
920 check_fs_hardlink=False,
921 )
921 )
922 progress.increment()
922 progress.increment()
923 return hardlink[0]
923 return hardlink[0]
924
924
925
925
926 def local_copy(src_repo, dest_repo):
926 def local_copy(src_repo, dest_repo):
927 """copy all content from one local repository to another
927 """copy all content from one local repository to another
928
928
929 This is useful for local clone"""
929 This is useful for local clone"""
930 src_store_requirements = {
930 src_store_requirements = {
931 r
931 r
932 for r in src_repo.requirements
932 for r in src_repo.requirements
933 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
933 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
934 }
934 }
935 dest_store_requirements = {
935 dest_store_requirements = {
936 r
936 r
937 for r in dest_repo.requirements
937 for r in dest_repo.requirements
938 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
938 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
939 }
939 }
940 assert src_store_requirements == dest_store_requirements
940 assert src_store_requirements == dest_store_requirements
941
941
942 with dest_repo.lock():
942 with dest_repo.lock():
943 with src_repo.lock():
943 with src_repo.lock():
944
944
945 # bookmark is not integrated to the streaming as it might use the
945 # bookmark is not integrated to the streaming as it might use the
946 # `repo.vfs` and they are too many sentitive data accessible
946 # `repo.vfs` and they are too many sentitive data accessible
947 # through `repo.vfs` to expose it to streaming clone.
947 # through `repo.vfs` to expose it to streaming clone.
948 src_book_vfs = bookmarks.bookmarksvfs(src_repo)
948 src_book_vfs = bookmarks.bookmarksvfs(src_repo)
949 srcbookmarks = src_book_vfs.join(b'bookmarks')
949 srcbookmarks = src_book_vfs.join(b'bookmarks')
950 bm_count = 0
950 bm_count = 0
951 if os.path.exists(srcbookmarks):
951 if os.path.exists(srcbookmarks):
952 bm_count = 1
952 bm_count = 1
953
953
954 entries, totalfilesize = _v2_walk(
954 entries = _entries_walk(
955 src_repo,
955 src_repo,
956 includes=None,
956 includes=None,
957 excludes=None,
957 excludes=None,
958 includeobsmarkers=True,
958 includeobsmarkers=True,
959 )
959 )
960 entries = list(entries)
960 src_vfs_map = _makemap(src_repo)
961 src_vfs_map = _makemap(src_repo)
961 dest_vfs_map = _makemap(dest_repo)
962 dest_vfs_map = _makemap(dest_repo)
963 total_files = sum(len(e[1].files()) for e in entries) + bm_count
962 progress = src_repo.ui.makeprogress(
964 progress = src_repo.ui.makeprogress(
963 topic=_(b'linking'),
965 topic=_(b'linking'),
964 total=len(entries) + bm_count,
966 total=total_files,
965 unit=_(b'files'),
967 unit=_(b'files'),
966 )
968 )
967 # copy files
969 # copy files
968 #
970 #
969 # We could copy the full file while the source repository is locked
971 # We could copy the full file while the source repository is locked
970 # and the other one without the lock. However, in the linking case,
972 # and the other one without the lock. However, in the linking case,
971 # this would also requires checks that nobody is appending any data
973 # this would also requires checks that nobody is appending any data
972 # to the files while we do the clone, so this is not done yet. We
974 # to the files while we do the clone, so this is not done yet. We
973 # could do this blindly when copying files.
975 # could do this blindly when copying files.
974 files = ((k, path, size) for k, path, ftype, size in entries)
976 files = [
977 (vfs_key, f.unencoded_path)
978 for vfs_key, e in entries
979 for f in e.files()
980 ]
975 hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress)
981 hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress)
976
982
977 # copy bookmarks over
983 # copy bookmarks over
978 if bm_count:
984 if bm_count:
979 dst_book_vfs = bookmarks.bookmarksvfs(dest_repo)
985 dst_book_vfs = bookmarks.bookmarksvfs(dest_repo)
980 dstbookmarks = dst_book_vfs.join(b'bookmarks')
986 dstbookmarks = dst_book_vfs.join(b'bookmarks')
981 util.copyfile(srcbookmarks, dstbookmarks)
987 util.copyfile(srcbookmarks, dstbookmarks)
982 progress.complete()
988 progress.complete()
983 if hardlink:
989 if hardlink:
984 msg = b'linked %d files\n'
990 msg = b'linked %d files\n'
985 else:
991 else:
986 msg = b'copied %d files\n'
992 msg = b'copied %d files\n'
987 src_repo.ui.debug(msg % (len(entries) + bm_count))
993 src_repo.ui.debug(msg % total_files)
988
994
989 with dest_repo.transaction(b"localclone") as tr:
995 with dest_repo.transaction(b"localclone") as tr:
990 dest_repo.store.write(tr)
996 dest_repo.store.write(tr)
991
997
992 # clean up transaction file as they do not make sense
998 # clean up transaction file as they do not make sense
993 transaction.cleanup_undo_files(dest_repo.ui.warn, dest_repo.vfs_map)
999 transaction.cleanup_undo_files(dest_repo.ui.warn, dest_repo.vfs_map)
General Comments 0
You need to be logged in to leave comments. Login now