##// END OF EJS Templates
undo-files: use the cleanup function in streamclone...
marmoute -
r51187:97e91001 stable
parent child Browse files
Show More
@@ -1,949 +1,935 b''
1 # streamclone.py - producing and consuming streaming repository data
1 # streamclone.py - producing and consuming streaming repository data
2 #
2 #
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8
8
9 import contextlib
9 import contextlib
10 import errno
11 import os
10 import os
12 import struct
11 import struct
13
12
14 from .i18n import _
13 from .i18n import _
15 from .pycompat import open
14 from .pycompat import open
16 from .interfaces import repository
15 from .interfaces import repository
17 from . import (
16 from . import (
18 bookmarks,
17 bookmarks,
19 cacheutil,
18 cacheutil,
20 error,
19 error,
21 narrowspec,
20 narrowspec,
22 phases,
21 phases,
23 pycompat,
22 pycompat,
23 repair,
24 requirements as requirementsmod,
24 requirements as requirementsmod,
25 scmutil,
25 scmutil,
26 store,
26 store,
27 util,
27 util,
28 )
28 )
29 from .revlogutils import (
29 from .revlogutils import (
30 nodemap,
30 nodemap,
31 )
31 )
32 from .utils import (
33 stringutil,
34 )
35
32
36
33
37 def new_stream_clone_requirements(default_requirements, streamed_requirements):
34 def new_stream_clone_requirements(default_requirements, streamed_requirements):
38 """determine the final set of requirement for a new stream clone
35 """determine the final set of requirement for a new stream clone
39
36
40 this method combine the "default" requirements that a new repository would
37 this method combine the "default" requirements that a new repository would
41 use with the constaint we get from the stream clone content. We keep local
38 use with the constaint we get from the stream clone content. We keep local
42 configuration choice when possible.
39 configuration choice when possible.
43 """
40 """
44 requirements = set(default_requirements)
41 requirements = set(default_requirements)
45 requirements -= requirementsmod.STREAM_FIXED_REQUIREMENTS
42 requirements -= requirementsmod.STREAM_FIXED_REQUIREMENTS
46 requirements.update(streamed_requirements)
43 requirements.update(streamed_requirements)
47 return requirements
44 return requirements
48
45
49
46
50 def streamed_requirements(repo):
47 def streamed_requirements(repo):
51 """the set of requirement the new clone will have to support
48 """the set of requirement the new clone will have to support
52
49
53 This is used for advertising the stream options and to generate the actual
50 This is used for advertising the stream options and to generate the actual
54 stream content."""
51 stream content."""
55 requiredformats = (
52 requiredformats = (
56 repo.requirements & requirementsmod.STREAM_FIXED_REQUIREMENTS
53 repo.requirements & requirementsmod.STREAM_FIXED_REQUIREMENTS
57 )
54 )
58 return requiredformats
55 return requiredformats
59
56
60
57
61 def canperformstreamclone(pullop, bundle2=False):
58 def canperformstreamclone(pullop, bundle2=False):
62 """Whether it is possible to perform a streaming clone as part of pull.
59 """Whether it is possible to perform a streaming clone as part of pull.
63
60
64 ``bundle2`` will cause the function to consider stream clone through
61 ``bundle2`` will cause the function to consider stream clone through
65 bundle2 and only through bundle2.
62 bundle2 and only through bundle2.
66
63
67 Returns a tuple of (supported, requirements). ``supported`` is True if
64 Returns a tuple of (supported, requirements). ``supported`` is True if
68 streaming clone is supported and False otherwise. ``requirements`` is
65 streaming clone is supported and False otherwise. ``requirements`` is
69 a set of repo requirements from the remote, or ``None`` if stream clone
66 a set of repo requirements from the remote, or ``None`` if stream clone
70 isn't supported.
67 isn't supported.
71 """
68 """
72 repo = pullop.repo
69 repo = pullop.repo
73 remote = pullop.remote
70 remote = pullop.remote
74
71
75 bundle2supported = False
72 bundle2supported = False
76 if pullop.canusebundle2:
73 if pullop.canusebundle2:
77 if b'v2' in pullop.remotebundle2caps.get(b'stream', []):
74 if b'v2' in pullop.remotebundle2caps.get(b'stream', []):
78 bundle2supported = True
75 bundle2supported = True
79 # else
76 # else
80 # Server doesn't support bundle2 stream clone or doesn't support
77 # Server doesn't support bundle2 stream clone or doesn't support
81 # the versions we support. Fall back and possibly allow legacy.
78 # the versions we support. Fall back and possibly allow legacy.
82
79
83 # Ensures legacy code path uses available bundle2.
80 # Ensures legacy code path uses available bundle2.
84 if bundle2supported and not bundle2:
81 if bundle2supported and not bundle2:
85 return False, None
82 return False, None
86 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
83 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
87 elif bundle2 and not bundle2supported:
84 elif bundle2 and not bundle2supported:
88 return False, None
85 return False, None
89
86
90 # Streaming clone only works on empty repositories.
87 # Streaming clone only works on empty repositories.
91 if len(repo):
88 if len(repo):
92 return False, None
89 return False, None
93
90
94 # Streaming clone only works if all data is being requested.
91 # Streaming clone only works if all data is being requested.
95 if pullop.heads:
92 if pullop.heads:
96 return False, None
93 return False, None
97
94
98 streamrequested = pullop.streamclonerequested
95 streamrequested = pullop.streamclonerequested
99
96
100 # If we don't have a preference, let the server decide for us. This
97 # If we don't have a preference, let the server decide for us. This
101 # likely only comes into play in LANs.
98 # likely only comes into play in LANs.
102 if streamrequested is None:
99 if streamrequested is None:
103 # The server can advertise whether to prefer streaming clone.
100 # The server can advertise whether to prefer streaming clone.
104 streamrequested = remote.capable(b'stream-preferred')
101 streamrequested = remote.capable(b'stream-preferred')
105
102
106 if not streamrequested:
103 if not streamrequested:
107 return False, None
104 return False, None
108
105
109 # In order for stream clone to work, the client has to support all the
106 # In order for stream clone to work, the client has to support all the
110 # requirements advertised by the server.
107 # requirements advertised by the server.
111 #
108 #
112 # The server advertises its requirements via the "stream" and "streamreqs"
109 # The server advertises its requirements via the "stream" and "streamreqs"
113 # capability. "stream" (a value-less capability) is advertised if and only
110 # capability. "stream" (a value-less capability) is advertised if and only
114 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
111 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
115 # is advertised and contains a comma-delimited list of requirements.
112 # is advertised and contains a comma-delimited list of requirements.
116 requirements = set()
113 requirements = set()
117 if remote.capable(b'stream'):
114 if remote.capable(b'stream'):
118 requirements.add(requirementsmod.REVLOGV1_REQUIREMENT)
115 requirements.add(requirementsmod.REVLOGV1_REQUIREMENT)
119 else:
116 else:
120 streamreqs = remote.capable(b'streamreqs')
117 streamreqs = remote.capable(b'streamreqs')
121 # This is weird and shouldn't happen with modern servers.
118 # This is weird and shouldn't happen with modern servers.
122 if not streamreqs:
119 if not streamreqs:
123 pullop.repo.ui.warn(
120 pullop.repo.ui.warn(
124 _(
121 _(
125 b'warning: stream clone requested but server has them '
122 b'warning: stream clone requested but server has them '
126 b'disabled\n'
123 b'disabled\n'
127 )
124 )
128 )
125 )
129 return False, None
126 return False, None
130
127
131 streamreqs = set(streamreqs.split(b','))
128 streamreqs = set(streamreqs.split(b','))
132 # Server requires something we don't support. Bail.
129 # Server requires something we don't support. Bail.
133 missingreqs = streamreqs - repo.supported
130 missingreqs = streamreqs - repo.supported
134 if missingreqs:
131 if missingreqs:
135 pullop.repo.ui.warn(
132 pullop.repo.ui.warn(
136 _(
133 _(
137 b'warning: stream clone requested but client is missing '
134 b'warning: stream clone requested but client is missing '
138 b'requirements: %s\n'
135 b'requirements: %s\n'
139 )
136 )
140 % b', '.join(sorted(missingreqs))
137 % b', '.join(sorted(missingreqs))
141 )
138 )
142 pullop.repo.ui.warn(
139 pullop.repo.ui.warn(
143 _(
140 _(
144 b'(see https://www.mercurial-scm.org/wiki/MissingRequirement '
141 b'(see https://www.mercurial-scm.org/wiki/MissingRequirement '
145 b'for more information)\n'
142 b'for more information)\n'
146 )
143 )
147 )
144 )
148 return False, None
145 return False, None
149 requirements = streamreqs
146 requirements = streamreqs
150
147
151 return True, requirements
148 return True, requirements
152
149
153
150
154 def maybeperformlegacystreamclone(pullop):
151 def maybeperformlegacystreamclone(pullop):
155 """Possibly perform a legacy stream clone operation.
152 """Possibly perform a legacy stream clone operation.
156
153
157 Legacy stream clones are performed as part of pull but before all other
154 Legacy stream clones are performed as part of pull but before all other
158 operations.
155 operations.
159
156
160 A legacy stream clone will not be performed if a bundle2 stream clone is
157 A legacy stream clone will not be performed if a bundle2 stream clone is
161 supported.
158 supported.
162 """
159 """
163 from . import localrepo
160 from . import localrepo
164
161
165 supported, requirements = canperformstreamclone(pullop)
162 supported, requirements = canperformstreamclone(pullop)
166
163
167 if not supported:
164 if not supported:
168 return
165 return
169
166
170 repo = pullop.repo
167 repo = pullop.repo
171 remote = pullop.remote
168 remote = pullop.remote
172
169
173 # Save remote branchmap. We will use it later to speed up branchcache
170 # Save remote branchmap. We will use it later to speed up branchcache
174 # creation.
171 # creation.
175 rbranchmap = None
172 rbranchmap = None
176 if remote.capable(b'branchmap'):
173 if remote.capable(b'branchmap'):
177 with remote.commandexecutor() as e:
174 with remote.commandexecutor() as e:
178 rbranchmap = e.callcommand(b'branchmap', {}).result()
175 rbranchmap = e.callcommand(b'branchmap', {}).result()
179
176
180 repo.ui.status(_(b'streaming all changes\n'))
177 repo.ui.status(_(b'streaming all changes\n'))
181
178
182 with remote.commandexecutor() as e:
179 with remote.commandexecutor() as e:
183 fp = e.callcommand(b'stream_out', {}).result()
180 fp = e.callcommand(b'stream_out', {}).result()
184
181
185 # TODO strictly speaking, this code should all be inside the context
182 # TODO strictly speaking, this code should all be inside the context
186 # manager because the context manager is supposed to ensure all wire state
183 # manager because the context manager is supposed to ensure all wire state
187 # is flushed when exiting. But the legacy peers don't do this, so it
184 # is flushed when exiting. But the legacy peers don't do this, so it
188 # doesn't matter.
185 # doesn't matter.
189 l = fp.readline()
186 l = fp.readline()
190 try:
187 try:
191 resp = int(l)
188 resp = int(l)
192 except ValueError:
189 except ValueError:
193 raise error.ResponseError(
190 raise error.ResponseError(
194 _(b'unexpected response from remote server:'), l
191 _(b'unexpected response from remote server:'), l
195 )
192 )
196 if resp == 1:
193 if resp == 1:
197 raise error.Abort(_(b'operation forbidden by server'))
194 raise error.Abort(_(b'operation forbidden by server'))
198 elif resp == 2:
195 elif resp == 2:
199 raise error.Abort(_(b'locking the remote repository failed'))
196 raise error.Abort(_(b'locking the remote repository failed'))
200 elif resp != 0:
197 elif resp != 0:
201 raise error.Abort(_(b'the server sent an unknown error code'))
198 raise error.Abort(_(b'the server sent an unknown error code'))
202
199
203 l = fp.readline()
200 l = fp.readline()
204 try:
201 try:
205 filecount, bytecount = map(int, l.split(b' ', 1))
202 filecount, bytecount = map(int, l.split(b' ', 1))
206 except (ValueError, TypeError):
203 except (ValueError, TypeError):
207 raise error.ResponseError(
204 raise error.ResponseError(
208 _(b'unexpected response from remote server:'), l
205 _(b'unexpected response from remote server:'), l
209 )
206 )
210
207
211 with repo.lock():
208 with repo.lock():
212 consumev1(repo, fp, filecount, bytecount)
209 consumev1(repo, fp, filecount, bytecount)
213 repo.requirements = new_stream_clone_requirements(
210 repo.requirements = new_stream_clone_requirements(
214 repo.requirements,
211 repo.requirements,
215 requirements,
212 requirements,
216 )
213 )
217 repo.svfs.options = localrepo.resolvestorevfsoptions(
214 repo.svfs.options = localrepo.resolvestorevfsoptions(
218 repo.ui, repo.requirements, repo.features
215 repo.ui, repo.requirements, repo.features
219 )
216 )
220 scmutil.writereporequirements(repo)
217 scmutil.writereporequirements(repo)
221 nodemap.post_stream_cleanup(repo)
218 nodemap.post_stream_cleanup(repo)
222
219
223 if rbranchmap:
220 if rbranchmap:
224 repo._branchcaches.replace(repo, rbranchmap)
221 repo._branchcaches.replace(repo, rbranchmap)
225
222
226 repo.invalidate()
223 repo.invalidate()
227
224
228
225
229 def allowservergeneration(repo):
226 def allowservergeneration(repo):
230 """Whether streaming clones are allowed from the server."""
227 """Whether streaming clones are allowed from the server."""
231 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
228 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
232 return False
229 return False
233
230
234 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
231 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
235 return False
232 return False
236
233
237 # The way stream clone works makes it impossible to hide secret changesets.
234 # The way stream clone works makes it impossible to hide secret changesets.
238 # So don't allow this by default.
235 # So don't allow this by default.
239 secret = phases.hassecret(repo)
236 secret = phases.hassecret(repo)
240 if secret:
237 if secret:
241 return repo.ui.configbool(b'server', b'uncompressedallowsecret')
238 return repo.ui.configbool(b'server', b'uncompressedallowsecret')
242
239
243 return True
240 return True
244
241
245
242
246 # This is it's own function so extensions can override it.
243 # This is it's own function so extensions can override it.
247 def _walkstreamfiles(repo, matcher=None):
244 def _walkstreamfiles(repo, matcher=None):
248 return repo.store.walk(matcher)
245 return repo.store.walk(matcher)
249
246
250
247
251 def generatev1(repo):
248 def generatev1(repo):
252 """Emit content for version 1 of a streaming clone.
249 """Emit content for version 1 of a streaming clone.
253
250
254 This returns a 3-tuple of (file count, byte size, data iterator).
251 This returns a 3-tuple of (file count, byte size, data iterator).
255
252
256 The data iterator consists of N entries for each file being transferred.
253 The data iterator consists of N entries for each file being transferred.
257 Each file entry starts as a line with the file name and integer size
254 Each file entry starts as a line with the file name and integer size
258 delimited by a null byte.
255 delimited by a null byte.
259
256
260 The raw file data follows. Following the raw file data is the next file
257 The raw file data follows. Following the raw file data is the next file
261 entry, or EOF.
258 entry, or EOF.
262
259
263 When used on the wire protocol, an additional line indicating protocol
260 When used on the wire protocol, an additional line indicating protocol
264 success will be prepended to the stream. This function is not responsible
261 success will be prepended to the stream. This function is not responsible
265 for adding it.
262 for adding it.
266
263
267 This function will obtain a repository lock to ensure a consistent view of
264 This function will obtain a repository lock to ensure a consistent view of
268 the store is captured. It therefore may raise LockError.
265 the store is captured. It therefore may raise LockError.
269 """
266 """
270 entries = []
267 entries = []
271 total_bytes = 0
268 total_bytes = 0
272 # Get consistent snapshot of repo, lock during scan.
269 # Get consistent snapshot of repo, lock during scan.
273 with repo.lock():
270 with repo.lock():
274 repo.ui.debug(b'scanning\n')
271 repo.ui.debug(b'scanning\n')
275 for file_type, name, size in _walkstreamfiles(repo):
272 for file_type, name, size in _walkstreamfiles(repo):
276 if size:
273 if size:
277 entries.append((name, size))
274 entries.append((name, size))
278 total_bytes += size
275 total_bytes += size
279 _test_sync_point_walk_1(repo)
276 _test_sync_point_walk_1(repo)
280 _test_sync_point_walk_2(repo)
277 _test_sync_point_walk_2(repo)
281
278
282 repo.ui.debug(
279 repo.ui.debug(
283 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
280 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
284 )
281 )
285
282
286 svfs = repo.svfs
283 svfs = repo.svfs
287 debugflag = repo.ui.debugflag
284 debugflag = repo.ui.debugflag
288
285
289 def emitrevlogdata():
286 def emitrevlogdata():
290 for name, size in entries:
287 for name, size in entries:
291 if debugflag:
288 if debugflag:
292 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
289 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
293 # partially encode name over the wire for backwards compat
290 # partially encode name over the wire for backwards compat
294 yield b'%s\0%d\n' % (store.encodedir(name), size)
291 yield b'%s\0%d\n' % (store.encodedir(name), size)
295 # auditing at this stage is both pointless (paths are already
292 # auditing at this stage is both pointless (paths are already
296 # trusted by the local repo) and expensive
293 # trusted by the local repo) and expensive
297 with svfs(name, b'rb', auditpath=False) as fp:
294 with svfs(name, b'rb', auditpath=False) as fp:
298 if size <= 65536:
295 if size <= 65536:
299 yield fp.read(size)
296 yield fp.read(size)
300 else:
297 else:
301 for chunk in util.filechunkiter(fp, limit=size):
298 for chunk in util.filechunkiter(fp, limit=size):
302 yield chunk
299 yield chunk
303
300
304 return len(entries), total_bytes, emitrevlogdata()
301 return len(entries), total_bytes, emitrevlogdata()
305
302
306
303
307 def generatev1wireproto(repo):
304 def generatev1wireproto(repo):
308 """Emit content for version 1 of streaming clone suitable for the wire.
305 """Emit content for version 1 of streaming clone suitable for the wire.
309
306
310 This is the data output from ``generatev1()`` with 2 header lines. The
307 This is the data output from ``generatev1()`` with 2 header lines. The
311 first line indicates overall success. The 2nd contains the file count and
308 first line indicates overall success. The 2nd contains the file count and
312 byte size of payload.
309 byte size of payload.
313
310
314 The success line contains "0" for success, "1" for stream generation not
311 The success line contains "0" for success, "1" for stream generation not
315 allowed, and "2" for error locking the repository (possibly indicating
312 allowed, and "2" for error locking the repository (possibly indicating
316 a permissions error for the server process).
313 a permissions error for the server process).
317 """
314 """
318 if not allowservergeneration(repo):
315 if not allowservergeneration(repo):
319 yield b'1\n'
316 yield b'1\n'
320 return
317 return
321
318
322 try:
319 try:
323 filecount, bytecount, it = generatev1(repo)
320 filecount, bytecount, it = generatev1(repo)
324 except error.LockError:
321 except error.LockError:
325 yield b'2\n'
322 yield b'2\n'
326 return
323 return
327
324
328 # Indicates successful response.
325 # Indicates successful response.
329 yield b'0\n'
326 yield b'0\n'
330 yield b'%d %d\n' % (filecount, bytecount)
327 yield b'%d %d\n' % (filecount, bytecount)
331 for chunk in it:
328 for chunk in it:
332 yield chunk
329 yield chunk
333
330
334
331
335 def generatebundlev1(repo, compression=b'UN'):
332 def generatebundlev1(repo, compression=b'UN'):
336 """Emit content for version 1 of a stream clone bundle.
333 """Emit content for version 1 of a stream clone bundle.
337
334
338 The first 4 bytes of the output ("HGS1") denote this as stream clone
335 The first 4 bytes of the output ("HGS1") denote this as stream clone
339 bundle version 1.
336 bundle version 1.
340
337
341 The next 2 bytes indicate the compression type. Only "UN" is currently
338 The next 2 bytes indicate the compression type. Only "UN" is currently
342 supported.
339 supported.
343
340
344 The next 16 bytes are two 64-bit big endian unsigned integers indicating
341 The next 16 bytes are two 64-bit big endian unsigned integers indicating
345 file count and byte count, respectively.
342 file count and byte count, respectively.
346
343
347 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
344 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
348 of the requirements string, including a trailing \0. The following N bytes
345 of the requirements string, including a trailing \0. The following N bytes
349 are the requirements string, which is ASCII containing a comma-delimited
346 are the requirements string, which is ASCII containing a comma-delimited
350 list of repo requirements that are needed to support the data.
347 list of repo requirements that are needed to support the data.
351
348
352 The remaining content is the output of ``generatev1()`` (which may be
349 The remaining content is the output of ``generatev1()`` (which may be
353 compressed in the future).
350 compressed in the future).
354
351
355 Returns a tuple of (requirements, data generator).
352 Returns a tuple of (requirements, data generator).
356 """
353 """
357 if compression != b'UN':
354 if compression != b'UN':
358 raise ValueError(b'we do not support the compression argument yet')
355 raise ValueError(b'we do not support the compression argument yet')
359
356
360 requirements = streamed_requirements(repo)
357 requirements = streamed_requirements(repo)
361 requires = b','.join(sorted(requirements))
358 requires = b','.join(sorted(requirements))
362
359
363 def gen():
360 def gen():
364 yield b'HGS1'
361 yield b'HGS1'
365 yield compression
362 yield compression
366
363
367 filecount, bytecount, it = generatev1(repo)
364 filecount, bytecount, it = generatev1(repo)
368 repo.ui.status(
365 repo.ui.status(
369 _(b'writing %d bytes for %d files\n') % (bytecount, filecount)
366 _(b'writing %d bytes for %d files\n') % (bytecount, filecount)
370 )
367 )
371
368
372 yield struct.pack(b'>QQ', filecount, bytecount)
369 yield struct.pack(b'>QQ', filecount, bytecount)
373 yield struct.pack(b'>H', len(requires) + 1)
370 yield struct.pack(b'>H', len(requires) + 1)
374 yield requires + b'\0'
371 yield requires + b'\0'
375
372
376 # This is where we'll add compression in the future.
373 # This is where we'll add compression in the future.
377 assert compression == b'UN'
374 assert compression == b'UN'
378
375
379 progress = repo.ui.makeprogress(
376 progress = repo.ui.makeprogress(
380 _(b'bundle'), total=bytecount, unit=_(b'bytes')
377 _(b'bundle'), total=bytecount, unit=_(b'bytes')
381 )
378 )
382 progress.update(0)
379 progress.update(0)
383
380
384 for chunk in it:
381 for chunk in it:
385 progress.increment(step=len(chunk))
382 progress.increment(step=len(chunk))
386 yield chunk
383 yield chunk
387
384
388 progress.complete()
385 progress.complete()
389
386
390 return requirements, gen()
387 return requirements, gen()
391
388
392
389
393 def consumev1(repo, fp, filecount, bytecount):
390 def consumev1(repo, fp, filecount, bytecount):
394 """Apply the contents from version 1 of a streaming clone file handle.
391 """Apply the contents from version 1 of a streaming clone file handle.
395
392
396 This takes the output from "stream_out" and applies it to the specified
393 This takes the output from "stream_out" and applies it to the specified
397 repository.
394 repository.
398
395
399 Like "stream_out," the status line added by the wire protocol is not
396 Like "stream_out," the status line added by the wire protocol is not
400 handled by this function.
397 handled by this function.
401 """
398 """
402 with repo.lock():
399 with repo.lock():
403 repo.ui.status(
400 repo.ui.status(
404 _(b'%d files to transfer, %s of data\n')
401 _(b'%d files to transfer, %s of data\n')
405 % (filecount, util.bytecount(bytecount))
402 % (filecount, util.bytecount(bytecount))
406 )
403 )
407 progress = repo.ui.makeprogress(
404 progress = repo.ui.makeprogress(
408 _(b'clone'), total=bytecount, unit=_(b'bytes')
405 _(b'clone'), total=bytecount, unit=_(b'bytes')
409 )
406 )
410 progress.update(0)
407 progress.update(0)
411 start = util.timer()
408 start = util.timer()
412
409
413 # TODO: get rid of (potential) inconsistency
410 # TODO: get rid of (potential) inconsistency
414 #
411 #
415 # If transaction is started and any @filecache property is
412 # If transaction is started and any @filecache property is
416 # changed at this point, it causes inconsistency between
413 # changed at this point, it causes inconsistency between
417 # in-memory cached property and streamclone-ed file on the
414 # in-memory cached property and streamclone-ed file on the
418 # disk. Nested transaction prevents transaction scope "clone"
415 # disk. Nested transaction prevents transaction scope "clone"
419 # below from writing in-memory changes out at the end of it,
416 # below from writing in-memory changes out at the end of it,
420 # even though in-memory changes are discarded at the end of it
417 # even though in-memory changes are discarded at the end of it
421 # regardless of transaction nesting.
418 # regardless of transaction nesting.
422 #
419 #
423 # But transaction nesting can't be simply prohibited, because
420 # But transaction nesting can't be simply prohibited, because
424 # nesting occurs also in ordinary case (e.g. enabling
421 # nesting occurs also in ordinary case (e.g. enabling
425 # clonebundles).
422 # clonebundles).
426
423
427 with repo.transaction(b'clone'):
424 with repo.transaction(b'clone'):
428 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
425 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
429 for i in range(filecount):
426 for i in range(filecount):
430 # XXX doesn't support '\n' or '\r' in filenames
427 # XXX doesn't support '\n' or '\r' in filenames
431 l = fp.readline()
428 l = fp.readline()
432 try:
429 try:
433 name, size = l.split(b'\0', 1)
430 name, size = l.split(b'\0', 1)
434 size = int(size)
431 size = int(size)
435 except (ValueError, TypeError):
432 except (ValueError, TypeError):
436 raise error.ResponseError(
433 raise error.ResponseError(
437 _(b'unexpected response from remote server:'), l
434 _(b'unexpected response from remote server:'), l
438 )
435 )
439 if repo.ui.debugflag:
436 if repo.ui.debugflag:
440 repo.ui.debug(
437 repo.ui.debug(
441 b'adding %s (%s)\n' % (name, util.bytecount(size))
438 b'adding %s (%s)\n' % (name, util.bytecount(size))
442 )
439 )
443 # for backwards compat, name was partially encoded
440 # for backwards compat, name was partially encoded
444 path = store.decodedir(name)
441 path = store.decodedir(name)
445 with repo.svfs(path, b'w', backgroundclose=True) as ofp:
442 with repo.svfs(path, b'w', backgroundclose=True) as ofp:
446 for chunk in util.filechunkiter(fp, limit=size):
443 for chunk in util.filechunkiter(fp, limit=size):
447 progress.increment(step=len(chunk))
444 progress.increment(step=len(chunk))
448 ofp.write(chunk)
445 ofp.write(chunk)
449
446
450 # force @filecache properties to be reloaded from
447 # force @filecache properties to be reloaded from
451 # streamclone-ed file at next access
448 # streamclone-ed file at next access
452 repo.invalidate(clearfilecache=True)
449 repo.invalidate(clearfilecache=True)
453
450
454 elapsed = util.timer() - start
451 elapsed = util.timer() - start
455 if elapsed <= 0:
452 if elapsed <= 0:
456 elapsed = 0.001
453 elapsed = 0.001
457 progress.complete()
454 progress.complete()
458 repo.ui.status(
455 repo.ui.status(
459 _(b'transferred %s in %.1f seconds (%s/sec)\n')
456 _(b'transferred %s in %.1f seconds (%s/sec)\n')
460 % (
457 % (
461 util.bytecount(bytecount),
458 util.bytecount(bytecount),
462 elapsed,
459 elapsed,
463 util.bytecount(bytecount / elapsed),
460 util.bytecount(bytecount / elapsed),
464 )
461 )
465 )
462 )
466
463
467
464
468 def readbundle1header(fp):
465 def readbundle1header(fp):
469 compression = fp.read(2)
466 compression = fp.read(2)
470 if compression != b'UN':
467 if compression != b'UN':
471 raise error.Abort(
468 raise error.Abort(
472 _(
469 _(
473 b'only uncompressed stream clone bundles are '
470 b'only uncompressed stream clone bundles are '
474 b'supported; got %s'
471 b'supported; got %s'
475 )
472 )
476 % compression
473 % compression
477 )
474 )
478
475
479 filecount, bytecount = struct.unpack(b'>QQ', fp.read(16))
476 filecount, bytecount = struct.unpack(b'>QQ', fp.read(16))
480 requireslen = struct.unpack(b'>H', fp.read(2))[0]
477 requireslen = struct.unpack(b'>H', fp.read(2))[0]
481 requires = fp.read(requireslen)
478 requires = fp.read(requireslen)
482
479
483 if not requires.endswith(b'\0'):
480 if not requires.endswith(b'\0'):
484 raise error.Abort(
481 raise error.Abort(
485 _(
482 _(
486 b'malformed stream clone bundle: '
483 b'malformed stream clone bundle: '
487 b'requirements not properly encoded'
484 b'requirements not properly encoded'
488 )
485 )
489 )
486 )
490
487
491 requirements = set(requires.rstrip(b'\0').split(b','))
488 requirements = set(requires.rstrip(b'\0').split(b','))
492
489
493 return filecount, bytecount, requirements
490 return filecount, bytecount, requirements
494
491
495
492
496 def applybundlev1(repo, fp):
493 def applybundlev1(repo, fp):
497 """Apply the content from a stream clone bundle version 1.
494 """Apply the content from a stream clone bundle version 1.
498
495
499 We assume the 4 byte header has been read and validated and the file handle
496 We assume the 4 byte header has been read and validated and the file handle
500 is at the 2 byte compression identifier.
497 is at the 2 byte compression identifier.
501 """
498 """
502 if len(repo):
499 if len(repo):
503 raise error.Abort(
500 raise error.Abort(
504 _(b'cannot apply stream clone bundle on non-empty repo')
501 _(b'cannot apply stream clone bundle on non-empty repo')
505 )
502 )
506
503
507 filecount, bytecount, requirements = readbundle1header(fp)
504 filecount, bytecount, requirements = readbundle1header(fp)
508 missingreqs = requirements - repo.supported
505 missingreqs = requirements - repo.supported
509 if missingreqs:
506 if missingreqs:
510 raise error.Abort(
507 raise error.Abort(
511 _(b'unable to apply stream clone: unsupported format: %s')
508 _(b'unable to apply stream clone: unsupported format: %s')
512 % b', '.join(sorted(missingreqs))
509 % b', '.join(sorted(missingreqs))
513 )
510 )
514
511
515 consumev1(repo, fp, filecount, bytecount)
512 consumev1(repo, fp, filecount, bytecount)
516 nodemap.post_stream_cleanup(repo)
513 nodemap.post_stream_cleanup(repo)
517
514
518
515
519 class streamcloneapplier:
516 class streamcloneapplier:
520 """Class to manage applying streaming clone bundles.
517 """Class to manage applying streaming clone bundles.
521
518
522 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
519 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
523 readers to perform bundle type-specific functionality.
520 readers to perform bundle type-specific functionality.
524 """
521 """
525
522
526 def __init__(self, fh):
523 def __init__(self, fh):
527 self._fh = fh
524 self._fh = fh
528
525
529 def apply(self, repo):
526 def apply(self, repo):
530 return applybundlev1(repo, self._fh)
527 return applybundlev1(repo, self._fh)
531
528
532
529
533 # type of file to stream
530 # type of file to stream
534 _fileappend = 0 # append only file
531 _fileappend = 0 # append only file
535 _filefull = 1 # full snapshot file
532 _filefull = 1 # full snapshot file
536
533
537 # Source of the file
534 # Source of the file
538 _srcstore = b's' # store (svfs)
535 _srcstore = b's' # store (svfs)
539 _srccache = b'c' # cache (cache)
536 _srccache = b'c' # cache (cache)
540
537
541 # This is it's own function so extensions can override it.
538 # This is it's own function so extensions can override it.
542 def _walkstreamfullstorefiles(repo):
539 def _walkstreamfullstorefiles(repo):
543 """list snapshot file from the store"""
540 """list snapshot file from the store"""
544 fnames = []
541 fnames = []
545 if not repo.publishing():
542 if not repo.publishing():
546 fnames.append(b'phaseroots')
543 fnames.append(b'phaseroots')
547 return fnames
544 return fnames
548
545
549
546
550 def _filterfull(entry, copy, vfsmap):
547 def _filterfull(entry, copy, vfsmap):
551 """actually copy the snapshot files"""
548 """actually copy the snapshot files"""
552 src, name, ftype, data = entry
549 src, name, ftype, data = entry
553 if ftype != _filefull:
550 if ftype != _filefull:
554 return entry
551 return entry
555 return (src, name, ftype, copy(vfsmap[src].join(name)))
552 return (src, name, ftype, copy(vfsmap[src].join(name)))
556
553
557
554
558 @contextlib.contextmanager
555 @contextlib.contextmanager
559 def maketempcopies():
556 def maketempcopies():
560 """return a function to temporary copy file"""
557 """return a function to temporary copy file"""
561
558
562 files = []
559 files = []
563 dst_dir = pycompat.mkdtemp(prefix=b'hg-clone-')
560 dst_dir = pycompat.mkdtemp(prefix=b'hg-clone-')
564 try:
561 try:
565
562
566 def copy(src):
563 def copy(src):
567 fd, dst = pycompat.mkstemp(
564 fd, dst = pycompat.mkstemp(
568 prefix=os.path.basename(src), dir=dst_dir
565 prefix=os.path.basename(src), dir=dst_dir
569 )
566 )
570 os.close(fd)
567 os.close(fd)
571 files.append(dst)
568 files.append(dst)
572 util.copyfiles(src, dst, hardlink=True)
569 util.copyfiles(src, dst, hardlink=True)
573 return dst
570 return dst
574
571
575 yield copy
572 yield copy
576 finally:
573 finally:
577 for tmp in files:
574 for tmp in files:
578 util.tryunlink(tmp)
575 util.tryunlink(tmp)
579 util.tryrmdir(dst_dir)
576 util.tryrmdir(dst_dir)
580
577
581
578
582 def _makemap(repo):
579 def _makemap(repo):
583 """make a (src -> vfs) map for the repo"""
580 """make a (src -> vfs) map for the repo"""
584 vfsmap = {
581 vfsmap = {
585 _srcstore: repo.svfs,
582 _srcstore: repo.svfs,
586 _srccache: repo.cachevfs,
583 _srccache: repo.cachevfs,
587 }
584 }
588 # we keep repo.vfs out of the on purpose, ther are too many danger there
585 # we keep repo.vfs out of the on purpose, ther are too many danger there
589 # (eg: .hg/hgrc)
586 # (eg: .hg/hgrc)
590 assert repo.vfs not in vfsmap.values()
587 assert repo.vfs not in vfsmap.values()
591
588
592 return vfsmap
589 return vfsmap
593
590
594
591
595 def _emit2(repo, entries, totalfilesize):
592 def _emit2(repo, entries, totalfilesize):
596 """actually emit the stream bundle"""
593 """actually emit the stream bundle"""
597 vfsmap = _makemap(repo)
594 vfsmap = _makemap(repo)
598 # we keep repo.vfs out of the on purpose, ther are too many danger there
595 # we keep repo.vfs out of the on purpose, ther are too many danger there
599 # (eg: .hg/hgrc),
596 # (eg: .hg/hgrc),
600 #
597 #
601 # this assert is duplicated (from _makemap) as author might think this is
598 # this assert is duplicated (from _makemap) as author might think this is
602 # fine, while this is really not fine.
599 # fine, while this is really not fine.
603 if repo.vfs in vfsmap.values():
600 if repo.vfs in vfsmap.values():
604 raise error.ProgrammingError(
601 raise error.ProgrammingError(
605 b'repo.vfs must not be added to vfsmap for security reasons'
602 b'repo.vfs must not be added to vfsmap for security reasons'
606 )
603 )
607
604
608 progress = repo.ui.makeprogress(
605 progress = repo.ui.makeprogress(
609 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
606 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
610 )
607 )
611 progress.update(0)
608 progress.update(0)
612 with maketempcopies() as copy, progress:
609 with maketempcopies() as copy, progress:
613 # copy is delayed until we are in the try
610 # copy is delayed until we are in the try
614 entries = [_filterfull(e, copy, vfsmap) for e in entries]
611 entries = [_filterfull(e, copy, vfsmap) for e in entries]
615 yield None # this release the lock on the repository
612 yield None # this release the lock on the repository
616 totalbytecount = 0
613 totalbytecount = 0
617
614
618 for src, name, ftype, data in entries:
615 for src, name, ftype, data in entries:
619 vfs = vfsmap[src]
616 vfs = vfsmap[src]
620 yield src
617 yield src
621 yield util.uvarintencode(len(name))
618 yield util.uvarintencode(len(name))
622 if ftype == _fileappend:
619 if ftype == _fileappend:
623 fp = vfs(name)
620 fp = vfs(name)
624 size = data
621 size = data
625 elif ftype == _filefull:
622 elif ftype == _filefull:
626 fp = open(data, b'rb')
623 fp = open(data, b'rb')
627 size = util.fstat(fp).st_size
624 size = util.fstat(fp).st_size
628 bytecount = 0
625 bytecount = 0
629 try:
626 try:
630 yield util.uvarintencode(size)
627 yield util.uvarintencode(size)
631 yield name
628 yield name
632 if size <= 65536:
629 if size <= 65536:
633 chunks = (fp.read(size),)
630 chunks = (fp.read(size),)
634 else:
631 else:
635 chunks = util.filechunkiter(fp, limit=size)
632 chunks = util.filechunkiter(fp, limit=size)
636 for chunk in chunks:
633 for chunk in chunks:
637 bytecount += len(chunk)
634 bytecount += len(chunk)
638 totalbytecount += len(chunk)
635 totalbytecount += len(chunk)
639 progress.update(totalbytecount)
636 progress.update(totalbytecount)
640 yield chunk
637 yield chunk
641 if bytecount != size:
638 if bytecount != size:
642 # Would most likely be caused by a race due to `hg strip` or
639 # Would most likely be caused by a race due to `hg strip` or
643 # a revlog split
640 # a revlog split
644 raise error.Abort(
641 raise error.Abort(
645 _(
642 _(
646 b'clone could only read %d bytes from %s, but '
643 b'clone could only read %d bytes from %s, but '
647 b'expected %d bytes'
644 b'expected %d bytes'
648 )
645 )
649 % (bytecount, name, size)
646 % (bytecount, name, size)
650 )
647 )
651 finally:
648 finally:
652 fp.close()
649 fp.close()
653
650
654
651
655 def _test_sync_point_walk_1(repo):
652 def _test_sync_point_walk_1(repo):
656 """a function for synchronisation during tests"""
653 """a function for synchronisation during tests"""
657
654
658
655
659 def _test_sync_point_walk_2(repo):
656 def _test_sync_point_walk_2(repo):
660 """a function for synchronisation during tests"""
657 """a function for synchronisation during tests"""
661
658
662
659
663 def _v2_walk(repo, includes, excludes, includeobsmarkers):
660 def _v2_walk(repo, includes, excludes, includeobsmarkers):
664 """emit a seris of files information useful to clone a repo
661 """emit a seris of files information useful to clone a repo
665
662
666 return (entries, totalfilesize)
663 return (entries, totalfilesize)
667
664
668 entries is a list of tuple (vfs-key, file-path, file-type, size)
665 entries is a list of tuple (vfs-key, file-path, file-type, size)
669
666
670 - `vfs-key`: is a key to the right vfs to write the file (see _makemap)
667 - `vfs-key`: is a key to the right vfs to write the file (see _makemap)
671 - `name`: file path of the file to copy (to be feed to the vfss)
668 - `name`: file path of the file to copy (to be feed to the vfss)
672 - `file-type`: do this file need to be copied with the source lock ?
669 - `file-type`: do this file need to be copied with the source lock ?
673 - `size`: the size of the file (or None)
670 - `size`: the size of the file (or None)
674 """
671 """
675 assert repo._currentlock(repo._lockref) is not None
672 assert repo._currentlock(repo._lockref) is not None
676 entries = []
673 entries = []
677 totalfilesize = 0
674 totalfilesize = 0
678
675
679 matcher = None
676 matcher = None
680 if includes or excludes:
677 if includes or excludes:
681 matcher = narrowspec.match(repo.root, includes, excludes)
678 matcher = narrowspec.match(repo.root, includes, excludes)
682
679
683 for rl_type, name, size in _walkstreamfiles(repo, matcher):
680 for rl_type, name, size in _walkstreamfiles(repo, matcher):
684 if size:
681 if size:
685 ft = _fileappend
682 ft = _fileappend
686 if rl_type & store.FILEFLAGS_VOLATILE:
683 if rl_type & store.FILEFLAGS_VOLATILE:
687 ft = _filefull
684 ft = _filefull
688 entries.append((_srcstore, name, ft, size))
685 entries.append((_srcstore, name, ft, size))
689 totalfilesize += size
686 totalfilesize += size
690 for name in _walkstreamfullstorefiles(repo):
687 for name in _walkstreamfullstorefiles(repo):
691 if repo.svfs.exists(name):
688 if repo.svfs.exists(name):
692 totalfilesize += repo.svfs.lstat(name).st_size
689 totalfilesize += repo.svfs.lstat(name).st_size
693 entries.append((_srcstore, name, _filefull, None))
690 entries.append((_srcstore, name, _filefull, None))
694 if includeobsmarkers and repo.svfs.exists(b'obsstore'):
691 if includeobsmarkers and repo.svfs.exists(b'obsstore'):
695 totalfilesize += repo.svfs.lstat(b'obsstore').st_size
692 totalfilesize += repo.svfs.lstat(b'obsstore').st_size
696 entries.append((_srcstore, b'obsstore', _filefull, None))
693 entries.append((_srcstore, b'obsstore', _filefull, None))
697 for name in cacheutil.cachetocopy(repo):
694 for name in cacheutil.cachetocopy(repo):
698 if repo.cachevfs.exists(name):
695 if repo.cachevfs.exists(name):
699 totalfilesize += repo.cachevfs.lstat(name).st_size
696 totalfilesize += repo.cachevfs.lstat(name).st_size
700 entries.append((_srccache, name, _filefull, None))
697 entries.append((_srccache, name, _filefull, None))
701 return entries, totalfilesize
698 return entries, totalfilesize
702
699
703
700
704 def generatev2(repo, includes, excludes, includeobsmarkers):
701 def generatev2(repo, includes, excludes, includeobsmarkers):
705 """Emit content for version 2 of a streaming clone.
702 """Emit content for version 2 of a streaming clone.
706
703
707 the data stream consists the following entries:
704 the data stream consists the following entries:
708 1) A char representing the file destination (eg: store or cache)
705 1) A char representing the file destination (eg: store or cache)
709 2) A varint containing the length of the filename
706 2) A varint containing the length of the filename
710 3) A varint containing the length of file data
707 3) A varint containing the length of file data
711 4) N bytes containing the filename (the internal, store-agnostic form)
708 4) N bytes containing the filename (the internal, store-agnostic form)
712 5) N bytes containing the file data
709 5) N bytes containing the file data
713
710
714 Returns a 3-tuple of (file count, file size, data iterator).
711 Returns a 3-tuple of (file count, file size, data iterator).
715 """
712 """
716
713
717 with repo.lock():
714 with repo.lock():
718
715
719 repo.ui.debug(b'scanning\n')
716 repo.ui.debug(b'scanning\n')
720
717
721 entries, totalfilesize = _v2_walk(
718 entries, totalfilesize = _v2_walk(
722 repo,
719 repo,
723 includes=includes,
720 includes=includes,
724 excludes=excludes,
721 excludes=excludes,
725 includeobsmarkers=includeobsmarkers,
722 includeobsmarkers=includeobsmarkers,
726 )
723 )
727
724
728 chunks = _emit2(repo, entries, totalfilesize)
725 chunks = _emit2(repo, entries, totalfilesize)
729 first = next(chunks)
726 first = next(chunks)
730 assert first is None
727 assert first is None
731 _test_sync_point_walk_1(repo)
728 _test_sync_point_walk_1(repo)
732 _test_sync_point_walk_2(repo)
729 _test_sync_point_walk_2(repo)
733
730
734 return len(entries), totalfilesize, chunks
731 return len(entries), totalfilesize, chunks
735
732
736
733
737 @contextlib.contextmanager
734 @contextlib.contextmanager
738 def nested(*ctxs):
735 def nested(*ctxs):
739 this = ctxs[0]
736 this = ctxs[0]
740 rest = ctxs[1:]
737 rest = ctxs[1:]
741 with this:
738 with this:
742 if rest:
739 if rest:
743 with nested(*rest):
740 with nested(*rest):
744 yield
741 yield
745 else:
742 else:
746 yield
743 yield
747
744
748
745
749 def consumev2(repo, fp, filecount, filesize):
746 def consumev2(repo, fp, filecount, filesize):
750 """Apply the contents from a version 2 streaming clone.
747 """Apply the contents from a version 2 streaming clone.
751
748
752 Data is read from an object that only needs to provide a ``read(size)``
749 Data is read from an object that only needs to provide a ``read(size)``
753 method.
750 method.
754 """
751 """
755 with repo.lock():
752 with repo.lock():
756 repo.ui.status(
753 repo.ui.status(
757 _(b'%d files to transfer, %s of data\n')
754 _(b'%d files to transfer, %s of data\n')
758 % (filecount, util.bytecount(filesize))
755 % (filecount, util.bytecount(filesize))
759 )
756 )
760
757
761 start = util.timer()
758 start = util.timer()
762 progress = repo.ui.makeprogress(
759 progress = repo.ui.makeprogress(
763 _(b'clone'), total=filesize, unit=_(b'bytes')
760 _(b'clone'), total=filesize, unit=_(b'bytes')
764 )
761 )
765 progress.update(0)
762 progress.update(0)
766
763
767 vfsmap = _makemap(repo)
764 vfsmap = _makemap(repo)
768 # we keep repo.vfs out of the on purpose, ther are too many danger
765 # we keep repo.vfs out of the on purpose, ther are too many danger
769 # there (eg: .hg/hgrc),
766 # there (eg: .hg/hgrc),
770 #
767 #
771 # this assert is duplicated (from _makemap) as author might think this
768 # this assert is duplicated (from _makemap) as author might think this
772 # is fine, while this is really not fine.
769 # is fine, while this is really not fine.
773 if repo.vfs in vfsmap.values():
770 if repo.vfs in vfsmap.values():
774 raise error.ProgrammingError(
771 raise error.ProgrammingError(
775 b'repo.vfs must not be added to vfsmap for security reasons'
772 b'repo.vfs must not be added to vfsmap for security reasons'
776 )
773 )
777
774
778 with repo.transaction(b'clone'):
775 with repo.transaction(b'clone'):
779 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
776 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
780 with nested(*ctxs):
777 with nested(*ctxs):
781 for i in range(filecount):
778 for i in range(filecount):
782 src = util.readexactly(fp, 1)
779 src = util.readexactly(fp, 1)
783 vfs = vfsmap[src]
780 vfs = vfsmap[src]
784 namelen = util.uvarintdecodestream(fp)
781 namelen = util.uvarintdecodestream(fp)
785 datalen = util.uvarintdecodestream(fp)
782 datalen = util.uvarintdecodestream(fp)
786
783
787 name = util.readexactly(fp, namelen)
784 name = util.readexactly(fp, namelen)
788
785
789 if repo.ui.debugflag:
786 if repo.ui.debugflag:
790 repo.ui.debug(
787 repo.ui.debug(
791 b'adding [%s] %s (%s)\n'
788 b'adding [%s] %s (%s)\n'
792 % (src, name, util.bytecount(datalen))
789 % (src, name, util.bytecount(datalen))
793 )
790 )
794
791
795 with vfs(name, b'w') as ofp:
792 with vfs(name, b'w') as ofp:
796 for chunk in util.filechunkiter(fp, limit=datalen):
793 for chunk in util.filechunkiter(fp, limit=datalen):
797 progress.increment(step=len(chunk))
794 progress.increment(step=len(chunk))
798 ofp.write(chunk)
795 ofp.write(chunk)
799
796
800 # force @filecache properties to be reloaded from
797 # force @filecache properties to be reloaded from
801 # streamclone-ed file at next access
798 # streamclone-ed file at next access
802 repo.invalidate(clearfilecache=True)
799 repo.invalidate(clearfilecache=True)
803
800
804 elapsed = util.timer() - start
801 elapsed = util.timer() - start
805 if elapsed <= 0:
802 if elapsed <= 0:
806 elapsed = 0.001
803 elapsed = 0.001
807 repo.ui.status(
804 repo.ui.status(
808 _(b'transferred %s in %.1f seconds (%s/sec)\n')
805 _(b'transferred %s in %.1f seconds (%s/sec)\n')
809 % (
806 % (
810 util.bytecount(progress.pos),
807 util.bytecount(progress.pos),
811 elapsed,
808 elapsed,
812 util.bytecount(progress.pos / elapsed),
809 util.bytecount(progress.pos / elapsed),
813 )
810 )
814 )
811 )
815 progress.complete()
812 progress.complete()
816
813
817
814
818 def applybundlev2(repo, fp, filecount, filesize, requirements):
815 def applybundlev2(repo, fp, filecount, filesize, requirements):
819 from . import localrepo
816 from . import localrepo
820
817
821 missingreqs = [r for r in requirements if r not in repo.supported]
818 missingreqs = [r for r in requirements if r not in repo.supported]
822 if missingreqs:
819 if missingreqs:
823 raise error.Abort(
820 raise error.Abort(
824 _(b'unable to apply stream clone: unsupported format: %s')
821 _(b'unable to apply stream clone: unsupported format: %s')
825 % b', '.join(sorted(missingreqs))
822 % b', '.join(sorted(missingreqs))
826 )
823 )
827
824
828 consumev2(repo, fp, filecount, filesize)
825 consumev2(repo, fp, filecount, filesize)
829
826
830 repo.requirements = new_stream_clone_requirements(
827 repo.requirements = new_stream_clone_requirements(
831 repo.requirements,
828 repo.requirements,
832 requirements,
829 requirements,
833 )
830 )
834 repo.svfs.options = localrepo.resolvestorevfsoptions(
831 repo.svfs.options = localrepo.resolvestorevfsoptions(
835 repo.ui, repo.requirements, repo.features
832 repo.ui, repo.requirements, repo.features
836 )
833 )
837 scmutil.writereporequirements(repo)
834 scmutil.writereporequirements(repo)
838 nodemap.post_stream_cleanup(repo)
835 nodemap.post_stream_cleanup(repo)
839
836
840
837
841 def _copy_files(src_vfs_map, dst_vfs_map, entries, progress):
838 def _copy_files(src_vfs_map, dst_vfs_map, entries, progress):
842 hardlink = [True]
839 hardlink = [True]
843
840
844 def copy_used():
841 def copy_used():
845 hardlink[0] = False
842 hardlink[0] = False
846 progress.topic = _(b'copying')
843 progress.topic = _(b'copying')
847
844
848 for k, path, size in entries:
845 for k, path, size in entries:
849 src_vfs = src_vfs_map[k]
846 src_vfs = src_vfs_map[k]
850 dst_vfs = dst_vfs_map[k]
847 dst_vfs = dst_vfs_map[k]
851 src_path = src_vfs.join(path)
848 src_path = src_vfs.join(path)
852 dst_path = dst_vfs.join(path)
849 dst_path = dst_vfs.join(path)
853 # We cannot use dirname and makedirs of dst_vfs here because the store
850 # We cannot use dirname and makedirs of dst_vfs here because the store
854 # encoding confuses them. See issue 6581 for details.
851 # encoding confuses them. See issue 6581 for details.
855 dirname = os.path.dirname(dst_path)
852 dirname = os.path.dirname(dst_path)
856 if not os.path.exists(dirname):
853 if not os.path.exists(dirname):
857 util.makedirs(dirname)
854 util.makedirs(dirname)
858 dst_vfs.register_file(path)
855 dst_vfs.register_file(path)
859 # XXX we could use the #nb_bytes argument.
856 # XXX we could use the #nb_bytes argument.
860 util.copyfile(
857 util.copyfile(
861 src_path,
858 src_path,
862 dst_path,
859 dst_path,
863 hardlink=hardlink[0],
860 hardlink=hardlink[0],
864 no_hardlink_cb=copy_used,
861 no_hardlink_cb=copy_used,
865 check_fs_hardlink=False,
862 check_fs_hardlink=False,
866 )
863 )
867 progress.increment()
864 progress.increment()
868 return hardlink[0]
865 return hardlink[0]
869
866
870
867
871 def local_copy(src_repo, dest_repo):
868 def local_copy(src_repo, dest_repo):
872 """copy all content from one local repository to another
869 """copy all content from one local repository to another
873
870
874 This is useful for local clone"""
871 This is useful for local clone"""
875 src_store_requirements = {
872 src_store_requirements = {
876 r
873 r
877 for r in src_repo.requirements
874 for r in src_repo.requirements
878 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
875 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
879 }
876 }
880 dest_store_requirements = {
877 dest_store_requirements = {
881 r
878 r
882 for r in dest_repo.requirements
879 for r in dest_repo.requirements
883 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
880 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
884 }
881 }
885 assert src_store_requirements == dest_store_requirements
882 assert src_store_requirements == dest_store_requirements
886
883
887 with dest_repo.lock():
884 with dest_repo.lock():
888 with src_repo.lock():
885 with src_repo.lock():
889
886
890 # bookmark is not integrated to the streaming as it might use the
887 # bookmark is not integrated to the streaming as it might use the
891 # `repo.vfs` and they are too many sentitive data accessible
888 # `repo.vfs` and they are too many sentitive data accessible
892 # through `repo.vfs` to expose it to streaming clone.
889 # through `repo.vfs` to expose it to streaming clone.
893 src_book_vfs = bookmarks.bookmarksvfs(src_repo)
890 src_book_vfs = bookmarks.bookmarksvfs(src_repo)
894 srcbookmarks = src_book_vfs.join(b'bookmarks')
891 srcbookmarks = src_book_vfs.join(b'bookmarks')
895 bm_count = 0
892 bm_count = 0
896 if os.path.exists(srcbookmarks):
893 if os.path.exists(srcbookmarks):
897 bm_count = 1
894 bm_count = 1
898
895
899 entries, totalfilesize = _v2_walk(
896 entries, totalfilesize = _v2_walk(
900 src_repo,
897 src_repo,
901 includes=None,
898 includes=None,
902 excludes=None,
899 excludes=None,
903 includeobsmarkers=True,
900 includeobsmarkers=True,
904 )
901 )
905 src_vfs_map = _makemap(src_repo)
902 src_vfs_map = _makemap(src_repo)
906 dest_vfs_map = _makemap(dest_repo)
903 dest_vfs_map = _makemap(dest_repo)
907 progress = src_repo.ui.makeprogress(
904 progress = src_repo.ui.makeprogress(
908 topic=_(b'linking'),
905 topic=_(b'linking'),
909 total=len(entries) + bm_count,
906 total=len(entries) + bm_count,
910 unit=_(b'files'),
907 unit=_(b'files'),
911 )
908 )
912 # copy files
909 # copy files
913 #
910 #
914 # We could copy the full file while the source repository is locked
911 # We could copy the full file while the source repository is locked
915 # and the other one without the lock. However, in the linking case,
912 # and the other one without the lock. However, in the linking case,
916 # this would also requires checks that nobody is appending any data
913 # this would also requires checks that nobody is appending any data
917 # to the files while we do the clone, so this is not done yet. We
914 # to the files while we do the clone, so this is not done yet. We
918 # could do this blindly when copying files.
915 # could do this blindly when copying files.
919 files = ((k, path, size) for k, path, ftype, size in entries)
916 files = ((k, path, size) for k, path, ftype, size in entries)
920 hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress)
917 hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress)
921
918
922 # copy bookmarks over
919 # copy bookmarks over
923 if bm_count:
920 if bm_count:
924 dst_book_vfs = bookmarks.bookmarksvfs(dest_repo)
921 dst_book_vfs = bookmarks.bookmarksvfs(dest_repo)
925 dstbookmarks = dst_book_vfs.join(b'bookmarks')
922 dstbookmarks = dst_book_vfs.join(b'bookmarks')
926 util.copyfile(srcbookmarks, dstbookmarks)
923 util.copyfile(srcbookmarks, dstbookmarks)
927 progress.complete()
924 progress.complete()
928 if hardlink:
925 if hardlink:
929 msg = b'linked %d files\n'
926 msg = b'linked %d files\n'
930 else:
927 else:
931 msg = b'copied %d files\n'
928 msg = b'copied %d files\n'
932 src_repo.ui.debug(msg % (len(entries) + bm_count))
929 src_repo.ui.debug(msg % (len(entries) + bm_count))
933
930
934 with dest_repo.transaction(b"localclone") as tr:
931 with dest_repo.transaction(b"localclone") as tr:
935 dest_repo.store.write(tr)
932 dest_repo.store.write(tr)
936
933
937 # clean up transaction file as they do not make sense
934 # clean up transaction file as they do not make sense
938 undo_files = [(dest_repo.svfs, b'undo.backupfiles')]
935 repair.cleanup_undo_files(dest_repo)
939 undo_files.extend(dest_repo.undofiles())
940 for undovfs, undofile in undo_files:
941 try:
942 undovfs.unlink(undofile)
943 except OSError as e:
944 if e.errno != errno.ENOENT:
945 msg = _(b'error removing %s: %s\n')
946 path = undovfs.join(undofile)
947 e_msg = stringutil.forcebytestr(e)
948 msg %= (path, e_msg)
949 dest_repo.ui.warn(msg)
General Comments 0
You need to be logged in to leave comments. Login now