##// END OF EJS Templates
stream-clone: factor computation of new clone requirement out...
marmoute -
r49442:739f2ca3 default
parent child Browse files
Show More
@@ -1,920 +1,932 b''
1 # streamclone.py - producing and consuming streaming repository data
1 # streamclone.py - producing and consuming streaming repository data
2 #
2 #
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import contextlib
10 import contextlib
11 import errno
11 import errno
12 import os
12 import os
13 import struct
13 import struct
14
14
15 from .i18n import _
15 from .i18n import _
16 from .pycompat import open
16 from .pycompat import open
17 from .interfaces import repository
17 from .interfaces import repository
18 from . import (
18 from . import (
19 bookmarks,
19 bookmarks,
20 cacheutil,
20 cacheutil,
21 error,
21 error,
22 narrowspec,
22 narrowspec,
23 phases,
23 phases,
24 pycompat,
24 pycompat,
25 requirements as requirementsmod,
25 requirements as requirementsmod,
26 scmutil,
26 scmutil,
27 store,
27 store,
28 util,
28 util,
29 )
29 )
30 from .utils import (
30 from .utils import (
31 stringutil,
31 stringutil,
32 )
32 )
33
33
34
34
35 def new_stream_clone_requirements(
36 supported_formats, default_requirements, streamed_requirements
37 ):
38 """determine the final set of requirement for a new stream clone
39
40 this method combine the "default" requirements that a new repository would
41 use with the constaint we get from the stream clone content. We keep local
42 configuration choice when possible.
43 """
44 requirements = set(default_requirements)
45 requirements -= supported_formats
46 requirements.update(streamed_requirements)
47 return requirements
48
49
35 def canperformstreamclone(pullop, bundle2=False):
50 def canperformstreamclone(pullop, bundle2=False):
36 """Whether it is possible to perform a streaming clone as part of pull.
51 """Whether it is possible to perform a streaming clone as part of pull.
37
52
38 ``bundle2`` will cause the function to consider stream clone through
53 ``bundle2`` will cause the function to consider stream clone through
39 bundle2 and only through bundle2.
54 bundle2 and only through bundle2.
40
55
41 Returns a tuple of (supported, requirements). ``supported`` is True if
56 Returns a tuple of (supported, requirements). ``supported`` is True if
42 streaming clone is supported and False otherwise. ``requirements`` is
57 streaming clone is supported and False otherwise. ``requirements`` is
43 a set of repo requirements from the remote, or ``None`` if stream clone
58 a set of repo requirements from the remote, or ``None`` if stream clone
44 isn't supported.
59 isn't supported.
45 """
60 """
46 repo = pullop.repo
61 repo = pullop.repo
47 remote = pullop.remote
62 remote = pullop.remote
48
63
49 bundle2supported = False
64 bundle2supported = False
50 if pullop.canusebundle2:
65 if pullop.canusebundle2:
51 if b'v2' in pullop.remotebundle2caps.get(b'stream', []):
66 if b'v2' in pullop.remotebundle2caps.get(b'stream', []):
52 bundle2supported = True
67 bundle2supported = True
53 # else
68 # else
54 # Server doesn't support bundle2 stream clone or doesn't support
69 # Server doesn't support bundle2 stream clone or doesn't support
55 # the versions we support. Fall back and possibly allow legacy.
70 # the versions we support. Fall back and possibly allow legacy.
56
71
57 # Ensures legacy code path uses available bundle2.
72 # Ensures legacy code path uses available bundle2.
58 if bundle2supported and not bundle2:
73 if bundle2supported and not bundle2:
59 return False, None
74 return False, None
60 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
75 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
61 elif bundle2 and not bundle2supported:
76 elif bundle2 and not bundle2supported:
62 return False, None
77 return False, None
63
78
64 # Streaming clone only works on empty repositories.
79 # Streaming clone only works on empty repositories.
65 if len(repo):
80 if len(repo):
66 return False, None
81 return False, None
67
82
68 # Streaming clone only works if all data is being requested.
83 # Streaming clone only works if all data is being requested.
69 if pullop.heads:
84 if pullop.heads:
70 return False, None
85 return False, None
71
86
72 streamrequested = pullop.streamclonerequested
87 streamrequested = pullop.streamclonerequested
73
88
74 # If we don't have a preference, let the server decide for us. This
89 # If we don't have a preference, let the server decide for us. This
75 # likely only comes into play in LANs.
90 # likely only comes into play in LANs.
76 if streamrequested is None:
91 if streamrequested is None:
77 # The server can advertise whether to prefer streaming clone.
92 # The server can advertise whether to prefer streaming clone.
78 streamrequested = remote.capable(b'stream-preferred')
93 streamrequested = remote.capable(b'stream-preferred')
79
94
80 if not streamrequested:
95 if not streamrequested:
81 return False, None
96 return False, None
82
97
83 # In order for stream clone to work, the client has to support all the
98 # In order for stream clone to work, the client has to support all the
84 # requirements advertised by the server.
99 # requirements advertised by the server.
85 #
100 #
86 # The server advertises its requirements via the "stream" and "streamreqs"
101 # The server advertises its requirements via the "stream" and "streamreqs"
87 # capability. "stream" (a value-less capability) is advertised if and only
102 # capability. "stream" (a value-less capability) is advertised if and only
88 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
103 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
89 # is advertised and contains a comma-delimited list of requirements.
104 # is advertised and contains a comma-delimited list of requirements.
90 requirements = set()
105 requirements = set()
91 if remote.capable(b'stream'):
106 if remote.capable(b'stream'):
92 requirements.add(requirementsmod.REVLOGV1_REQUIREMENT)
107 requirements.add(requirementsmod.REVLOGV1_REQUIREMENT)
93 else:
108 else:
94 streamreqs = remote.capable(b'streamreqs')
109 streamreqs = remote.capable(b'streamreqs')
95 # This is weird and shouldn't happen with modern servers.
110 # This is weird and shouldn't happen with modern servers.
96 if not streamreqs:
111 if not streamreqs:
97 pullop.repo.ui.warn(
112 pullop.repo.ui.warn(
98 _(
113 _(
99 b'warning: stream clone requested but server has them '
114 b'warning: stream clone requested but server has them '
100 b'disabled\n'
115 b'disabled\n'
101 )
116 )
102 )
117 )
103 return False, None
118 return False, None
104
119
105 streamreqs = set(streamreqs.split(b','))
120 streamreqs = set(streamreqs.split(b','))
106 # Server requires something we don't support. Bail.
121 # Server requires something we don't support. Bail.
107 missingreqs = streamreqs - repo.supportedformats
122 missingreqs = streamreqs - repo.supportedformats
108 if missingreqs:
123 if missingreqs:
109 pullop.repo.ui.warn(
124 pullop.repo.ui.warn(
110 _(
125 _(
111 b'warning: stream clone requested but client is missing '
126 b'warning: stream clone requested but client is missing '
112 b'requirements: %s\n'
127 b'requirements: %s\n'
113 )
128 )
114 % b', '.join(sorted(missingreqs))
129 % b', '.join(sorted(missingreqs))
115 )
130 )
116 pullop.repo.ui.warn(
131 pullop.repo.ui.warn(
117 _(
132 _(
118 b'(see https://www.mercurial-scm.org/wiki/MissingRequirement '
133 b'(see https://www.mercurial-scm.org/wiki/MissingRequirement '
119 b'for more information)\n'
134 b'for more information)\n'
120 )
135 )
121 )
136 )
122 return False, None
137 return False, None
123 requirements = streamreqs
138 requirements = streamreqs
124
139
125 return True, requirements
140 return True, requirements
126
141
127
142
128 def maybeperformlegacystreamclone(pullop):
143 def maybeperformlegacystreamclone(pullop):
129 """Possibly perform a legacy stream clone operation.
144 """Possibly perform a legacy stream clone operation.
130
145
131 Legacy stream clones are performed as part of pull but before all other
146 Legacy stream clones are performed as part of pull but before all other
132 operations.
147 operations.
133
148
134 A legacy stream clone will not be performed if a bundle2 stream clone is
149 A legacy stream clone will not be performed if a bundle2 stream clone is
135 supported.
150 supported.
136 """
151 """
137 from . import localrepo
152 from . import localrepo
138
153
139 supported, requirements = canperformstreamclone(pullop)
154 supported, requirements = canperformstreamclone(pullop)
140
155
141 if not supported:
156 if not supported:
142 return
157 return
143
158
144 repo = pullop.repo
159 repo = pullop.repo
145 remote = pullop.remote
160 remote = pullop.remote
146
161
147 # Save remote branchmap. We will use it later to speed up branchcache
162 # Save remote branchmap. We will use it later to speed up branchcache
148 # creation.
163 # creation.
149 rbranchmap = None
164 rbranchmap = None
150 if remote.capable(b'branchmap'):
165 if remote.capable(b'branchmap'):
151 with remote.commandexecutor() as e:
166 with remote.commandexecutor() as e:
152 rbranchmap = e.callcommand(b'branchmap', {}).result()
167 rbranchmap = e.callcommand(b'branchmap', {}).result()
153
168
154 repo.ui.status(_(b'streaming all changes\n'))
169 repo.ui.status(_(b'streaming all changes\n'))
155
170
156 with remote.commandexecutor() as e:
171 with remote.commandexecutor() as e:
157 fp = e.callcommand(b'stream_out', {}).result()
172 fp = e.callcommand(b'stream_out', {}).result()
158
173
159 # TODO strictly speaking, this code should all be inside the context
174 # TODO strictly speaking, this code should all be inside the context
160 # manager because the context manager is supposed to ensure all wire state
175 # manager because the context manager is supposed to ensure all wire state
161 # is flushed when exiting. But the legacy peers don't do this, so it
176 # is flushed when exiting. But the legacy peers don't do this, so it
162 # doesn't matter.
177 # doesn't matter.
163 l = fp.readline()
178 l = fp.readline()
164 try:
179 try:
165 resp = int(l)
180 resp = int(l)
166 except ValueError:
181 except ValueError:
167 raise error.ResponseError(
182 raise error.ResponseError(
168 _(b'unexpected response from remote server:'), l
183 _(b'unexpected response from remote server:'), l
169 )
184 )
170 if resp == 1:
185 if resp == 1:
171 raise error.Abort(_(b'operation forbidden by server'))
186 raise error.Abort(_(b'operation forbidden by server'))
172 elif resp == 2:
187 elif resp == 2:
173 raise error.Abort(_(b'locking the remote repository failed'))
188 raise error.Abort(_(b'locking the remote repository failed'))
174 elif resp != 0:
189 elif resp != 0:
175 raise error.Abort(_(b'the server sent an unknown error code'))
190 raise error.Abort(_(b'the server sent an unknown error code'))
176
191
177 l = fp.readline()
192 l = fp.readline()
178 try:
193 try:
179 filecount, bytecount = map(int, l.split(b' ', 1))
194 filecount, bytecount = map(int, l.split(b' ', 1))
180 except (ValueError, TypeError):
195 except (ValueError, TypeError):
181 raise error.ResponseError(
196 raise error.ResponseError(
182 _(b'unexpected response from remote server:'), l
197 _(b'unexpected response from remote server:'), l
183 )
198 )
184
199
185 with repo.lock():
200 with repo.lock():
186 consumev1(repo, fp, filecount, bytecount)
201 consumev1(repo, fp, filecount, bytecount)
187
202 repo.requirements = new_stream_clone_requirements(
188 # new requirements = old non-format requirements +
203 repo.supportedformats,
189 # new format-related remote requirements
204 repo.requirements,
190 # requirements from the streamed-in repository
205 requirements,
191 repo.requirements = requirements | (
192 repo.requirements - repo.supportedformats
193 )
206 )
194 repo.svfs.options = localrepo.resolvestorevfsoptions(
207 repo.svfs.options = localrepo.resolvestorevfsoptions(
195 repo.ui, repo.requirements, repo.features
208 repo.ui, repo.requirements, repo.features
196 )
209 )
197 scmutil.writereporequirements(repo)
210 scmutil.writereporequirements(repo)
198
211
199 if rbranchmap:
212 if rbranchmap:
200 repo._branchcaches.replace(repo, rbranchmap)
213 repo._branchcaches.replace(repo, rbranchmap)
201
214
202 repo.invalidate()
215 repo.invalidate()
203
216
204
217
205 def allowservergeneration(repo):
218 def allowservergeneration(repo):
206 """Whether streaming clones are allowed from the server."""
219 """Whether streaming clones are allowed from the server."""
207 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
220 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
208 return False
221 return False
209
222
210 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
223 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
211 return False
224 return False
212
225
213 # The way stream clone works makes it impossible to hide secret changesets.
226 # The way stream clone works makes it impossible to hide secret changesets.
214 # So don't allow this by default.
227 # So don't allow this by default.
215 secret = phases.hassecret(repo)
228 secret = phases.hassecret(repo)
216 if secret:
229 if secret:
217 return repo.ui.configbool(b'server', b'uncompressedallowsecret')
230 return repo.ui.configbool(b'server', b'uncompressedallowsecret')
218
231
219 return True
232 return True
220
233
221
234
222 # This is it's own function so extensions can override it.
235 # This is it's own function so extensions can override it.
223 def _walkstreamfiles(repo, matcher=None):
236 def _walkstreamfiles(repo, matcher=None):
224 return repo.store.walk(matcher)
237 return repo.store.walk(matcher)
225
238
226
239
227 def generatev1(repo):
240 def generatev1(repo):
228 """Emit content for version 1 of a streaming clone.
241 """Emit content for version 1 of a streaming clone.
229
242
230 This returns a 3-tuple of (file count, byte size, data iterator).
243 This returns a 3-tuple of (file count, byte size, data iterator).
231
244
232 The data iterator consists of N entries for each file being transferred.
245 The data iterator consists of N entries for each file being transferred.
233 Each file entry starts as a line with the file name and integer size
246 Each file entry starts as a line with the file name and integer size
234 delimited by a null byte.
247 delimited by a null byte.
235
248
236 The raw file data follows. Following the raw file data is the next file
249 The raw file data follows. Following the raw file data is the next file
237 entry, or EOF.
250 entry, or EOF.
238
251
239 When used on the wire protocol, an additional line indicating protocol
252 When used on the wire protocol, an additional line indicating protocol
240 success will be prepended to the stream. This function is not responsible
253 success will be prepended to the stream. This function is not responsible
241 for adding it.
254 for adding it.
242
255
243 This function will obtain a repository lock to ensure a consistent view of
256 This function will obtain a repository lock to ensure a consistent view of
244 the store is captured. It therefore may raise LockError.
257 the store is captured. It therefore may raise LockError.
245 """
258 """
246 entries = []
259 entries = []
247 total_bytes = 0
260 total_bytes = 0
248 # Get consistent snapshot of repo, lock during scan.
261 # Get consistent snapshot of repo, lock during scan.
249 with repo.lock():
262 with repo.lock():
250 repo.ui.debug(b'scanning\n')
263 repo.ui.debug(b'scanning\n')
251 for file_type, name, size in _walkstreamfiles(repo):
264 for file_type, name, size in _walkstreamfiles(repo):
252 if size:
265 if size:
253 entries.append((name, size))
266 entries.append((name, size))
254 total_bytes += size
267 total_bytes += size
255 _test_sync_point_walk_1(repo)
268 _test_sync_point_walk_1(repo)
256 _test_sync_point_walk_2(repo)
269 _test_sync_point_walk_2(repo)
257
270
258 repo.ui.debug(
271 repo.ui.debug(
259 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
272 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
260 )
273 )
261
274
262 svfs = repo.svfs
275 svfs = repo.svfs
263 debugflag = repo.ui.debugflag
276 debugflag = repo.ui.debugflag
264
277
265 def emitrevlogdata():
278 def emitrevlogdata():
266 for name, size in entries:
279 for name, size in entries:
267 if debugflag:
280 if debugflag:
268 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
281 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
269 # partially encode name over the wire for backwards compat
282 # partially encode name over the wire for backwards compat
270 yield b'%s\0%d\n' % (store.encodedir(name), size)
283 yield b'%s\0%d\n' % (store.encodedir(name), size)
271 # auditing at this stage is both pointless (paths are already
284 # auditing at this stage is both pointless (paths are already
272 # trusted by the local repo) and expensive
285 # trusted by the local repo) and expensive
273 with svfs(name, b'rb', auditpath=False) as fp:
286 with svfs(name, b'rb', auditpath=False) as fp:
274 if size <= 65536:
287 if size <= 65536:
275 yield fp.read(size)
288 yield fp.read(size)
276 else:
289 else:
277 for chunk in util.filechunkiter(fp, limit=size):
290 for chunk in util.filechunkiter(fp, limit=size):
278 yield chunk
291 yield chunk
279
292
280 return len(entries), total_bytes, emitrevlogdata()
293 return len(entries), total_bytes, emitrevlogdata()
281
294
282
295
283 def generatev1wireproto(repo):
296 def generatev1wireproto(repo):
284 """Emit content for version 1 of streaming clone suitable for the wire.
297 """Emit content for version 1 of streaming clone suitable for the wire.
285
298
286 This is the data output from ``generatev1()`` with 2 header lines. The
299 This is the data output from ``generatev1()`` with 2 header lines. The
287 first line indicates overall success. The 2nd contains the file count and
300 first line indicates overall success. The 2nd contains the file count and
288 byte size of payload.
301 byte size of payload.
289
302
290 The success line contains "0" for success, "1" for stream generation not
303 The success line contains "0" for success, "1" for stream generation not
291 allowed, and "2" for error locking the repository (possibly indicating
304 allowed, and "2" for error locking the repository (possibly indicating
292 a permissions error for the server process).
305 a permissions error for the server process).
293 """
306 """
294 if not allowservergeneration(repo):
307 if not allowservergeneration(repo):
295 yield b'1\n'
308 yield b'1\n'
296 return
309 return
297
310
298 try:
311 try:
299 filecount, bytecount, it = generatev1(repo)
312 filecount, bytecount, it = generatev1(repo)
300 except error.LockError:
313 except error.LockError:
301 yield b'2\n'
314 yield b'2\n'
302 return
315 return
303
316
304 # Indicates successful response.
317 # Indicates successful response.
305 yield b'0\n'
318 yield b'0\n'
306 yield b'%d %d\n' % (filecount, bytecount)
319 yield b'%d %d\n' % (filecount, bytecount)
307 for chunk in it:
320 for chunk in it:
308 yield chunk
321 yield chunk
309
322
310
323
311 def generatebundlev1(repo, compression=b'UN'):
324 def generatebundlev1(repo, compression=b'UN'):
312 """Emit content for version 1 of a stream clone bundle.
325 """Emit content for version 1 of a stream clone bundle.
313
326
314 The first 4 bytes of the output ("HGS1") denote this as stream clone
327 The first 4 bytes of the output ("HGS1") denote this as stream clone
315 bundle version 1.
328 bundle version 1.
316
329
317 The next 2 bytes indicate the compression type. Only "UN" is currently
330 The next 2 bytes indicate the compression type. Only "UN" is currently
318 supported.
331 supported.
319
332
320 The next 16 bytes are two 64-bit big endian unsigned integers indicating
333 The next 16 bytes are two 64-bit big endian unsigned integers indicating
321 file count and byte count, respectively.
334 file count and byte count, respectively.
322
335
323 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
336 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
324 of the requirements string, including a trailing \0. The following N bytes
337 of the requirements string, including a trailing \0. The following N bytes
325 are the requirements string, which is ASCII containing a comma-delimited
338 are the requirements string, which is ASCII containing a comma-delimited
326 list of repo requirements that are needed to support the data.
339 list of repo requirements that are needed to support the data.
327
340
328 The remaining content is the output of ``generatev1()`` (which may be
341 The remaining content is the output of ``generatev1()`` (which may be
329 compressed in the future).
342 compressed in the future).
330
343
331 Returns a tuple of (requirements, data generator).
344 Returns a tuple of (requirements, data generator).
332 """
345 """
333 if compression != b'UN':
346 if compression != b'UN':
334 raise ValueError(b'we do not support the compression argument yet')
347 raise ValueError(b'we do not support the compression argument yet')
335
348
336 requirements = repo.requirements & repo.supportedformats
349 requirements = repo.requirements & repo.supportedformats
337 requires = b','.join(sorted(requirements))
350 requires = b','.join(sorted(requirements))
338
351
339 def gen():
352 def gen():
340 yield b'HGS1'
353 yield b'HGS1'
341 yield compression
354 yield compression
342
355
343 filecount, bytecount, it = generatev1(repo)
356 filecount, bytecount, it = generatev1(repo)
344 repo.ui.status(
357 repo.ui.status(
345 _(b'writing %d bytes for %d files\n') % (bytecount, filecount)
358 _(b'writing %d bytes for %d files\n') % (bytecount, filecount)
346 )
359 )
347
360
348 yield struct.pack(b'>QQ', filecount, bytecount)
361 yield struct.pack(b'>QQ', filecount, bytecount)
349 yield struct.pack(b'>H', len(requires) + 1)
362 yield struct.pack(b'>H', len(requires) + 1)
350 yield requires + b'\0'
363 yield requires + b'\0'
351
364
352 # This is where we'll add compression in the future.
365 # This is where we'll add compression in the future.
353 assert compression == b'UN'
366 assert compression == b'UN'
354
367
355 progress = repo.ui.makeprogress(
368 progress = repo.ui.makeprogress(
356 _(b'bundle'), total=bytecount, unit=_(b'bytes')
369 _(b'bundle'), total=bytecount, unit=_(b'bytes')
357 )
370 )
358 progress.update(0)
371 progress.update(0)
359
372
360 for chunk in it:
373 for chunk in it:
361 progress.increment(step=len(chunk))
374 progress.increment(step=len(chunk))
362 yield chunk
375 yield chunk
363
376
364 progress.complete()
377 progress.complete()
365
378
366 return requirements, gen()
379 return requirements, gen()
367
380
368
381
369 def consumev1(repo, fp, filecount, bytecount):
382 def consumev1(repo, fp, filecount, bytecount):
370 """Apply the contents from version 1 of a streaming clone file handle.
383 """Apply the contents from version 1 of a streaming clone file handle.
371
384
372 This takes the output from "stream_out" and applies it to the specified
385 This takes the output from "stream_out" and applies it to the specified
373 repository.
386 repository.
374
387
375 Like "stream_out," the status line added by the wire protocol is not
388 Like "stream_out," the status line added by the wire protocol is not
376 handled by this function.
389 handled by this function.
377 """
390 """
378 with repo.lock():
391 with repo.lock():
379 repo.ui.status(
392 repo.ui.status(
380 _(b'%d files to transfer, %s of data\n')
393 _(b'%d files to transfer, %s of data\n')
381 % (filecount, util.bytecount(bytecount))
394 % (filecount, util.bytecount(bytecount))
382 )
395 )
383 progress = repo.ui.makeprogress(
396 progress = repo.ui.makeprogress(
384 _(b'clone'), total=bytecount, unit=_(b'bytes')
397 _(b'clone'), total=bytecount, unit=_(b'bytes')
385 )
398 )
386 progress.update(0)
399 progress.update(0)
387 start = util.timer()
400 start = util.timer()
388
401
389 # TODO: get rid of (potential) inconsistency
402 # TODO: get rid of (potential) inconsistency
390 #
403 #
391 # If transaction is started and any @filecache property is
404 # If transaction is started and any @filecache property is
392 # changed at this point, it causes inconsistency between
405 # changed at this point, it causes inconsistency between
393 # in-memory cached property and streamclone-ed file on the
406 # in-memory cached property and streamclone-ed file on the
394 # disk. Nested transaction prevents transaction scope "clone"
407 # disk. Nested transaction prevents transaction scope "clone"
395 # below from writing in-memory changes out at the end of it,
408 # below from writing in-memory changes out at the end of it,
396 # even though in-memory changes are discarded at the end of it
409 # even though in-memory changes are discarded at the end of it
397 # regardless of transaction nesting.
410 # regardless of transaction nesting.
398 #
411 #
399 # But transaction nesting can't be simply prohibited, because
412 # But transaction nesting can't be simply prohibited, because
400 # nesting occurs also in ordinary case (e.g. enabling
413 # nesting occurs also in ordinary case (e.g. enabling
401 # clonebundles).
414 # clonebundles).
402
415
403 with repo.transaction(b'clone'):
416 with repo.transaction(b'clone'):
404 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
417 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
405 for i in pycompat.xrange(filecount):
418 for i in pycompat.xrange(filecount):
406 # XXX doesn't support '\n' or '\r' in filenames
419 # XXX doesn't support '\n' or '\r' in filenames
407 l = fp.readline()
420 l = fp.readline()
408 try:
421 try:
409 name, size = l.split(b'\0', 1)
422 name, size = l.split(b'\0', 1)
410 size = int(size)
423 size = int(size)
411 except (ValueError, TypeError):
424 except (ValueError, TypeError):
412 raise error.ResponseError(
425 raise error.ResponseError(
413 _(b'unexpected response from remote server:'), l
426 _(b'unexpected response from remote server:'), l
414 )
427 )
415 if repo.ui.debugflag:
428 if repo.ui.debugflag:
416 repo.ui.debug(
429 repo.ui.debug(
417 b'adding %s (%s)\n' % (name, util.bytecount(size))
430 b'adding %s (%s)\n' % (name, util.bytecount(size))
418 )
431 )
419 # for backwards compat, name was partially encoded
432 # for backwards compat, name was partially encoded
420 path = store.decodedir(name)
433 path = store.decodedir(name)
421 with repo.svfs(path, b'w', backgroundclose=True) as ofp:
434 with repo.svfs(path, b'w', backgroundclose=True) as ofp:
422 for chunk in util.filechunkiter(fp, limit=size):
435 for chunk in util.filechunkiter(fp, limit=size):
423 progress.increment(step=len(chunk))
436 progress.increment(step=len(chunk))
424 ofp.write(chunk)
437 ofp.write(chunk)
425
438
426 # force @filecache properties to be reloaded from
439 # force @filecache properties to be reloaded from
427 # streamclone-ed file at next access
440 # streamclone-ed file at next access
428 repo.invalidate(clearfilecache=True)
441 repo.invalidate(clearfilecache=True)
429
442
430 elapsed = util.timer() - start
443 elapsed = util.timer() - start
431 if elapsed <= 0:
444 if elapsed <= 0:
432 elapsed = 0.001
445 elapsed = 0.001
433 progress.complete()
446 progress.complete()
434 repo.ui.status(
447 repo.ui.status(
435 _(b'transferred %s in %.1f seconds (%s/sec)\n')
448 _(b'transferred %s in %.1f seconds (%s/sec)\n')
436 % (
449 % (
437 util.bytecount(bytecount),
450 util.bytecount(bytecount),
438 elapsed,
451 elapsed,
439 util.bytecount(bytecount / elapsed),
452 util.bytecount(bytecount / elapsed),
440 )
453 )
441 )
454 )
442
455
443
456
444 def readbundle1header(fp):
457 def readbundle1header(fp):
445 compression = fp.read(2)
458 compression = fp.read(2)
446 if compression != b'UN':
459 if compression != b'UN':
447 raise error.Abort(
460 raise error.Abort(
448 _(
461 _(
449 b'only uncompressed stream clone bundles are '
462 b'only uncompressed stream clone bundles are '
450 b'supported; got %s'
463 b'supported; got %s'
451 )
464 )
452 % compression
465 % compression
453 )
466 )
454
467
455 filecount, bytecount = struct.unpack(b'>QQ', fp.read(16))
468 filecount, bytecount = struct.unpack(b'>QQ', fp.read(16))
456 requireslen = struct.unpack(b'>H', fp.read(2))[0]
469 requireslen = struct.unpack(b'>H', fp.read(2))[0]
457 requires = fp.read(requireslen)
470 requires = fp.read(requireslen)
458
471
459 if not requires.endswith(b'\0'):
472 if not requires.endswith(b'\0'):
460 raise error.Abort(
473 raise error.Abort(
461 _(
474 _(
462 b'malformed stream clone bundle: '
475 b'malformed stream clone bundle: '
463 b'requirements not properly encoded'
476 b'requirements not properly encoded'
464 )
477 )
465 )
478 )
466
479
467 requirements = set(requires.rstrip(b'\0').split(b','))
480 requirements = set(requires.rstrip(b'\0').split(b','))
468
481
469 return filecount, bytecount, requirements
482 return filecount, bytecount, requirements
470
483
471
484
472 def applybundlev1(repo, fp):
485 def applybundlev1(repo, fp):
473 """Apply the content from a stream clone bundle version 1.
486 """Apply the content from a stream clone bundle version 1.
474
487
475 We assume the 4 byte header has been read and validated and the file handle
488 We assume the 4 byte header has been read and validated and the file handle
476 is at the 2 byte compression identifier.
489 is at the 2 byte compression identifier.
477 """
490 """
478 if len(repo):
491 if len(repo):
479 raise error.Abort(
492 raise error.Abort(
480 _(b'cannot apply stream clone bundle on non-empty repo')
493 _(b'cannot apply stream clone bundle on non-empty repo')
481 )
494 )
482
495
483 filecount, bytecount, requirements = readbundle1header(fp)
496 filecount, bytecount, requirements = readbundle1header(fp)
484 missingreqs = requirements - repo.supportedformats
497 missingreqs = requirements - repo.supportedformats
485 if missingreqs:
498 if missingreqs:
486 raise error.Abort(
499 raise error.Abort(
487 _(b'unable to apply stream clone: unsupported format: %s')
500 _(b'unable to apply stream clone: unsupported format: %s')
488 % b', '.join(sorted(missingreqs))
501 % b', '.join(sorted(missingreqs))
489 )
502 )
490
503
491 consumev1(repo, fp, filecount, bytecount)
504 consumev1(repo, fp, filecount, bytecount)
492
505
493
506
494 class streamcloneapplier(object):
507 class streamcloneapplier(object):
495 """Class to manage applying streaming clone bundles.
508 """Class to manage applying streaming clone bundles.
496
509
497 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
510 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
498 readers to perform bundle type-specific functionality.
511 readers to perform bundle type-specific functionality.
499 """
512 """
500
513
501 def __init__(self, fh):
514 def __init__(self, fh):
502 self._fh = fh
515 self._fh = fh
503
516
504 def apply(self, repo):
517 def apply(self, repo):
505 return applybundlev1(repo, self._fh)
518 return applybundlev1(repo, self._fh)
506
519
507
520
508 # type of file to stream
521 # type of file to stream
509 _fileappend = 0 # append only file
522 _fileappend = 0 # append only file
510 _filefull = 1 # full snapshot file
523 _filefull = 1 # full snapshot file
511
524
512 # Source of the file
525 # Source of the file
513 _srcstore = b's' # store (svfs)
526 _srcstore = b's' # store (svfs)
514 _srccache = b'c' # cache (cache)
527 _srccache = b'c' # cache (cache)
515
528
516 # This is it's own function so extensions can override it.
529 # This is it's own function so extensions can override it.
517 def _walkstreamfullstorefiles(repo):
530 def _walkstreamfullstorefiles(repo):
518 """list snapshot file from the store"""
531 """list snapshot file from the store"""
519 fnames = []
532 fnames = []
520 if not repo.publishing():
533 if not repo.publishing():
521 fnames.append(b'phaseroots')
534 fnames.append(b'phaseroots')
522 return fnames
535 return fnames
523
536
524
537
525 def _filterfull(entry, copy, vfsmap):
538 def _filterfull(entry, copy, vfsmap):
526 """actually copy the snapshot files"""
539 """actually copy the snapshot files"""
527 src, name, ftype, data = entry
540 src, name, ftype, data = entry
528 if ftype != _filefull:
541 if ftype != _filefull:
529 return entry
542 return entry
530 return (src, name, ftype, copy(vfsmap[src].join(name)))
543 return (src, name, ftype, copy(vfsmap[src].join(name)))
531
544
532
545
533 @contextlib.contextmanager
546 @contextlib.contextmanager
534 def maketempcopies():
547 def maketempcopies():
535 """return a function to temporary copy file"""
548 """return a function to temporary copy file"""
536 files = []
549 files = []
537 try:
550 try:
538
551
539 def copy(src):
552 def copy(src):
540 fd, dst = pycompat.mkstemp()
553 fd, dst = pycompat.mkstemp()
541 os.close(fd)
554 os.close(fd)
542 files.append(dst)
555 files.append(dst)
543 util.copyfiles(src, dst, hardlink=True)
556 util.copyfiles(src, dst, hardlink=True)
544 return dst
557 return dst
545
558
546 yield copy
559 yield copy
547 finally:
560 finally:
548 for tmp in files:
561 for tmp in files:
549 util.tryunlink(tmp)
562 util.tryunlink(tmp)
550
563
551
564
552 def _makemap(repo):
565 def _makemap(repo):
553 """make a (src -> vfs) map for the repo"""
566 """make a (src -> vfs) map for the repo"""
554 vfsmap = {
567 vfsmap = {
555 _srcstore: repo.svfs,
568 _srcstore: repo.svfs,
556 _srccache: repo.cachevfs,
569 _srccache: repo.cachevfs,
557 }
570 }
558 # we keep repo.vfs out of the on purpose, ther are too many danger there
571 # we keep repo.vfs out of the on purpose, ther are too many danger there
559 # (eg: .hg/hgrc)
572 # (eg: .hg/hgrc)
560 assert repo.vfs not in vfsmap.values()
573 assert repo.vfs not in vfsmap.values()
561
574
562 return vfsmap
575 return vfsmap
563
576
564
577
565 def _emit2(repo, entries, totalfilesize):
578 def _emit2(repo, entries, totalfilesize):
566 """actually emit the stream bundle"""
579 """actually emit the stream bundle"""
567 vfsmap = _makemap(repo)
580 vfsmap = _makemap(repo)
568 # we keep repo.vfs out of the on purpose, ther are too many danger there
581 # we keep repo.vfs out of the on purpose, ther are too many danger there
569 # (eg: .hg/hgrc),
582 # (eg: .hg/hgrc),
570 #
583 #
571 # this assert is duplicated (from _makemap) as author might think this is
584 # this assert is duplicated (from _makemap) as author might think this is
572 # fine, while this is really not fine.
585 # fine, while this is really not fine.
573 if repo.vfs in vfsmap.values():
586 if repo.vfs in vfsmap.values():
574 raise error.ProgrammingError(
587 raise error.ProgrammingError(
575 b'repo.vfs must not be added to vfsmap for security reasons'
588 b'repo.vfs must not be added to vfsmap for security reasons'
576 )
589 )
577
590
578 progress = repo.ui.makeprogress(
591 progress = repo.ui.makeprogress(
579 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
592 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
580 )
593 )
581 progress.update(0)
594 progress.update(0)
582 with maketempcopies() as copy, progress:
595 with maketempcopies() as copy, progress:
583 # copy is delayed until we are in the try
596 # copy is delayed until we are in the try
584 entries = [_filterfull(e, copy, vfsmap) for e in entries]
597 entries = [_filterfull(e, copy, vfsmap) for e in entries]
585 yield None # this release the lock on the repository
598 yield None # this release the lock on the repository
586 totalbytecount = 0
599 totalbytecount = 0
587
600
588 for src, name, ftype, data in entries:
601 for src, name, ftype, data in entries:
589 vfs = vfsmap[src]
602 vfs = vfsmap[src]
590 yield src
603 yield src
591 yield util.uvarintencode(len(name))
604 yield util.uvarintencode(len(name))
592 if ftype == _fileappend:
605 if ftype == _fileappend:
593 fp = vfs(name)
606 fp = vfs(name)
594 size = data
607 size = data
595 elif ftype == _filefull:
608 elif ftype == _filefull:
596 fp = open(data, b'rb')
609 fp = open(data, b'rb')
597 size = util.fstat(fp).st_size
610 size = util.fstat(fp).st_size
598 bytecount = 0
611 bytecount = 0
599 try:
612 try:
600 yield util.uvarintencode(size)
613 yield util.uvarintencode(size)
601 yield name
614 yield name
602 if size <= 65536:
615 if size <= 65536:
603 chunks = (fp.read(size),)
616 chunks = (fp.read(size),)
604 else:
617 else:
605 chunks = util.filechunkiter(fp, limit=size)
618 chunks = util.filechunkiter(fp, limit=size)
606 for chunk in chunks:
619 for chunk in chunks:
607 bytecount += len(chunk)
620 bytecount += len(chunk)
608 totalbytecount += len(chunk)
621 totalbytecount += len(chunk)
609 progress.update(totalbytecount)
622 progress.update(totalbytecount)
610 yield chunk
623 yield chunk
611 if bytecount != size:
624 if bytecount != size:
612 # Would most likely be caused by a race due to `hg strip` or
625 # Would most likely be caused by a race due to `hg strip` or
613 # a revlog split
626 # a revlog split
614 raise error.Abort(
627 raise error.Abort(
615 _(
628 _(
616 b'clone could only read %d bytes from %s, but '
629 b'clone could only read %d bytes from %s, but '
617 b'expected %d bytes'
630 b'expected %d bytes'
618 )
631 )
619 % (bytecount, name, size)
632 % (bytecount, name, size)
620 )
633 )
621 finally:
634 finally:
622 fp.close()
635 fp.close()
623
636
624
637
625 def _test_sync_point_walk_1(repo):
638 def _test_sync_point_walk_1(repo):
626 """a function for synchronisation during tests"""
639 """a function for synchronisation during tests"""
627
640
628
641
629 def _test_sync_point_walk_2(repo):
642 def _test_sync_point_walk_2(repo):
630 """a function for synchronisation during tests"""
643 """a function for synchronisation during tests"""
631
644
632
645
633 def _v2_walk(repo, includes, excludes, includeobsmarkers):
646 def _v2_walk(repo, includes, excludes, includeobsmarkers):
634 """emit a seris of files information useful to clone a repo
647 """emit a seris of files information useful to clone a repo
635
648
636 return (entries, totalfilesize)
649 return (entries, totalfilesize)
637
650
638 entries is a list of tuple (vfs-key, file-path, file-type, size)
651 entries is a list of tuple (vfs-key, file-path, file-type, size)
639
652
640 - `vfs-key`: is a key to the right vfs to write the file (see _makemap)
653 - `vfs-key`: is a key to the right vfs to write the file (see _makemap)
641 - `name`: file path of the file to copy (to be feed to the vfss)
654 - `name`: file path of the file to copy (to be feed to the vfss)
642 - `file-type`: do this file need to be copied with the source lock ?
655 - `file-type`: do this file need to be copied with the source lock ?
643 - `size`: the size of the file (or None)
656 - `size`: the size of the file (or None)
644 """
657 """
645 assert repo._currentlock(repo._lockref) is not None
658 assert repo._currentlock(repo._lockref) is not None
646 entries = []
659 entries = []
647 totalfilesize = 0
660 totalfilesize = 0
648
661
649 matcher = None
662 matcher = None
650 if includes or excludes:
663 if includes or excludes:
651 matcher = narrowspec.match(repo.root, includes, excludes)
664 matcher = narrowspec.match(repo.root, includes, excludes)
652
665
653 for rl_type, name, size in _walkstreamfiles(repo, matcher):
666 for rl_type, name, size in _walkstreamfiles(repo, matcher):
654 if size:
667 if size:
655 ft = _fileappend
668 ft = _fileappend
656 if rl_type & store.FILEFLAGS_VOLATILE:
669 if rl_type & store.FILEFLAGS_VOLATILE:
657 ft = _filefull
670 ft = _filefull
658 entries.append((_srcstore, name, ft, size))
671 entries.append((_srcstore, name, ft, size))
659 totalfilesize += size
672 totalfilesize += size
660 for name in _walkstreamfullstorefiles(repo):
673 for name in _walkstreamfullstorefiles(repo):
661 if repo.svfs.exists(name):
674 if repo.svfs.exists(name):
662 totalfilesize += repo.svfs.lstat(name).st_size
675 totalfilesize += repo.svfs.lstat(name).st_size
663 entries.append((_srcstore, name, _filefull, None))
676 entries.append((_srcstore, name, _filefull, None))
664 if includeobsmarkers and repo.svfs.exists(b'obsstore'):
677 if includeobsmarkers and repo.svfs.exists(b'obsstore'):
665 totalfilesize += repo.svfs.lstat(b'obsstore').st_size
678 totalfilesize += repo.svfs.lstat(b'obsstore').st_size
666 entries.append((_srcstore, b'obsstore', _filefull, None))
679 entries.append((_srcstore, b'obsstore', _filefull, None))
667 for name in cacheutil.cachetocopy(repo):
680 for name in cacheutil.cachetocopy(repo):
668 if repo.cachevfs.exists(name):
681 if repo.cachevfs.exists(name):
669 totalfilesize += repo.cachevfs.lstat(name).st_size
682 totalfilesize += repo.cachevfs.lstat(name).st_size
670 entries.append((_srccache, name, _filefull, None))
683 entries.append((_srccache, name, _filefull, None))
671 return entries, totalfilesize
684 return entries, totalfilesize
672
685
673
686
674 def generatev2(repo, includes, excludes, includeobsmarkers):
687 def generatev2(repo, includes, excludes, includeobsmarkers):
675 """Emit content for version 2 of a streaming clone.
688 """Emit content for version 2 of a streaming clone.
676
689
677 the data stream consists the following entries:
690 the data stream consists the following entries:
678 1) A char representing the file destination (eg: store or cache)
691 1) A char representing the file destination (eg: store or cache)
679 2) A varint containing the length of the filename
692 2) A varint containing the length of the filename
680 3) A varint containing the length of file data
693 3) A varint containing the length of file data
681 4) N bytes containing the filename (the internal, store-agnostic form)
694 4) N bytes containing the filename (the internal, store-agnostic form)
682 5) N bytes containing the file data
695 5) N bytes containing the file data
683
696
684 Returns a 3-tuple of (file count, file size, data iterator).
697 Returns a 3-tuple of (file count, file size, data iterator).
685 """
698 """
686
699
687 with repo.lock():
700 with repo.lock():
688
701
689 repo.ui.debug(b'scanning\n')
702 repo.ui.debug(b'scanning\n')
690
703
691 entries, totalfilesize = _v2_walk(
704 entries, totalfilesize = _v2_walk(
692 repo,
705 repo,
693 includes=includes,
706 includes=includes,
694 excludes=excludes,
707 excludes=excludes,
695 includeobsmarkers=includeobsmarkers,
708 includeobsmarkers=includeobsmarkers,
696 )
709 )
697
710
698 chunks = _emit2(repo, entries, totalfilesize)
711 chunks = _emit2(repo, entries, totalfilesize)
699 first = next(chunks)
712 first = next(chunks)
700 assert first is None
713 assert first is None
701 _test_sync_point_walk_1(repo)
714 _test_sync_point_walk_1(repo)
702 _test_sync_point_walk_2(repo)
715 _test_sync_point_walk_2(repo)
703
716
704 return len(entries), totalfilesize, chunks
717 return len(entries), totalfilesize, chunks
705
718
706
719
707 @contextlib.contextmanager
720 @contextlib.contextmanager
708 def nested(*ctxs):
721 def nested(*ctxs):
709 this = ctxs[0]
722 this = ctxs[0]
710 rest = ctxs[1:]
723 rest = ctxs[1:]
711 with this:
724 with this:
712 if rest:
725 if rest:
713 with nested(*rest):
726 with nested(*rest):
714 yield
727 yield
715 else:
728 else:
716 yield
729 yield
717
730
718
731
719 def consumev2(repo, fp, filecount, filesize):
732 def consumev2(repo, fp, filecount, filesize):
720 """Apply the contents from a version 2 streaming clone.
733 """Apply the contents from a version 2 streaming clone.
721
734
722 Data is read from an object that only needs to provide a ``read(size)``
735 Data is read from an object that only needs to provide a ``read(size)``
723 method.
736 method.
724 """
737 """
725 with repo.lock():
738 with repo.lock():
726 repo.ui.status(
739 repo.ui.status(
727 _(b'%d files to transfer, %s of data\n')
740 _(b'%d files to transfer, %s of data\n')
728 % (filecount, util.bytecount(filesize))
741 % (filecount, util.bytecount(filesize))
729 )
742 )
730
743
731 start = util.timer()
744 start = util.timer()
732 progress = repo.ui.makeprogress(
745 progress = repo.ui.makeprogress(
733 _(b'clone'), total=filesize, unit=_(b'bytes')
746 _(b'clone'), total=filesize, unit=_(b'bytes')
734 )
747 )
735 progress.update(0)
748 progress.update(0)
736
749
737 vfsmap = _makemap(repo)
750 vfsmap = _makemap(repo)
738 # we keep repo.vfs out of the on purpose, ther are too many danger
751 # we keep repo.vfs out of the on purpose, ther are too many danger
739 # there (eg: .hg/hgrc),
752 # there (eg: .hg/hgrc),
740 #
753 #
741 # this assert is duplicated (from _makemap) as author might think this
754 # this assert is duplicated (from _makemap) as author might think this
742 # is fine, while this is really not fine.
755 # is fine, while this is really not fine.
743 if repo.vfs in vfsmap.values():
756 if repo.vfs in vfsmap.values():
744 raise error.ProgrammingError(
757 raise error.ProgrammingError(
745 b'repo.vfs must not be added to vfsmap for security reasons'
758 b'repo.vfs must not be added to vfsmap for security reasons'
746 )
759 )
747
760
748 with repo.transaction(b'clone'):
761 with repo.transaction(b'clone'):
749 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
762 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
750 with nested(*ctxs):
763 with nested(*ctxs):
751 for i in range(filecount):
764 for i in range(filecount):
752 src = util.readexactly(fp, 1)
765 src = util.readexactly(fp, 1)
753 vfs = vfsmap[src]
766 vfs = vfsmap[src]
754 namelen = util.uvarintdecodestream(fp)
767 namelen = util.uvarintdecodestream(fp)
755 datalen = util.uvarintdecodestream(fp)
768 datalen = util.uvarintdecodestream(fp)
756
769
757 name = util.readexactly(fp, namelen)
770 name = util.readexactly(fp, namelen)
758
771
759 if repo.ui.debugflag:
772 if repo.ui.debugflag:
760 repo.ui.debug(
773 repo.ui.debug(
761 b'adding [%s] %s (%s)\n'
774 b'adding [%s] %s (%s)\n'
762 % (src, name, util.bytecount(datalen))
775 % (src, name, util.bytecount(datalen))
763 )
776 )
764
777
765 with vfs(name, b'w') as ofp:
778 with vfs(name, b'w') as ofp:
766 for chunk in util.filechunkiter(fp, limit=datalen):
779 for chunk in util.filechunkiter(fp, limit=datalen):
767 progress.increment(step=len(chunk))
780 progress.increment(step=len(chunk))
768 ofp.write(chunk)
781 ofp.write(chunk)
769
782
770 # force @filecache properties to be reloaded from
783 # force @filecache properties to be reloaded from
771 # streamclone-ed file at next access
784 # streamclone-ed file at next access
772 repo.invalidate(clearfilecache=True)
785 repo.invalidate(clearfilecache=True)
773
786
774 elapsed = util.timer() - start
787 elapsed = util.timer() - start
775 if elapsed <= 0:
788 if elapsed <= 0:
776 elapsed = 0.001
789 elapsed = 0.001
777 repo.ui.status(
790 repo.ui.status(
778 _(b'transferred %s in %.1f seconds (%s/sec)\n')
791 _(b'transferred %s in %.1f seconds (%s/sec)\n')
779 % (
792 % (
780 util.bytecount(progress.pos),
793 util.bytecount(progress.pos),
781 elapsed,
794 elapsed,
782 util.bytecount(progress.pos / elapsed),
795 util.bytecount(progress.pos / elapsed),
783 )
796 )
784 )
797 )
785 progress.complete()
798 progress.complete()
786
799
787
800
788 def applybundlev2(repo, fp, filecount, filesize, requirements):
801 def applybundlev2(repo, fp, filecount, filesize, requirements):
789 from . import localrepo
802 from . import localrepo
790
803
791 missingreqs = [r for r in requirements if r not in repo.supported]
804 missingreqs = [r for r in requirements if r not in repo.supported]
792 if missingreqs:
805 if missingreqs:
793 raise error.Abort(
806 raise error.Abort(
794 _(b'unable to apply stream clone: unsupported format: %s')
807 _(b'unable to apply stream clone: unsupported format: %s')
795 % b', '.join(sorted(missingreqs))
808 % b', '.join(sorted(missingreqs))
796 )
809 )
797
810
798 consumev2(repo, fp, filecount, filesize)
811 consumev2(repo, fp, filecount, filesize)
799
812
800 # new requirements = old non-format requirements +
813 repo.requirements = new_stream_clone_requirements(
801 # new format-related remote requirements
814 repo.supportedformats,
802 # requirements from the streamed-in repository
815 repo.requirements,
803 repo.requirements = set(requirements) | (
816 requirements,
804 repo.requirements - repo.supportedformats
805 )
817 )
806 repo.svfs.options = localrepo.resolvestorevfsoptions(
818 repo.svfs.options = localrepo.resolvestorevfsoptions(
807 repo.ui, repo.requirements, repo.features
819 repo.ui, repo.requirements, repo.features
808 )
820 )
809 scmutil.writereporequirements(repo)
821 scmutil.writereporequirements(repo)
810
822
811
823
812 def _copy_files(src_vfs_map, dst_vfs_map, entries, progress):
824 def _copy_files(src_vfs_map, dst_vfs_map, entries, progress):
813 hardlink = [True]
825 hardlink = [True]
814
826
815 def copy_used():
827 def copy_used():
816 hardlink[0] = False
828 hardlink[0] = False
817 progress.topic = _(b'copying')
829 progress.topic = _(b'copying')
818
830
819 for k, path, size in entries:
831 for k, path, size in entries:
820 src_vfs = src_vfs_map[k]
832 src_vfs = src_vfs_map[k]
821 dst_vfs = dst_vfs_map[k]
833 dst_vfs = dst_vfs_map[k]
822 src_path = src_vfs.join(path)
834 src_path = src_vfs.join(path)
823 dst_path = dst_vfs.join(path)
835 dst_path = dst_vfs.join(path)
824 # We cannot use dirname and makedirs of dst_vfs here because the store
836 # We cannot use dirname and makedirs of dst_vfs here because the store
825 # encoding confuses them. See issue 6581 for details.
837 # encoding confuses them. See issue 6581 for details.
826 dirname = os.path.dirname(dst_path)
838 dirname = os.path.dirname(dst_path)
827 if not os.path.exists(dirname):
839 if not os.path.exists(dirname):
828 util.makedirs(dirname)
840 util.makedirs(dirname)
829 dst_vfs.register_file(path)
841 dst_vfs.register_file(path)
830 # XXX we could use the #nb_bytes argument.
842 # XXX we could use the #nb_bytes argument.
831 util.copyfile(
843 util.copyfile(
832 src_path,
844 src_path,
833 dst_path,
845 dst_path,
834 hardlink=hardlink[0],
846 hardlink=hardlink[0],
835 no_hardlink_cb=copy_used,
847 no_hardlink_cb=copy_used,
836 check_fs_hardlink=False,
848 check_fs_hardlink=False,
837 )
849 )
838 progress.increment()
850 progress.increment()
839 return hardlink[0]
851 return hardlink[0]
840
852
841
853
842 def local_copy(src_repo, dest_repo):
854 def local_copy(src_repo, dest_repo):
843 """copy all content from one local repository to another
855 """copy all content from one local repository to another
844
856
845 This is useful for local clone"""
857 This is useful for local clone"""
846 src_store_requirements = {
858 src_store_requirements = {
847 r
859 r
848 for r in src_repo.requirements
860 for r in src_repo.requirements
849 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
861 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
850 }
862 }
851 dest_store_requirements = {
863 dest_store_requirements = {
852 r
864 r
853 for r in dest_repo.requirements
865 for r in dest_repo.requirements
854 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
866 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
855 }
867 }
856 assert src_store_requirements == dest_store_requirements
868 assert src_store_requirements == dest_store_requirements
857
869
858 with dest_repo.lock():
870 with dest_repo.lock():
859 with src_repo.lock():
871 with src_repo.lock():
860
872
861 # bookmark is not integrated to the streaming as it might use the
873 # bookmark is not integrated to the streaming as it might use the
862 # `repo.vfs` and they are too many sentitive data accessible
874 # `repo.vfs` and they are too many sentitive data accessible
863 # through `repo.vfs` to expose it to streaming clone.
875 # through `repo.vfs` to expose it to streaming clone.
864 src_book_vfs = bookmarks.bookmarksvfs(src_repo)
876 src_book_vfs = bookmarks.bookmarksvfs(src_repo)
865 srcbookmarks = src_book_vfs.join(b'bookmarks')
877 srcbookmarks = src_book_vfs.join(b'bookmarks')
866 bm_count = 0
878 bm_count = 0
867 if os.path.exists(srcbookmarks):
879 if os.path.exists(srcbookmarks):
868 bm_count = 1
880 bm_count = 1
869
881
870 entries, totalfilesize = _v2_walk(
882 entries, totalfilesize = _v2_walk(
871 src_repo,
883 src_repo,
872 includes=None,
884 includes=None,
873 excludes=None,
885 excludes=None,
874 includeobsmarkers=True,
886 includeobsmarkers=True,
875 )
887 )
876 src_vfs_map = _makemap(src_repo)
888 src_vfs_map = _makemap(src_repo)
877 dest_vfs_map = _makemap(dest_repo)
889 dest_vfs_map = _makemap(dest_repo)
878 progress = src_repo.ui.makeprogress(
890 progress = src_repo.ui.makeprogress(
879 topic=_(b'linking'),
891 topic=_(b'linking'),
880 total=len(entries) + bm_count,
892 total=len(entries) + bm_count,
881 unit=_(b'files'),
893 unit=_(b'files'),
882 )
894 )
883 # copy files
895 # copy files
884 #
896 #
885 # We could copy the full file while the source repository is locked
897 # We could copy the full file while the source repository is locked
886 # and the other one without the lock. However, in the linking case,
898 # and the other one without the lock. However, in the linking case,
887 # this would also requires checks that nobody is appending any data
899 # this would also requires checks that nobody is appending any data
888 # to the files while we do the clone, so this is not done yet. We
900 # to the files while we do the clone, so this is not done yet. We
889 # could do this blindly when copying files.
901 # could do this blindly when copying files.
890 files = ((k, path, size) for k, path, ftype, size in entries)
902 files = ((k, path, size) for k, path, ftype, size in entries)
891 hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress)
903 hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress)
892
904
893 # copy bookmarks over
905 # copy bookmarks over
894 if bm_count:
906 if bm_count:
895 dst_book_vfs = bookmarks.bookmarksvfs(dest_repo)
907 dst_book_vfs = bookmarks.bookmarksvfs(dest_repo)
896 dstbookmarks = dst_book_vfs.join(b'bookmarks')
908 dstbookmarks = dst_book_vfs.join(b'bookmarks')
897 util.copyfile(srcbookmarks, dstbookmarks)
909 util.copyfile(srcbookmarks, dstbookmarks)
898 progress.complete()
910 progress.complete()
899 if hardlink:
911 if hardlink:
900 msg = b'linked %d files\n'
912 msg = b'linked %d files\n'
901 else:
913 else:
902 msg = b'copied %d files\n'
914 msg = b'copied %d files\n'
903 src_repo.ui.debug(msg % (len(entries) + bm_count))
915 src_repo.ui.debug(msg % (len(entries) + bm_count))
904
916
905 with dest_repo.transaction(b"localclone") as tr:
917 with dest_repo.transaction(b"localclone") as tr:
906 dest_repo.store.write(tr)
918 dest_repo.store.write(tr)
907
919
908 # clean up transaction file as they do not make sense
920 # clean up transaction file as they do not make sense
909 undo_files = [(dest_repo.svfs, b'undo.backupfiles')]
921 undo_files = [(dest_repo.svfs, b'undo.backupfiles')]
910 undo_files.extend(dest_repo.undofiles())
922 undo_files.extend(dest_repo.undofiles())
911 for undovfs, undofile in undo_files:
923 for undovfs, undofile in undo_files:
912 try:
924 try:
913 undovfs.unlink(undofile)
925 undovfs.unlink(undofile)
914 except OSError as e:
926 except OSError as e:
915 if e.errno != errno.ENOENT:
927 if e.errno != errno.ENOENT:
916 msg = _(b'error removing %s: %s\n')
928 msg = _(b'error removing %s: %s\n')
917 path = undovfs.join(undofile)
929 path = undovfs.join(undofile)
918 e_msg = stringutil.forcebytestr(e)
930 e_msg = stringutil.forcebytestr(e)
919 msg %= (path, e_msg)
931 msg %= (path, e_msg)
920 dest_repo.ui.warn(msg)
932 dest_repo.ui.warn(msg)
General Comments 0
You need to be logged in to leave comments. Login now