##// END OF EJS Templates
stream-clone: directly use `_entries_walk` to generate stream-v2...
marmoute -
r51528:3416b463 default
parent child Browse files
Show More
@@ -1,998 +1,1009
1 # streamclone.py - producing and consuming streaming repository data
1 # streamclone.py - producing and consuming streaming repository data
2 #
2 #
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8
8
9 import contextlib
9 import contextlib
10 import os
10 import os
11 import struct
11 import struct
12
12
13 from .i18n import _
13 from .i18n import _
14 from .pycompat import open
14 from .pycompat import open
15 from .interfaces import repository
15 from .interfaces import repository
16 from . import (
16 from . import (
17 bookmarks,
17 bookmarks,
18 bundle2 as bundle2mod,
18 bundle2 as bundle2mod,
19 cacheutil,
19 cacheutil,
20 error,
20 error,
21 narrowspec,
21 narrowspec,
22 phases,
22 phases,
23 pycompat,
23 pycompat,
24 requirements as requirementsmod,
24 requirements as requirementsmod,
25 scmutil,
25 scmutil,
26 store,
26 store,
27 transaction,
27 transaction,
28 util,
28 util,
29 )
29 )
30 from .revlogutils import (
30 from .revlogutils import (
31 nodemap,
31 nodemap,
32 )
32 )
33
33
34
34
35 def new_stream_clone_requirements(default_requirements, streamed_requirements):
35 def new_stream_clone_requirements(default_requirements, streamed_requirements):
36 """determine the final set of requirement for a new stream clone
36 """determine the final set of requirement for a new stream clone
37
37
38 this method combine the "default" requirements that a new repository would
38 this method combine the "default" requirements that a new repository would
39 use with the constaint we get from the stream clone content. We keep local
39 use with the constaint we get from the stream clone content. We keep local
40 configuration choice when possible.
40 configuration choice when possible.
41 """
41 """
42 requirements = set(default_requirements)
42 requirements = set(default_requirements)
43 requirements -= requirementsmod.STREAM_FIXED_REQUIREMENTS
43 requirements -= requirementsmod.STREAM_FIXED_REQUIREMENTS
44 requirements.update(streamed_requirements)
44 requirements.update(streamed_requirements)
45 return requirements
45 return requirements
46
46
47
47
48 def streamed_requirements(repo):
48 def streamed_requirements(repo):
49 """the set of requirement the new clone will have to support
49 """the set of requirement the new clone will have to support
50
50
51 This is used for advertising the stream options and to generate the actual
51 This is used for advertising the stream options and to generate the actual
52 stream content."""
52 stream content."""
53 requiredformats = (
53 requiredformats = (
54 repo.requirements & requirementsmod.STREAM_FIXED_REQUIREMENTS
54 repo.requirements & requirementsmod.STREAM_FIXED_REQUIREMENTS
55 )
55 )
56 return requiredformats
56 return requiredformats
57
57
58
58
59 def canperformstreamclone(pullop, bundle2=False):
59 def canperformstreamclone(pullop, bundle2=False):
60 """Whether it is possible to perform a streaming clone as part of pull.
60 """Whether it is possible to perform a streaming clone as part of pull.
61
61
62 ``bundle2`` will cause the function to consider stream clone through
62 ``bundle2`` will cause the function to consider stream clone through
63 bundle2 and only through bundle2.
63 bundle2 and only through bundle2.
64
64
65 Returns a tuple of (supported, requirements). ``supported`` is True if
65 Returns a tuple of (supported, requirements). ``supported`` is True if
66 streaming clone is supported and False otherwise. ``requirements`` is
66 streaming clone is supported and False otherwise. ``requirements`` is
67 a set of repo requirements from the remote, or ``None`` if stream clone
67 a set of repo requirements from the remote, or ``None`` if stream clone
68 isn't supported.
68 isn't supported.
69 """
69 """
70 repo = pullop.repo
70 repo = pullop.repo
71 remote = pullop.remote
71 remote = pullop.remote
72
72
73 # should we consider streaming clone at all ?
73 # should we consider streaming clone at all ?
74 streamrequested = pullop.streamclonerequested
74 streamrequested = pullop.streamclonerequested
75 # If we don't have a preference, let the server decide for us. This
75 # If we don't have a preference, let the server decide for us. This
76 # likely only comes into play in LANs.
76 # likely only comes into play in LANs.
77 if streamrequested is None:
77 if streamrequested is None:
78 # The server can advertise whether to prefer streaming clone.
78 # The server can advertise whether to prefer streaming clone.
79 streamrequested = remote.capable(b'stream-preferred')
79 streamrequested = remote.capable(b'stream-preferred')
80 if not streamrequested:
80 if not streamrequested:
81 return False, None
81 return False, None
82
82
83 # Streaming clone only works on an empty destination repository
83 # Streaming clone only works on an empty destination repository
84 if len(repo):
84 if len(repo):
85 return False, None
85 return False, None
86
86
87 # Streaming clone only works if all data is being requested.
87 # Streaming clone only works if all data is being requested.
88 if pullop.heads:
88 if pullop.heads:
89 return False, None
89 return False, None
90
90
91 bundle2supported = False
91 bundle2supported = False
92 if pullop.canusebundle2:
92 if pullop.canusebundle2:
93 local_caps = bundle2mod.getrepocaps(repo, role=b'client')
93 local_caps = bundle2mod.getrepocaps(repo, role=b'client')
94 local_supported = set(local_caps.get(b'stream', []))
94 local_supported = set(local_caps.get(b'stream', []))
95 remote_supported = set(pullop.remotebundle2caps.get(b'stream', []))
95 remote_supported = set(pullop.remotebundle2caps.get(b'stream', []))
96 bundle2supported = bool(local_supported & remote_supported)
96 bundle2supported = bool(local_supported & remote_supported)
97 # else
97 # else
98 # Server doesn't support bundle2 stream clone or doesn't support
98 # Server doesn't support bundle2 stream clone or doesn't support
99 # the versions we support. Fall back and possibly allow legacy.
99 # the versions we support. Fall back and possibly allow legacy.
100
100
101 # Ensures legacy code path uses available bundle2.
101 # Ensures legacy code path uses available bundle2.
102 if bundle2supported and not bundle2:
102 if bundle2supported and not bundle2:
103 return False, None
103 return False, None
104 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
104 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
105 elif bundle2 and not bundle2supported:
105 elif bundle2 and not bundle2supported:
106 return False, None
106 return False, None
107
107
108 # In order for stream clone to work, the client has to support all the
108 # In order for stream clone to work, the client has to support all the
109 # requirements advertised by the server.
109 # requirements advertised by the server.
110 #
110 #
111 # The server advertises its requirements via the "stream" and "streamreqs"
111 # The server advertises its requirements via the "stream" and "streamreqs"
112 # capability. "stream" (a value-less capability) is advertised if and only
112 # capability. "stream" (a value-less capability) is advertised if and only
113 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
113 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
114 # is advertised and contains a comma-delimited list of requirements.
114 # is advertised and contains a comma-delimited list of requirements.
115 requirements = set()
115 requirements = set()
116 if remote.capable(b'stream'):
116 if remote.capable(b'stream'):
117 requirements.add(requirementsmod.REVLOGV1_REQUIREMENT)
117 requirements.add(requirementsmod.REVLOGV1_REQUIREMENT)
118 else:
118 else:
119 streamreqs = remote.capable(b'streamreqs')
119 streamreqs = remote.capable(b'streamreqs')
120 # This is weird and shouldn't happen with modern servers.
120 # This is weird and shouldn't happen with modern servers.
121 if not streamreqs:
121 if not streamreqs:
122 pullop.repo.ui.warn(
122 pullop.repo.ui.warn(
123 _(
123 _(
124 b'warning: stream clone requested but server has them '
124 b'warning: stream clone requested but server has them '
125 b'disabled\n'
125 b'disabled\n'
126 )
126 )
127 )
127 )
128 return False, None
128 return False, None
129
129
130 streamreqs = set(streamreqs.split(b','))
130 streamreqs = set(streamreqs.split(b','))
131 # Server requires something we don't support. Bail.
131 # Server requires something we don't support. Bail.
132 missingreqs = streamreqs - repo.supported
132 missingreqs = streamreqs - repo.supported
133 if missingreqs:
133 if missingreqs:
134 pullop.repo.ui.warn(
134 pullop.repo.ui.warn(
135 _(
135 _(
136 b'warning: stream clone requested but client is missing '
136 b'warning: stream clone requested but client is missing '
137 b'requirements: %s\n'
137 b'requirements: %s\n'
138 )
138 )
139 % b', '.join(sorted(missingreqs))
139 % b', '.join(sorted(missingreqs))
140 )
140 )
141 pullop.repo.ui.warn(
141 pullop.repo.ui.warn(
142 _(
142 _(
143 b'(see https://www.mercurial-scm.org/wiki/MissingRequirement '
143 b'(see https://www.mercurial-scm.org/wiki/MissingRequirement '
144 b'for more information)\n'
144 b'for more information)\n'
145 )
145 )
146 )
146 )
147 return False, None
147 return False, None
148 requirements = streamreqs
148 requirements = streamreqs
149
149
150 return True, requirements
150 return True, requirements
151
151
152
152
153 def maybeperformlegacystreamclone(pullop):
153 def maybeperformlegacystreamclone(pullop):
154 """Possibly perform a legacy stream clone operation.
154 """Possibly perform a legacy stream clone operation.
155
155
156 Legacy stream clones are performed as part of pull but before all other
156 Legacy stream clones are performed as part of pull but before all other
157 operations.
157 operations.
158
158
159 A legacy stream clone will not be performed if a bundle2 stream clone is
159 A legacy stream clone will not be performed if a bundle2 stream clone is
160 supported.
160 supported.
161 """
161 """
162 from . import localrepo
162 from . import localrepo
163
163
164 supported, requirements = canperformstreamclone(pullop)
164 supported, requirements = canperformstreamclone(pullop)
165
165
166 if not supported:
166 if not supported:
167 return
167 return
168
168
169 repo = pullop.repo
169 repo = pullop.repo
170 remote = pullop.remote
170 remote = pullop.remote
171
171
172 # Save remote branchmap. We will use it later to speed up branchcache
172 # Save remote branchmap. We will use it later to speed up branchcache
173 # creation.
173 # creation.
174 rbranchmap = None
174 rbranchmap = None
175 if remote.capable(b'branchmap'):
175 if remote.capable(b'branchmap'):
176 with remote.commandexecutor() as e:
176 with remote.commandexecutor() as e:
177 rbranchmap = e.callcommand(b'branchmap', {}).result()
177 rbranchmap = e.callcommand(b'branchmap', {}).result()
178
178
179 repo.ui.status(_(b'streaming all changes\n'))
179 repo.ui.status(_(b'streaming all changes\n'))
180
180
181 with remote.commandexecutor() as e:
181 with remote.commandexecutor() as e:
182 fp = e.callcommand(b'stream_out', {}).result()
182 fp = e.callcommand(b'stream_out', {}).result()
183
183
184 # TODO strictly speaking, this code should all be inside the context
184 # TODO strictly speaking, this code should all be inside the context
185 # manager because the context manager is supposed to ensure all wire state
185 # manager because the context manager is supposed to ensure all wire state
186 # is flushed when exiting. But the legacy peers don't do this, so it
186 # is flushed when exiting. But the legacy peers don't do this, so it
187 # doesn't matter.
187 # doesn't matter.
188 l = fp.readline()
188 l = fp.readline()
189 try:
189 try:
190 resp = int(l)
190 resp = int(l)
191 except ValueError:
191 except ValueError:
192 raise error.ResponseError(
192 raise error.ResponseError(
193 _(b'unexpected response from remote server:'), l
193 _(b'unexpected response from remote server:'), l
194 )
194 )
195 if resp == 1:
195 if resp == 1:
196 raise error.Abort(_(b'operation forbidden by server'))
196 raise error.Abort(_(b'operation forbidden by server'))
197 elif resp == 2:
197 elif resp == 2:
198 raise error.Abort(_(b'locking the remote repository failed'))
198 raise error.Abort(_(b'locking the remote repository failed'))
199 elif resp != 0:
199 elif resp != 0:
200 raise error.Abort(_(b'the server sent an unknown error code'))
200 raise error.Abort(_(b'the server sent an unknown error code'))
201
201
202 l = fp.readline()
202 l = fp.readline()
203 try:
203 try:
204 filecount, bytecount = map(int, l.split(b' ', 1))
204 filecount, bytecount = map(int, l.split(b' ', 1))
205 except (ValueError, TypeError):
205 except (ValueError, TypeError):
206 raise error.ResponseError(
206 raise error.ResponseError(
207 _(b'unexpected response from remote server:'), l
207 _(b'unexpected response from remote server:'), l
208 )
208 )
209
209
210 with repo.lock():
210 with repo.lock():
211 consumev1(repo, fp, filecount, bytecount)
211 consumev1(repo, fp, filecount, bytecount)
212 repo.requirements = new_stream_clone_requirements(
212 repo.requirements = new_stream_clone_requirements(
213 repo.requirements,
213 repo.requirements,
214 requirements,
214 requirements,
215 )
215 )
216 repo.svfs.options = localrepo.resolvestorevfsoptions(
216 repo.svfs.options = localrepo.resolvestorevfsoptions(
217 repo.ui, repo.requirements, repo.features
217 repo.ui, repo.requirements, repo.features
218 )
218 )
219 scmutil.writereporequirements(repo)
219 scmutil.writereporequirements(repo)
220 nodemap.post_stream_cleanup(repo)
220 nodemap.post_stream_cleanup(repo)
221
221
222 if rbranchmap:
222 if rbranchmap:
223 repo._branchcaches.replace(repo, rbranchmap)
223 repo._branchcaches.replace(repo, rbranchmap)
224
224
225 repo.invalidate()
225 repo.invalidate()
226
226
227
227
228 def allowservergeneration(repo):
228 def allowservergeneration(repo):
229 """Whether streaming clones are allowed from the server."""
229 """Whether streaming clones are allowed from the server."""
230 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
230 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
231 return False
231 return False
232
232
233 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
233 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
234 return False
234 return False
235
235
236 # The way stream clone works makes it impossible to hide secret changesets.
236 # The way stream clone works makes it impossible to hide secret changesets.
237 # So don't allow this by default.
237 # So don't allow this by default.
238 secret = phases.hassecret(repo)
238 secret = phases.hassecret(repo)
239 if secret:
239 if secret:
240 return repo.ui.configbool(b'server', b'uncompressedallowsecret')
240 return repo.ui.configbool(b'server', b'uncompressedallowsecret')
241
241
242 return True
242 return True
243
243
244
244
245 # This is it's own function so extensions can override it.
245 # This is it's own function so extensions can override it.
246 def _walkstreamfiles(repo, matcher=None, phase=False, obsolescence=False):
246 def _walkstreamfiles(repo, matcher=None, phase=False, obsolescence=False):
247 return repo.store.walk(matcher, phase=phase, obsolescence=obsolescence)
247 return repo.store.walk(matcher, phase=phase, obsolescence=obsolescence)
248
248
249
249
250 def generatev1(repo):
250 def generatev1(repo):
251 """Emit content for version 1 of a streaming clone.
251 """Emit content for version 1 of a streaming clone.
252
252
253 This returns a 3-tuple of (file count, byte size, data iterator).
253 This returns a 3-tuple of (file count, byte size, data iterator).
254
254
255 The data iterator consists of N entries for each file being transferred.
255 The data iterator consists of N entries for each file being transferred.
256 Each file entry starts as a line with the file name and integer size
256 Each file entry starts as a line with the file name and integer size
257 delimited by a null byte.
257 delimited by a null byte.
258
258
259 The raw file data follows. Following the raw file data is the next file
259 The raw file data follows. Following the raw file data is the next file
260 entry, or EOF.
260 entry, or EOF.
261
261
262 When used on the wire protocol, an additional line indicating protocol
262 When used on the wire protocol, an additional line indicating protocol
263 success will be prepended to the stream. This function is not responsible
263 success will be prepended to the stream. This function is not responsible
264 for adding it.
264 for adding it.
265
265
266 This function will obtain a repository lock to ensure a consistent view of
266 This function will obtain a repository lock to ensure a consistent view of
267 the store is captured. It therefore may raise LockError.
267 the store is captured. It therefore may raise LockError.
268 """
268 """
269 entries = []
269 entries = []
270 total_bytes = 0
270 total_bytes = 0
271 # Get consistent snapshot of repo, lock during scan.
271 # Get consistent snapshot of repo, lock during scan.
272 with repo.lock():
272 with repo.lock():
273 repo.ui.debug(b'scanning\n')
273 repo.ui.debug(b'scanning\n')
274 for entry in _walkstreamfiles(repo):
274 for entry in _walkstreamfiles(repo):
275 for f in entry.files():
275 for f in entry.files():
276 file_size = f.file_size(repo.store.vfs)
276 file_size = f.file_size(repo.store.vfs)
277 if file_size:
277 if file_size:
278 entries.append((f.unencoded_path, file_size))
278 entries.append((f.unencoded_path, file_size))
279 total_bytes += file_size
279 total_bytes += file_size
280 _test_sync_point_walk_1(repo)
280 _test_sync_point_walk_1(repo)
281 _test_sync_point_walk_2(repo)
281 _test_sync_point_walk_2(repo)
282
282
283 repo.ui.debug(
283 repo.ui.debug(
284 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
284 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
285 )
285 )
286
286
287 svfs = repo.svfs
287 svfs = repo.svfs
288 debugflag = repo.ui.debugflag
288 debugflag = repo.ui.debugflag
289
289
290 def emitrevlogdata():
290 def emitrevlogdata():
291 for name, size in entries:
291 for name, size in entries:
292 if debugflag:
292 if debugflag:
293 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
293 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
294 # partially encode name over the wire for backwards compat
294 # partially encode name over the wire for backwards compat
295 yield b'%s\0%d\n' % (store.encodedir(name), size)
295 yield b'%s\0%d\n' % (store.encodedir(name), size)
296 # auditing at this stage is both pointless (paths are already
296 # auditing at this stage is both pointless (paths are already
297 # trusted by the local repo) and expensive
297 # trusted by the local repo) and expensive
298 with svfs(name, b'rb', auditpath=False) as fp:
298 with svfs(name, b'rb', auditpath=False) as fp:
299 if size <= 65536:
299 if size <= 65536:
300 yield fp.read(size)
300 yield fp.read(size)
301 else:
301 else:
302 for chunk in util.filechunkiter(fp, limit=size):
302 for chunk in util.filechunkiter(fp, limit=size):
303 yield chunk
303 yield chunk
304
304
305 return len(entries), total_bytes, emitrevlogdata()
305 return len(entries), total_bytes, emitrevlogdata()
306
306
307
307
308 def generatev1wireproto(repo):
308 def generatev1wireproto(repo):
309 """Emit content for version 1 of streaming clone suitable for the wire.
309 """Emit content for version 1 of streaming clone suitable for the wire.
310
310
311 This is the data output from ``generatev1()`` with 2 header lines. The
311 This is the data output from ``generatev1()`` with 2 header lines. The
312 first line indicates overall success. The 2nd contains the file count and
312 first line indicates overall success. The 2nd contains the file count and
313 byte size of payload.
313 byte size of payload.
314
314
315 The success line contains "0" for success, "1" for stream generation not
315 The success line contains "0" for success, "1" for stream generation not
316 allowed, and "2" for error locking the repository (possibly indicating
316 allowed, and "2" for error locking the repository (possibly indicating
317 a permissions error for the server process).
317 a permissions error for the server process).
318 """
318 """
319 if not allowservergeneration(repo):
319 if not allowservergeneration(repo):
320 yield b'1\n'
320 yield b'1\n'
321 return
321 return
322
322
323 try:
323 try:
324 filecount, bytecount, it = generatev1(repo)
324 filecount, bytecount, it = generatev1(repo)
325 except error.LockError:
325 except error.LockError:
326 yield b'2\n'
326 yield b'2\n'
327 return
327 return
328
328
329 # Indicates successful response.
329 # Indicates successful response.
330 yield b'0\n'
330 yield b'0\n'
331 yield b'%d %d\n' % (filecount, bytecount)
331 yield b'%d %d\n' % (filecount, bytecount)
332 for chunk in it:
332 for chunk in it:
333 yield chunk
333 yield chunk
334
334
335
335
336 def generatebundlev1(repo, compression=b'UN'):
336 def generatebundlev1(repo, compression=b'UN'):
337 """Emit content for version 1 of a stream clone bundle.
337 """Emit content for version 1 of a stream clone bundle.
338
338
339 The first 4 bytes of the output ("HGS1") denote this as stream clone
339 The first 4 bytes of the output ("HGS1") denote this as stream clone
340 bundle version 1.
340 bundle version 1.
341
341
342 The next 2 bytes indicate the compression type. Only "UN" is currently
342 The next 2 bytes indicate the compression type. Only "UN" is currently
343 supported.
343 supported.
344
344
345 The next 16 bytes are two 64-bit big endian unsigned integers indicating
345 The next 16 bytes are two 64-bit big endian unsigned integers indicating
346 file count and byte count, respectively.
346 file count and byte count, respectively.
347
347
348 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
348 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
349 of the requirements string, including a trailing \0. The following N bytes
349 of the requirements string, including a trailing \0. The following N bytes
350 are the requirements string, which is ASCII containing a comma-delimited
350 are the requirements string, which is ASCII containing a comma-delimited
351 list of repo requirements that are needed to support the data.
351 list of repo requirements that are needed to support the data.
352
352
353 The remaining content is the output of ``generatev1()`` (which may be
353 The remaining content is the output of ``generatev1()`` (which may be
354 compressed in the future).
354 compressed in the future).
355
355
356 Returns a tuple of (requirements, data generator).
356 Returns a tuple of (requirements, data generator).
357 """
357 """
358 if compression != b'UN':
358 if compression != b'UN':
359 raise ValueError(b'we do not support the compression argument yet')
359 raise ValueError(b'we do not support the compression argument yet')
360
360
361 requirements = streamed_requirements(repo)
361 requirements = streamed_requirements(repo)
362 requires = b','.join(sorted(requirements))
362 requires = b','.join(sorted(requirements))
363
363
364 def gen():
364 def gen():
365 yield b'HGS1'
365 yield b'HGS1'
366 yield compression
366 yield compression
367
367
368 filecount, bytecount, it = generatev1(repo)
368 filecount, bytecount, it = generatev1(repo)
369 repo.ui.status(
369 repo.ui.status(
370 _(b'writing %d bytes for %d files\n') % (bytecount, filecount)
370 _(b'writing %d bytes for %d files\n') % (bytecount, filecount)
371 )
371 )
372
372
373 yield struct.pack(b'>QQ', filecount, bytecount)
373 yield struct.pack(b'>QQ', filecount, bytecount)
374 yield struct.pack(b'>H', len(requires) + 1)
374 yield struct.pack(b'>H', len(requires) + 1)
375 yield requires + b'\0'
375 yield requires + b'\0'
376
376
377 # This is where we'll add compression in the future.
377 # This is where we'll add compression in the future.
378 assert compression == b'UN'
378 assert compression == b'UN'
379
379
380 progress = repo.ui.makeprogress(
380 progress = repo.ui.makeprogress(
381 _(b'bundle'), total=bytecount, unit=_(b'bytes')
381 _(b'bundle'), total=bytecount, unit=_(b'bytes')
382 )
382 )
383 progress.update(0)
383 progress.update(0)
384
384
385 for chunk in it:
385 for chunk in it:
386 progress.increment(step=len(chunk))
386 progress.increment(step=len(chunk))
387 yield chunk
387 yield chunk
388
388
389 progress.complete()
389 progress.complete()
390
390
391 return requirements, gen()
391 return requirements, gen()
392
392
393
393
394 def consumev1(repo, fp, filecount, bytecount):
394 def consumev1(repo, fp, filecount, bytecount):
395 """Apply the contents from version 1 of a streaming clone file handle.
395 """Apply the contents from version 1 of a streaming clone file handle.
396
396
397 This takes the output from "stream_out" and applies it to the specified
397 This takes the output from "stream_out" and applies it to the specified
398 repository.
398 repository.
399
399
400 Like "stream_out," the status line added by the wire protocol is not
400 Like "stream_out," the status line added by the wire protocol is not
401 handled by this function.
401 handled by this function.
402 """
402 """
403 with repo.lock():
403 with repo.lock():
404 repo.ui.status(
404 repo.ui.status(
405 _(b'%d files to transfer, %s of data\n')
405 _(b'%d files to transfer, %s of data\n')
406 % (filecount, util.bytecount(bytecount))
406 % (filecount, util.bytecount(bytecount))
407 )
407 )
408 progress = repo.ui.makeprogress(
408 progress = repo.ui.makeprogress(
409 _(b'clone'), total=bytecount, unit=_(b'bytes')
409 _(b'clone'), total=bytecount, unit=_(b'bytes')
410 )
410 )
411 progress.update(0)
411 progress.update(0)
412 start = util.timer()
412 start = util.timer()
413
413
414 # TODO: get rid of (potential) inconsistency
414 # TODO: get rid of (potential) inconsistency
415 #
415 #
416 # If transaction is started and any @filecache property is
416 # If transaction is started and any @filecache property is
417 # changed at this point, it causes inconsistency between
417 # changed at this point, it causes inconsistency between
418 # in-memory cached property and streamclone-ed file on the
418 # in-memory cached property and streamclone-ed file on the
419 # disk. Nested transaction prevents transaction scope "clone"
419 # disk. Nested transaction prevents transaction scope "clone"
420 # below from writing in-memory changes out at the end of it,
420 # below from writing in-memory changes out at the end of it,
421 # even though in-memory changes are discarded at the end of it
421 # even though in-memory changes are discarded at the end of it
422 # regardless of transaction nesting.
422 # regardless of transaction nesting.
423 #
423 #
424 # But transaction nesting can't be simply prohibited, because
424 # But transaction nesting can't be simply prohibited, because
425 # nesting occurs also in ordinary case (e.g. enabling
425 # nesting occurs also in ordinary case (e.g. enabling
426 # clonebundles).
426 # clonebundles).
427
427
428 with repo.transaction(b'clone'):
428 with repo.transaction(b'clone'):
429 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
429 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
430 for i in range(filecount):
430 for i in range(filecount):
431 # XXX doesn't support '\n' or '\r' in filenames
431 # XXX doesn't support '\n' or '\r' in filenames
432 l = fp.readline()
432 l = fp.readline()
433 try:
433 try:
434 name, size = l.split(b'\0', 1)
434 name, size = l.split(b'\0', 1)
435 size = int(size)
435 size = int(size)
436 except (ValueError, TypeError):
436 except (ValueError, TypeError):
437 raise error.ResponseError(
437 raise error.ResponseError(
438 _(b'unexpected response from remote server:'), l
438 _(b'unexpected response from remote server:'), l
439 )
439 )
440 if repo.ui.debugflag:
440 if repo.ui.debugflag:
441 repo.ui.debug(
441 repo.ui.debug(
442 b'adding %s (%s)\n' % (name, util.bytecount(size))
442 b'adding %s (%s)\n' % (name, util.bytecount(size))
443 )
443 )
444 # for backwards compat, name was partially encoded
444 # for backwards compat, name was partially encoded
445 path = store.decodedir(name)
445 path = store.decodedir(name)
446 with repo.svfs(path, b'w', backgroundclose=True) as ofp:
446 with repo.svfs(path, b'w', backgroundclose=True) as ofp:
447 for chunk in util.filechunkiter(fp, limit=size):
447 for chunk in util.filechunkiter(fp, limit=size):
448 progress.increment(step=len(chunk))
448 progress.increment(step=len(chunk))
449 ofp.write(chunk)
449 ofp.write(chunk)
450
450
451 # force @filecache properties to be reloaded from
451 # force @filecache properties to be reloaded from
452 # streamclone-ed file at next access
452 # streamclone-ed file at next access
453 repo.invalidate(clearfilecache=True)
453 repo.invalidate(clearfilecache=True)
454
454
455 elapsed = util.timer() - start
455 elapsed = util.timer() - start
456 if elapsed <= 0:
456 if elapsed <= 0:
457 elapsed = 0.001
457 elapsed = 0.001
458 progress.complete()
458 progress.complete()
459 repo.ui.status(
459 repo.ui.status(
460 _(b'transferred %s in %.1f seconds (%s/sec)\n')
460 _(b'transferred %s in %.1f seconds (%s/sec)\n')
461 % (
461 % (
462 util.bytecount(bytecount),
462 util.bytecount(bytecount),
463 elapsed,
463 elapsed,
464 util.bytecount(bytecount / elapsed),
464 util.bytecount(bytecount / elapsed),
465 )
465 )
466 )
466 )
467
467
468
468
469 def readbundle1header(fp):
469 def readbundle1header(fp):
470 compression = fp.read(2)
470 compression = fp.read(2)
471 if compression != b'UN':
471 if compression != b'UN':
472 raise error.Abort(
472 raise error.Abort(
473 _(
473 _(
474 b'only uncompressed stream clone bundles are '
474 b'only uncompressed stream clone bundles are '
475 b'supported; got %s'
475 b'supported; got %s'
476 )
476 )
477 % compression
477 % compression
478 )
478 )
479
479
480 filecount, bytecount = struct.unpack(b'>QQ', fp.read(16))
480 filecount, bytecount = struct.unpack(b'>QQ', fp.read(16))
481 requireslen = struct.unpack(b'>H', fp.read(2))[0]
481 requireslen = struct.unpack(b'>H', fp.read(2))[0]
482 requires = fp.read(requireslen)
482 requires = fp.read(requireslen)
483
483
484 if not requires.endswith(b'\0'):
484 if not requires.endswith(b'\0'):
485 raise error.Abort(
485 raise error.Abort(
486 _(
486 _(
487 b'malformed stream clone bundle: '
487 b'malformed stream clone bundle: '
488 b'requirements not properly encoded'
488 b'requirements not properly encoded'
489 )
489 )
490 )
490 )
491
491
492 requirements = set(requires.rstrip(b'\0').split(b','))
492 requirements = set(requires.rstrip(b'\0').split(b','))
493
493
494 return filecount, bytecount, requirements
494 return filecount, bytecount, requirements
495
495
496
496
497 def applybundlev1(repo, fp):
497 def applybundlev1(repo, fp):
498 """Apply the content from a stream clone bundle version 1.
498 """Apply the content from a stream clone bundle version 1.
499
499
500 We assume the 4 byte header has been read and validated and the file handle
500 We assume the 4 byte header has been read and validated and the file handle
501 is at the 2 byte compression identifier.
501 is at the 2 byte compression identifier.
502 """
502 """
503 if len(repo):
503 if len(repo):
504 raise error.Abort(
504 raise error.Abort(
505 _(b'cannot apply stream clone bundle on non-empty repo')
505 _(b'cannot apply stream clone bundle on non-empty repo')
506 )
506 )
507
507
508 filecount, bytecount, requirements = readbundle1header(fp)
508 filecount, bytecount, requirements = readbundle1header(fp)
509 missingreqs = requirements - repo.supported
509 missingreqs = requirements - repo.supported
510 if missingreqs:
510 if missingreqs:
511 raise error.Abort(
511 raise error.Abort(
512 _(b'unable to apply stream clone: unsupported format: %s')
512 _(b'unable to apply stream clone: unsupported format: %s')
513 % b', '.join(sorted(missingreqs))
513 % b', '.join(sorted(missingreqs))
514 )
514 )
515
515
516 consumev1(repo, fp, filecount, bytecount)
516 consumev1(repo, fp, filecount, bytecount)
517 nodemap.post_stream_cleanup(repo)
517 nodemap.post_stream_cleanup(repo)
518
518
519
519
520 class streamcloneapplier:
520 class streamcloneapplier:
521 """Class to manage applying streaming clone bundles.
521 """Class to manage applying streaming clone bundles.
522
522
523 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
523 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
524 readers to perform bundle type-specific functionality.
524 readers to perform bundle type-specific functionality.
525 """
525 """
526
526
527 def __init__(self, fh):
527 def __init__(self, fh):
528 self._fh = fh
528 self._fh = fh
529
529
530 def apply(self, repo):
530 def apply(self, repo):
531 return applybundlev1(repo, self._fh)
531 return applybundlev1(repo, self._fh)
532
532
533
533
534 # type of file to stream
534 # type of file to stream
535 _fileappend = 0 # append only file
535 _fileappend = 0 # append only file
536 _filefull = 1 # full snapshot file
536 _filefull = 1 # full snapshot file
537
537
538 # Source of the file
538 # Source of the file
539 _srcstore = b's' # store (svfs)
539 _srcstore = b's' # store (svfs)
540 _srccache = b'c' # cache (cache)
540 _srccache = b'c' # cache (cache)
541
541
542 # This is it's own function so extensions can override it.
542 # This is it's own function so extensions can override it.
543 def _walkstreamfullstorefiles(repo):
543 def _walkstreamfullstorefiles(repo):
544 """list snapshot file from the store"""
544 """list snapshot file from the store"""
545 fnames = []
545 fnames = []
546 if not repo.publishing():
546 if not repo.publishing():
547 fnames.append(b'phaseroots')
547 fnames.append(b'phaseroots')
548 return fnames
548 return fnames
549
549
550
550
551 def _filterfull(entry, copy, vfsmap):
551 def _filterfull(entry, copy, vfsmap):
552 """actually copy the snapshot files"""
552 """actually copy the snapshot files"""
553 src, name, ftype, data = entry
553 src, name, ftype, data = entry
554 if ftype != _filefull:
554 if ftype != _filefull:
555 return entry
555 return entry
556 return (src, name, ftype, copy(vfsmap[src].join(name)))
556 return (src, name, ftype, copy(vfsmap[src].join(name)))
557
557
558
558
559 class TempCopyManager:
559 class TempCopyManager:
560 """Manage temporary backup of volatile file during stream clone
560 """Manage temporary backup of volatile file during stream clone
561
561
562 This should be used as a Python context, the copies will be discarded when
562 This should be used as a Python context, the copies will be discarded when
563 exiting the context.
563 exiting the context.
564
564
565 A copy can be done by calling the object on the real path (encoded full
565 A copy can be done by calling the object on the real path (encoded full
566 path)
566 path)
567
567
568 The backup path can be retrieved using the __getitem__ protocol, obj[path].
568 The backup path can be retrieved using the __getitem__ protocol, obj[path].
569 On file without backup, it will return the unmodified path. (equivalent to
569 On file without backup, it will return the unmodified path. (equivalent to
570 `dict.get(x, x)`)
570 `dict.get(x, x)`)
571 """
571 """
572
572
573 def __init__(self):
573 def __init__(self):
574 self._copies = None
574 self._copies = None
575 self._dst_dir = None
575 self._dst_dir = None
576
576
577 def __enter__(self):
577 def __enter__(self):
578 if self._copies is not None:
578 if self._copies is not None:
579 msg = "Copies context already open"
579 msg = "Copies context already open"
580 raise error.ProgrammingError(msg)
580 raise error.ProgrammingError(msg)
581 self._copies = {}
581 self._copies = {}
582 self._dst_dir = pycompat.mkdtemp(prefix=b'hg-clone-')
582 self._dst_dir = pycompat.mkdtemp(prefix=b'hg-clone-')
583 return self
583 return self
584
584
585 def __call__(self, src):
585 def __call__(self, src):
586 """create a backup of the file at src"""
586 """create a backup of the file at src"""
587 prefix = os.path.basename(src)
587 prefix = os.path.basename(src)
588 fd, dst = pycompat.mkstemp(prefix=prefix, dir=self._dst_dir)
588 fd, dst = pycompat.mkstemp(prefix=prefix, dir=self._dst_dir)
589 os.close(fd)
589 os.close(fd)
590 self._copies[src] = dst
590 self._copies[src] = dst
591 util.copyfiles(src, dst, hardlink=True)
591 util.copyfiles(src, dst, hardlink=True)
592 return dst
592 return dst
593
593
594 def __getitem__(self, src):
594 def __getitem__(self, src):
595 """return the path to a valid version of `src`
595 """return the path to a valid version of `src`
596
596
597 If the file has no backup, the path of the file is returned
597 If the file has no backup, the path of the file is returned
598 unmodified."""
598 unmodified."""
599 return self._copies.get(src, src)
599 return self._copies.get(src, src)
600
600
601 def __exit__(self, *args, **kwars):
601 def __exit__(self, *args, **kwars):
602 """discard all backups"""
602 """discard all backups"""
603 for tmp in self._copies.values():
603 for tmp in self._copies.values():
604 util.tryunlink(tmp)
604 util.tryunlink(tmp)
605 util.tryrmdir(self._dst_dir)
605 util.tryrmdir(self._dst_dir)
606 self._copies = None
606 self._copies = None
607 self._dst_dir = None
607 self._dst_dir = None
608
608
609
609
610 def _makemap(repo):
610 def _makemap(repo):
611 """make a (src -> vfs) map for the repo"""
611 """make a (src -> vfs) map for the repo"""
612 vfsmap = {
612 vfsmap = {
613 _srcstore: repo.svfs,
613 _srcstore: repo.svfs,
614 _srccache: repo.cachevfs,
614 _srccache: repo.cachevfs,
615 }
615 }
616 # we keep repo.vfs out of the on purpose, ther are too many danger there
616 # we keep repo.vfs out of the on purpose, ther are too many danger there
617 # (eg: .hg/hgrc)
617 # (eg: .hg/hgrc)
618 assert repo.vfs not in vfsmap.values()
618 assert repo.vfs not in vfsmap.values()
619
619
620 return vfsmap
620 return vfsmap
621
621
622
622
623 def _emit2(repo, entries, totalfilesize):
623 def _emit2(repo, entries):
624 """actually emit the stream bundle"""
624 """actually emit the stream bundle"""
625 vfsmap = _makemap(repo)
625 vfsmap = _makemap(repo)
626 # we keep repo.vfs out of the on purpose, ther are too many danger there
626 # we keep repo.vfs out of the on purpose, ther are too many danger there
627 # (eg: .hg/hgrc),
627 # (eg: .hg/hgrc),
628 #
628 #
629 # this assert is duplicated (from _makemap) as author might think this is
629 # this assert is duplicated (from _makemap) as author might think this is
630 # fine, while this is really not fine.
630 # fine, while this is really not fine.
631 if repo.vfs in vfsmap.values():
631 if repo.vfs in vfsmap.values():
632 raise error.ProgrammingError(
632 raise error.ProgrammingError(
633 b'repo.vfs must not be added to vfsmap for security reasons'
633 b'repo.vfs must not be added to vfsmap for security reasons'
634 )
634 )
635
635
636 # translate the vfs one
637 entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries]
638
639 file_count = totalfilesize = 0
640 # record the expected size of every file
641 for k, vfs, e in entries:
642 for f in e.files():
643 file_count += 1
644 totalfilesize += f.file_size(vfs)
645
636 progress = repo.ui.makeprogress(
646 progress = repo.ui.makeprogress(
637 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
647 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
638 )
648 )
639 progress.update(0)
649 progress.update(0)
640 with TempCopyManager() as copy, progress:
650 with TempCopyManager() as copy, progress:
641 # copy is delayed until we are in the try
651 # create a copy of volatile files
642 entries = [_filterfull(e, copy, vfsmap) for e in entries]
652 for k, vfs, e in entries:
643 yield None # this release the lock on the repository
653 for f in e.files():
654 if f.is_volatile:
655 copy(vfs.join(f.unencoded_path))
656 # the first yield release the lock on the repository
657 yield file_count, totalfilesize
644 totalbytecount = 0
658 totalbytecount = 0
645
659
646 for src, name, ftype, data in entries:
660 for src, vfs, e in entries:
647 if True:
661 for f in e.files():
648 vfs = vfsmap[src]
649 yield src
662 yield src
663 name = f.unencoded_path
650 yield util.uvarintencode(len(name))
664 yield util.uvarintencode(len(name))
651 if ftype == _fileappend:
665 actual_path = copy[vfs.join(name)]
652 fp = vfs(name)
666 fp = open(actual_path, b'rb')
653 size = data
667 size = f.file_size(vfs)
654 elif ftype == _filefull:
655 fp = open(data, b'rb')
656 size = util.fstat(fp).st_size
657 bytecount = 0
668 bytecount = 0
658 try:
669 try:
659 yield util.uvarintencode(size)
670 yield util.uvarintencode(size)
660 yield name
671 yield name
661 if size <= 65536:
672 if size <= 65536:
662 chunks = (fp.read(size),)
673 chunks = (fp.read(size),)
663 else:
674 else:
664 chunks = util.filechunkiter(fp, limit=size)
675 chunks = util.filechunkiter(fp, limit=size)
665 for chunk in chunks:
676 for chunk in chunks:
666 bytecount += len(chunk)
677 bytecount += len(chunk)
667 totalbytecount += len(chunk)
678 totalbytecount += len(chunk)
668 progress.update(totalbytecount)
679 progress.update(totalbytecount)
669 yield chunk
680 yield chunk
670 if bytecount != size:
681 if bytecount != size:
671 # Would most likely be caused by a race due to `hg
682 # Would most likely be caused by a race due to `hg
672 # strip` or a revlog split
683 # strip` or a revlog split
673 msg = _(
684 msg = _(
674 b'clone could only read %d bytes from %s, but '
685 b'clone could only read %d bytes from %s, but '
675 b'expected %d bytes'
686 b'expected %d bytes'
676 )
687 )
677 raise error.Abort(msg % (bytecount, name, size))
688 raise error.Abort(msg % (bytecount, name, size))
678 finally:
689 finally:
679 fp.close()
690 fp.close()
680
691
681
692
682 def _test_sync_point_walk_1(repo):
693 def _test_sync_point_walk_1(repo):
683 """a function for synchronisation during tests"""
694 """a function for synchronisation during tests"""
684
695
685
696
686 def _test_sync_point_walk_2(repo):
697 def _test_sync_point_walk_2(repo):
687 """a function for synchronisation during tests"""
698 """a function for synchronisation during tests"""
688
699
689
700
690 def _entries_walk(repo, includes, excludes, includeobsmarkers):
701 def _entries_walk(repo, includes, excludes, includeobsmarkers):
691 """emit a seris of files information useful to clone a repo
702 """emit a seris of files information useful to clone a repo
692
703
693 return (vfs-key, entry) iterator
704 return (vfs-key, entry) iterator
694
705
695 Where `entry` is StoreEntry. (used even for cache entries)
706 Where `entry` is StoreEntry. (used even for cache entries)
696 """
707 """
697 assert repo._currentlock(repo._lockref) is not None
708 assert repo._currentlock(repo._lockref) is not None
698
709
699 matcher = None
710 matcher = None
700 if includes or excludes:
711 if includes or excludes:
701 matcher = narrowspec.match(repo.root, includes, excludes)
712 matcher = narrowspec.match(repo.root, includes, excludes)
702
713
703 phase = not repo.publishing()
714 phase = not repo.publishing()
704 entries = _walkstreamfiles(
715 entries = _walkstreamfiles(
705 repo,
716 repo,
706 matcher,
717 matcher,
707 phase=phase,
718 phase=phase,
708 obsolescence=includeobsmarkers,
719 obsolescence=includeobsmarkers,
709 )
720 )
710 for entry in entries:
721 for entry in entries:
711 yield (_srcstore, entry)
722 yield (_srcstore, entry)
712
723
713 for name in cacheutil.cachetocopy(repo):
724 for name in cacheutil.cachetocopy(repo):
714 if repo.cachevfs.exists(name):
725 if repo.cachevfs.exists(name):
715 # not really a StoreEntry, but close enough
726 # not really a StoreEntry, but close enough
716 entry = store.SimpleStoreEntry(
727 entry = store.SimpleStoreEntry(
717 entry_path=name,
728 entry_path=name,
718 is_volatile=True,
729 is_volatile=True,
719 )
730 )
720 yield (_srccache, entry)
731 yield (_srccache, entry)
721
732
722
733
723 def _v2_walk(repo, includes, excludes, includeobsmarkers):
734 def _v2_walk(repo, includes, excludes, includeobsmarkers):
724 """emit a seris of files information useful to clone a repo
735 """emit a seris of files information useful to clone a repo
725
736
726 return (entries, totalfilesize)
737 return (entries, totalfilesize)
727
738
728 entries is a list of tuple (vfs-key, file-path, file-type, size)
739 entries is a list of tuple (vfs-key, file-path, file-type, size)
729
740
730 - `vfs-key`: is a key to the right vfs to write the file (see _makemap)
741 - `vfs-key`: is a key to the right vfs to write the file (see _makemap)
731 - `name`: file path of the file to copy (to be feed to the vfss)
742 - `name`: file path of the file to copy (to be feed to the vfss)
732 - `file-type`: do this file need to be copied with the source lock ?
743 - `file-type`: do this file need to be copied with the source lock ?
733 - `size`: the size of the file (or None)
744 - `size`: the size of the file (or None)
734 """
745 """
735 assert repo._currentlock(repo._lockref) is not None
746 assert repo._currentlock(repo._lockref) is not None
736 files = []
747 files = []
737 totalfilesize = 0
748 totalfilesize = 0
738
749
739 vfsmap = _makemap(repo)
750 vfsmap = _makemap(repo)
740 entries = _entries_walk(repo, includes, excludes, includeobsmarkers)
751 entries = _entries_walk(repo, includes, excludes, includeobsmarkers)
741 for vfs_key, entry in entries:
752 for vfs_key, entry in entries:
742 vfs = vfsmap[vfs_key]
753 vfs = vfsmap[vfs_key]
743 for f in entry.files():
754 for f in entry.files():
744 file_size = f.file_size(vfs)
755 file_size = f.file_size(vfs)
745 if file_size:
756 if file_size:
746 ft = _fileappend
757 ft = _fileappend
747 if f.is_volatile:
758 if f.is_volatile:
748 ft = _filefull
759 ft = _filefull
749 files.append((vfs_key, f.unencoded_path, ft, file_size))
760 files.append((vfs_key, f.unencoded_path, ft, file_size))
750 totalfilesize += file_size
761 totalfilesize += file_size
751 return files, totalfilesize
762 return files, totalfilesize
752
763
753
764
754 def generatev2(repo, includes, excludes, includeobsmarkers):
765 def generatev2(repo, includes, excludes, includeobsmarkers):
755 """Emit content for version 2 of a streaming clone.
766 """Emit content for version 2 of a streaming clone.
756
767
757 the data stream consists the following entries:
768 the data stream consists the following entries:
758 1) A char representing the file destination (eg: store or cache)
769 1) A char representing the file destination (eg: store or cache)
759 2) A varint containing the length of the filename
770 2) A varint containing the length of the filename
760 3) A varint containing the length of file data
771 3) A varint containing the length of file data
761 4) N bytes containing the filename (the internal, store-agnostic form)
772 4) N bytes containing the filename (the internal, store-agnostic form)
762 5) N bytes containing the file data
773 5) N bytes containing the file data
763
774
764 Returns a 3-tuple of (file count, file size, data iterator).
775 Returns a 3-tuple of (file count, file size, data iterator).
765 """
776 """
766
777
767 with repo.lock():
778 with repo.lock():
768
779
769 repo.ui.debug(b'scanning\n')
780 repo.ui.debug(b'scanning\n')
770
781
771 entries, totalfilesize = _v2_walk(
782 entries = _entries_walk(
772 repo,
783 repo,
773 includes=includes,
784 includes=includes,
774 excludes=excludes,
785 excludes=excludes,
775 includeobsmarkers=includeobsmarkers,
786 includeobsmarkers=includeobsmarkers,
776 )
787 )
777
788
778 chunks = _emit2(repo, entries, totalfilesize)
789 chunks = _emit2(repo, entries)
779 first = next(chunks)
790 first = next(chunks)
780 assert first is None
791 file_count, total_file_size = first
781 _test_sync_point_walk_1(repo)
792 _test_sync_point_walk_1(repo)
782 _test_sync_point_walk_2(repo)
793 _test_sync_point_walk_2(repo)
783
794
784 return len(entries), totalfilesize, chunks
795 return file_count, total_file_size, chunks
785
796
786
797
787 def generatev3(repo, includes, excludes, includeobsmarkers):
798 def generatev3(repo, includes, excludes, includeobsmarkers):
788 return generatev2(repo, includes, excludes, includeobsmarkers)
799 return generatev2(repo, includes, excludes, includeobsmarkers)
789
800
790
801
791 @contextlib.contextmanager
802 @contextlib.contextmanager
792 def nested(*ctxs):
803 def nested(*ctxs):
793 this = ctxs[0]
804 this = ctxs[0]
794 rest = ctxs[1:]
805 rest = ctxs[1:]
795 with this:
806 with this:
796 if rest:
807 if rest:
797 with nested(*rest):
808 with nested(*rest):
798 yield
809 yield
799 else:
810 else:
800 yield
811 yield
801
812
802
813
803 def consumev2(repo, fp, filecount, filesize):
814 def consumev2(repo, fp, filecount, filesize):
804 """Apply the contents from a version 2 streaming clone.
815 """Apply the contents from a version 2 streaming clone.
805
816
806 Data is read from an object that only needs to provide a ``read(size)``
817 Data is read from an object that only needs to provide a ``read(size)``
807 method.
818 method.
808 """
819 """
809 with repo.lock():
820 with repo.lock():
810 repo.ui.status(
821 repo.ui.status(
811 _(b'%d files to transfer, %s of data\n')
822 _(b'%d files to transfer, %s of data\n')
812 % (filecount, util.bytecount(filesize))
823 % (filecount, util.bytecount(filesize))
813 )
824 )
814
825
815 start = util.timer()
826 start = util.timer()
816 progress = repo.ui.makeprogress(
827 progress = repo.ui.makeprogress(
817 _(b'clone'), total=filesize, unit=_(b'bytes')
828 _(b'clone'), total=filesize, unit=_(b'bytes')
818 )
829 )
819 progress.update(0)
830 progress.update(0)
820
831
821 vfsmap = _makemap(repo)
832 vfsmap = _makemap(repo)
822 # we keep repo.vfs out of the on purpose, ther are too many danger
833 # we keep repo.vfs out of the on purpose, ther are too many danger
823 # there (eg: .hg/hgrc),
834 # there (eg: .hg/hgrc),
824 #
835 #
825 # this assert is duplicated (from _makemap) as author might think this
836 # this assert is duplicated (from _makemap) as author might think this
826 # is fine, while this is really not fine.
837 # is fine, while this is really not fine.
827 if repo.vfs in vfsmap.values():
838 if repo.vfs in vfsmap.values():
828 raise error.ProgrammingError(
839 raise error.ProgrammingError(
829 b'repo.vfs must not be added to vfsmap for security reasons'
840 b'repo.vfs must not be added to vfsmap for security reasons'
830 )
841 )
831
842
832 with repo.transaction(b'clone'):
843 with repo.transaction(b'clone'):
833 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
844 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
834 with nested(*ctxs):
845 with nested(*ctxs):
835 for i in range(filecount):
846 for i in range(filecount):
836 src = util.readexactly(fp, 1)
847 src = util.readexactly(fp, 1)
837 vfs = vfsmap[src]
848 vfs = vfsmap[src]
838 namelen = util.uvarintdecodestream(fp)
849 namelen = util.uvarintdecodestream(fp)
839 datalen = util.uvarintdecodestream(fp)
850 datalen = util.uvarintdecodestream(fp)
840
851
841 name = util.readexactly(fp, namelen)
852 name = util.readexactly(fp, namelen)
842
853
843 if repo.ui.debugflag:
854 if repo.ui.debugflag:
844 repo.ui.debug(
855 repo.ui.debug(
845 b'adding [%s] %s (%s)\n'
856 b'adding [%s] %s (%s)\n'
846 % (src, name, util.bytecount(datalen))
857 % (src, name, util.bytecount(datalen))
847 )
858 )
848
859
849 with vfs(name, b'w') as ofp:
860 with vfs(name, b'w') as ofp:
850 for chunk in util.filechunkiter(fp, limit=datalen):
861 for chunk in util.filechunkiter(fp, limit=datalen):
851 progress.increment(step=len(chunk))
862 progress.increment(step=len(chunk))
852 ofp.write(chunk)
863 ofp.write(chunk)
853
864
854 # force @filecache properties to be reloaded from
865 # force @filecache properties to be reloaded from
855 # streamclone-ed file at next access
866 # streamclone-ed file at next access
856 repo.invalidate(clearfilecache=True)
867 repo.invalidate(clearfilecache=True)
857
868
858 elapsed = util.timer() - start
869 elapsed = util.timer() - start
859 if elapsed <= 0:
870 if elapsed <= 0:
860 elapsed = 0.001
871 elapsed = 0.001
861 repo.ui.status(
872 repo.ui.status(
862 _(b'transferred %s in %.1f seconds (%s/sec)\n')
873 _(b'transferred %s in %.1f seconds (%s/sec)\n')
863 % (
874 % (
864 util.bytecount(progress.pos),
875 util.bytecount(progress.pos),
865 elapsed,
876 elapsed,
866 util.bytecount(progress.pos / elapsed),
877 util.bytecount(progress.pos / elapsed),
867 )
878 )
868 )
879 )
869 progress.complete()
880 progress.complete()
870
881
871
882
872 def applybundlev2(repo, fp, filecount, filesize, requirements):
883 def applybundlev2(repo, fp, filecount, filesize, requirements):
873 from . import localrepo
884 from . import localrepo
874
885
875 missingreqs = [r for r in requirements if r not in repo.supported]
886 missingreqs = [r for r in requirements if r not in repo.supported]
876 if missingreqs:
887 if missingreqs:
877 raise error.Abort(
888 raise error.Abort(
878 _(b'unable to apply stream clone: unsupported format: %s')
889 _(b'unable to apply stream clone: unsupported format: %s')
879 % b', '.join(sorted(missingreqs))
890 % b', '.join(sorted(missingreqs))
880 )
891 )
881
892
882 consumev2(repo, fp, filecount, filesize)
893 consumev2(repo, fp, filecount, filesize)
883
894
884 repo.requirements = new_stream_clone_requirements(
895 repo.requirements = new_stream_clone_requirements(
885 repo.requirements,
896 repo.requirements,
886 requirements,
897 requirements,
887 )
898 )
888 repo.svfs.options = localrepo.resolvestorevfsoptions(
899 repo.svfs.options = localrepo.resolvestorevfsoptions(
889 repo.ui, repo.requirements, repo.features
900 repo.ui, repo.requirements, repo.features
890 )
901 )
891 scmutil.writereporequirements(repo)
902 scmutil.writereporequirements(repo)
892 nodemap.post_stream_cleanup(repo)
903 nodemap.post_stream_cleanup(repo)
893
904
894
905
895 def _copy_files(src_vfs_map, dst_vfs_map, entries, progress):
906 def _copy_files(src_vfs_map, dst_vfs_map, entries, progress):
896 hardlink = [True]
907 hardlink = [True]
897
908
898 def copy_used():
909 def copy_used():
899 hardlink[0] = False
910 hardlink[0] = False
900 progress.topic = _(b'copying')
911 progress.topic = _(b'copying')
901
912
902 for k, path in entries:
913 for k, path in entries:
903 src_vfs = src_vfs_map[k]
914 src_vfs = src_vfs_map[k]
904 dst_vfs = dst_vfs_map[k]
915 dst_vfs = dst_vfs_map[k]
905 src_path = src_vfs.join(path)
916 src_path = src_vfs.join(path)
906 dst_path = dst_vfs.join(path)
917 dst_path = dst_vfs.join(path)
907 # We cannot use dirname and makedirs of dst_vfs here because the store
918 # We cannot use dirname and makedirs of dst_vfs here because the store
908 # encoding confuses them. See issue 6581 for details.
919 # encoding confuses them. See issue 6581 for details.
909 dirname = os.path.dirname(dst_path)
920 dirname = os.path.dirname(dst_path)
910 if not os.path.exists(dirname):
921 if not os.path.exists(dirname):
911 util.makedirs(dirname)
922 util.makedirs(dirname)
912 dst_vfs.register_file(path)
923 dst_vfs.register_file(path)
913 # XXX we could use the #nb_bytes argument.
924 # XXX we could use the #nb_bytes argument.
914 util.copyfile(
925 util.copyfile(
915 src_path,
926 src_path,
916 dst_path,
927 dst_path,
917 hardlink=hardlink[0],
928 hardlink=hardlink[0],
918 no_hardlink_cb=copy_used,
929 no_hardlink_cb=copy_used,
919 check_fs_hardlink=False,
930 check_fs_hardlink=False,
920 )
931 )
921 progress.increment()
932 progress.increment()
922 return hardlink[0]
933 return hardlink[0]
923
934
924
935
925 def local_copy(src_repo, dest_repo):
936 def local_copy(src_repo, dest_repo):
926 """copy all content from one local repository to another
937 """copy all content from one local repository to another
927
938
928 This is useful for local clone"""
939 This is useful for local clone"""
929 src_store_requirements = {
940 src_store_requirements = {
930 r
941 r
931 for r in src_repo.requirements
942 for r in src_repo.requirements
932 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
943 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
933 }
944 }
934 dest_store_requirements = {
945 dest_store_requirements = {
935 r
946 r
936 for r in dest_repo.requirements
947 for r in dest_repo.requirements
937 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
948 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
938 }
949 }
939 assert src_store_requirements == dest_store_requirements
950 assert src_store_requirements == dest_store_requirements
940
951
941 with dest_repo.lock():
952 with dest_repo.lock():
942 with src_repo.lock():
953 with src_repo.lock():
943
954
944 # bookmark is not integrated to the streaming as it might use the
955 # bookmark is not integrated to the streaming as it might use the
945 # `repo.vfs` and they are too many sentitive data accessible
956 # `repo.vfs` and they are too many sentitive data accessible
946 # through `repo.vfs` to expose it to streaming clone.
957 # through `repo.vfs` to expose it to streaming clone.
947 src_book_vfs = bookmarks.bookmarksvfs(src_repo)
958 src_book_vfs = bookmarks.bookmarksvfs(src_repo)
948 srcbookmarks = src_book_vfs.join(b'bookmarks')
959 srcbookmarks = src_book_vfs.join(b'bookmarks')
949 bm_count = 0
960 bm_count = 0
950 if os.path.exists(srcbookmarks):
961 if os.path.exists(srcbookmarks):
951 bm_count = 1
962 bm_count = 1
952
963
953 entries = _entries_walk(
964 entries = _entries_walk(
954 src_repo,
965 src_repo,
955 includes=None,
966 includes=None,
956 excludes=None,
967 excludes=None,
957 includeobsmarkers=True,
968 includeobsmarkers=True,
958 )
969 )
959 entries = list(entries)
970 entries = list(entries)
960 src_vfs_map = _makemap(src_repo)
971 src_vfs_map = _makemap(src_repo)
961 dest_vfs_map = _makemap(dest_repo)
972 dest_vfs_map = _makemap(dest_repo)
962 total_files = sum(len(e[1].files()) for e in entries) + bm_count
973 total_files = sum(len(e[1].files()) for e in entries) + bm_count
963 progress = src_repo.ui.makeprogress(
974 progress = src_repo.ui.makeprogress(
964 topic=_(b'linking'),
975 topic=_(b'linking'),
965 total=total_files,
976 total=total_files,
966 unit=_(b'files'),
977 unit=_(b'files'),
967 )
978 )
968 # copy files
979 # copy files
969 #
980 #
970 # We could copy the full file while the source repository is locked
981 # We could copy the full file while the source repository is locked
971 # and the other one without the lock. However, in the linking case,
982 # and the other one without the lock. However, in the linking case,
972 # this would also requires checks that nobody is appending any data
983 # this would also requires checks that nobody is appending any data
973 # to the files while we do the clone, so this is not done yet. We
984 # to the files while we do the clone, so this is not done yet. We
974 # could do this blindly when copying files.
985 # could do this blindly when copying files.
975 files = [
986 files = [
976 (vfs_key, f.unencoded_path)
987 (vfs_key, f.unencoded_path)
977 for vfs_key, e in entries
988 for vfs_key, e in entries
978 for f in e.files()
989 for f in e.files()
979 ]
990 ]
980 hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress)
991 hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress)
981
992
982 # copy bookmarks over
993 # copy bookmarks over
983 if bm_count:
994 if bm_count:
984 dst_book_vfs = bookmarks.bookmarksvfs(dest_repo)
995 dst_book_vfs = bookmarks.bookmarksvfs(dest_repo)
985 dstbookmarks = dst_book_vfs.join(b'bookmarks')
996 dstbookmarks = dst_book_vfs.join(b'bookmarks')
986 util.copyfile(srcbookmarks, dstbookmarks)
997 util.copyfile(srcbookmarks, dstbookmarks)
987 progress.complete()
998 progress.complete()
988 if hardlink:
999 if hardlink:
989 msg = b'linked %d files\n'
1000 msg = b'linked %d files\n'
990 else:
1001 else:
991 msg = b'copied %d files\n'
1002 msg = b'copied %d files\n'
992 src_repo.ui.debug(msg % total_files)
1003 src_repo.ui.debug(msg % total_files)
993
1004
994 with dest_repo.transaction(b"localclone") as tr:
1005 with dest_repo.transaction(b"localclone") as tr:
995 dest_repo.store.write(tr)
1006 dest_repo.store.write(tr)
996
1007
997 # clean up transaction file as they do not make sense
1008 # clean up transaction file as they do not make sense
998 transaction.cleanup_undo_files(dest_repo.ui.warn, dest_repo.vfs_map)
1009 transaction.cleanup_undo_files(dest_repo.ui.warn, dest_repo.vfs_map)
General Comments 0
You need to be logged in to leave comments. Login now