##// END OF EJS Templates
streamclone: ensure the server sends the right amount of data...
Valentin Gatien-Baron -
r48592:48f07adb stable
parent child Browse files
Show More
@@ -1,906 +1,918 b''
1 # streamclone.py - producing and consuming streaming repository data
1 # streamclone.py - producing and consuming streaming repository data
2 #
2 #
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import contextlib
10 import contextlib
11 import errno
11 import errno
12 import os
12 import os
13 import struct
13 import struct
14
14
15 from .i18n import _
15 from .i18n import _
16 from .pycompat import open
16 from .pycompat import open
17 from .interfaces import repository
17 from .interfaces import repository
18 from . import (
18 from . import (
19 bookmarks,
19 bookmarks,
20 cacheutil,
20 cacheutil,
21 error,
21 error,
22 narrowspec,
22 narrowspec,
23 phases,
23 phases,
24 pycompat,
24 pycompat,
25 requirements as requirementsmod,
25 requirements as requirementsmod,
26 scmutil,
26 scmutil,
27 store,
27 store,
28 util,
28 util,
29 )
29 )
30 from .utils import (
30 from .utils import (
31 stringutil,
31 stringutil,
32 )
32 )
33
33
34
34
35 def canperformstreamclone(pullop, bundle2=False):
35 def canperformstreamclone(pullop, bundle2=False):
36 """Whether it is possible to perform a streaming clone as part of pull.
36 """Whether it is possible to perform a streaming clone as part of pull.
37
37
38 ``bundle2`` will cause the function to consider stream clone through
38 ``bundle2`` will cause the function to consider stream clone through
39 bundle2 and only through bundle2.
39 bundle2 and only through bundle2.
40
40
41 Returns a tuple of (supported, requirements). ``supported`` is True if
41 Returns a tuple of (supported, requirements). ``supported`` is True if
42 streaming clone is supported and False otherwise. ``requirements`` is
42 streaming clone is supported and False otherwise. ``requirements`` is
43 a set of repo requirements from the remote, or ``None`` if stream clone
43 a set of repo requirements from the remote, or ``None`` if stream clone
44 isn't supported.
44 isn't supported.
45 """
45 """
46 repo = pullop.repo
46 repo = pullop.repo
47 remote = pullop.remote
47 remote = pullop.remote
48
48
49 bundle2supported = False
49 bundle2supported = False
50 if pullop.canusebundle2:
50 if pullop.canusebundle2:
51 if b'v2' in pullop.remotebundle2caps.get(b'stream', []):
51 if b'v2' in pullop.remotebundle2caps.get(b'stream', []):
52 bundle2supported = True
52 bundle2supported = True
53 # else
53 # else
54 # Server doesn't support bundle2 stream clone or doesn't support
54 # Server doesn't support bundle2 stream clone or doesn't support
55 # the versions we support. Fall back and possibly allow legacy.
55 # the versions we support. Fall back and possibly allow legacy.
56
56
57 # Ensures legacy code path uses available bundle2.
57 # Ensures legacy code path uses available bundle2.
58 if bundle2supported and not bundle2:
58 if bundle2supported and not bundle2:
59 return False, None
59 return False, None
60 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
60 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
61 elif bundle2 and not bundle2supported:
61 elif bundle2 and not bundle2supported:
62 return False, None
62 return False, None
63
63
64 # Streaming clone only works on empty repositories.
64 # Streaming clone only works on empty repositories.
65 if len(repo):
65 if len(repo):
66 return False, None
66 return False, None
67
67
68 # Streaming clone only works if all data is being requested.
68 # Streaming clone only works if all data is being requested.
69 if pullop.heads:
69 if pullop.heads:
70 return False, None
70 return False, None
71
71
72 streamrequested = pullop.streamclonerequested
72 streamrequested = pullop.streamclonerequested
73
73
74 # If we don't have a preference, let the server decide for us. This
74 # If we don't have a preference, let the server decide for us. This
75 # likely only comes into play in LANs.
75 # likely only comes into play in LANs.
76 if streamrequested is None:
76 if streamrequested is None:
77 # The server can advertise whether to prefer streaming clone.
77 # The server can advertise whether to prefer streaming clone.
78 streamrequested = remote.capable(b'stream-preferred')
78 streamrequested = remote.capable(b'stream-preferred')
79
79
80 if not streamrequested:
80 if not streamrequested:
81 return False, None
81 return False, None
82
82
83 # In order for stream clone to work, the client has to support all the
83 # In order for stream clone to work, the client has to support all the
84 # requirements advertised by the server.
84 # requirements advertised by the server.
85 #
85 #
86 # The server advertises its requirements via the "stream" and "streamreqs"
86 # The server advertises its requirements via the "stream" and "streamreqs"
87 # capability. "stream" (a value-less capability) is advertised if and only
87 # capability. "stream" (a value-less capability) is advertised if and only
88 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
88 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
89 # is advertised and contains a comma-delimited list of requirements.
89 # is advertised and contains a comma-delimited list of requirements.
90 requirements = set()
90 requirements = set()
91 if remote.capable(b'stream'):
91 if remote.capable(b'stream'):
92 requirements.add(requirementsmod.REVLOGV1_REQUIREMENT)
92 requirements.add(requirementsmod.REVLOGV1_REQUIREMENT)
93 else:
93 else:
94 streamreqs = remote.capable(b'streamreqs')
94 streamreqs = remote.capable(b'streamreqs')
95 # This is weird and shouldn't happen with modern servers.
95 # This is weird and shouldn't happen with modern servers.
96 if not streamreqs:
96 if not streamreqs:
97 pullop.repo.ui.warn(
97 pullop.repo.ui.warn(
98 _(
98 _(
99 b'warning: stream clone requested but server has them '
99 b'warning: stream clone requested but server has them '
100 b'disabled\n'
100 b'disabled\n'
101 )
101 )
102 )
102 )
103 return False, None
103 return False, None
104
104
105 streamreqs = set(streamreqs.split(b','))
105 streamreqs = set(streamreqs.split(b','))
106 # Server requires something we don't support. Bail.
106 # Server requires something we don't support. Bail.
107 missingreqs = streamreqs - repo.supportedformats
107 missingreqs = streamreqs - repo.supportedformats
108 if missingreqs:
108 if missingreqs:
109 pullop.repo.ui.warn(
109 pullop.repo.ui.warn(
110 _(
110 _(
111 b'warning: stream clone requested but client is missing '
111 b'warning: stream clone requested but client is missing '
112 b'requirements: %s\n'
112 b'requirements: %s\n'
113 )
113 )
114 % b', '.join(sorted(missingreqs))
114 % b', '.join(sorted(missingreqs))
115 )
115 )
116 pullop.repo.ui.warn(
116 pullop.repo.ui.warn(
117 _(
117 _(
118 b'(see https://www.mercurial-scm.org/wiki/MissingRequirement '
118 b'(see https://www.mercurial-scm.org/wiki/MissingRequirement '
119 b'for more information)\n'
119 b'for more information)\n'
120 )
120 )
121 )
121 )
122 return False, None
122 return False, None
123 requirements = streamreqs
123 requirements = streamreqs
124
124
125 return True, requirements
125 return True, requirements
126
126
127
127
128 def maybeperformlegacystreamclone(pullop):
128 def maybeperformlegacystreamclone(pullop):
129 """Possibly perform a legacy stream clone operation.
129 """Possibly perform a legacy stream clone operation.
130
130
131 Legacy stream clones are performed as part of pull but before all other
131 Legacy stream clones are performed as part of pull but before all other
132 operations.
132 operations.
133
133
134 A legacy stream clone will not be performed if a bundle2 stream clone is
134 A legacy stream clone will not be performed if a bundle2 stream clone is
135 supported.
135 supported.
136 """
136 """
137 from . import localrepo
137 from . import localrepo
138
138
139 supported, requirements = canperformstreamclone(pullop)
139 supported, requirements = canperformstreamclone(pullop)
140
140
141 if not supported:
141 if not supported:
142 return
142 return
143
143
144 repo = pullop.repo
144 repo = pullop.repo
145 remote = pullop.remote
145 remote = pullop.remote
146
146
147 # Save remote branchmap. We will use it later to speed up branchcache
147 # Save remote branchmap. We will use it later to speed up branchcache
148 # creation.
148 # creation.
149 rbranchmap = None
149 rbranchmap = None
150 if remote.capable(b'branchmap'):
150 if remote.capable(b'branchmap'):
151 with remote.commandexecutor() as e:
151 with remote.commandexecutor() as e:
152 rbranchmap = e.callcommand(b'branchmap', {}).result()
152 rbranchmap = e.callcommand(b'branchmap', {}).result()
153
153
154 repo.ui.status(_(b'streaming all changes\n'))
154 repo.ui.status(_(b'streaming all changes\n'))
155
155
156 with remote.commandexecutor() as e:
156 with remote.commandexecutor() as e:
157 fp = e.callcommand(b'stream_out', {}).result()
157 fp = e.callcommand(b'stream_out', {}).result()
158
158
159 # TODO strictly speaking, this code should all be inside the context
159 # TODO strictly speaking, this code should all be inside the context
160 # manager because the context manager is supposed to ensure all wire state
160 # manager because the context manager is supposed to ensure all wire state
161 # is flushed when exiting. But the legacy peers don't do this, so it
161 # is flushed when exiting. But the legacy peers don't do this, so it
162 # doesn't matter.
162 # doesn't matter.
163 l = fp.readline()
163 l = fp.readline()
164 try:
164 try:
165 resp = int(l)
165 resp = int(l)
166 except ValueError:
166 except ValueError:
167 raise error.ResponseError(
167 raise error.ResponseError(
168 _(b'unexpected response from remote server:'), l
168 _(b'unexpected response from remote server:'), l
169 )
169 )
170 if resp == 1:
170 if resp == 1:
171 raise error.Abort(_(b'operation forbidden by server'))
171 raise error.Abort(_(b'operation forbidden by server'))
172 elif resp == 2:
172 elif resp == 2:
173 raise error.Abort(_(b'locking the remote repository failed'))
173 raise error.Abort(_(b'locking the remote repository failed'))
174 elif resp != 0:
174 elif resp != 0:
175 raise error.Abort(_(b'the server sent an unknown error code'))
175 raise error.Abort(_(b'the server sent an unknown error code'))
176
176
177 l = fp.readline()
177 l = fp.readline()
178 try:
178 try:
179 filecount, bytecount = map(int, l.split(b' ', 1))
179 filecount, bytecount = map(int, l.split(b' ', 1))
180 except (ValueError, TypeError):
180 except (ValueError, TypeError):
181 raise error.ResponseError(
181 raise error.ResponseError(
182 _(b'unexpected response from remote server:'), l
182 _(b'unexpected response from remote server:'), l
183 )
183 )
184
184
185 with repo.lock():
185 with repo.lock():
186 consumev1(repo, fp, filecount, bytecount)
186 consumev1(repo, fp, filecount, bytecount)
187
187
188 # new requirements = old non-format requirements +
188 # new requirements = old non-format requirements +
189 # new format-related remote requirements
189 # new format-related remote requirements
190 # requirements from the streamed-in repository
190 # requirements from the streamed-in repository
191 repo.requirements = requirements | (
191 repo.requirements = requirements | (
192 repo.requirements - repo.supportedformats
192 repo.requirements - repo.supportedformats
193 )
193 )
194 repo.svfs.options = localrepo.resolvestorevfsoptions(
194 repo.svfs.options = localrepo.resolvestorevfsoptions(
195 repo.ui, repo.requirements, repo.features
195 repo.ui, repo.requirements, repo.features
196 )
196 )
197 scmutil.writereporequirements(repo)
197 scmutil.writereporequirements(repo)
198
198
199 if rbranchmap:
199 if rbranchmap:
200 repo._branchcaches.replace(repo, rbranchmap)
200 repo._branchcaches.replace(repo, rbranchmap)
201
201
202 repo.invalidate()
202 repo.invalidate()
203
203
204
204
205 def allowservergeneration(repo):
205 def allowservergeneration(repo):
206 """Whether streaming clones are allowed from the server."""
206 """Whether streaming clones are allowed from the server."""
207 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
207 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
208 return False
208 return False
209
209
210 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
210 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
211 return False
211 return False
212
212
213 # The way stream clone works makes it impossible to hide secret changesets.
213 # The way stream clone works makes it impossible to hide secret changesets.
214 # So don't allow this by default.
214 # So don't allow this by default.
215 secret = phases.hassecret(repo)
215 secret = phases.hassecret(repo)
216 if secret:
216 if secret:
217 return repo.ui.configbool(b'server', b'uncompressedallowsecret')
217 return repo.ui.configbool(b'server', b'uncompressedallowsecret')
218
218
219 return True
219 return True
220
220
221
221
222 # This is it's own function so extensions can override it.
222 # This is it's own function so extensions can override it.
223 def _walkstreamfiles(repo, matcher=None):
223 def _walkstreamfiles(repo, matcher=None):
224 return repo.store.walk(matcher)
224 return repo.store.walk(matcher)
225
225
226
226
227 def generatev1(repo):
227 def generatev1(repo):
228 """Emit content for version 1 of a streaming clone.
228 """Emit content for version 1 of a streaming clone.
229
229
230 This returns a 3-tuple of (file count, byte size, data iterator).
230 This returns a 3-tuple of (file count, byte size, data iterator).
231
231
232 The data iterator consists of N entries for each file being transferred.
232 The data iterator consists of N entries for each file being transferred.
233 Each file entry starts as a line with the file name and integer size
233 Each file entry starts as a line with the file name and integer size
234 delimited by a null byte.
234 delimited by a null byte.
235
235
236 The raw file data follows. Following the raw file data is the next file
236 The raw file data follows. Following the raw file data is the next file
237 entry, or EOF.
237 entry, or EOF.
238
238
239 When used on the wire protocol, an additional line indicating protocol
239 When used on the wire protocol, an additional line indicating protocol
240 success will be prepended to the stream. This function is not responsible
240 success will be prepended to the stream. This function is not responsible
241 for adding it.
241 for adding it.
242
242
243 This function will obtain a repository lock to ensure a consistent view of
243 This function will obtain a repository lock to ensure a consistent view of
244 the store is captured. It therefore may raise LockError.
244 the store is captured. It therefore may raise LockError.
245 """
245 """
246 entries = []
246 entries = []
247 total_bytes = 0
247 total_bytes = 0
248 # Get consistent snapshot of repo, lock during scan.
248 # Get consistent snapshot of repo, lock during scan.
249 with repo.lock():
249 with repo.lock():
250 repo.ui.debug(b'scanning\n')
250 repo.ui.debug(b'scanning\n')
251 for file_type, name, ename, size in _walkstreamfiles(repo):
251 for file_type, name, ename, size in _walkstreamfiles(repo):
252 if size:
252 if size:
253 entries.append((name, size))
253 entries.append((name, size))
254 total_bytes += size
254 total_bytes += size
255 _test_sync_point_walk_1(repo)
255 _test_sync_point_walk_1(repo)
256 _test_sync_point_walk_2(repo)
256 _test_sync_point_walk_2(repo)
257
257
258 repo.ui.debug(
258 repo.ui.debug(
259 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
259 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
260 )
260 )
261
261
262 svfs = repo.svfs
262 svfs = repo.svfs
263 debugflag = repo.ui.debugflag
263 debugflag = repo.ui.debugflag
264
264
265 def emitrevlogdata():
265 def emitrevlogdata():
266 for name, size in entries:
266 for name, size in entries:
267 if debugflag:
267 if debugflag:
268 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
268 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
269 # partially encode name over the wire for backwards compat
269 # partially encode name over the wire for backwards compat
270 yield b'%s\0%d\n' % (store.encodedir(name), size)
270 yield b'%s\0%d\n' % (store.encodedir(name), size)
271 # auditing at this stage is both pointless (paths are already
271 # auditing at this stage is both pointless (paths are already
272 # trusted by the local repo) and expensive
272 # trusted by the local repo) and expensive
273 with svfs(name, b'rb', auditpath=False) as fp:
273 with svfs(name, b'rb', auditpath=False) as fp:
274 if size <= 65536:
274 if size <= 65536:
275 yield fp.read(size)
275 yield fp.read(size)
276 else:
276 else:
277 for chunk in util.filechunkiter(fp, limit=size):
277 for chunk in util.filechunkiter(fp, limit=size):
278 yield chunk
278 yield chunk
279
279
280 return len(entries), total_bytes, emitrevlogdata()
280 return len(entries), total_bytes, emitrevlogdata()
281
281
282
282
283 def generatev1wireproto(repo):
283 def generatev1wireproto(repo):
284 """Emit content for version 1 of streaming clone suitable for the wire.
284 """Emit content for version 1 of streaming clone suitable for the wire.
285
285
286 This is the data output from ``generatev1()`` with 2 header lines. The
286 This is the data output from ``generatev1()`` with 2 header lines. The
287 first line indicates overall success. The 2nd contains the file count and
287 first line indicates overall success. The 2nd contains the file count and
288 byte size of payload.
288 byte size of payload.
289
289
290 The success line contains "0" for success, "1" for stream generation not
290 The success line contains "0" for success, "1" for stream generation not
291 allowed, and "2" for error locking the repository (possibly indicating
291 allowed, and "2" for error locking the repository (possibly indicating
292 a permissions error for the server process).
292 a permissions error for the server process).
293 """
293 """
294 if not allowservergeneration(repo):
294 if not allowservergeneration(repo):
295 yield b'1\n'
295 yield b'1\n'
296 return
296 return
297
297
298 try:
298 try:
299 filecount, bytecount, it = generatev1(repo)
299 filecount, bytecount, it = generatev1(repo)
300 except error.LockError:
300 except error.LockError:
301 yield b'2\n'
301 yield b'2\n'
302 return
302 return
303
303
304 # Indicates successful response.
304 # Indicates successful response.
305 yield b'0\n'
305 yield b'0\n'
306 yield b'%d %d\n' % (filecount, bytecount)
306 yield b'%d %d\n' % (filecount, bytecount)
307 for chunk in it:
307 for chunk in it:
308 yield chunk
308 yield chunk
309
309
310
310
311 def generatebundlev1(repo, compression=b'UN'):
311 def generatebundlev1(repo, compression=b'UN'):
312 """Emit content for version 1 of a stream clone bundle.
312 """Emit content for version 1 of a stream clone bundle.
313
313
314 The first 4 bytes of the output ("HGS1") denote this as stream clone
314 The first 4 bytes of the output ("HGS1") denote this as stream clone
315 bundle version 1.
315 bundle version 1.
316
316
317 The next 2 bytes indicate the compression type. Only "UN" is currently
317 The next 2 bytes indicate the compression type. Only "UN" is currently
318 supported.
318 supported.
319
319
320 The next 16 bytes are two 64-bit big endian unsigned integers indicating
320 The next 16 bytes are two 64-bit big endian unsigned integers indicating
321 file count and byte count, respectively.
321 file count and byte count, respectively.
322
322
323 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
323 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
324 of the requirements string, including a trailing \0. The following N bytes
324 of the requirements string, including a trailing \0. The following N bytes
325 are the requirements string, which is ASCII containing a comma-delimited
325 are the requirements string, which is ASCII containing a comma-delimited
326 list of repo requirements that are needed to support the data.
326 list of repo requirements that are needed to support the data.
327
327
328 The remaining content is the output of ``generatev1()`` (which may be
328 The remaining content is the output of ``generatev1()`` (which may be
329 compressed in the future).
329 compressed in the future).
330
330
331 Returns a tuple of (requirements, data generator).
331 Returns a tuple of (requirements, data generator).
332 """
332 """
333 if compression != b'UN':
333 if compression != b'UN':
334 raise ValueError(b'we do not support the compression argument yet')
334 raise ValueError(b'we do not support the compression argument yet')
335
335
336 requirements = repo.requirements & repo.supportedformats
336 requirements = repo.requirements & repo.supportedformats
337 requires = b','.join(sorted(requirements))
337 requires = b','.join(sorted(requirements))
338
338
339 def gen():
339 def gen():
340 yield b'HGS1'
340 yield b'HGS1'
341 yield compression
341 yield compression
342
342
343 filecount, bytecount, it = generatev1(repo)
343 filecount, bytecount, it = generatev1(repo)
344 repo.ui.status(
344 repo.ui.status(
345 _(b'writing %d bytes for %d files\n') % (bytecount, filecount)
345 _(b'writing %d bytes for %d files\n') % (bytecount, filecount)
346 )
346 )
347
347
348 yield struct.pack(b'>QQ', filecount, bytecount)
348 yield struct.pack(b'>QQ', filecount, bytecount)
349 yield struct.pack(b'>H', len(requires) + 1)
349 yield struct.pack(b'>H', len(requires) + 1)
350 yield requires + b'\0'
350 yield requires + b'\0'
351
351
352 # This is where we'll add compression in the future.
352 # This is where we'll add compression in the future.
353 assert compression == b'UN'
353 assert compression == b'UN'
354
354
355 progress = repo.ui.makeprogress(
355 progress = repo.ui.makeprogress(
356 _(b'bundle'), total=bytecount, unit=_(b'bytes')
356 _(b'bundle'), total=bytecount, unit=_(b'bytes')
357 )
357 )
358 progress.update(0)
358 progress.update(0)
359
359
360 for chunk in it:
360 for chunk in it:
361 progress.increment(step=len(chunk))
361 progress.increment(step=len(chunk))
362 yield chunk
362 yield chunk
363
363
364 progress.complete()
364 progress.complete()
365
365
366 return requirements, gen()
366 return requirements, gen()
367
367
368
368
369 def consumev1(repo, fp, filecount, bytecount):
369 def consumev1(repo, fp, filecount, bytecount):
370 """Apply the contents from version 1 of a streaming clone file handle.
370 """Apply the contents from version 1 of a streaming clone file handle.
371
371
372 This takes the output from "stream_out" and applies it to the specified
372 This takes the output from "stream_out" and applies it to the specified
373 repository.
373 repository.
374
374
375 Like "stream_out," the status line added by the wire protocol is not
375 Like "stream_out," the status line added by the wire protocol is not
376 handled by this function.
376 handled by this function.
377 """
377 """
378 with repo.lock():
378 with repo.lock():
379 repo.ui.status(
379 repo.ui.status(
380 _(b'%d files to transfer, %s of data\n')
380 _(b'%d files to transfer, %s of data\n')
381 % (filecount, util.bytecount(bytecount))
381 % (filecount, util.bytecount(bytecount))
382 )
382 )
383 progress = repo.ui.makeprogress(
383 progress = repo.ui.makeprogress(
384 _(b'clone'), total=bytecount, unit=_(b'bytes')
384 _(b'clone'), total=bytecount, unit=_(b'bytes')
385 )
385 )
386 progress.update(0)
386 progress.update(0)
387 start = util.timer()
387 start = util.timer()
388
388
389 # TODO: get rid of (potential) inconsistency
389 # TODO: get rid of (potential) inconsistency
390 #
390 #
391 # If transaction is started and any @filecache property is
391 # If transaction is started and any @filecache property is
392 # changed at this point, it causes inconsistency between
392 # changed at this point, it causes inconsistency between
393 # in-memory cached property and streamclone-ed file on the
393 # in-memory cached property and streamclone-ed file on the
394 # disk. Nested transaction prevents transaction scope "clone"
394 # disk. Nested transaction prevents transaction scope "clone"
395 # below from writing in-memory changes out at the end of it,
395 # below from writing in-memory changes out at the end of it,
396 # even though in-memory changes are discarded at the end of it
396 # even though in-memory changes are discarded at the end of it
397 # regardless of transaction nesting.
397 # regardless of transaction nesting.
398 #
398 #
399 # But transaction nesting can't be simply prohibited, because
399 # But transaction nesting can't be simply prohibited, because
400 # nesting occurs also in ordinary case (e.g. enabling
400 # nesting occurs also in ordinary case (e.g. enabling
401 # clonebundles).
401 # clonebundles).
402
402
403 with repo.transaction(b'clone'):
403 with repo.transaction(b'clone'):
404 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
404 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
405 for i in pycompat.xrange(filecount):
405 for i in pycompat.xrange(filecount):
406 # XXX doesn't support '\n' or '\r' in filenames
406 # XXX doesn't support '\n' or '\r' in filenames
407 l = fp.readline()
407 l = fp.readline()
408 try:
408 try:
409 name, size = l.split(b'\0', 1)
409 name, size = l.split(b'\0', 1)
410 size = int(size)
410 size = int(size)
411 except (ValueError, TypeError):
411 except (ValueError, TypeError):
412 raise error.ResponseError(
412 raise error.ResponseError(
413 _(b'unexpected response from remote server:'), l
413 _(b'unexpected response from remote server:'), l
414 )
414 )
415 if repo.ui.debugflag:
415 if repo.ui.debugflag:
416 repo.ui.debug(
416 repo.ui.debug(
417 b'adding %s (%s)\n' % (name, util.bytecount(size))
417 b'adding %s (%s)\n' % (name, util.bytecount(size))
418 )
418 )
419 # for backwards compat, name was partially encoded
419 # for backwards compat, name was partially encoded
420 path = store.decodedir(name)
420 path = store.decodedir(name)
421 with repo.svfs(path, b'w', backgroundclose=True) as ofp:
421 with repo.svfs(path, b'w', backgroundclose=True) as ofp:
422 for chunk in util.filechunkiter(fp, limit=size):
422 for chunk in util.filechunkiter(fp, limit=size):
423 progress.increment(step=len(chunk))
423 progress.increment(step=len(chunk))
424 ofp.write(chunk)
424 ofp.write(chunk)
425
425
426 # force @filecache properties to be reloaded from
426 # force @filecache properties to be reloaded from
427 # streamclone-ed file at next access
427 # streamclone-ed file at next access
428 repo.invalidate(clearfilecache=True)
428 repo.invalidate(clearfilecache=True)
429
429
430 elapsed = util.timer() - start
430 elapsed = util.timer() - start
431 if elapsed <= 0:
431 if elapsed <= 0:
432 elapsed = 0.001
432 elapsed = 0.001
433 progress.complete()
433 progress.complete()
434 repo.ui.status(
434 repo.ui.status(
435 _(b'transferred %s in %.1f seconds (%s/sec)\n')
435 _(b'transferred %s in %.1f seconds (%s/sec)\n')
436 % (
436 % (
437 util.bytecount(bytecount),
437 util.bytecount(bytecount),
438 elapsed,
438 elapsed,
439 util.bytecount(bytecount / elapsed),
439 util.bytecount(bytecount / elapsed),
440 )
440 )
441 )
441 )
442
442
443
443
444 def readbundle1header(fp):
444 def readbundle1header(fp):
445 compression = fp.read(2)
445 compression = fp.read(2)
446 if compression != b'UN':
446 if compression != b'UN':
447 raise error.Abort(
447 raise error.Abort(
448 _(
448 _(
449 b'only uncompressed stream clone bundles are '
449 b'only uncompressed stream clone bundles are '
450 b'supported; got %s'
450 b'supported; got %s'
451 )
451 )
452 % compression
452 % compression
453 )
453 )
454
454
455 filecount, bytecount = struct.unpack(b'>QQ', fp.read(16))
455 filecount, bytecount = struct.unpack(b'>QQ', fp.read(16))
456 requireslen = struct.unpack(b'>H', fp.read(2))[0]
456 requireslen = struct.unpack(b'>H', fp.read(2))[0]
457 requires = fp.read(requireslen)
457 requires = fp.read(requireslen)
458
458
459 if not requires.endswith(b'\0'):
459 if not requires.endswith(b'\0'):
460 raise error.Abort(
460 raise error.Abort(
461 _(
461 _(
462 b'malformed stream clone bundle: '
462 b'malformed stream clone bundle: '
463 b'requirements not properly encoded'
463 b'requirements not properly encoded'
464 )
464 )
465 )
465 )
466
466
467 requirements = set(requires.rstrip(b'\0').split(b','))
467 requirements = set(requires.rstrip(b'\0').split(b','))
468
468
469 return filecount, bytecount, requirements
469 return filecount, bytecount, requirements
470
470
471
471
472 def applybundlev1(repo, fp):
472 def applybundlev1(repo, fp):
473 """Apply the content from a stream clone bundle version 1.
473 """Apply the content from a stream clone bundle version 1.
474
474
475 We assume the 4 byte header has been read and validated and the file handle
475 We assume the 4 byte header has been read and validated and the file handle
476 is at the 2 byte compression identifier.
476 is at the 2 byte compression identifier.
477 """
477 """
478 if len(repo):
478 if len(repo):
479 raise error.Abort(
479 raise error.Abort(
480 _(b'cannot apply stream clone bundle on non-empty repo')
480 _(b'cannot apply stream clone bundle on non-empty repo')
481 )
481 )
482
482
483 filecount, bytecount, requirements = readbundle1header(fp)
483 filecount, bytecount, requirements = readbundle1header(fp)
484 missingreqs = requirements - repo.supportedformats
484 missingreqs = requirements - repo.supportedformats
485 if missingreqs:
485 if missingreqs:
486 raise error.Abort(
486 raise error.Abort(
487 _(b'unable to apply stream clone: unsupported format: %s')
487 _(b'unable to apply stream clone: unsupported format: %s')
488 % b', '.join(sorted(missingreqs))
488 % b', '.join(sorted(missingreqs))
489 )
489 )
490
490
491 consumev1(repo, fp, filecount, bytecount)
491 consumev1(repo, fp, filecount, bytecount)
492
492
493
493
494 class streamcloneapplier(object):
494 class streamcloneapplier(object):
495 """Class to manage applying streaming clone bundles.
495 """Class to manage applying streaming clone bundles.
496
496
497 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
497 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
498 readers to perform bundle type-specific functionality.
498 readers to perform bundle type-specific functionality.
499 """
499 """
500
500
501 def __init__(self, fh):
501 def __init__(self, fh):
502 self._fh = fh
502 self._fh = fh
503
503
504 def apply(self, repo):
504 def apply(self, repo):
505 return applybundlev1(repo, self._fh)
505 return applybundlev1(repo, self._fh)
506
506
507
507
508 # type of file to stream
508 # type of file to stream
509 _fileappend = 0 # append only file
509 _fileappend = 0 # append only file
510 _filefull = 1 # full snapshot file
510 _filefull = 1 # full snapshot file
511
511
512 # Source of the file
512 # Source of the file
513 _srcstore = b's' # store (svfs)
513 _srcstore = b's' # store (svfs)
514 _srccache = b'c' # cache (cache)
514 _srccache = b'c' # cache (cache)
515
515
516 # This is it's own function so extensions can override it.
516 # This is it's own function so extensions can override it.
517 def _walkstreamfullstorefiles(repo):
517 def _walkstreamfullstorefiles(repo):
518 """list snapshot file from the store"""
518 """list snapshot file from the store"""
519 fnames = []
519 fnames = []
520 if not repo.publishing():
520 if not repo.publishing():
521 fnames.append(b'phaseroots')
521 fnames.append(b'phaseroots')
522 return fnames
522 return fnames
523
523
524
524
525 def _filterfull(entry, copy, vfsmap):
525 def _filterfull(entry, copy, vfsmap):
526 """actually copy the snapshot files"""
526 """actually copy the snapshot files"""
527 src, name, ftype, data = entry
527 src, name, ftype, data = entry
528 if ftype != _filefull:
528 if ftype != _filefull:
529 return entry
529 return entry
530 return (src, name, ftype, copy(vfsmap[src].join(name)))
530 return (src, name, ftype, copy(vfsmap[src].join(name)))
531
531
532
532
533 @contextlib.contextmanager
533 @contextlib.contextmanager
534 def maketempcopies():
534 def maketempcopies():
535 """return a function to temporary copy file"""
535 """return a function to temporary copy file"""
536 files = []
536 files = []
537 try:
537 try:
538
538
539 def copy(src):
539 def copy(src):
540 fd, dst = pycompat.mkstemp()
540 fd, dst = pycompat.mkstemp()
541 os.close(fd)
541 os.close(fd)
542 files.append(dst)
542 files.append(dst)
543 util.copyfiles(src, dst, hardlink=True)
543 util.copyfiles(src, dst, hardlink=True)
544 return dst
544 return dst
545
545
546 yield copy
546 yield copy
547 finally:
547 finally:
548 for tmp in files:
548 for tmp in files:
549 util.tryunlink(tmp)
549 util.tryunlink(tmp)
550
550
551
551
552 def _makemap(repo):
552 def _makemap(repo):
553 """make a (src -> vfs) map for the repo"""
553 """make a (src -> vfs) map for the repo"""
554 vfsmap = {
554 vfsmap = {
555 _srcstore: repo.svfs,
555 _srcstore: repo.svfs,
556 _srccache: repo.cachevfs,
556 _srccache: repo.cachevfs,
557 }
557 }
558 # we keep repo.vfs out of the on purpose, ther are too many danger there
558 # we keep repo.vfs out of the on purpose, ther are too many danger there
559 # (eg: .hg/hgrc)
559 # (eg: .hg/hgrc)
560 assert repo.vfs not in vfsmap.values()
560 assert repo.vfs not in vfsmap.values()
561
561
562 return vfsmap
562 return vfsmap
563
563
564
564
565 def _emit2(repo, entries, totalfilesize):
565 def _emit2(repo, entries, totalfilesize):
566 """actually emit the stream bundle"""
566 """actually emit the stream bundle"""
567 vfsmap = _makemap(repo)
567 vfsmap = _makemap(repo)
568 # we keep repo.vfs out of the on purpose, ther are too many danger there
568 # we keep repo.vfs out of the on purpose, ther are too many danger there
569 # (eg: .hg/hgrc),
569 # (eg: .hg/hgrc),
570 #
570 #
571 # this assert is duplicated (from _makemap) as author might think this is
571 # this assert is duplicated (from _makemap) as author might think this is
572 # fine, while this is really not fine.
572 # fine, while this is really not fine.
573 if repo.vfs in vfsmap.values():
573 if repo.vfs in vfsmap.values():
574 raise error.ProgrammingError(
574 raise error.ProgrammingError(
575 b'repo.vfs must not be added to vfsmap for security reasons'
575 b'repo.vfs must not be added to vfsmap for security reasons'
576 )
576 )
577
577
578 progress = repo.ui.makeprogress(
578 progress = repo.ui.makeprogress(
579 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
579 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
580 )
580 )
581 progress.update(0)
581 progress.update(0)
582 with maketempcopies() as copy, progress:
582 with maketempcopies() as copy, progress:
583 # copy is delayed until we are in the try
583 # copy is delayed until we are in the try
584 entries = [_filterfull(e, copy, vfsmap) for e in entries]
584 entries = [_filterfull(e, copy, vfsmap) for e in entries]
585 yield None # this release the lock on the repository
585 yield None # this release the lock on the repository
586 seen = 0
586 totalbytecount = 0
587
587
588 for src, name, ftype, data in entries:
588 for src, name, ftype, data in entries:
589 vfs = vfsmap[src]
589 vfs = vfsmap[src]
590 yield src
590 yield src
591 yield util.uvarintencode(len(name))
591 yield util.uvarintencode(len(name))
592 if ftype == _fileappend:
592 if ftype == _fileappend:
593 fp = vfs(name)
593 fp = vfs(name)
594 size = data
594 size = data
595 elif ftype == _filefull:
595 elif ftype == _filefull:
596 fp = open(data, b'rb')
596 fp = open(data, b'rb')
597 size = util.fstat(fp).st_size
597 size = util.fstat(fp).st_size
598 bytecount = 0
598 try:
599 try:
599 yield util.uvarintencode(size)
600 yield util.uvarintencode(size)
600 yield name
601 yield name
601 if size <= 65536:
602 if size <= 65536:
602 chunks = (fp.read(size),)
603 chunks = (fp.read(size),)
603 else:
604 else:
604 chunks = util.filechunkiter(fp, limit=size)
605 chunks = util.filechunkiter(fp, limit=size)
605 for chunk in chunks:
606 for chunk in chunks:
606 seen += len(chunk)
607 bytecount += len(chunk)
607 progress.update(seen)
608 totalbytecount += len(chunk)
609 progress.update(totalbytecount)
608 yield chunk
610 yield chunk
611 if bytecount != size:
612 # Would most likely be caused by a race due to `hg strip` or
613 # a revlog split
614 raise error.Abort(
615 _(
616 b'clone could only read %d bytes from %s, but '
617 b'expected %d bytes'
618 )
619 % (bytecount, name, size)
620 )
609 finally:
621 finally:
610 fp.close()
622 fp.close()
611
623
612
624
613 def _test_sync_point_walk_1(repo):
625 def _test_sync_point_walk_1(repo):
614 """a function for synchronisation during tests"""
626 """a function for synchronisation during tests"""
615
627
616
628
617 def _test_sync_point_walk_2(repo):
629 def _test_sync_point_walk_2(repo):
618 """a function for synchronisation during tests"""
630 """a function for synchronisation during tests"""
619
631
620
632
621 def _v2_walk(repo, includes, excludes, includeobsmarkers):
633 def _v2_walk(repo, includes, excludes, includeobsmarkers):
622 """emit a seris of files information useful to clone a repo
634 """emit a seris of files information useful to clone a repo
623
635
624 return (entries, totalfilesize)
636 return (entries, totalfilesize)
625
637
626 entries is a list of tuple (vfs-key, file-path, file-type, size)
638 entries is a list of tuple (vfs-key, file-path, file-type, size)
627
639
628 - `vfs-key`: is a key to the right vfs to write the file (see _makemap)
640 - `vfs-key`: is a key to the right vfs to write the file (see _makemap)
629 - `name`: file path of the file to copy (to be feed to the vfss)
641 - `name`: file path of the file to copy (to be feed to the vfss)
630 - `file-type`: do this file need to be copied with the source lock ?
642 - `file-type`: do this file need to be copied with the source lock ?
631 - `size`: the size of the file (or None)
643 - `size`: the size of the file (or None)
632 """
644 """
633 assert repo._currentlock(repo._lockref) is not None
645 assert repo._currentlock(repo._lockref) is not None
634 entries = []
646 entries = []
635 totalfilesize = 0
647 totalfilesize = 0
636
648
637 matcher = None
649 matcher = None
638 if includes or excludes:
650 if includes or excludes:
639 matcher = narrowspec.match(repo.root, includes, excludes)
651 matcher = narrowspec.match(repo.root, includes, excludes)
640
652
641 for rl_type, name, ename, size in _walkstreamfiles(repo, matcher):
653 for rl_type, name, ename, size in _walkstreamfiles(repo, matcher):
642 if size:
654 if size:
643 ft = _fileappend
655 ft = _fileappend
644 if rl_type & store.FILEFLAGS_VOLATILE:
656 if rl_type & store.FILEFLAGS_VOLATILE:
645 ft = _filefull
657 ft = _filefull
646 entries.append((_srcstore, name, ft, size))
658 entries.append((_srcstore, name, ft, size))
647 totalfilesize += size
659 totalfilesize += size
648 for name in _walkstreamfullstorefiles(repo):
660 for name in _walkstreamfullstorefiles(repo):
649 if repo.svfs.exists(name):
661 if repo.svfs.exists(name):
650 totalfilesize += repo.svfs.lstat(name).st_size
662 totalfilesize += repo.svfs.lstat(name).st_size
651 entries.append((_srcstore, name, _filefull, None))
663 entries.append((_srcstore, name, _filefull, None))
652 if includeobsmarkers and repo.svfs.exists(b'obsstore'):
664 if includeobsmarkers and repo.svfs.exists(b'obsstore'):
653 totalfilesize += repo.svfs.lstat(b'obsstore').st_size
665 totalfilesize += repo.svfs.lstat(b'obsstore').st_size
654 entries.append((_srcstore, b'obsstore', _filefull, None))
666 entries.append((_srcstore, b'obsstore', _filefull, None))
655 for name in cacheutil.cachetocopy(repo):
667 for name in cacheutil.cachetocopy(repo):
656 if repo.cachevfs.exists(name):
668 if repo.cachevfs.exists(name):
657 totalfilesize += repo.cachevfs.lstat(name).st_size
669 totalfilesize += repo.cachevfs.lstat(name).st_size
658 entries.append((_srccache, name, _filefull, None))
670 entries.append((_srccache, name, _filefull, None))
659 return entries, totalfilesize
671 return entries, totalfilesize
660
672
661
673
662 def generatev2(repo, includes, excludes, includeobsmarkers):
674 def generatev2(repo, includes, excludes, includeobsmarkers):
663 """Emit content for version 2 of a streaming clone.
675 """Emit content for version 2 of a streaming clone.
664
676
665 the data stream consists the following entries:
677 the data stream consists the following entries:
666 1) A char representing the file destination (eg: store or cache)
678 1) A char representing the file destination (eg: store or cache)
667 2) A varint containing the length of the filename
679 2) A varint containing the length of the filename
668 3) A varint containing the length of file data
680 3) A varint containing the length of file data
669 4) N bytes containing the filename (the internal, store-agnostic form)
681 4) N bytes containing the filename (the internal, store-agnostic form)
670 5) N bytes containing the file data
682 5) N bytes containing the file data
671
683
672 Returns a 3-tuple of (file count, file size, data iterator).
684 Returns a 3-tuple of (file count, file size, data iterator).
673 """
685 """
674
686
675 with repo.lock():
687 with repo.lock():
676
688
677 repo.ui.debug(b'scanning\n')
689 repo.ui.debug(b'scanning\n')
678
690
679 entries, totalfilesize = _v2_walk(
691 entries, totalfilesize = _v2_walk(
680 repo,
692 repo,
681 includes=includes,
693 includes=includes,
682 excludes=excludes,
694 excludes=excludes,
683 includeobsmarkers=includeobsmarkers,
695 includeobsmarkers=includeobsmarkers,
684 )
696 )
685
697
686 chunks = _emit2(repo, entries, totalfilesize)
698 chunks = _emit2(repo, entries, totalfilesize)
687 first = next(chunks)
699 first = next(chunks)
688 assert first is None
700 assert first is None
689 _test_sync_point_walk_1(repo)
701 _test_sync_point_walk_1(repo)
690 _test_sync_point_walk_2(repo)
702 _test_sync_point_walk_2(repo)
691
703
692 return len(entries), totalfilesize, chunks
704 return len(entries), totalfilesize, chunks
693
705
694
706
695 @contextlib.contextmanager
707 @contextlib.contextmanager
696 def nested(*ctxs):
708 def nested(*ctxs):
697 this = ctxs[0]
709 this = ctxs[0]
698 rest = ctxs[1:]
710 rest = ctxs[1:]
699 with this:
711 with this:
700 if rest:
712 if rest:
701 with nested(*rest):
713 with nested(*rest):
702 yield
714 yield
703 else:
715 else:
704 yield
716 yield
705
717
706
718
707 def consumev2(repo, fp, filecount, filesize):
719 def consumev2(repo, fp, filecount, filesize):
708 """Apply the contents from a version 2 streaming clone.
720 """Apply the contents from a version 2 streaming clone.
709
721
710 Data is read from an object that only needs to provide a ``read(size)``
722 Data is read from an object that only needs to provide a ``read(size)``
711 method.
723 method.
712 """
724 """
713 with repo.lock():
725 with repo.lock():
714 repo.ui.status(
726 repo.ui.status(
715 _(b'%d files to transfer, %s of data\n')
727 _(b'%d files to transfer, %s of data\n')
716 % (filecount, util.bytecount(filesize))
728 % (filecount, util.bytecount(filesize))
717 )
729 )
718
730
719 start = util.timer()
731 start = util.timer()
720 progress = repo.ui.makeprogress(
732 progress = repo.ui.makeprogress(
721 _(b'clone'), total=filesize, unit=_(b'bytes')
733 _(b'clone'), total=filesize, unit=_(b'bytes')
722 )
734 )
723 progress.update(0)
735 progress.update(0)
724
736
725 vfsmap = _makemap(repo)
737 vfsmap = _makemap(repo)
726 # we keep repo.vfs out of the on purpose, ther are too many danger
738 # we keep repo.vfs out of the on purpose, ther are too many danger
727 # there (eg: .hg/hgrc),
739 # there (eg: .hg/hgrc),
728 #
740 #
729 # this assert is duplicated (from _makemap) as author might think this
741 # this assert is duplicated (from _makemap) as author might think this
730 # is fine, while this is really not fine.
742 # is fine, while this is really not fine.
731 if repo.vfs in vfsmap.values():
743 if repo.vfs in vfsmap.values():
732 raise error.ProgrammingError(
744 raise error.ProgrammingError(
733 b'repo.vfs must not be added to vfsmap for security reasons'
745 b'repo.vfs must not be added to vfsmap for security reasons'
734 )
746 )
735
747
736 with repo.transaction(b'clone'):
748 with repo.transaction(b'clone'):
737 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
749 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
738 with nested(*ctxs):
750 with nested(*ctxs):
739 for i in range(filecount):
751 for i in range(filecount):
740 src = util.readexactly(fp, 1)
752 src = util.readexactly(fp, 1)
741 vfs = vfsmap[src]
753 vfs = vfsmap[src]
742 namelen = util.uvarintdecodestream(fp)
754 namelen = util.uvarintdecodestream(fp)
743 datalen = util.uvarintdecodestream(fp)
755 datalen = util.uvarintdecodestream(fp)
744
756
745 name = util.readexactly(fp, namelen)
757 name = util.readexactly(fp, namelen)
746
758
747 if repo.ui.debugflag:
759 if repo.ui.debugflag:
748 repo.ui.debug(
760 repo.ui.debug(
749 b'adding [%s] %s (%s)\n'
761 b'adding [%s] %s (%s)\n'
750 % (src, name, util.bytecount(datalen))
762 % (src, name, util.bytecount(datalen))
751 )
763 )
752
764
753 with vfs(name, b'w') as ofp:
765 with vfs(name, b'w') as ofp:
754 for chunk in util.filechunkiter(fp, limit=datalen):
766 for chunk in util.filechunkiter(fp, limit=datalen):
755 progress.increment(step=len(chunk))
767 progress.increment(step=len(chunk))
756 ofp.write(chunk)
768 ofp.write(chunk)
757
769
758 # force @filecache properties to be reloaded from
770 # force @filecache properties to be reloaded from
759 # streamclone-ed file at next access
771 # streamclone-ed file at next access
760 repo.invalidate(clearfilecache=True)
772 repo.invalidate(clearfilecache=True)
761
773
762 elapsed = util.timer() - start
774 elapsed = util.timer() - start
763 if elapsed <= 0:
775 if elapsed <= 0:
764 elapsed = 0.001
776 elapsed = 0.001
765 repo.ui.status(
777 repo.ui.status(
766 _(b'transferred %s in %.1f seconds (%s/sec)\n')
778 _(b'transferred %s in %.1f seconds (%s/sec)\n')
767 % (
779 % (
768 util.bytecount(progress.pos),
780 util.bytecount(progress.pos),
769 elapsed,
781 elapsed,
770 util.bytecount(progress.pos / elapsed),
782 util.bytecount(progress.pos / elapsed),
771 )
783 )
772 )
784 )
773 progress.complete()
785 progress.complete()
774
786
775
787
776 def applybundlev2(repo, fp, filecount, filesize, requirements):
788 def applybundlev2(repo, fp, filecount, filesize, requirements):
777 from . import localrepo
789 from . import localrepo
778
790
779 missingreqs = [r for r in requirements if r not in repo.supported]
791 missingreqs = [r for r in requirements if r not in repo.supported]
780 if missingreqs:
792 if missingreqs:
781 raise error.Abort(
793 raise error.Abort(
782 _(b'unable to apply stream clone: unsupported format: %s')
794 _(b'unable to apply stream clone: unsupported format: %s')
783 % b', '.join(sorted(missingreqs))
795 % b', '.join(sorted(missingreqs))
784 )
796 )
785
797
786 consumev2(repo, fp, filecount, filesize)
798 consumev2(repo, fp, filecount, filesize)
787
799
788 # new requirements = old non-format requirements +
800 # new requirements = old non-format requirements +
789 # new format-related remote requirements
801 # new format-related remote requirements
790 # requirements from the streamed-in repository
802 # requirements from the streamed-in repository
791 repo.requirements = set(requirements) | (
803 repo.requirements = set(requirements) | (
792 repo.requirements - repo.supportedformats
804 repo.requirements - repo.supportedformats
793 )
805 )
794 repo.svfs.options = localrepo.resolvestorevfsoptions(
806 repo.svfs.options = localrepo.resolvestorevfsoptions(
795 repo.ui, repo.requirements, repo.features
807 repo.ui, repo.requirements, repo.features
796 )
808 )
797 scmutil.writereporequirements(repo)
809 scmutil.writereporequirements(repo)
798
810
799
811
800 def _copy_files(src_vfs_map, dst_vfs_map, entries, progress):
812 def _copy_files(src_vfs_map, dst_vfs_map, entries, progress):
801 hardlink = [True]
813 hardlink = [True]
802
814
803 def copy_used():
815 def copy_used():
804 hardlink[0] = False
816 hardlink[0] = False
805 progress.topic = _(b'copying')
817 progress.topic = _(b'copying')
806
818
807 for k, path, size in entries:
819 for k, path, size in entries:
808 src_vfs = src_vfs_map[k]
820 src_vfs = src_vfs_map[k]
809 dst_vfs = dst_vfs_map[k]
821 dst_vfs = dst_vfs_map[k]
810 src_path = src_vfs.join(path)
822 src_path = src_vfs.join(path)
811 dst_path = dst_vfs.join(path)
823 dst_path = dst_vfs.join(path)
812 dirname = dst_vfs.dirname(path)
824 dirname = dst_vfs.dirname(path)
813 if not dst_vfs.exists(dirname):
825 if not dst_vfs.exists(dirname):
814 dst_vfs.makedirs(dirname)
826 dst_vfs.makedirs(dirname)
815 dst_vfs.register_file(path)
827 dst_vfs.register_file(path)
816 # XXX we could use the #nb_bytes argument.
828 # XXX we could use the #nb_bytes argument.
817 util.copyfile(
829 util.copyfile(
818 src_path,
830 src_path,
819 dst_path,
831 dst_path,
820 hardlink=hardlink[0],
832 hardlink=hardlink[0],
821 no_hardlink_cb=copy_used,
833 no_hardlink_cb=copy_used,
822 check_fs_hardlink=False,
834 check_fs_hardlink=False,
823 )
835 )
824 progress.increment()
836 progress.increment()
825 return hardlink[0]
837 return hardlink[0]
826
838
827
839
828 def local_copy(src_repo, dest_repo):
840 def local_copy(src_repo, dest_repo):
829 """copy all content from one local repository to another
841 """copy all content from one local repository to another
830
842
831 This is useful for local clone"""
843 This is useful for local clone"""
832 src_store_requirements = {
844 src_store_requirements = {
833 r
845 r
834 for r in src_repo.requirements
846 for r in src_repo.requirements
835 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
847 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
836 }
848 }
837 dest_store_requirements = {
849 dest_store_requirements = {
838 r
850 r
839 for r in dest_repo.requirements
851 for r in dest_repo.requirements
840 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
852 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
841 }
853 }
842 assert src_store_requirements == dest_store_requirements
854 assert src_store_requirements == dest_store_requirements
843
855
844 with dest_repo.lock():
856 with dest_repo.lock():
845 with src_repo.lock():
857 with src_repo.lock():
846
858
847 # bookmark is not integrated to the streaming as it might use the
859 # bookmark is not integrated to the streaming as it might use the
848 # `repo.vfs` and they are too many sentitive data accessible
860 # `repo.vfs` and they are too many sentitive data accessible
849 # through `repo.vfs` to expose it to streaming clone.
861 # through `repo.vfs` to expose it to streaming clone.
850 src_book_vfs = bookmarks.bookmarksvfs(src_repo)
862 src_book_vfs = bookmarks.bookmarksvfs(src_repo)
851 srcbookmarks = src_book_vfs.join(b'bookmarks')
863 srcbookmarks = src_book_vfs.join(b'bookmarks')
852 bm_count = 0
864 bm_count = 0
853 if os.path.exists(srcbookmarks):
865 if os.path.exists(srcbookmarks):
854 bm_count = 1
866 bm_count = 1
855
867
856 entries, totalfilesize = _v2_walk(
868 entries, totalfilesize = _v2_walk(
857 src_repo,
869 src_repo,
858 includes=None,
870 includes=None,
859 excludes=None,
871 excludes=None,
860 includeobsmarkers=True,
872 includeobsmarkers=True,
861 )
873 )
862 src_vfs_map = _makemap(src_repo)
874 src_vfs_map = _makemap(src_repo)
863 dest_vfs_map = _makemap(dest_repo)
875 dest_vfs_map = _makemap(dest_repo)
864 progress = src_repo.ui.makeprogress(
876 progress = src_repo.ui.makeprogress(
865 topic=_(b'linking'),
877 topic=_(b'linking'),
866 total=len(entries) + bm_count,
878 total=len(entries) + bm_count,
867 unit=_(b'files'),
879 unit=_(b'files'),
868 )
880 )
869 # copy files
881 # copy files
870 #
882 #
871 # We could copy the full file while the source repository is locked
883 # We could copy the full file while the source repository is locked
872 # and the other one without the lock. However, in the linking case,
884 # and the other one without the lock. However, in the linking case,
873 # this would also requires checks that nobody is appending any data
885 # this would also requires checks that nobody is appending any data
874 # to the files while we do the clone, so this is not done yet. We
886 # to the files while we do the clone, so this is not done yet. We
875 # could do this blindly when copying files.
887 # could do this blindly when copying files.
876 files = ((k, path, size) for k, path, ftype, size in entries)
888 files = ((k, path, size) for k, path, ftype, size in entries)
877 hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress)
889 hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress)
878
890
879 # copy bookmarks over
891 # copy bookmarks over
880 if bm_count:
892 if bm_count:
881 dst_book_vfs = bookmarks.bookmarksvfs(dest_repo)
893 dst_book_vfs = bookmarks.bookmarksvfs(dest_repo)
882 dstbookmarks = dst_book_vfs.join(b'bookmarks')
894 dstbookmarks = dst_book_vfs.join(b'bookmarks')
883 util.copyfile(srcbookmarks, dstbookmarks)
895 util.copyfile(srcbookmarks, dstbookmarks)
884 progress.complete()
896 progress.complete()
885 if hardlink:
897 if hardlink:
886 msg = b'linked %d files\n'
898 msg = b'linked %d files\n'
887 else:
899 else:
888 msg = b'copied %d files\n'
900 msg = b'copied %d files\n'
889 src_repo.ui.debug(msg % (len(entries) + bm_count))
901 src_repo.ui.debug(msg % (len(entries) + bm_count))
890
902
891 with dest_repo.transaction(b"localclone") as tr:
903 with dest_repo.transaction(b"localclone") as tr:
892 dest_repo.store.write(tr)
904 dest_repo.store.write(tr)
893
905
894 # clean up transaction file as they do not make sense
906 # clean up transaction file as they do not make sense
895 undo_files = [(dest_repo.svfs, b'undo.backupfiles')]
907 undo_files = [(dest_repo.svfs, b'undo.backupfiles')]
896 undo_files.extend(dest_repo.undofiles())
908 undo_files.extend(dest_repo.undofiles())
897 for undovfs, undofile in undo_files:
909 for undovfs, undofile in undo_files:
898 try:
910 try:
899 undovfs.unlink(undofile)
911 undovfs.unlink(undofile)
900 except OSError as e:
912 except OSError as e:
901 if e.errno != errno.ENOENT:
913 if e.errno != errno.ENOENT:
902 msg = _(b'error removing %s: %s\n')
914 msg = _(b'error removing %s: %s\n')
903 path = undovfs.join(undofile)
915 path = undovfs.join(undofile)
904 e_msg = stringutil.forcebytestr(e)
916 e_msg = stringutil.forcebytestr(e)
905 msg %= (path, e_msg)
917 msg %= (path, e_msg)
906 dest_repo.ui.warn(msg)
918 dest_repo.ui.warn(msg)
General Comments 0
You need to be logged in to leave comments. Login now