##// END OF EJS Templates
streamingclone: extract the scanning part from the generation part...
marmoute -
r48237:2f4ca480 default
parent child Browse files
Show More
@@ -1,750 +1,773
1 # streamclone.py - producing and consuming streaming repository data
1 # streamclone.py - producing and consuming streaming repository data
2 #
2 #
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import contextlib
10 import contextlib
11 import os
11 import os
12 import struct
12 import struct
13
13
14 from .i18n import _
14 from .i18n import _
15 from .pycompat import open
15 from .pycompat import open
16 from .interfaces import repository
16 from .interfaces import repository
17 from . import (
17 from . import (
18 cacheutil,
18 cacheutil,
19 error,
19 error,
20 narrowspec,
20 narrowspec,
21 phases,
21 phases,
22 pycompat,
22 pycompat,
23 requirements as requirementsmod,
23 requirements as requirementsmod,
24 scmutil,
24 scmutil,
25 store,
25 store,
26 util,
26 util,
27 )
27 )
28
28
29
29
30 def canperformstreamclone(pullop, bundle2=False):
30 def canperformstreamclone(pullop, bundle2=False):
31 """Whether it is possible to perform a streaming clone as part of pull.
31 """Whether it is possible to perform a streaming clone as part of pull.
32
32
33 ``bundle2`` will cause the function to consider stream clone through
33 ``bundle2`` will cause the function to consider stream clone through
34 bundle2 and only through bundle2.
34 bundle2 and only through bundle2.
35
35
36 Returns a tuple of (supported, requirements). ``supported`` is True if
36 Returns a tuple of (supported, requirements). ``supported`` is True if
37 streaming clone is supported and False otherwise. ``requirements`` is
37 streaming clone is supported and False otherwise. ``requirements`` is
38 a set of repo requirements from the remote, or ``None`` if stream clone
38 a set of repo requirements from the remote, or ``None`` if stream clone
39 isn't supported.
39 isn't supported.
40 """
40 """
41 repo = pullop.repo
41 repo = pullop.repo
42 remote = pullop.remote
42 remote = pullop.remote
43
43
44 bundle2supported = False
44 bundle2supported = False
45 if pullop.canusebundle2:
45 if pullop.canusebundle2:
46 if b'v2' in pullop.remotebundle2caps.get(b'stream', []):
46 if b'v2' in pullop.remotebundle2caps.get(b'stream', []):
47 bundle2supported = True
47 bundle2supported = True
48 # else
48 # else
49 # Server doesn't support bundle2 stream clone or doesn't support
49 # Server doesn't support bundle2 stream clone or doesn't support
50 # the versions we support. Fall back and possibly allow legacy.
50 # the versions we support. Fall back and possibly allow legacy.
51
51
52 # Ensures legacy code path uses available bundle2.
52 # Ensures legacy code path uses available bundle2.
53 if bundle2supported and not bundle2:
53 if bundle2supported and not bundle2:
54 return False, None
54 return False, None
55 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
55 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
56 elif bundle2 and not bundle2supported:
56 elif bundle2 and not bundle2supported:
57 return False, None
57 return False, None
58
58
59 # Streaming clone only works on empty repositories.
59 # Streaming clone only works on empty repositories.
60 if len(repo):
60 if len(repo):
61 return False, None
61 return False, None
62
62
63 # Streaming clone only works if all data is being requested.
63 # Streaming clone only works if all data is being requested.
64 if pullop.heads:
64 if pullop.heads:
65 return False, None
65 return False, None
66
66
67 streamrequested = pullop.streamclonerequested
67 streamrequested = pullop.streamclonerequested
68
68
69 # If we don't have a preference, let the server decide for us. This
69 # If we don't have a preference, let the server decide for us. This
70 # likely only comes into play in LANs.
70 # likely only comes into play in LANs.
71 if streamrequested is None:
71 if streamrequested is None:
72 # The server can advertise whether to prefer streaming clone.
72 # The server can advertise whether to prefer streaming clone.
73 streamrequested = remote.capable(b'stream-preferred')
73 streamrequested = remote.capable(b'stream-preferred')
74
74
75 if not streamrequested:
75 if not streamrequested:
76 return False, None
76 return False, None
77
77
78 # In order for stream clone to work, the client has to support all the
78 # In order for stream clone to work, the client has to support all the
79 # requirements advertised by the server.
79 # requirements advertised by the server.
80 #
80 #
81 # The server advertises its requirements via the "stream" and "streamreqs"
81 # The server advertises its requirements via the "stream" and "streamreqs"
82 # capability. "stream" (a value-less capability) is advertised if and only
82 # capability. "stream" (a value-less capability) is advertised if and only
83 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
83 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
84 # is advertised and contains a comma-delimited list of requirements.
84 # is advertised and contains a comma-delimited list of requirements.
85 requirements = set()
85 requirements = set()
86 if remote.capable(b'stream'):
86 if remote.capable(b'stream'):
87 requirements.add(requirementsmod.REVLOGV1_REQUIREMENT)
87 requirements.add(requirementsmod.REVLOGV1_REQUIREMENT)
88 else:
88 else:
89 streamreqs = remote.capable(b'streamreqs')
89 streamreqs = remote.capable(b'streamreqs')
90 # This is weird and shouldn't happen with modern servers.
90 # This is weird and shouldn't happen with modern servers.
91 if not streamreqs:
91 if not streamreqs:
92 pullop.repo.ui.warn(
92 pullop.repo.ui.warn(
93 _(
93 _(
94 b'warning: stream clone requested but server has them '
94 b'warning: stream clone requested but server has them '
95 b'disabled\n'
95 b'disabled\n'
96 )
96 )
97 )
97 )
98 return False, None
98 return False, None
99
99
100 streamreqs = set(streamreqs.split(b','))
100 streamreqs = set(streamreqs.split(b','))
101 # Server requires something we don't support. Bail.
101 # Server requires something we don't support. Bail.
102 missingreqs = streamreqs - repo.supportedformats
102 missingreqs = streamreqs - repo.supportedformats
103 if missingreqs:
103 if missingreqs:
104 pullop.repo.ui.warn(
104 pullop.repo.ui.warn(
105 _(
105 _(
106 b'warning: stream clone requested but client is missing '
106 b'warning: stream clone requested but client is missing '
107 b'requirements: %s\n'
107 b'requirements: %s\n'
108 )
108 )
109 % b', '.join(sorted(missingreqs))
109 % b', '.join(sorted(missingreqs))
110 )
110 )
111 pullop.repo.ui.warn(
111 pullop.repo.ui.warn(
112 _(
112 _(
113 b'(see https://www.mercurial-scm.org/wiki/MissingRequirement '
113 b'(see https://www.mercurial-scm.org/wiki/MissingRequirement '
114 b'for more information)\n'
114 b'for more information)\n'
115 )
115 )
116 )
116 )
117 return False, None
117 return False, None
118 requirements = streamreqs
118 requirements = streamreqs
119
119
120 return True, requirements
120 return True, requirements
121
121
122
122
123 def maybeperformlegacystreamclone(pullop):
123 def maybeperformlegacystreamclone(pullop):
124 """Possibly perform a legacy stream clone operation.
124 """Possibly perform a legacy stream clone operation.
125
125
126 Legacy stream clones are performed as part of pull but before all other
126 Legacy stream clones are performed as part of pull but before all other
127 operations.
127 operations.
128
128
129 A legacy stream clone will not be performed if a bundle2 stream clone is
129 A legacy stream clone will not be performed if a bundle2 stream clone is
130 supported.
130 supported.
131 """
131 """
132 from . import localrepo
132 from . import localrepo
133
133
134 supported, requirements = canperformstreamclone(pullop)
134 supported, requirements = canperformstreamclone(pullop)
135
135
136 if not supported:
136 if not supported:
137 return
137 return
138
138
139 repo = pullop.repo
139 repo = pullop.repo
140 remote = pullop.remote
140 remote = pullop.remote
141
141
142 # Save remote branchmap. We will use it later to speed up branchcache
142 # Save remote branchmap. We will use it later to speed up branchcache
143 # creation.
143 # creation.
144 rbranchmap = None
144 rbranchmap = None
145 if remote.capable(b'branchmap'):
145 if remote.capable(b'branchmap'):
146 with remote.commandexecutor() as e:
146 with remote.commandexecutor() as e:
147 rbranchmap = e.callcommand(b'branchmap', {}).result()
147 rbranchmap = e.callcommand(b'branchmap', {}).result()
148
148
149 repo.ui.status(_(b'streaming all changes\n'))
149 repo.ui.status(_(b'streaming all changes\n'))
150
150
151 with remote.commandexecutor() as e:
151 with remote.commandexecutor() as e:
152 fp = e.callcommand(b'stream_out', {}).result()
152 fp = e.callcommand(b'stream_out', {}).result()
153
153
154 # TODO strictly speaking, this code should all be inside the context
154 # TODO strictly speaking, this code should all be inside the context
155 # manager because the context manager is supposed to ensure all wire state
155 # manager because the context manager is supposed to ensure all wire state
156 # is flushed when exiting. But the legacy peers don't do this, so it
156 # is flushed when exiting. But the legacy peers don't do this, so it
157 # doesn't matter.
157 # doesn't matter.
158 l = fp.readline()
158 l = fp.readline()
159 try:
159 try:
160 resp = int(l)
160 resp = int(l)
161 except ValueError:
161 except ValueError:
162 raise error.ResponseError(
162 raise error.ResponseError(
163 _(b'unexpected response from remote server:'), l
163 _(b'unexpected response from remote server:'), l
164 )
164 )
165 if resp == 1:
165 if resp == 1:
166 raise error.Abort(_(b'operation forbidden by server'))
166 raise error.Abort(_(b'operation forbidden by server'))
167 elif resp == 2:
167 elif resp == 2:
168 raise error.Abort(_(b'locking the remote repository failed'))
168 raise error.Abort(_(b'locking the remote repository failed'))
169 elif resp != 0:
169 elif resp != 0:
170 raise error.Abort(_(b'the server sent an unknown error code'))
170 raise error.Abort(_(b'the server sent an unknown error code'))
171
171
172 l = fp.readline()
172 l = fp.readline()
173 try:
173 try:
174 filecount, bytecount = map(int, l.split(b' ', 1))
174 filecount, bytecount = map(int, l.split(b' ', 1))
175 except (ValueError, TypeError):
175 except (ValueError, TypeError):
176 raise error.ResponseError(
176 raise error.ResponseError(
177 _(b'unexpected response from remote server:'), l
177 _(b'unexpected response from remote server:'), l
178 )
178 )
179
179
180 with repo.lock():
180 with repo.lock():
181 consumev1(repo, fp, filecount, bytecount)
181 consumev1(repo, fp, filecount, bytecount)
182
182
183 # new requirements = old non-format requirements +
183 # new requirements = old non-format requirements +
184 # new format-related remote requirements
184 # new format-related remote requirements
185 # requirements from the streamed-in repository
185 # requirements from the streamed-in repository
186 repo.requirements = requirements | (
186 repo.requirements = requirements | (
187 repo.requirements - repo.supportedformats
187 repo.requirements - repo.supportedformats
188 )
188 )
189 repo.svfs.options = localrepo.resolvestorevfsoptions(
189 repo.svfs.options = localrepo.resolvestorevfsoptions(
190 repo.ui, repo.requirements, repo.features
190 repo.ui, repo.requirements, repo.features
191 )
191 )
192 scmutil.writereporequirements(repo)
192 scmutil.writereporequirements(repo)
193
193
194 if rbranchmap:
194 if rbranchmap:
195 repo._branchcaches.replace(repo, rbranchmap)
195 repo._branchcaches.replace(repo, rbranchmap)
196
196
197 repo.invalidate()
197 repo.invalidate()
198
198
199
199
200 def allowservergeneration(repo):
200 def allowservergeneration(repo):
201 """Whether streaming clones are allowed from the server."""
201 """Whether streaming clones are allowed from the server."""
202 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
202 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
203 return False
203 return False
204
204
205 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
205 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
206 return False
206 return False
207
207
208 # The way stream clone works makes it impossible to hide secret changesets.
208 # The way stream clone works makes it impossible to hide secret changesets.
209 # So don't allow this by default.
209 # So don't allow this by default.
210 secret = phases.hassecret(repo)
210 secret = phases.hassecret(repo)
211 if secret:
211 if secret:
212 return repo.ui.configbool(b'server', b'uncompressedallowsecret')
212 return repo.ui.configbool(b'server', b'uncompressedallowsecret')
213
213
214 return True
214 return True
215
215
216
216
217 # This is it's own function so extensions can override it.
217 # This is it's own function so extensions can override it.
218 def _walkstreamfiles(repo, matcher=None):
218 def _walkstreamfiles(repo, matcher=None):
219 return repo.store.walk(matcher)
219 return repo.store.walk(matcher)
220
220
221
221
222 def generatev1(repo):
222 def generatev1(repo):
223 """Emit content for version 1 of a streaming clone.
223 """Emit content for version 1 of a streaming clone.
224
224
225 This returns a 3-tuple of (file count, byte size, data iterator).
225 This returns a 3-tuple of (file count, byte size, data iterator).
226
226
227 The data iterator consists of N entries for each file being transferred.
227 The data iterator consists of N entries for each file being transferred.
228 Each file entry starts as a line with the file name and integer size
228 Each file entry starts as a line with the file name and integer size
229 delimited by a null byte.
229 delimited by a null byte.
230
230
231 The raw file data follows. Following the raw file data is the next file
231 The raw file data follows. Following the raw file data is the next file
232 entry, or EOF.
232 entry, or EOF.
233
233
234 When used on the wire protocol, an additional line indicating protocol
234 When used on the wire protocol, an additional line indicating protocol
235 success will be prepended to the stream. This function is not responsible
235 success will be prepended to the stream. This function is not responsible
236 for adding it.
236 for adding it.
237
237
238 This function will obtain a repository lock to ensure a consistent view of
238 This function will obtain a repository lock to ensure a consistent view of
239 the store is captured. It therefore may raise LockError.
239 the store is captured. It therefore may raise LockError.
240 """
240 """
241 entries = []
241 entries = []
242 total_bytes = 0
242 total_bytes = 0
243 # Get consistent snapshot of repo, lock during scan.
243 # Get consistent snapshot of repo, lock during scan.
244 with repo.lock():
244 with repo.lock():
245 repo.ui.debug(b'scanning\n')
245 repo.ui.debug(b'scanning\n')
246 for file_type, name, ename, size in _walkstreamfiles(repo):
246 for file_type, name, ename, size in _walkstreamfiles(repo):
247 if size:
247 if size:
248 entries.append((name, size))
248 entries.append((name, size))
249 total_bytes += size
249 total_bytes += size
250 _test_sync_point_walk_1(repo)
250 _test_sync_point_walk_1(repo)
251 _test_sync_point_walk_2(repo)
251 _test_sync_point_walk_2(repo)
252
252
253 repo.ui.debug(
253 repo.ui.debug(
254 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
254 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
255 )
255 )
256
256
257 svfs = repo.svfs
257 svfs = repo.svfs
258 debugflag = repo.ui.debugflag
258 debugflag = repo.ui.debugflag
259
259
260 def emitrevlogdata():
260 def emitrevlogdata():
261 for name, size in entries:
261 for name, size in entries:
262 if debugflag:
262 if debugflag:
263 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
263 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
264 # partially encode name over the wire for backwards compat
264 # partially encode name over the wire for backwards compat
265 yield b'%s\0%d\n' % (store.encodedir(name), size)
265 yield b'%s\0%d\n' % (store.encodedir(name), size)
266 # auditing at this stage is both pointless (paths are already
266 # auditing at this stage is both pointless (paths are already
267 # trusted by the local repo) and expensive
267 # trusted by the local repo) and expensive
268 with svfs(name, b'rb', auditpath=False) as fp:
268 with svfs(name, b'rb', auditpath=False) as fp:
269 if size <= 65536:
269 if size <= 65536:
270 yield fp.read(size)
270 yield fp.read(size)
271 else:
271 else:
272 for chunk in util.filechunkiter(fp, limit=size):
272 for chunk in util.filechunkiter(fp, limit=size):
273 yield chunk
273 yield chunk
274
274
275 return len(entries), total_bytes, emitrevlogdata()
275 return len(entries), total_bytes, emitrevlogdata()
276
276
277
277
278 def generatev1wireproto(repo):
278 def generatev1wireproto(repo):
279 """Emit content for version 1 of streaming clone suitable for the wire.
279 """Emit content for version 1 of streaming clone suitable for the wire.
280
280
281 This is the data output from ``generatev1()`` with 2 header lines. The
281 This is the data output from ``generatev1()`` with 2 header lines. The
282 first line indicates overall success. The 2nd contains the file count and
282 first line indicates overall success. The 2nd contains the file count and
283 byte size of payload.
283 byte size of payload.
284
284
285 The success line contains "0" for success, "1" for stream generation not
285 The success line contains "0" for success, "1" for stream generation not
286 allowed, and "2" for error locking the repository (possibly indicating
286 allowed, and "2" for error locking the repository (possibly indicating
287 a permissions error for the server process).
287 a permissions error for the server process).
288 """
288 """
289 if not allowservergeneration(repo):
289 if not allowservergeneration(repo):
290 yield b'1\n'
290 yield b'1\n'
291 return
291 return
292
292
293 try:
293 try:
294 filecount, bytecount, it = generatev1(repo)
294 filecount, bytecount, it = generatev1(repo)
295 except error.LockError:
295 except error.LockError:
296 yield b'2\n'
296 yield b'2\n'
297 return
297 return
298
298
299 # Indicates successful response.
299 # Indicates successful response.
300 yield b'0\n'
300 yield b'0\n'
301 yield b'%d %d\n' % (filecount, bytecount)
301 yield b'%d %d\n' % (filecount, bytecount)
302 for chunk in it:
302 for chunk in it:
303 yield chunk
303 yield chunk
304
304
305
305
306 def generatebundlev1(repo, compression=b'UN'):
306 def generatebundlev1(repo, compression=b'UN'):
307 """Emit content for version 1 of a stream clone bundle.
307 """Emit content for version 1 of a stream clone bundle.
308
308
309 The first 4 bytes of the output ("HGS1") denote this as stream clone
309 The first 4 bytes of the output ("HGS1") denote this as stream clone
310 bundle version 1.
310 bundle version 1.
311
311
312 The next 2 bytes indicate the compression type. Only "UN" is currently
312 The next 2 bytes indicate the compression type. Only "UN" is currently
313 supported.
313 supported.
314
314
315 The next 16 bytes are two 64-bit big endian unsigned integers indicating
315 The next 16 bytes are two 64-bit big endian unsigned integers indicating
316 file count and byte count, respectively.
316 file count and byte count, respectively.
317
317
318 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
318 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
319 of the requirements string, including a trailing \0. The following N bytes
319 of the requirements string, including a trailing \0. The following N bytes
320 are the requirements string, which is ASCII containing a comma-delimited
320 are the requirements string, which is ASCII containing a comma-delimited
321 list of repo requirements that are needed to support the data.
321 list of repo requirements that are needed to support the data.
322
322
323 The remaining content is the output of ``generatev1()`` (which may be
323 The remaining content is the output of ``generatev1()`` (which may be
324 compressed in the future).
324 compressed in the future).
325
325
326 Returns a tuple of (requirements, data generator).
326 Returns a tuple of (requirements, data generator).
327 """
327 """
328 if compression != b'UN':
328 if compression != b'UN':
329 raise ValueError(b'we do not support the compression argument yet')
329 raise ValueError(b'we do not support the compression argument yet')
330
330
331 requirements = repo.requirements & repo.supportedformats
331 requirements = repo.requirements & repo.supportedformats
332 requires = b','.join(sorted(requirements))
332 requires = b','.join(sorted(requirements))
333
333
334 def gen():
334 def gen():
335 yield b'HGS1'
335 yield b'HGS1'
336 yield compression
336 yield compression
337
337
338 filecount, bytecount, it = generatev1(repo)
338 filecount, bytecount, it = generatev1(repo)
339 repo.ui.status(
339 repo.ui.status(
340 _(b'writing %d bytes for %d files\n') % (bytecount, filecount)
340 _(b'writing %d bytes for %d files\n') % (bytecount, filecount)
341 )
341 )
342
342
343 yield struct.pack(b'>QQ', filecount, bytecount)
343 yield struct.pack(b'>QQ', filecount, bytecount)
344 yield struct.pack(b'>H', len(requires) + 1)
344 yield struct.pack(b'>H', len(requires) + 1)
345 yield requires + b'\0'
345 yield requires + b'\0'
346
346
347 # This is where we'll add compression in the future.
347 # This is where we'll add compression in the future.
348 assert compression == b'UN'
348 assert compression == b'UN'
349
349
350 progress = repo.ui.makeprogress(
350 progress = repo.ui.makeprogress(
351 _(b'bundle'), total=bytecount, unit=_(b'bytes')
351 _(b'bundle'), total=bytecount, unit=_(b'bytes')
352 )
352 )
353 progress.update(0)
353 progress.update(0)
354
354
355 for chunk in it:
355 for chunk in it:
356 progress.increment(step=len(chunk))
356 progress.increment(step=len(chunk))
357 yield chunk
357 yield chunk
358
358
359 progress.complete()
359 progress.complete()
360
360
361 return requirements, gen()
361 return requirements, gen()
362
362
363
363
364 def consumev1(repo, fp, filecount, bytecount):
364 def consumev1(repo, fp, filecount, bytecount):
365 """Apply the contents from version 1 of a streaming clone file handle.
365 """Apply the contents from version 1 of a streaming clone file handle.
366
366
367 This takes the output from "stream_out" and applies it to the specified
367 This takes the output from "stream_out" and applies it to the specified
368 repository.
368 repository.
369
369
370 Like "stream_out," the status line added by the wire protocol is not
370 Like "stream_out," the status line added by the wire protocol is not
371 handled by this function.
371 handled by this function.
372 """
372 """
373 with repo.lock():
373 with repo.lock():
374 repo.ui.status(
374 repo.ui.status(
375 _(b'%d files to transfer, %s of data\n')
375 _(b'%d files to transfer, %s of data\n')
376 % (filecount, util.bytecount(bytecount))
376 % (filecount, util.bytecount(bytecount))
377 )
377 )
378 progress = repo.ui.makeprogress(
378 progress = repo.ui.makeprogress(
379 _(b'clone'), total=bytecount, unit=_(b'bytes')
379 _(b'clone'), total=bytecount, unit=_(b'bytes')
380 )
380 )
381 progress.update(0)
381 progress.update(0)
382 start = util.timer()
382 start = util.timer()
383
383
384 # TODO: get rid of (potential) inconsistency
384 # TODO: get rid of (potential) inconsistency
385 #
385 #
386 # If transaction is started and any @filecache property is
386 # If transaction is started and any @filecache property is
387 # changed at this point, it causes inconsistency between
387 # changed at this point, it causes inconsistency between
388 # in-memory cached property and streamclone-ed file on the
388 # in-memory cached property and streamclone-ed file on the
389 # disk. Nested transaction prevents transaction scope "clone"
389 # disk. Nested transaction prevents transaction scope "clone"
390 # below from writing in-memory changes out at the end of it,
390 # below from writing in-memory changes out at the end of it,
391 # even though in-memory changes are discarded at the end of it
391 # even though in-memory changes are discarded at the end of it
392 # regardless of transaction nesting.
392 # regardless of transaction nesting.
393 #
393 #
394 # But transaction nesting can't be simply prohibited, because
394 # But transaction nesting can't be simply prohibited, because
395 # nesting occurs also in ordinary case (e.g. enabling
395 # nesting occurs also in ordinary case (e.g. enabling
396 # clonebundles).
396 # clonebundles).
397
397
398 with repo.transaction(b'clone'):
398 with repo.transaction(b'clone'):
399 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
399 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
400 for i in pycompat.xrange(filecount):
400 for i in pycompat.xrange(filecount):
401 # XXX doesn't support '\n' or '\r' in filenames
401 # XXX doesn't support '\n' or '\r' in filenames
402 l = fp.readline()
402 l = fp.readline()
403 try:
403 try:
404 name, size = l.split(b'\0', 1)
404 name, size = l.split(b'\0', 1)
405 size = int(size)
405 size = int(size)
406 except (ValueError, TypeError):
406 except (ValueError, TypeError):
407 raise error.ResponseError(
407 raise error.ResponseError(
408 _(b'unexpected response from remote server:'), l
408 _(b'unexpected response from remote server:'), l
409 )
409 )
410 if repo.ui.debugflag:
410 if repo.ui.debugflag:
411 repo.ui.debug(
411 repo.ui.debug(
412 b'adding %s (%s)\n' % (name, util.bytecount(size))
412 b'adding %s (%s)\n' % (name, util.bytecount(size))
413 )
413 )
414 # for backwards compat, name was partially encoded
414 # for backwards compat, name was partially encoded
415 path = store.decodedir(name)
415 path = store.decodedir(name)
416 with repo.svfs(path, b'w', backgroundclose=True) as ofp:
416 with repo.svfs(path, b'w', backgroundclose=True) as ofp:
417 for chunk in util.filechunkiter(fp, limit=size):
417 for chunk in util.filechunkiter(fp, limit=size):
418 progress.increment(step=len(chunk))
418 progress.increment(step=len(chunk))
419 ofp.write(chunk)
419 ofp.write(chunk)
420
420
421 # force @filecache properties to be reloaded from
421 # force @filecache properties to be reloaded from
422 # streamclone-ed file at next access
422 # streamclone-ed file at next access
423 repo.invalidate(clearfilecache=True)
423 repo.invalidate(clearfilecache=True)
424
424
425 elapsed = util.timer() - start
425 elapsed = util.timer() - start
426 if elapsed <= 0:
426 if elapsed <= 0:
427 elapsed = 0.001
427 elapsed = 0.001
428 progress.complete()
428 progress.complete()
429 repo.ui.status(
429 repo.ui.status(
430 _(b'transferred %s in %.1f seconds (%s/sec)\n')
430 _(b'transferred %s in %.1f seconds (%s/sec)\n')
431 % (
431 % (
432 util.bytecount(bytecount),
432 util.bytecount(bytecount),
433 elapsed,
433 elapsed,
434 util.bytecount(bytecount / elapsed),
434 util.bytecount(bytecount / elapsed),
435 )
435 )
436 )
436 )
437
437
438
438
439 def readbundle1header(fp):
439 def readbundle1header(fp):
440 compression = fp.read(2)
440 compression = fp.read(2)
441 if compression != b'UN':
441 if compression != b'UN':
442 raise error.Abort(
442 raise error.Abort(
443 _(
443 _(
444 b'only uncompressed stream clone bundles are '
444 b'only uncompressed stream clone bundles are '
445 b'supported; got %s'
445 b'supported; got %s'
446 )
446 )
447 % compression
447 % compression
448 )
448 )
449
449
450 filecount, bytecount = struct.unpack(b'>QQ', fp.read(16))
450 filecount, bytecount = struct.unpack(b'>QQ', fp.read(16))
451 requireslen = struct.unpack(b'>H', fp.read(2))[0]
451 requireslen = struct.unpack(b'>H', fp.read(2))[0]
452 requires = fp.read(requireslen)
452 requires = fp.read(requireslen)
453
453
454 if not requires.endswith(b'\0'):
454 if not requires.endswith(b'\0'):
455 raise error.Abort(
455 raise error.Abort(
456 _(
456 _(
457 b'malformed stream clone bundle: '
457 b'malformed stream clone bundle: '
458 b'requirements not properly encoded'
458 b'requirements not properly encoded'
459 )
459 )
460 )
460 )
461
461
462 requirements = set(requires.rstrip(b'\0').split(b','))
462 requirements = set(requires.rstrip(b'\0').split(b','))
463
463
464 return filecount, bytecount, requirements
464 return filecount, bytecount, requirements
465
465
466
466
467 def applybundlev1(repo, fp):
467 def applybundlev1(repo, fp):
468 """Apply the content from a stream clone bundle version 1.
468 """Apply the content from a stream clone bundle version 1.
469
469
470 We assume the 4 byte header has been read and validated and the file handle
470 We assume the 4 byte header has been read and validated and the file handle
471 is at the 2 byte compression identifier.
471 is at the 2 byte compression identifier.
472 """
472 """
473 if len(repo):
473 if len(repo):
474 raise error.Abort(
474 raise error.Abort(
475 _(b'cannot apply stream clone bundle on non-empty repo')
475 _(b'cannot apply stream clone bundle on non-empty repo')
476 )
476 )
477
477
478 filecount, bytecount, requirements = readbundle1header(fp)
478 filecount, bytecount, requirements = readbundle1header(fp)
479 missingreqs = requirements - repo.supportedformats
479 missingreqs = requirements - repo.supportedformats
480 if missingreqs:
480 if missingreqs:
481 raise error.Abort(
481 raise error.Abort(
482 _(b'unable to apply stream clone: unsupported format: %s')
482 _(b'unable to apply stream clone: unsupported format: %s')
483 % b', '.join(sorted(missingreqs))
483 % b', '.join(sorted(missingreqs))
484 )
484 )
485
485
486 consumev1(repo, fp, filecount, bytecount)
486 consumev1(repo, fp, filecount, bytecount)
487
487
488
488
489 class streamcloneapplier(object):
489 class streamcloneapplier(object):
490 """Class to manage applying streaming clone bundles.
490 """Class to manage applying streaming clone bundles.
491
491
492 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
492 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
493 readers to perform bundle type-specific functionality.
493 readers to perform bundle type-specific functionality.
494 """
494 """
495
495
496 def __init__(self, fh):
496 def __init__(self, fh):
497 self._fh = fh
497 self._fh = fh
498
498
499 def apply(self, repo):
499 def apply(self, repo):
500 return applybundlev1(repo, self._fh)
500 return applybundlev1(repo, self._fh)
501
501
502
502
503 # type of file to stream
503 # type of file to stream
504 _fileappend = 0 # append only file
504 _fileappend = 0 # append only file
505 _filefull = 1 # full snapshot file
505 _filefull = 1 # full snapshot file
506
506
507 # Source of the file
507 # Source of the file
508 _srcstore = b's' # store (svfs)
508 _srcstore = b's' # store (svfs)
509 _srccache = b'c' # cache (cache)
509 _srccache = b'c' # cache (cache)
510
510
511 # This is it's own function so extensions can override it.
511 # This is it's own function so extensions can override it.
512 def _walkstreamfullstorefiles(repo):
512 def _walkstreamfullstorefiles(repo):
513 """list snapshot file from the store"""
513 """list snapshot file from the store"""
514 fnames = []
514 fnames = []
515 if not repo.publishing():
515 if not repo.publishing():
516 fnames.append(b'phaseroots')
516 fnames.append(b'phaseroots')
517 return fnames
517 return fnames
518
518
519
519
520 def _filterfull(entry, copy, vfsmap):
520 def _filterfull(entry, copy, vfsmap):
521 """actually copy the snapshot files"""
521 """actually copy the snapshot files"""
522 src, name, ftype, data = entry
522 src, name, ftype, data = entry
523 if ftype != _filefull:
523 if ftype != _filefull:
524 return entry
524 return entry
525 return (src, name, ftype, copy(vfsmap[src].join(name)))
525 return (src, name, ftype, copy(vfsmap[src].join(name)))
526
526
527
527
528 @contextlib.contextmanager
528 @contextlib.contextmanager
529 def maketempcopies():
529 def maketempcopies():
530 """return a function to temporary copy file"""
530 """return a function to temporary copy file"""
531 files = []
531 files = []
532 try:
532 try:
533
533
534 def copy(src):
534 def copy(src):
535 fd, dst = pycompat.mkstemp()
535 fd, dst = pycompat.mkstemp()
536 os.close(fd)
536 os.close(fd)
537 files.append(dst)
537 files.append(dst)
538 util.copyfiles(src, dst, hardlink=True)
538 util.copyfiles(src, dst, hardlink=True)
539 return dst
539 return dst
540
540
541 yield copy
541 yield copy
542 finally:
542 finally:
543 for tmp in files:
543 for tmp in files:
544 util.tryunlink(tmp)
544 util.tryunlink(tmp)
545
545
546
546
547 def _makemap(repo):
547 def _makemap(repo):
548 """make a (src -> vfs) map for the repo"""
548 """make a (src -> vfs) map for the repo"""
549 vfsmap = {
549 vfsmap = {
550 _srcstore: repo.svfs,
550 _srcstore: repo.svfs,
551 _srccache: repo.cachevfs,
551 _srccache: repo.cachevfs,
552 }
552 }
553 # we keep repo.vfs out of the on purpose, ther are too many danger there
553 # we keep repo.vfs out of the on purpose, ther are too many danger there
554 # (eg: .hg/hgrc)
554 # (eg: .hg/hgrc)
555 assert repo.vfs not in vfsmap.values()
555 assert repo.vfs not in vfsmap.values()
556
556
557 return vfsmap
557 return vfsmap
558
558
559
559
560 def _emit2(repo, entries, totalfilesize):
560 def _emit2(repo, entries, totalfilesize):
561 """actually emit the stream bundle"""
561 """actually emit the stream bundle"""
562 vfsmap = _makemap(repo)
562 vfsmap = _makemap(repo)
563 progress = repo.ui.makeprogress(
563 progress = repo.ui.makeprogress(
564 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
564 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
565 )
565 )
566 progress.update(0)
566 progress.update(0)
567 with maketempcopies() as copy, progress:
567 with maketempcopies() as copy, progress:
568 # copy is delayed until we are in the try
568 # copy is delayed until we are in the try
569 entries = [_filterfull(e, copy, vfsmap) for e in entries]
569 entries = [_filterfull(e, copy, vfsmap) for e in entries]
570 yield None # this release the lock on the repository
570 yield None # this release the lock on the repository
571 seen = 0
571 seen = 0
572
572
573 for src, name, ftype, data in entries:
573 for src, name, ftype, data in entries:
574 vfs = vfsmap[src]
574 vfs = vfsmap[src]
575 yield src
575 yield src
576 yield util.uvarintencode(len(name))
576 yield util.uvarintencode(len(name))
577 if ftype == _fileappend:
577 if ftype == _fileappend:
578 fp = vfs(name)
578 fp = vfs(name)
579 size = data
579 size = data
580 elif ftype == _filefull:
580 elif ftype == _filefull:
581 fp = open(data, b'rb')
581 fp = open(data, b'rb')
582 size = util.fstat(fp).st_size
582 size = util.fstat(fp).st_size
583 try:
583 try:
584 yield util.uvarintencode(size)
584 yield util.uvarintencode(size)
585 yield name
585 yield name
586 if size <= 65536:
586 if size <= 65536:
587 chunks = (fp.read(size),)
587 chunks = (fp.read(size),)
588 else:
588 else:
589 chunks = util.filechunkiter(fp, limit=size)
589 chunks = util.filechunkiter(fp, limit=size)
590 for chunk in chunks:
590 for chunk in chunks:
591 seen += len(chunk)
591 seen += len(chunk)
592 progress.update(seen)
592 progress.update(seen)
593 yield chunk
593 yield chunk
594 finally:
594 finally:
595 fp.close()
595 fp.close()
596
596
597
597
598 def _test_sync_point_walk_1(repo):
598 def _test_sync_point_walk_1(repo):
599 """a function for synchronisation during tests"""
599 """a function for synchronisation during tests"""
600
600
601
601
602 def _test_sync_point_walk_2(repo):
602 def _test_sync_point_walk_2(repo):
603 """a function for synchronisation during tests"""
603 """a function for synchronisation during tests"""
604
604
605
605
606 def generatev2(repo, includes, excludes, includeobsmarkers):
606 def _v2_walk(repo, includes, excludes, includeobsmarkers):
607 """Emit content for version 2 of a streaming clone.
607 """emit a seris of files information useful to clone a repo
608
609 return (entries, totalfilesize)
610
611 entries is a list of tuple (vfs-key, file-path, file-type, size)
608
612
609 the data stream consists the following entries:
613 - `vfs-key`: is a key to the right vfs to write the file (see _makemap)
610 1) A char representing the file destination (eg: store or cache)
614 - `name`: file path of the file to copy (to be feed to the vfss)
611 2) A varint containing the length of the filename
615 - `file-type`: do this file need to be copied with the source lock ?
612 3) A varint containing the length of file data
616 - `size`: the size of the file (or None)
613 4) N bytes containing the filename (the internal, store-agnostic form)
614 5) N bytes containing the file data
615
616 Returns a 3-tuple of (file count, file size, data iterator).
617 """
617 """
618
618 assert repo._currentlock(repo._lockref) is not None
619 with repo.lock():
620
621 entries = []
619 entries = []
622 totalfilesize = 0
620 totalfilesize = 0
623
621
624 matcher = None
622 matcher = None
625 if includes or excludes:
623 if includes or excludes:
626 matcher = narrowspec.match(repo.root, includes, excludes)
624 matcher = narrowspec.match(repo.root, includes, excludes)
627
625
628 repo.ui.debug(b'scanning\n')
629 for rl_type, name, ename, size in _walkstreamfiles(repo, matcher):
626 for rl_type, name, ename, size in _walkstreamfiles(repo, matcher):
630 if size:
627 if size:
631 ft = _fileappend
628 ft = _fileappend
632 if rl_type & store.FILEFLAGS_VOLATILE:
629 if rl_type & store.FILEFLAGS_VOLATILE:
633 ft = _filefull
630 ft = _filefull
634 entries.append((_srcstore, name, ft, size))
631 entries.append((_srcstore, name, ft, size))
635 totalfilesize += size
632 totalfilesize += size
636 for name in _walkstreamfullstorefiles(repo):
633 for name in _walkstreamfullstorefiles(repo):
637 if repo.svfs.exists(name):
634 if repo.svfs.exists(name):
638 totalfilesize += repo.svfs.lstat(name).st_size
635 totalfilesize += repo.svfs.lstat(name).st_size
639 entries.append((_srcstore, name, _filefull, None))
636 entries.append((_srcstore, name, _filefull, None))
640 if includeobsmarkers and repo.svfs.exists(b'obsstore'):
637 if includeobsmarkers and repo.svfs.exists(b'obsstore'):
641 totalfilesize += repo.svfs.lstat(b'obsstore').st_size
638 totalfilesize += repo.svfs.lstat(b'obsstore').st_size
642 entries.append((_srcstore, b'obsstore', _filefull, None))
639 entries.append((_srcstore, b'obsstore', _filefull, None))
643 for name in cacheutil.cachetocopy(repo):
640 for name in cacheutil.cachetocopy(repo):
644 if repo.cachevfs.exists(name):
641 if repo.cachevfs.exists(name):
645 totalfilesize += repo.cachevfs.lstat(name).st_size
642 totalfilesize += repo.cachevfs.lstat(name).st_size
646 entries.append((_srccache, name, _filefull, None))
643 entries.append((_srccache, name, _filefull, None))
644 return entries, totalfilesize
645
646
647 def generatev2(repo, includes, excludes, includeobsmarkers):
648 """Emit content for version 2 of a streaming clone.
649
650 the data stream consists the following entries:
651 1) A char representing the file destination (eg: store or cache)
652 2) A varint containing the length of the filename
653 3) A varint containing the length of file data
654 4) N bytes containing the filename (the internal, store-agnostic form)
655 5) N bytes containing the file data
656
657 Returns a 3-tuple of (file count, file size, data iterator).
658 """
659
660 with repo.lock():
661
662 repo.ui.debug(b'scanning\n')
663
664 entries, totalfilesize = _v2_walk(
665 repo,
666 includes=includes,
667 excludes=excludes,
668 includeobsmarkers=includeobsmarkers,
669 )
647
670
648 chunks = _emit2(repo, entries, totalfilesize)
671 chunks = _emit2(repo, entries, totalfilesize)
649 first = next(chunks)
672 first = next(chunks)
650 assert first is None
673 assert first is None
651 _test_sync_point_walk_1(repo)
674 _test_sync_point_walk_1(repo)
652 _test_sync_point_walk_2(repo)
675 _test_sync_point_walk_2(repo)
653
676
654 return len(entries), totalfilesize, chunks
677 return len(entries), totalfilesize, chunks
655
678
656
679
657 @contextlib.contextmanager
680 @contextlib.contextmanager
658 def nested(*ctxs):
681 def nested(*ctxs):
659 this = ctxs[0]
682 this = ctxs[0]
660 rest = ctxs[1:]
683 rest = ctxs[1:]
661 with this:
684 with this:
662 if rest:
685 if rest:
663 with nested(*rest):
686 with nested(*rest):
664 yield
687 yield
665 else:
688 else:
666 yield
689 yield
667
690
668
691
669 def consumev2(repo, fp, filecount, filesize):
692 def consumev2(repo, fp, filecount, filesize):
670 """Apply the contents from a version 2 streaming clone.
693 """Apply the contents from a version 2 streaming clone.
671
694
672 Data is read from an object that only needs to provide a ``read(size)``
695 Data is read from an object that only needs to provide a ``read(size)``
673 method.
696 method.
674 """
697 """
675 with repo.lock():
698 with repo.lock():
676 repo.ui.status(
699 repo.ui.status(
677 _(b'%d files to transfer, %s of data\n')
700 _(b'%d files to transfer, %s of data\n')
678 % (filecount, util.bytecount(filesize))
701 % (filecount, util.bytecount(filesize))
679 )
702 )
680
703
681 start = util.timer()
704 start = util.timer()
682 progress = repo.ui.makeprogress(
705 progress = repo.ui.makeprogress(
683 _(b'clone'), total=filesize, unit=_(b'bytes')
706 _(b'clone'), total=filesize, unit=_(b'bytes')
684 )
707 )
685 progress.update(0)
708 progress.update(0)
686
709
687 vfsmap = _makemap(repo)
710 vfsmap = _makemap(repo)
688
711
689 with repo.transaction(b'clone'):
712 with repo.transaction(b'clone'):
690 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
713 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
691 with nested(*ctxs):
714 with nested(*ctxs):
692 for i in range(filecount):
715 for i in range(filecount):
693 src = util.readexactly(fp, 1)
716 src = util.readexactly(fp, 1)
694 vfs = vfsmap[src]
717 vfs = vfsmap[src]
695 namelen = util.uvarintdecodestream(fp)
718 namelen = util.uvarintdecodestream(fp)
696 datalen = util.uvarintdecodestream(fp)
719 datalen = util.uvarintdecodestream(fp)
697
720
698 name = util.readexactly(fp, namelen)
721 name = util.readexactly(fp, namelen)
699
722
700 if repo.ui.debugflag:
723 if repo.ui.debugflag:
701 repo.ui.debug(
724 repo.ui.debug(
702 b'adding [%s] %s (%s)\n'
725 b'adding [%s] %s (%s)\n'
703 % (src, name, util.bytecount(datalen))
726 % (src, name, util.bytecount(datalen))
704 )
727 )
705
728
706 with vfs(name, b'w') as ofp:
729 with vfs(name, b'w') as ofp:
707 for chunk in util.filechunkiter(fp, limit=datalen):
730 for chunk in util.filechunkiter(fp, limit=datalen):
708 progress.increment(step=len(chunk))
731 progress.increment(step=len(chunk))
709 ofp.write(chunk)
732 ofp.write(chunk)
710
733
711 # force @filecache properties to be reloaded from
734 # force @filecache properties to be reloaded from
712 # streamclone-ed file at next access
735 # streamclone-ed file at next access
713 repo.invalidate(clearfilecache=True)
736 repo.invalidate(clearfilecache=True)
714
737
715 elapsed = util.timer() - start
738 elapsed = util.timer() - start
716 if elapsed <= 0:
739 if elapsed <= 0:
717 elapsed = 0.001
740 elapsed = 0.001
718 repo.ui.status(
741 repo.ui.status(
719 _(b'transferred %s in %.1f seconds (%s/sec)\n')
742 _(b'transferred %s in %.1f seconds (%s/sec)\n')
720 % (
743 % (
721 util.bytecount(progress.pos),
744 util.bytecount(progress.pos),
722 elapsed,
745 elapsed,
723 util.bytecount(progress.pos / elapsed),
746 util.bytecount(progress.pos / elapsed),
724 )
747 )
725 )
748 )
726 progress.complete()
749 progress.complete()
727
750
728
751
729 def applybundlev2(repo, fp, filecount, filesize, requirements):
752 def applybundlev2(repo, fp, filecount, filesize, requirements):
730 from . import localrepo
753 from . import localrepo
731
754
732 missingreqs = [r for r in requirements if r not in repo.supported]
755 missingreqs = [r for r in requirements if r not in repo.supported]
733 if missingreqs:
756 if missingreqs:
734 raise error.Abort(
757 raise error.Abort(
735 _(b'unable to apply stream clone: unsupported format: %s')
758 _(b'unable to apply stream clone: unsupported format: %s')
736 % b', '.join(sorted(missingreqs))
759 % b', '.join(sorted(missingreqs))
737 )
760 )
738
761
739 consumev2(repo, fp, filecount, filesize)
762 consumev2(repo, fp, filecount, filesize)
740
763
741 # new requirements = old non-format requirements +
764 # new requirements = old non-format requirements +
742 # new format-related remote requirements
765 # new format-related remote requirements
743 # requirements from the streamed-in repository
766 # requirements from the streamed-in repository
744 repo.requirements = set(requirements) | (
767 repo.requirements = set(requirements) | (
745 repo.requirements - repo.supportedformats
768 repo.requirements - repo.supportedformats
746 )
769 )
747 repo.svfs.options = localrepo.resolvestorevfsoptions(
770 repo.svfs.options = localrepo.resolvestorevfsoptions(
748 repo.ui, repo.requirements, repo.features
771 repo.ui, repo.requirements, repo.features
749 )
772 )
750 scmutil.writereporequirements(repo)
773 scmutil.writereporequirements(repo)
General Comments 0
You need to be logged in to leave comments. Login now