##// END OF EJS Templates
stream: double check that self.vfs is *not* in the vfsmap...
marmoute -
r48267:adb9d79a stable draft
parent child Browse files
Show More
@@ -1,750 +1,762 b''
1 # streamclone.py - producing and consuming streaming repository data
1 # streamclone.py - producing and consuming streaming repository data
2 #
2 #
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import contextlib
10 import contextlib
11 import os
11 import os
12 import struct
12 import struct
13
13
14 from .i18n import _
14 from .i18n import _
15 from .pycompat import open
15 from .pycompat import open
16 from .interfaces import repository
16 from .interfaces import repository
17 from . import (
17 from . import (
18 cacheutil,
18 cacheutil,
19 error,
19 error,
20 narrowspec,
20 narrowspec,
21 phases,
21 phases,
22 pycompat,
22 pycompat,
23 requirements as requirementsmod,
23 requirements as requirementsmod,
24 scmutil,
24 scmutil,
25 store,
25 store,
26 util,
26 util,
27 )
27 )
28
28
29
29
30 def canperformstreamclone(pullop, bundle2=False):
30 def canperformstreamclone(pullop, bundle2=False):
31 """Whether it is possible to perform a streaming clone as part of pull.
31 """Whether it is possible to perform a streaming clone as part of pull.
32
32
33 ``bundle2`` will cause the function to consider stream clone through
33 ``bundle2`` will cause the function to consider stream clone through
34 bundle2 and only through bundle2.
34 bundle2 and only through bundle2.
35
35
36 Returns a tuple of (supported, requirements). ``supported`` is True if
36 Returns a tuple of (supported, requirements). ``supported`` is True if
37 streaming clone is supported and False otherwise. ``requirements`` is
37 streaming clone is supported and False otherwise. ``requirements`` is
38 a set of repo requirements from the remote, or ``None`` if stream clone
38 a set of repo requirements from the remote, or ``None`` if stream clone
39 isn't supported.
39 isn't supported.
40 """
40 """
41 repo = pullop.repo
41 repo = pullop.repo
42 remote = pullop.remote
42 remote = pullop.remote
43
43
44 bundle2supported = False
44 bundle2supported = False
45 if pullop.canusebundle2:
45 if pullop.canusebundle2:
46 if b'v2' in pullop.remotebundle2caps.get(b'stream', []):
46 if b'v2' in pullop.remotebundle2caps.get(b'stream', []):
47 bundle2supported = True
47 bundle2supported = True
48 # else
48 # else
49 # Server doesn't support bundle2 stream clone or doesn't support
49 # Server doesn't support bundle2 stream clone or doesn't support
50 # the versions we support. Fall back and possibly allow legacy.
50 # the versions we support. Fall back and possibly allow legacy.
51
51
52 # Ensures legacy code path uses available bundle2.
52 # Ensures legacy code path uses available bundle2.
53 if bundle2supported and not bundle2:
53 if bundle2supported and not bundle2:
54 return False, None
54 return False, None
55 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
55 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
56 elif bundle2 and not bundle2supported:
56 elif bundle2 and not bundle2supported:
57 return False, None
57 return False, None
58
58
59 # Streaming clone only works on empty repositories.
59 # Streaming clone only works on empty repositories.
60 if len(repo):
60 if len(repo):
61 return False, None
61 return False, None
62
62
63 # Streaming clone only works if all data is being requested.
63 # Streaming clone only works if all data is being requested.
64 if pullop.heads:
64 if pullop.heads:
65 return False, None
65 return False, None
66
66
67 streamrequested = pullop.streamclonerequested
67 streamrequested = pullop.streamclonerequested
68
68
69 # If we don't have a preference, let the server decide for us. This
69 # If we don't have a preference, let the server decide for us. This
70 # likely only comes into play in LANs.
70 # likely only comes into play in LANs.
71 if streamrequested is None:
71 if streamrequested is None:
72 # The server can advertise whether to prefer streaming clone.
72 # The server can advertise whether to prefer streaming clone.
73 streamrequested = remote.capable(b'stream-preferred')
73 streamrequested = remote.capable(b'stream-preferred')
74
74
75 if not streamrequested:
75 if not streamrequested:
76 return False, None
76 return False, None
77
77
78 # In order for stream clone to work, the client has to support all the
78 # In order for stream clone to work, the client has to support all the
79 # requirements advertised by the server.
79 # requirements advertised by the server.
80 #
80 #
81 # The server advertises its requirements via the "stream" and "streamreqs"
81 # The server advertises its requirements via the "stream" and "streamreqs"
82 # capability. "stream" (a value-less capability) is advertised if and only
82 # capability. "stream" (a value-less capability) is advertised if and only
83 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
83 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
84 # is advertised and contains a comma-delimited list of requirements.
84 # is advertised and contains a comma-delimited list of requirements.
85 requirements = set()
85 requirements = set()
86 if remote.capable(b'stream'):
86 if remote.capable(b'stream'):
87 requirements.add(requirementsmod.REVLOGV1_REQUIREMENT)
87 requirements.add(requirementsmod.REVLOGV1_REQUIREMENT)
88 else:
88 else:
89 streamreqs = remote.capable(b'streamreqs')
89 streamreqs = remote.capable(b'streamreqs')
90 # This is weird and shouldn't happen with modern servers.
90 # This is weird and shouldn't happen with modern servers.
91 if not streamreqs:
91 if not streamreqs:
92 pullop.repo.ui.warn(
92 pullop.repo.ui.warn(
93 _(
93 _(
94 b'warning: stream clone requested but server has them '
94 b'warning: stream clone requested but server has them '
95 b'disabled\n'
95 b'disabled\n'
96 )
96 )
97 )
97 )
98 return False, None
98 return False, None
99
99
100 streamreqs = set(streamreqs.split(b','))
100 streamreqs = set(streamreqs.split(b','))
101 # Server requires something we don't support. Bail.
101 # Server requires something we don't support. Bail.
102 missingreqs = streamreqs - repo.supportedformats
102 missingreqs = streamreqs - repo.supportedformats
103 if missingreqs:
103 if missingreqs:
104 pullop.repo.ui.warn(
104 pullop.repo.ui.warn(
105 _(
105 _(
106 b'warning: stream clone requested but client is missing '
106 b'warning: stream clone requested but client is missing '
107 b'requirements: %s\n'
107 b'requirements: %s\n'
108 )
108 )
109 % b', '.join(sorted(missingreqs))
109 % b', '.join(sorted(missingreqs))
110 )
110 )
111 pullop.repo.ui.warn(
111 pullop.repo.ui.warn(
112 _(
112 _(
113 b'(see https://www.mercurial-scm.org/wiki/MissingRequirement '
113 b'(see https://www.mercurial-scm.org/wiki/MissingRequirement '
114 b'for more information)\n'
114 b'for more information)\n'
115 )
115 )
116 )
116 )
117 return False, None
117 return False, None
118 requirements = streamreqs
118 requirements = streamreqs
119
119
120 return True, requirements
120 return True, requirements
121
121
122
122
123 def maybeperformlegacystreamclone(pullop):
123 def maybeperformlegacystreamclone(pullop):
124 """Possibly perform a legacy stream clone operation.
124 """Possibly perform a legacy stream clone operation.
125
125
126 Legacy stream clones are performed as part of pull but before all other
126 Legacy stream clones are performed as part of pull but before all other
127 operations.
127 operations.
128
128
129 A legacy stream clone will not be performed if a bundle2 stream clone is
129 A legacy stream clone will not be performed if a bundle2 stream clone is
130 supported.
130 supported.
131 """
131 """
132 from . import localrepo
132 from . import localrepo
133
133
134 supported, requirements = canperformstreamclone(pullop)
134 supported, requirements = canperformstreamclone(pullop)
135
135
136 if not supported:
136 if not supported:
137 return
137 return
138
138
139 repo = pullop.repo
139 repo = pullop.repo
140 remote = pullop.remote
140 remote = pullop.remote
141
141
142 # Save remote branchmap. We will use it later to speed up branchcache
142 # Save remote branchmap. We will use it later to speed up branchcache
143 # creation.
143 # creation.
144 rbranchmap = None
144 rbranchmap = None
145 if remote.capable(b'branchmap'):
145 if remote.capable(b'branchmap'):
146 with remote.commandexecutor() as e:
146 with remote.commandexecutor() as e:
147 rbranchmap = e.callcommand(b'branchmap', {}).result()
147 rbranchmap = e.callcommand(b'branchmap', {}).result()
148
148
149 repo.ui.status(_(b'streaming all changes\n'))
149 repo.ui.status(_(b'streaming all changes\n'))
150
150
151 with remote.commandexecutor() as e:
151 with remote.commandexecutor() as e:
152 fp = e.callcommand(b'stream_out', {}).result()
152 fp = e.callcommand(b'stream_out', {}).result()
153
153
154 # TODO strictly speaking, this code should all be inside the context
154 # TODO strictly speaking, this code should all be inside the context
155 # manager because the context manager is supposed to ensure all wire state
155 # manager because the context manager is supposed to ensure all wire state
156 # is flushed when exiting. But the legacy peers don't do this, so it
156 # is flushed when exiting. But the legacy peers don't do this, so it
157 # doesn't matter.
157 # doesn't matter.
158 l = fp.readline()
158 l = fp.readline()
159 try:
159 try:
160 resp = int(l)
160 resp = int(l)
161 except ValueError:
161 except ValueError:
162 raise error.ResponseError(
162 raise error.ResponseError(
163 _(b'unexpected response from remote server:'), l
163 _(b'unexpected response from remote server:'), l
164 )
164 )
165 if resp == 1:
165 if resp == 1:
166 raise error.Abort(_(b'operation forbidden by server'))
166 raise error.Abort(_(b'operation forbidden by server'))
167 elif resp == 2:
167 elif resp == 2:
168 raise error.Abort(_(b'locking the remote repository failed'))
168 raise error.Abort(_(b'locking the remote repository failed'))
169 elif resp != 0:
169 elif resp != 0:
170 raise error.Abort(_(b'the server sent an unknown error code'))
170 raise error.Abort(_(b'the server sent an unknown error code'))
171
171
172 l = fp.readline()
172 l = fp.readline()
173 try:
173 try:
174 filecount, bytecount = map(int, l.split(b' ', 1))
174 filecount, bytecount = map(int, l.split(b' ', 1))
175 except (ValueError, TypeError):
175 except (ValueError, TypeError):
176 raise error.ResponseError(
176 raise error.ResponseError(
177 _(b'unexpected response from remote server:'), l
177 _(b'unexpected response from remote server:'), l
178 )
178 )
179
179
180 with repo.lock():
180 with repo.lock():
181 consumev1(repo, fp, filecount, bytecount)
181 consumev1(repo, fp, filecount, bytecount)
182
182
183 # new requirements = old non-format requirements +
183 # new requirements = old non-format requirements +
184 # new format-related remote requirements
184 # new format-related remote requirements
185 # requirements from the streamed-in repository
185 # requirements from the streamed-in repository
186 repo.requirements = requirements | (
186 repo.requirements = requirements | (
187 repo.requirements - repo.supportedformats
187 repo.requirements - repo.supportedformats
188 )
188 )
189 repo.svfs.options = localrepo.resolvestorevfsoptions(
189 repo.svfs.options = localrepo.resolvestorevfsoptions(
190 repo.ui, repo.requirements, repo.features
190 repo.ui, repo.requirements, repo.features
191 )
191 )
192 scmutil.writereporequirements(repo)
192 scmutil.writereporequirements(repo)
193
193
194 if rbranchmap:
194 if rbranchmap:
195 repo._branchcaches.replace(repo, rbranchmap)
195 repo._branchcaches.replace(repo, rbranchmap)
196
196
197 repo.invalidate()
197 repo.invalidate()
198
198
199
199
200 def allowservergeneration(repo):
200 def allowservergeneration(repo):
201 """Whether streaming clones are allowed from the server."""
201 """Whether streaming clones are allowed from the server."""
202 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
202 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
203 return False
203 return False
204
204
205 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
205 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
206 return False
206 return False
207
207
208 # The way stream clone works makes it impossible to hide secret changesets.
208 # The way stream clone works makes it impossible to hide secret changesets.
209 # So don't allow this by default.
209 # So don't allow this by default.
210 secret = phases.hassecret(repo)
210 secret = phases.hassecret(repo)
211 if secret:
211 if secret:
212 return repo.ui.configbool(b'server', b'uncompressedallowsecret')
212 return repo.ui.configbool(b'server', b'uncompressedallowsecret')
213
213
214 return True
214 return True
215
215
216
216
217 # This is it's own function so extensions can override it.
217 # This is it's own function so extensions can override it.
218 def _walkstreamfiles(repo, matcher=None):
218 def _walkstreamfiles(repo, matcher=None):
219 return repo.store.walk(matcher)
219 return repo.store.walk(matcher)
220
220
221
221
222 def generatev1(repo):
222 def generatev1(repo):
223 """Emit content for version 1 of a streaming clone.
223 """Emit content for version 1 of a streaming clone.
224
224
225 This returns a 3-tuple of (file count, byte size, data iterator).
225 This returns a 3-tuple of (file count, byte size, data iterator).
226
226
227 The data iterator consists of N entries for each file being transferred.
227 The data iterator consists of N entries for each file being transferred.
228 Each file entry starts as a line with the file name and integer size
228 Each file entry starts as a line with the file name and integer size
229 delimited by a null byte.
229 delimited by a null byte.
230
230
231 The raw file data follows. Following the raw file data is the next file
231 The raw file data follows. Following the raw file data is the next file
232 entry, or EOF.
232 entry, or EOF.
233
233
234 When used on the wire protocol, an additional line indicating protocol
234 When used on the wire protocol, an additional line indicating protocol
235 success will be prepended to the stream. This function is not responsible
235 success will be prepended to the stream. This function is not responsible
236 for adding it.
236 for adding it.
237
237
238 This function will obtain a repository lock to ensure a consistent view of
238 This function will obtain a repository lock to ensure a consistent view of
239 the store is captured. It therefore may raise LockError.
239 the store is captured. It therefore may raise LockError.
240 """
240 """
241 entries = []
241 entries = []
242 total_bytes = 0
242 total_bytes = 0
243 # Get consistent snapshot of repo, lock during scan.
243 # Get consistent snapshot of repo, lock during scan.
244 with repo.lock():
244 with repo.lock():
245 repo.ui.debug(b'scanning\n')
245 repo.ui.debug(b'scanning\n')
246 for file_type, name, ename, size in _walkstreamfiles(repo):
246 for file_type, name, ename, size in _walkstreamfiles(repo):
247 if size:
247 if size:
248 entries.append((name, size))
248 entries.append((name, size))
249 total_bytes += size
249 total_bytes += size
250 _test_sync_point_walk_1(repo)
250 _test_sync_point_walk_1(repo)
251 _test_sync_point_walk_2(repo)
251 _test_sync_point_walk_2(repo)
252
252
253 repo.ui.debug(
253 repo.ui.debug(
254 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
254 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
255 )
255 )
256
256
257 svfs = repo.svfs
257 svfs = repo.svfs
258 debugflag = repo.ui.debugflag
258 debugflag = repo.ui.debugflag
259
259
260 def emitrevlogdata():
260 def emitrevlogdata():
261 for name, size in entries:
261 for name, size in entries:
262 if debugflag:
262 if debugflag:
263 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
263 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
264 # partially encode name over the wire for backwards compat
264 # partially encode name over the wire for backwards compat
265 yield b'%s\0%d\n' % (store.encodedir(name), size)
265 yield b'%s\0%d\n' % (store.encodedir(name), size)
266 # auditing at this stage is both pointless (paths are already
266 # auditing at this stage is both pointless (paths are already
267 # trusted by the local repo) and expensive
267 # trusted by the local repo) and expensive
268 with svfs(name, b'rb', auditpath=False) as fp:
268 with svfs(name, b'rb', auditpath=False) as fp:
269 if size <= 65536:
269 if size <= 65536:
270 yield fp.read(size)
270 yield fp.read(size)
271 else:
271 else:
272 for chunk in util.filechunkiter(fp, limit=size):
272 for chunk in util.filechunkiter(fp, limit=size):
273 yield chunk
273 yield chunk
274
274
275 return len(entries), total_bytes, emitrevlogdata()
275 return len(entries), total_bytes, emitrevlogdata()
276
276
277
277
278 def generatev1wireproto(repo):
278 def generatev1wireproto(repo):
279 """Emit content for version 1 of streaming clone suitable for the wire.
279 """Emit content for version 1 of streaming clone suitable for the wire.
280
280
281 This is the data output from ``generatev1()`` with 2 header lines. The
281 This is the data output from ``generatev1()`` with 2 header lines. The
282 first line indicates overall success. The 2nd contains the file count and
282 first line indicates overall success. The 2nd contains the file count and
283 byte size of payload.
283 byte size of payload.
284
284
285 The success line contains "0" for success, "1" for stream generation not
285 The success line contains "0" for success, "1" for stream generation not
286 allowed, and "2" for error locking the repository (possibly indicating
286 allowed, and "2" for error locking the repository (possibly indicating
287 a permissions error for the server process).
287 a permissions error for the server process).
288 """
288 """
289 if not allowservergeneration(repo):
289 if not allowservergeneration(repo):
290 yield b'1\n'
290 yield b'1\n'
291 return
291 return
292
292
293 try:
293 try:
294 filecount, bytecount, it = generatev1(repo)
294 filecount, bytecount, it = generatev1(repo)
295 except error.LockError:
295 except error.LockError:
296 yield b'2\n'
296 yield b'2\n'
297 return
297 return
298
298
299 # Indicates successful response.
299 # Indicates successful response.
300 yield b'0\n'
300 yield b'0\n'
301 yield b'%d %d\n' % (filecount, bytecount)
301 yield b'%d %d\n' % (filecount, bytecount)
302 for chunk in it:
302 for chunk in it:
303 yield chunk
303 yield chunk
304
304
305
305
306 def generatebundlev1(repo, compression=b'UN'):
306 def generatebundlev1(repo, compression=b'UN'):
307 """Emit content for version 1 of a stream clone bundle.
307 """Emit content for version 1 of a stream clone bundle.
308
308
309 The first 4 bytes of the output ("HGS1") denote this as stream clone
309 The first 4 bytes of the output ("HGS1") denote this as stream clone
310 bundle version 1.
310 bundle version 1.
311
311
312 The next 2 bytes indicate the compression type. Only "UN" is currently
312 The next 2 bytes indicate the compression type. Only "UN" is currently
313 supported.
313 supported.
314
314
315 The next 16 bytes are two 64-bit big endian unsigned integers indicating
315 The next 16 bytes are two 64-bit big endian unsigned integers indicating
316 file count and byte count, respectively.
316 file count and byte count, respectively.
317
317
318 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
318 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
319 of the requirements string, including a trailing \0. The following N bytes
319 of the requirements string, including a trailing \0. The following N bytes
320 are the requirements string, which is ASCII containing a comma-delimited
320 are the requirements string, which is ASCII containing a comma-delimited
321 list of repo requirements that are needed to support the data.
321 list of repo requirements that are needed to support the data.
322
322
323 The remaining content is the output of ``generatev1()`` (which may be
323 The remaining content is the output of ``generatev1()`` (which may be
324 compressed in the future).
324 compressed in the future).
325
325
326 Returns a tuple of (requirements, data generator).
326 Returns a tuple of (requirements, data generator).
327 """
327 """
328 if compression != b'UN':
328 if compression != b'UN':
329 raise ValueError(b'we do not support the compression argument yet')
329 raise ValueError(b'we do not support the compression argument yet')
330
330
331 requirements = repo.requirements & repo.supportedformats
331 requirements = repo.requirements & repo.supportedformats
332 requires = b','.join(sorted(requirements))
332 requires = b','.join(sorted(requirements))
333
333
334 def gen():
334 def gen():
335 yield b'HGS1'
335 yield b'HGS1'
336 yield compression
336 yield compression
337
337
338 filecount, bytecount, it = generatev1(repo)
338 filecount, bytecount, it = generatev1(repo)
339 repo.ui.status(
339 repo.ui.status(
340 _(b'writing %d bytes for %d files\n') % (bytecount, filecount)
340 _(b'writing %d bytes for %d files\n') % (bytecount, filecount)
341 )
341 )
342
342
343 yield struct.pack(b'>QQ', filecount, bytecount)
343 yield struct.pack(b'>QQ', filecount, bytecount)
344 yield struct.pack(b'>H', len(requires) + 1)
344 yield struct.pack(b'>H', len(requires) + 1)
345 yield requires + b'\0'
345 yield requires + b'\0'
346
346
347 # This is where we'll add compression in the future.
347 # This is where we'll add compression in the future.
348 assert compression == b'UN'
348 assert compression == b'UN'
349
349
350 progress = repo.ui.makeprogress(
350 progress = repo.ui.makeprogress(
351 _(b'bundle'), total=bytecount, unit=_(b'bytes')
351 _(b'bundle'), total=bytecount, unit=_(b'bytes')
352 )
352 )
353 progress.update(0)
353 progress.update(0)
354
354
355 for chunk in it:
355 for chunk in it:
356 progress.increment(step=len(chunk))
356 progress.increment(step=len(chunk))
357 yield chunk
357 yield chunk
358
358
359 progress.complete()
359 progress.complete()
360
360
361 return requirements, gen()
361 return requirements, gen()
362
362
363
363
364 def consumev1(repo, fp, filecount, bytecount):
364 def consumev1(repo, fp, filecount, bytecount):
365 """Apply the contents from version 1 of a streaming clone file handle.
365 """Apply the contents from version 1 of a streaming clone file handle.
366
366
367 This takes the output from "stream_out" and applies it to the specified
367 This takes the output from "stream_out" and applies it to the specified
368 repository.
368 repository.
369
369
370 Like "stream_out," the status line added by the wire protocol is not
370 Like "stream_out," the status line added by the wire protocol is not
371 handled by this function.
371 handled by this function.
372 """
372 """
373 with repo.lock():
373 with repo.lock():
374 repo.ui.status(
374 repo.ui.status(
375 _(b'%d files to transfer, %s of data\n')
375 _(b'%d files to transfer, %s of data\n')
376 % (filecount, util.bytecount(bytecount))
376 % (filecount, util.bytecount(bytecount))
377 )
377 )
378 progress = repo.ui.makeprogress(
378 progress = repo.ui.makeprogress(
379 _(b'clone'), total=bytecount, unit=_(b'bytes')
379 _(b'clone'), total=bytecount, unit=_(b'bytes')
380 )
380 )
381 progress.update(0)
381 progress.update(0)
382 start = util.timer()
382 start = util.timer()
383
383
384 # TODO: get rid of (potential) inconsistency
384 # TODO: get rid of (potential) inconsistency
385 #
385 #
386 # If transaction is started and any @filecache property is
386 # If transaction is started and any @filecache property is
387 # changed at this point, it causes inconsistency between
387 # changed at this point, it causes inconsistency between
388 # in-memory cached property and streamclone-ed file on the
388 # in-memory cached property and streamclone-ed file on the
389 # disk. Nested transaction prevents transaction scope "clone"
389 # disk. Nested transaction prevents transaction scope "clone"
390 # below from writing in-memory changes out at the end of it,
390 # below from writing in-memory changes out at the end of it,
391 # even though in-memory changes are discarded at the end of it
391 # even though in-memory changes are discarded at the end of it
392 # regardless of transaction nesting.
392 # regardless of transaction nesting.
393 #
393 #
394 # But transaction nesting can't be simply prohibited, because
394 # But transaction nesting can't be simply prohibited, because
395 # nesting occurs also in ordinary case (e.g. enabling
395 # nesting occurs also in ordinary case (e.g. enabling
396 # clonebundles).
396 # clonebundles).
397
397
398 with repo.transaction(b'clone'):
398 with repo.transaction(b'clone'):
399 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
399 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
400 for i in pycompat.xrange(filecount):
400 for i in pycompat.xrange(filecount):
401 # XXX doesn't support '\n' or '\r' in filenames
401 # XXX doesn't support '\n' or '\r' in filenames
402 l = fp.readline()
402 l = fp.readline()
403 try:
403 try:
404 name, size = l.split(b'\0', 1)
404 name, size = l.split(b'\0', 1)
405 size = int(size)
405 size = int(size)
406 except (ValueError, TypeError):
406 except (ValueError, TypeError):
407 raise error.ResponseError(
407 raise error.ResponseError(
408 _(b'unexpected response from remote server:'), l
408 _(b'unexpected response from remote server:'), l
409 )
409 )
410 if repo.ui.debugflag:
410 if repo.ui.debugflag:
411 repo.ui.debug(
411 repo.ui.debug(
412 b'adding %s (%s)\n' % (name, util.bytecount(size))
412 b'adding %s (%s)\n' % (name, util.bytecount(size))
413 )
413 )
414 # for backwards compat, name was partially encoded
414 # for backwards compat, name was partially encoded
415 path = store.decodedir(name)
415 path = store.decodedir(name)
416 with repo.svfs(path, b'w', backgroundclose=True) as ofp:
416 with repo.svfs(path, b'w', backgroundclose=True) as ofp:
417 for chunk in util.filechunkiter(fp, limit=size):
417 for chunk in util.filechunkiter(fp, limit=size):
418 progress.increment(step=len(chunk))
418 progress.increment(step=len(chunk))
419 ofp.write(chunk)
419 ofp.write(chunk)
420
420
421 # force @filecache properties to be reloaded from
421 # force @filecache properties to be reloaded from
422 # streamclone-ed file at next access
422 # streamclone-ed file at next access
423 repo.invalidate(clearfilecache=True)
423 repo.invalidate(clearfilecache=True)
424
424
425 elapsed = util.timer() - start
425 elapsed = util.timer() - start
426 if elapsed <= 0:
426 if elapsed <= 0:
427 elapsed = 0.001
427 elapsed = 0.001
428 progress.complete()
428 progress.complete()
429 repo.ui.status(
429 repo.ui.status(
430 _(b'transferred %s in %.1f seconds (%s/sec)\n')
430 _(b'transferred %s in %.1f seconds (%s/sec)\n')
431 % (
431 % (
432 util.bytecount(bytecount),
432 util.bytecount(bytecount),
433 elapsed,
433 elapsed,
434 util.bytecount(bytecount / elapsed),
434 util.bytecount(bytecount / elapsed),
435 )
435 )
436 )
436 )
437
437
438
438
439 def readbundle1header(fp):
439 def readbundle1header(fp):
440 compression = fp.read(2)
440 compression = fp.read(2)
441 if compression != b'UN':
441 if compression != b'UN':
442 raise error.Abort(
442 raise error.Abort(
443 _(
443 _(
444 b'only uncompressed stream clone bundles are '
444 b'only uncompressed stream clone bundles are '
445 b'supported; got %s'
445 b'supported; got %s'
446 )
446 )
447 % compression
447 % compression
448 )
448 )
449
449
450 filecount, bytecount = struct.unpack(b'>QQ', fp.read(16))
450 filecount, bytecount = struct.unpack(b'>QQ', fp.read(16))
451 requireslen = struct.unpack(b'>H', fp.read(2))[0]
451 requireslen = struct.unpack(b'>H', fp.read(2))[0]
452 requires = fp.read(requireslen)
452 requires = fp.read(requireslen)
453
453
454 if not requires.endswith(b'\0'):
454 if not requires.endswith(b'\0'):
455 raise error.Abort(
455 raise error.Abort(
456 _(
456 _(
457 b'malformed stream clone bundle: '
457 b'malformed stream clone bundle: '
458 b'requirements not properly encoded'
458 b'requirements not properly encoded'
459 )
459 )
460 )
460 )
461
461
462 requirements = set(requires.rstrip(b'\0').split(b','))
462 requirements = set(requires.rstrip(b'\0').split(b','))
463
463
464 return filecount, bytecount, requirements
464 return filecount, bytecount, requirements
465
465
466
466
467 def applybundlev1(repo, fp):
467 def applybundlev1(repo, fp):
468 """Apply the content from a stream clone bundle version 1.
468 """Apply the content from a stream clone bundle version 1.
469
469
470 We assume the 4 byte header has been read and validated and the file handle
470 We assume the 4 byte header has been read and validated and the file handle
471 is at the 2 byte compression identifier.
471 is at the 2 byte compression identifier.
472 """
472 """
473 if len(repo):
473 if len(repo):
474 raise error.Abort(
474 raise error.Abort(
475 _(b'cannot apply stream clone bundle on non-empty repo')
475 _(b'cannot apply stream clone bundle on non-empty repo')
476 )
476 )
477
477
478 filecount, bytecount, requirements = readbundle1header(fp)
478 filecount, bytecount, requirements = readbundle1header(fp)
479 missingreqs = requirements - repo.supportedformats
479 missingreqs = requirements - repo.supportedformats
480 if missingreqs:
480 if missingreqs:
481 raise error.Abort(
481 raise error.Abort(
482 _(b'unable to apply stream clone: unsupported format: %s')
482 _(b'unable to apply stream clone: unsupported format: %s')
483 % b', '.join(sorted(missingreqs))
483 % b', '.join(sorted(missingreqs))
484 )
484 )
485
485
486 consumev1(repo, fp, filecount, bytecount)
486 consumev1(repo, fp, filecount, bytecount)
487
487
488
488
489 class streamcloneapplier(object):
489 class streamcloneapplier(object):
490 """Class to manage applying streaming clone bundles.
490 """Class to manage applying streaming clone bundles.
491
491
492 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
492 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
493 readers to perform bundle type-specific functionality.
493 readers to perform bundle type-specific functionality.
494 """
494 """
495
495
496 def __init__(self, fh):
496 def __init__(self, fh):
497 self._fh = fh
497 self._fh = fh
498
498
499 def apply(self, repo):
499 def apply(self, repo):
500 return applybundlev1(repo, self._fh)
500 return applybundlev1(repo, self._fh)
501
501
502
502
503 # type of file to stream
503 # type of file to stream
504 _fileappend = 0 # append only file
504 _fileappend = 0 # append only file
505 _filefull = 1 # full snapshot file
505 _filefull = 1 # full snapshot file
506
506
507 # Source of the file
507 # Source of the file
508 _srcstore = b's' # store (svfs)
508 _srcstore = b's' # store (svfs)
509 _srccache = b'c' # cache (cache)
509 _srccache = b'c' # cache (cache)
510
510
511 # This is it's own function so extensions can override it.
511 # This is it's own function so extensions can override it.
512 def _walkstreamfullstorefiles(repo):
512 def _walkstreamfullstorefiles(repo):
513 """list snapshot file from the store"""
513 """list snapshot file from the store"""
514 fnames = []
514 fnames = []
515 if not repo.publishing():
515 if not repo.publishing():
516 fnames.append(b'phaseroots')
516 fnames.append(b'phaseroots')
517 return fnames
517 return fnames
518
518
519
519
520 def _filterfull(entry, copy, vfsmap):
520 def _filterfull(entry, copy, vfsmap):
521 """actually copy the snapshot files"""
521 """actually copy the snapshot files"""
522 src, name, ftype, data = entry
522 src, name, ftype, data = entry
523 if ftype != _filefull:
523 if ftype != _filefull:
524 return entry
524 return entry
525 return (src, name, ftype, copy(vfsmap[src].join(name)))
525 return (src, name, ftype, copy(vfsmap[src].join(name)))
526
526
527
527
528 @contextlib.contextmanager
528 @contextlib.contextmanager
529 def maketempcopies():
529 def maketempcopies():
530 """return a function to temporary copy file"""
530 """return a function to temporary copy file"""
531 files = []
531 files = []
532 try:
532 try:
533
533
534 def copy(src):
534 def copy(src):
535 fd, dst = pycompat.mkstemp()
535 fd, dst = pycompat.mkstemp()
536 os.close(fd)
536 os.close(fd)
537 files.append(dst)
537 files.append(dst)
538 util.copyfiles(src, dst, hardlink=True)
538 util.copyfiles(src, dst, hardlink=True)
539 return dst
539 return dst
540
540
541 yield copy
541 yield copy
542 finally:
542 finally:
543 for tmp in files:
543 for tmp in files:
544 util.tryunlink(tmp)
544 util.tryunlink(tmp)
545
545
546
546
547 def _makemap(repo):
547 def _makemap(repo):
548 """make a (src -> vfs) map for the repo"""
548 """make a (src -> vfs) map for the repo"""
549 vfsmap = {
549 vfsmap = {
550 _srcstore: repo.svfs,
550 _srcstore: repo.svfs,
551 _srccache: repo.cachevfs,
551 _srccache: repo.cachevfs,
552 }
552 }
553 # we keep repo.vfs out of the on purpose, ther are too many danger there
553 # we keep repo.vfs out of the on purpose, ther are too many danger there
554 # (eg: .hg/hgrc)
554 # (eg: .hg/hgrc)
555 assert repo.vfs not in vfsmap.values()
555 assert repo.vfs not in vfsmap.values()
556
556
557 return vfsmap
557 return vfsmap
558
558
559
559
560 def _emit2(repo, entries, totalfilesize):
560 def _emit2(repo, entries, totalfilesize):
561 """actually emit the stream bundle"""
561 """actually emit the stream bundle"""
562 vfsmap = _makemap(repo)
562 vfsmap = _makemap(repo)
563 # we keep repo.vfs out of the on purpose, ther are too many danger there
564 # (eg: .hg/hgrc),
565 #
566 # this assert is duplicated (from _makemap) as author might think this is
567 # fine, while this is really not fine.
568 assert repo.vfs not in vfsmap.values()
563 progress = repo.ui.makeprogress(
569 progress = repo.ui.makeprogress(
564 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
570 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
565 )
571 )
566 progress.update(0)
572 progress.update(0)
567 with maketempcopies() as copy, progress:
573 with maketempcopies() as copy, progress:
568 # copy is delayed until we are in the try
574 # copy is delayed until we are in the try
569 entries = [_filterfull(e, copy, vfsmap) for e in entries]
575 entries = [_filterfull(e, copy, vfsmap) for e in entries]
570 yield None # this release the lock on the repository
576 yield None # this release the lock on the repository
571 seen = 0
577 seen = 0
572
578
573 for src, name, ftype, data in entries:
579 for src, name, ftype, data in entries:
574 vfs = vfsmap[src]
580 vfs = vfsmap[src]
575 yield src
581 yield src
576 yield util.uvarintencode(len(name))
582 yield util.uvarintencode(len(name))
577 if ftype == _fileappend:
583 if ftype == _fileappend:
578 fp = vfs(name)
584 fp = vfs(name)
579 size = data
585 size = data
580 elif ftype == _filefull:
586 elif ftype == _filefull:
581 fp = open(data, b'rb')
587 fp = open(data, b'rb')
582 size = util.fstat(fp).st_size
588 size = util.fstat(fp).st_size
583 try:
589 try:
584 yield util.uvarintencode(size)
590 yield util.uvarintencode(size)
585 yield name
591 yield name
586 if size <= 65536:
592 if size <= 65536:
587 chunks = (fp.read(size),)
593 chunks = (fp.read(size),)
588 else:
594 else:
589 chunks = util.filechunkiter(fp, limit=size)
595 chunks = util.filechunkiter(fp, limit=size)
590 for chunk in chunks:
596 for chunk in chunks:
591 seen += len(chunk)
597 seen += len(chunk)
592 progress.update(seen)
598 progress.update(seen)
593 yield chunk
599 yield chunk
594 finally:
600 finally:
595 fp.close()
601 fp.close()
596
602
597
603
598 def _test_sync_point_walk_1(repo):
604 def _test_sync_point_walk_1(repo):
599 """a function for synchronisation during tests"""
605 """a function for synchronisation during tests"""
600
606
601
607
602 def _test_sync_point_walk_2(repo):
608 def _test_sync_point_walk_2(repo):
603 """a function for synchronisation during tests"""
609 """a function for synchronisation during tests"""
604
610
605
611
606 def generatev2(repo, includes, excludes, includeobsmarkers):
612 def generatev2(repo, includes, excludes, includeobsmarkers):
607 """Emit content for version 2 of a streaming clone.
613 """Emit content for version 2 of a streaming clone.
608
614
609 the data stream consists the following entries:
615 the data stream consists the following entries:
610 1) A char representing the file destination (eg: store or cache)
616 1) A char representing the file destination (eg: store or cache)
611 2) A varint containing the length of the filename
617 2) A varint containing the length of the filename
612 3) A varint containing the length of file data
618 3) A varint containing the length of file data
613 4) N bytes containing the filename (the internal, store-agnostic form)
619 4) N bytes containing the filename (the internal, store-agnostic form)
614 5) N bytes containing the file data
620 5) N bytes containing the file data
615
621
616 Returns a 3-tuple of (file count, file size, data iterator).
622 Returns a 3-tuple of (file count, file size, data iterator).
617 """
623 """
618
624
619 with repo.lock():
625 with repo.lock():
620
626
621 entries = []
627 entries = []
622 totalfilesize = 0
628 totalfilesize = 0
623
629
624 matcher = None
630 matcher = None
625 if includes or excludes:
631 if includes or excludes:
626 matcher = narrowspec.match(repo.root, includes, excludes)
632 matcher = narrowspec.match(repo.root, includes, excludes)
627
633
628 repo.ui.debug(b'scanning\n')
634 repo.ui.debug(b'scanning\n')
629 for rl_type, name, ename, size in _walkstreamfiles(repo, matcher):
635 for rl_type, name, ename, size in _walkstreamfiles(repo, matcher):
630 if size:
636 if size:
631 ft = _fileappend
637 ft = _fileappend
632 if rl_type & store.FILEFLAGS_VOLATILE:
638 if rl_type & store.FILEFLAGS_VOLATILE:
633 ft = _filefull
639 ft = _filefull
634 entries.append((_srcstore, name, ft, size))
640 entries.append((_srcstore, name, ft, size))
635 totalfilesize += size
641 totalfilesize += size
636 for name in _walkstreamfullstorefiles(repo):
642 for name in _walkstreamfullstorefiles(repo):
637 if repo.svfs.exists(name):
643 if repo.svfs.exists(name):
638 totalfilesize += repo.svfs.lstat(name).st_size
644 totalfilesize += repo.svfs.lstat(name).st_size
639 entries.append((_srcstore, name, _filefull, None))
645 entries.append((_srcstore, name, _filefull, None))
640 if includeobsmarkers and repo.svfs.exists(b'obsstore'):
646 if includeobsmarkers and repo.svfs.exists(b'obsstore'):
641 totalfilesize += repo.svfs.lstat(b'obsstore').st_size
647 totalfilesize += repo.svfs.lstat(b'obsstore').st_size
642 entries.append((_srcstore, b'obsstore', _filefull, None))
648 entries.append((_srcstore, b'obsstore', _filefull, None))
643 for name in cacheutil.cachetocopy(repo):
649 for name in cacheutil.cachetocopy(repo):
644 if repo.cachevfs.exists(name):
650 if repo.cachevfs.exists(name):
645 totalfilesize += repo.cachevfs.lstat(name).st_size
651 totalfilesize += repo.cachevfs.lstat(name).st_size
646 entries.append((_srccache, name, _filefull, None))
652 entries.append((_srccache, name, _filefull, None))
647
653
648 chunks = _emit2(repo, entries, totalfilesize)
654 chunks = _emit2(repo, entries, totalfilesize)
649 first = next(chunks)
655 first = next(chunks)
650 assert first is None
656 assert first is None
651 _test_sync_point_walk_1(repo)
657 _test_sync_point_walk_1(repo)
652 _test_sync_point_walk_2(repo)
658 _test_sync_point_walk_2(repo)
653
659
654 return len(entries), totalfilesize, chunks
660 return len(entries), totalfilesize, chunks
655
661
656
662
657 @contextlib.contextmanager
663 @contextlib.contextmanager
658 def nested(*ctxs):
664 def nested(*ctxs):
659 this = ctxs[0]
665 this = ctxs[0]
660 rest = ctxs[1:]
666 rest = ctxs[1:]
661 with this:
667 with this:
662 if rest:
668 if rest:
663 with nested(*rest):
669 with nested(*rest):
664 yield
670 yield
665 else:
671 else:
666 yield
672 yield
667
673
668
674
669 def consumev2(repo, fp, filecount, filesize):
675 def consumev2(repo, fp, filecount, filesize):
670 """Apply the contents from a version 2 streaming clone.
676 """Apply the contents from a version 2 streaming clone.
671
677
672 Data is read from an object that only needs to provide a ``read(size)``
678 Data is read from an object that only needs to provide a ``read(size)``
673 method.
679 method.
674 """
680 """
675 with repo.lock():
681 with repo.lock():
676 repo.ui.status(
682 repo.ui.status(
677 _(b'%d files to transfer, %s of data\n')
683 _(b'%d files to transfer, %s of data\n')
678 % (filecount, util.bytecount(filesize))
684 % (filecount, util.bytecount(filesize))
679 )
685 )
680
686
681 start = util.timer()
687 start = util.timer()
682 progress = repo.ui.makeprogress(
688 progress = repo.ui.makeprogress(
683 _(b'clone'), total=filesize, unit=_(b'bytes')
689 _(b'clone'), total=filesize, unit=_(b'bytes')
684 )
690 )
685 progress.update(0)
691 progress.update(0)
686
692
687 vfsmap = _makemap(repo)
693 vfsmap = _makemap(repo)
694 # we keep repo.vfs out of the on purpose, ther are too many danger
695 # there (eg: .hg/hgrc),
696 #
697 # this assert is duplicated (from _makemap) as author might think this
698 # is fine, while this is really not fine.
699 assert repo.vfs not in vfsmap.values()
688
700
689 with repo.transaction(b'clone'):
701 with repo.transaction(b'clone'):
690 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
702 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
691 with nested(*ctxs):
703 with nested(*ctxs):
692 for i in range(filecount):
704 for i in range(filecount):
693 src = util.readexactly(fp, 1)
705 src = util.readexactly(fp, 1)
694 vfs = vfsmap[src]
706 vfs = vfsmap[src]
695 namelen = util.uvarintdecodestream(fp)
707 namelen = util.uvarintdecodestream(fp)
696 datalen = util.uvarintdecodestream(fp)
708 datalen = util.uvarintdecodestream(fp)
697
709
698 name = util.readexactly(fp, namelen)
710 name = util.readexactly(fp, namelen)
699
711
700 if repo.ui.debugflag:
712 if repo.ui.debugflag:
701 repo.ui.debug(
713 repo.ui.debug(
702 b'adding [%s] %s (%s)\n'
714 b'adding [%s] %s (%s)\n'
703 % (src, name, util.bytecount(datalen))
715 % (src, name, util.bytecount(datalen))
704 )
716 )
705
717
706 with vfs(name, b'w') as ofp:
718 with vfs(name, b'w') as ofp:
707 for chunk in util.filechunkiter(fp, limit=datalen):
719 for chunk in util.filechunkiter(fp, limit=datalen):
708 progress.increment(step=len(chunk))
720 progress.increment(step=len(chunk))
709 ofp.write(chunk)
721 ofp.write(chunk)
710
722
711 # force @filecache properties to be reloaded from
723 # force @filecache properties to be reloaded from
712 # streamclone-ed file at next access
724 # streamclone-ed file at next access
713 repo.invalidate(clearfilecache=True)
725 repo.invalidate(clearfilecache=True)
714
726
715 elapsed = util.timer() - start
727 elapsed = util.timer() - start
716 if elapsed <= 0:
728 if elapsed <= 0:
717 elapsed = 0.001
729 elapsed = 0.001
718 repo.ui.status(
730 repo.ui.status(
719 _(b'transferred %s in %.1f seconds (%s/sec)\n')
731 _(b'transferred %s in %.1f seconds (%s/sec)\n')
720 % (
732 % (
721 util.bytecount(progress.pos),
733 util.bytecount(progress.pos),
722 elapsed,
734 elapsed,
723 util.bytecount(progress.pos / elapsed),
735 util.bytecount(progress.pos / elapsed),
724 )
736 )
725 )
737 )
726 progress.complete()
738 progress.complete()
727
739
728
740
729 def applybundlev2(repo, fp, filecount, filesize, requirements):
741 def applybundlev2(repo, fp, filecount, filesize, requirements):
730 from . import localrepo
742 from . import localrepo
731
743
732 missingreqs = [r for r in requirements if r not in repo.supported]
744 missingreqs = [r for r in requirements if r not in repo.supported]
733 if missingreqs:
745 if missingreqs:
734 raise error.Abort(
746 raise error.Abort(
735 _(b'unable to apply stream clone: unsupported format: %s')
747 _(b'unable to apply stream clone: unsupported format: %s')
736 % b', '.join(sorted(missingreqs))
748 % b', '.join(sorted(missingreqs))
737 )
749 )
738
750
739 consumev2(repo, fp, filecount, filesize)
751 consumev2(repo, fp, filecount, filesize)
740
752
741 # new requirements = old non-format requirements +
753 # new requirements = old non-format requirements +
742 # new format-related remote requirements
754 # new format-related remote requirements
743 # requirements from the streamed-in repository
755 # requirements from the streamed-in repository
744 repo.requirements = set(requirements) | (
756 repo.requirements = set(requirements) | (
745 repo.requirements - repo.supportedformats
757 repo.requirements - repo.supportedformats
746 )
758 )
747 repo.svfs.options = localrepo.resolvestorevfsoptions(
759 repo.svfs.options = localrepo.resolvestorevfsoptions(
748 repo.ui, repo.requirements, repo.features
760 repo.ui, repo.requirements, repo.features
749 )
761 )
750 scmutil.writereporequirements(repo)
762 scmutil.writereporequirements(repo)
General Comments 0
You need to be logged in to leave comments. Login now