##// END OF EJS Templates
streamclone: don't support stream clone unless repo feature present...
Gregory Szorc -
r40064:51f10e6d default
parent child Browse files
Show More
@@ -1,647 +1,651 b''
1 # streamclone.py - producing and consuming streaming repository data
1 # streamclone.py - producing and consuming streaming repository data
2 #
2 #
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import contextlib
10 import contextlib
11 import os
11 import os
12 import struct
12 import struct
13
13
14 from .i18n import _
14 from .i18n import _
15 from . import (
15 from . import (
16 branchmap,
16 branchmap,
17 cacheutil,
17 cacheutil,
18 error,
18 error,
19 phases,
19 phases,
20 pycompat,
20 pycompat,
21 repository,
21 store,
22 store,
22 util,
23 util,
23 )
24 )
24
25
25 def canperformstreamclone(pullop, bundle2=False):
26 def canperformstreamclone(pullop, bundle2=False):
26 """Whether it is possible to perform a streaming clone as part of pull.
27 """Whether it is possible to perform a streaming clone as part of pull.
27
28
28 ``bundle2`` will cause the function to consider stream clone through
29 ``bundle2`` will cause the function to consider stream clone through
29 bundle2 and only through bundle2.
30 bundle2 and only through bundle2.
30
31
31 Returns a tuple of (supported, requirements). ``supported`` is True if
32 Returns a tuple of (supported, requirements). ``supported`` is True if
32 streaming clone is supported and False otherwise. ``requirements`` is
33 streaming clone is supported and False otherwise. ``requirements`` is
33 a set of repo requirements from the remote, or ``None`` if stream clone
34 a set of repo requirements from the remote, or ``None`` if stream clone
34 isn't supported.
35 isn't supported.
35 """
36 """
36 repo = pullop.repo
37 repo = pullop.repo
37 remote = pullop.remote
38 remote = pullop.remote
38
39
39 bundle2supported = False
40 bundle2supported = False
40 if pullop.canusebundle2:
41 if pullop.canusebundle2:
41 if 'v2' in pullop.remotebundle2caps.get('stream', []):
42 if 'v2' in pullop.remotebundle2caps.get('stream', []):
42 bundle2supported = True
43 bundle2supported = True
43 # else
44 # else
44 # Server doesn't support bundle2 stream clone or doesn't support
45 # Server doesn't support bundle2 stream clone or doesn't support
45 # the versions we support. Fall back and possibly allow legacy.
46 # the versions we support. Fall back and possibly allow legacy.
46
47
47 # Ensures legacy code path uses available bundle2.
48 # Ensures legacy code path uses available bundle2.
48 if bundle2supported and not bundle2:
49 if bundle2supported and not bundle2:
49 return False, None
50 return False, None
50 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
51 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
51 elif bundle2 and not bundle2supported:
52 elif bundle2 and not bundle2supported:
52 return False, None
53 return False, None
53
54
54 # Streaming clone only works on empty repositories.
55 # Streaming clone only works on empty repositories.
55 if len(repo):
56 if len(repo):
56 return False, None
57 return False, None
57
58
58 # Streaming clone only works if all data is being requested.
59 # Streaming clone only works if all data is being requested.
59 if pullop.heads:
60 if pullop.heads:
60 return False, None
61 return False, None
61
62
62 streamrequested = pullop.streamclonerequested
63 streamrequested = pullop.streamclonerequested
63
64
64 # If we don't have a preference, let the server decide for us. This
65 # If we don't have a preference, let the server decide for us. This
65 # likely only comes into play in LANs.
66 # likely only comes into play in LANs.
66 if streamrequested is None:
67 if streamrequested is None:
67 # The server can advertise whether to prefer streaming clone.
68 # The server can advertise whether to prefer streaming clone.
68 streamrequested = remote.capable('stream-preferred')
69 streamrequested = remote.capable('stream-preferred')
69
70
70 if not streamrequested:
71 if not streamrequested:
71 return False, None
72 return False, None
72
73
73 # In order for stream clone to work, the client has to support all the
74 # In order for stream clone to work, the client has to support all the
74 # requirements advertised by the server.
75 # requirements advertised by the server.
75 #
76 #
76 # The server advertises its requirements via the "stream" and "streamreqs"
77 # The server advertises its requirements via the "stream" and "streamreqs"
77 # capability. "stream" (a value-less capability) is advertised if and only
78 # capability. "stream" (a value-less capability) is advertised if and only
78 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
79 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
79 # is advertised and contains a comma-delimited list of requirements.
80 # is advertised and contains a comma-delimited list of requirements.
80 requirements = set()
81 requirements = set()
81 if remote.capable('stream'):
82 if remote.capable('stream'):
82 requirements.add('revlogv1')
83 requirements.add('revlogv1')
83 else:
84 else:
84 streamreqs = remote.capable('streamreqs')
85 streamreqs = remote.capable('streamreqs')
85 # This is weird and shouldn't happen with modern servers.
86 # This is weird and shouldn't happen with modern servers.
86 if not streamreqs:
87 if not streamreqs:
87 pullop.repo.ui.warn(_(
88 pullop.repo.ui.warn(_(
88 'warning: stream clone requested but server has them '
89 'warning: stream clone requested but server has them '
89 'disabled\n'))
90 'disabled\n'))
90 return False, None
91 return False, None
91
92
92 streamreqs = set(streamreqs.split(','))
93 streamreqs = set(streamreqs.split(','))
93 # Server requires something we don't support. Bail.
94 # Server requires something we don't support. Bail.
94 missingreqs = streamreqs - repo.supportedformats
95 missingreqs = streamreqs - repo.supportedformats
95 if missingreqs:
96 if missingreqs:
96 pullop.repo.ui.warn(_(
97 pullop.repo.ui.warn(_(
97 'warning: stream clone requested but client is missing '
98 'warning: stream clone requested but client is missing '
98 'requirements: %s\n') % ', '.join(sorted(missingreqs)))
99 'requirements: %s\n') % ', '.join(sorted(missingreqs)))
99 pullop.repo.ui.warn(
100 pullop.repo.ui.warn(
100 _('(see https://www.mercurial-scm.org/wiki/MissingRequirement '
101 _('(see https://www.mercurial-scm.org/wiki/MissingRequirement '
101 'for more information)\n'))
102 'for more information)\n'))
102 return False, None
103 return False, None
103 requirements = streamreqs
104 requirements = streamreqs
104
105
105 return True, requirements
106 return True, requirements
106
107
107 def maybeperformlegacystreamclone(pullop):
108 def maybeperformlegacystreamclone(pullop):
108 """Possibly perform a legacy stream clone operation.
109 """Possibly perform a legacy stream clone operation.
109
110
110 Legacy stream clones are performed as part of pull but before all other
111 Legacy stream clones are performed as part of pull but before all other
111 operations.
112 operations.
112
113
113 A legacy stream clone will not be performed if a bundle2 stream clone is
114 A legacy stream clone will not be performed if a bundle2 stream clone is
114 supported.
115 supported.
115 """
116 """
116 from . import localrepo
117 from . import localrepo
117
118
118 supported, requirements = canperformstreamclone(pullop)
119 supported, requirements = canperformstreamclone(pullop)
119
120
120 if not supported:
121 if not supported:
121 return
122 return
122
123
123 repo = pullop.repo
124 repo = pullop.repo
124 remote = pullop.remote
125 remote = pullop.remote
125
126
126 # Save remote branchmap. We will use it later to speed up branchcache
127 # Save remote branchmap. We will use it later to speed up branchcache
127 # creation.
128 # creation.
128 rbranchmap = None
129 rbranchmap = None
129 if remote.capable('branchmap'):
130 if remote.capable('branchmap'):
130 with remote.commandexecutor() as e:
131 with remote.commandexecutor() as e:
131 rbranchmap = e.callcommand('branchmap', {}).result()
132 rbranchmap = e.callcommand('branchmap', {}).result()
132
133
133 repo.ui.status(_('streaming all changes\n'))
134 repo.ui.status(_('streaming all changes\n'))
134
135
135 with remote.commandexecutor() as e:
136 with remote.commandexecutor() as e:
136 fp = e.callcommand('stream_out', {}).result()
137 fp = e.callcommand('stream_out', {}).result()
137
138
138 # TODO strictly speaking, this code should all be inside the context
139 # TODO strictly speaking, this code should all be inside the context
139 # manager because the context manager is supposed to ensure all wire state
140 # manager because the context manager is supposed to ensure all wire state
140 # is flushed when exiting. But the legacy peers don't do this, so it
141 # is flushed when exiting. But the legacy peers don't do this, so it
141 # doesn't matter.
142 # doesn't matter.
142 l = fp.readline()
143 l = fp.readline()
143 try:
144 try:
144 resp = int(l)
145 resp = int(l)
145 except ValueError:
146 except ValueError:
146 raise error.ResponseError(
147 raise error.ResponseError(
147 _('unexpected response from remote server:'), l)
148 _('unexpected response from remote server:'), l)
148 if resp == 1:
149 if resp == 1:
149 raise error.Abort(_('operation forbidden by server'))
150 raise error.Abort(_('operation forbidden by server'))
150 elif resp == 2:
151 elif resp == 2:
151 raise error.Abort(_('locking the remote repository failed'))
152 raise error.Abort(_('locking the remote repository failed'))
152 elif resp != 0:
153 elif resp != 0:
153 raise error.Abort(_('the server sent an unknown error code'))
154 raise error.Abort(_('the server sent an unknown error code'))
154
155
155 l = fp.readline()
156 l = fp.readline()
156 try:
157 try:
157 filecount, bytecount = map(int, l.split(' ', 1))
158 filecount, bytecount = map(int, l.split(' ', 1))
158 except (ValueError, TypeError):
159 except (ValueError, TypeError):
159 raise error.ResponseError(
160 raise error.ResponseError(
160 _('unexpected response from remote server:'), l)
161 _('unexpected response from remote server:'), l)
161
162
162 with repo.lock():
163 with repo.lock():
163 consumev1(repo, fp, filecount, bytecount)
164 consumev1(repo, fp, filecount, bytecount)
164
165
165 # new requirements = old non-format requirements +
166 # new requirements = old non-format requirements +
166 # new format-related remote requirements
167 # new format-related remote requirements
167 # requirements from the streamed-in repository
168 # requirements from the streamed-in repository
168 repo.requirements = requirements | (
169 repo.requirements = requirements | (
169 repo.requirements - repo.supportedformats)
170 repo.requirements - repo.supportedformats)
170 repo.svfs.options = localrepo.resolvestorevfsoptions(
171 repo.svfs.options = localrepo.resolvestorevfsoptions(
171 repo.ui, repo.requirements, repo.features)
172 repo.ui, repo.requirements, repo.features)
172 repo._writerequirements()
173 repo._writerequirements()
173
174
174 if rbranchmap:
175 if rbranchmap:
175 branchmap.replacecache(repo, rbranchmap)
176 branchmap.replacecache(repo, rbranchmap)
176
177
177 repo.invalidate()
178 repo.invalidate()
178
179
179 def allowservergeneration(repo):
180 def allowservergeneration(repo):
180 """Whether streaming clones are allowed from the server."""
181 """Whether streaming clones are allowed from the server."""
182 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
183 return False
184
181 if not repo.ui.configbool('server', 'uncompressed', untrusted=True):
185 if not repo.ui.configbool('server', 'uncompressed', untrusted=True):
182 return False
186 return False
183
187
184 # The way stream clone works makes it impossible to hide secret changesets.
188 # The way stream clone works makes it impossible to hide secret changesets.
185 # So don't allow this by default.
189 # So don't allow this by default.
186 secret = phases.hassecret(repo)
190 secret = phases.hassecret(repo)
187 if secret:
191 if secret:
188 return repo.ui.configbool('server', 'uncompressedallowsecret')
192 return repo.ui.configbool('server', 'uncompressedallowsecret')
189
193
190 return True
194 return True
191
195
192 # This is it's own function so extensions can override it.
196 # This is it's own function so extensions can override it.
193 def _walkstreamfiles(repo):
197 def _walkstreamfiles(repo):
194 return repo.store.walk()
198 return repo.store.walk()
195
199
196 def generatev1(repo):
200 def generatev1(repo):
197 """Emit content for version 1 of a streaming clone.
201 """Emit content for version 1 of a streaming clone.
198
202
199 This returns a 3-tuple of (file count, byte size, data iterator).
203 This returns a 3-tuple of (file count, byte size, data iterator).
200
204
201 The data iterator consists of N entries for each file being transferred.
205 The data iterator consists of N entries for each file being transferred.
202 Each file entry starts as a line with the file name and integer size
206 Each file entry starts as a line with the file name and integer size
203 delimited by a null byte.
207 delimited by a null byte.
204
208
205 The raw file data follows. Following the raw file data is the next file
209 The raw file data follows. Following the raw file data is the next file
206 entry, or EOF.
210 entry, or EOF.
207
211
208 When used on the wire protocol, an additional line indicating protocol
212 When used on the wire protocol, an additional line indicating protocol
209 success will be prepended to the stream. This function is not responsible
213 success will be prepended to the stream. This function is not responsible
210 for adding it.
214 for adding it.
211
215
212 This function will obtain a repository lock to ensure a consistent view of
216 This function will obtain a repository lock to ensure a consistent view of
213 the store is captured. It therefore may raise LockError.
217 the store is captured. It therefore may raise LockError.
214 """
218 """
215 entries = []
219 entries = []
216 total_bytes = 0
220 total_bytes = 0
217 # Get consistent snapshot of repo, lock during scan.
221 # Get consistent snapshot of repo, lock during scan.
218 with repo.lock():
222 with repo.lock():
219 repo.ui.debug('scanning\n')
223 repo.ui.debug('scanning\n')
220 for name, ename, size in _walkstreamfiles(repo):
224 for name, ename, size in _walkstreamfiles(repo):
221 if size:
225 if size:
222 entries.append((name, size))
226 entries.append((name, size))
223 total_bytes += size
227 total_bytes += size
224
228
225 repo.ui.debug('%d files, %d bytes to transfer\n' %
229 repo.ui.debug('%d files, %d bytes to transfer\n' %
226 (len(entries), total_bytes))
230 (len(entries), total_bytes))
227
231
228 svfs = repo.svfs
232 svfs = repo.svfs
229 debugflag = repo.ui.debugflag
233 debugflag = repo.ui.debugflag
230
234
231 def emitrevlogdata():
235 def emitrevlogdata():
232 for name, size in entries:
236 for name, size in entries:
233 if debugflag:
237 if debugflag:
234 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
238 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
235 # partially encode name over the wire for backwards compat
239 # partially encode name over the wire for backwards compat
236 yield '%s\0%d\n' % (store.encodedir(name), size)
240 yield '%s\0%d\n' % (store.encodedir(name), size)
237 # auditing at this stage is both pointless (paths are already
241 # auditing at this stage is both pointless (paths are already
238 # trusted by the local repo) and expensive
242 # trusted by the local repo) and expensive
239 with svfs(name, 'rb', auditpath=False) as fp:
243 with svfs(name, 'rb', auditpath=False) as fp:
240 if size <= 65536:
244 if size <= 65536:
241 yield fp.read(size)
245 yield fp.read(size)
242 else:
246 else:
243 for chunk in util.filechunkiter(fp, limit=size):
247 for chunk in util.filechunkiter(fp, limit=size):
244 yield chunk
248 yield chunk
245
249
246 return len(entries), total_bytes, emitrevlogdata()
250 return len(entries), total_bytes, emitrevlogdata()
247
251
248 def generatev1wireproto(repo):
252 def generatev1wireproto(repo):
249 """Emit content for version 1 of streaming clone suitable for the wire.
253 """Emit content for version 1 of streaming clone suitable for the wire.
250
254
251 This is the data output from ``generatev1()`` with 2 header lines. The
255 This is the data output from ``generatev1()`` with 2 header lines. The
252 first line indicates overall success. The 2nd contains the file count and
256 first line indicates overall success. The 2nd contains the file count and
253 byte size of payload.
257 byte size of payload.
254
258
255 The success line contains "0" for success, "1" for stream generation not
259 The success line contains "0" for success, "1" for stream generation not
256 allowed, and "2" for error locking the repository (possibly indicating
260 allowed, and "2" for error locking the repository (possibly indicating
257 a permissions error for the server process).
261 a permissions error for the server process).
258 """
262 """
259 if not allowservergeneration(repo):
263 if not allowservergeneration(repo):
260 yield '1\n'
264 yield '1\n'
261 return
265 return
262
266
263 try:
267 try:
264 filecount, bytecount, it = generatev1(repo)
268 filecount, bytecount, it = generatev1(repo)
265 except error.LockError:
269 except error.LockError:
266 yield '2\n'
270 yield '2\n'
267 return
271 return
268
272
269 # Indicates successful response.
273 # Indicates successful response.
270 yield '0\n'
274 yield '0\n'
271 yield '%d %d\n' % (filecount, bytecount)
275 yield '%d %d\n' % (filecount, bytecount)
272 for chunk in it:
276 for chunk in it:
273 yield chunk
277 yield chunk
274
278
275 def generatebundlev1(repo, compression='UN'):
279 def generatebundlev1(repo, compression='UN'):
276 """Emit content for version 1 of a stream clone bundle.
280 """Emit content for version 1 of a stream clone bundle.
277
281
278 The first 4 bytes of the output ("HGS1") denote this as stream clone
282 The first 4 bytes of the output ("HGS1") denote this as stream clone
279 bundle version 1.
283 bundle version 1.
280
284
281 The next 2 bytes indicate the compression type. Only "UN" is currently
285 The next 2 bytes indicate the compression type. Only "UN" is currently
282 supported.
286 supported.
283
287
284 The next 16 bytes are two 64-bit big endian unsigned integers indicating
288 The next 16 bytes are two 64-bit big endian unsigned integers indicating
285 file count and byte count, respectively.
289 file count and byte count, respectively.
286
290
287 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
291 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
288 of the requirements string, including a trailing \0. The following N bytes
292 of the requirements string, including a trailing \0. The following N bytes
289 are the requirements string, which is ASCII containing a comma-delimited
293 are the requirements string, which is ASCII containing a comma-delimited
290 list of repo requirements that are needed to support the data.
294 list of repo requirements that are needed to support the data.
291
295
292 The remaining content is the output of ``generatev1()`` (which may be
296 The remaining content is the output of ``generatev1()`` (which may be
293 compressed in the future).
297 compressed in the future).
294
298
295 Returns a tuple of (requirements, data generator).
299 Returns a tuple of (requirements, data generator).
296 """
300 """
297 if compression != 'UN':
301 if compression != 'UN':
298 raise ValueError('we do not support the compression argument yet')
302 raise ValueError('we do not support the compression argument yet')
299
303
300 requirements = repo.requirements & repo.supportedformats
304 requirements = repo.requirements & repo.supportedformats
301 requires = ','.join(sorted(requirements))
305 requires = ','.join(sorted(requirements))
302
306
303 def gen():
307 def gen():
304 yield 'HGS1'
308 yield 'HGS1'
305 yield compression
309 yield compression
306
310
307 filecount, bytecount, it = generatev1(repo)
311 filecount, bytecount, it = generatev1(repo)
308 repo.ui.status(_('writing %d bytes for %d files\n') %
312 repo.ui.status(_('writing %d bytes for %d files\n') %
309 (bytecount, filecount))
313 (bytecount, filecount))
310
314
311 yield struct.pack('>QQ', filecount, bytecount)
315 yield struct.pack('>QQ', filecount, bytecount)
312 yield struct.pack('>H', len(requires) + 1)
316 yield struct.pack('>H', len(requires) + 1)
313 yield requires + '\0'
317 yield requires + '\0'
314
318
315 # This is where we'll add compression in the future.
319 # This is where we'll add compression in the future.
316 assert compression == 'UN'
320 assert compression == 'UN'
317
321
318 progress = repo.ui.makeprogress(_('bundle'), total=bytecount,
322 progress = repo.ui.makeprogress(_('bundle'), total=bytecount,
319 unit=_('bytes'))
323 unit=_('bytes'))
320 progress.update(0)
324 progress.update(0)
321
325
322 for chunk in it:
326 for chunk in it:
323 progress.increment(step=len(chunk))
327 progress.increment(step=len(chunk))
324 yield chunk
328 yield chunk
325
329
326 progress.complete()
330 progress.complete()
327
331
328 return requirements, gen()
332 return requirements, gen()
329
333
330 def consumev1(repo, fp, filecount, bytecount):
334 def consumev1(repo, fp, filecount, bytecount):
331 """Apply the contents from version 1 of a streaming clone file handle.
335 """Apply the contents from version 1 of a streaming clone file handle.
332
336
333 This takes the output from "stream_out" and applies it to the specified
337 This takes the output from "stream_out" and applies it to the specified
334 repository.
338 repository.
335
339
336 Like "stream_out," the status line added by the wire protocol is not
340 Like "stream_out," the status line added by the wire protocol is not
337 handled by this function.
341 handled by this function.
338 """
342 """
339 with repo.lock():
343 with repo.lock():
340 repo.ui.status(_('%d files to transfer, %s of data\n') %
344 repo.ui.status(_('%d files to transfer, %s of data\n') %
341 (filecount, util.bytecount(bytecount)))
345 (filecount, util.bytecount(bytecount)))
342 progress = repo.ui.makeprogress(_('clone'), total=bytecount,
346 progress = repo.ui.makeprogress(_('clone'), total=bytecount,
343 unit=_('bytes'))
347 unit=_('bytes'))
344 progress.update(0)
348 progress.update(0)
345 start = util.timer()
349 start = util.timer()
346
350
347 # TODO: get rid of (potential) inconsistency
351 # TODO: get rid of (potential) inconsistency
348 #
352 #
349 # If transaction is started and any @filecache property is
353 # If transaction is started and any @filecache property is
350 # changed at this point, it causes inconsistency between
354 # changed at this point, it causes inconsistency between
351 # in-memory cached property and streamclone-ed file on the
355 # in-memory cached property and streamclone-ed file on the
352 # disk. Nested transaction prevents transaction scope "clone"
356 # disk. Nested transaction prevents transaction scope "clone"
353 # below from writing in-memory changes out at the end of it,
357 # below from writing in-memory changes out at the end of it,
354 # even though in-memory changes are discarded at the end of it
358 # even though in-memory changes are discarded at the end of it
355 # regardless of transaction nesting.
359 # regardless of transaction nesting.
356 #
360 #
357 # But transaction nesting can't be simply prohibited, because
361 # But transaction nesting can't be simply prohibited, because
358 # nesting occurs also in ordinary case (e.g. enabling
362 # nesting occurs also in ordinary case (e.g. enabling
359 # clonebundles).
363 # clonebundles).
360
364
361 with repo.transaction('clone'):
365 with repo.transaction('clone'):
362 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
366 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
363 for i in pycompat.xrange(filecount):
367 for i in pycompat.xrange(filecount):
364 # XXX doesn't support '\n' or '\r' in filenames
368 # XXX doesn't support '\n' or '\r' in filenames
365 l = fp.readline()
369 l = fp.readline()
366 try:
370 try:
367 name, size = l.split('\0', 1)
371 name, size = l.split('\0', 1)
368 size = int(size)
372 size = int(size)
369 except (ValueError, TypeError):
373 except (ValueError, TypeError):
370 raise error.ResponseError(
374 raise error.ResponseError(
371 _('unexpected response from remote server:'), l)
375 _('unexpected response from remote server:'), l)
372 if repo.ui.debugflag:
376 if repo.ui.debugflag:
373 repo.ui.debug('adding %s (%s)\n' %
377 repo.ui.debug('adding %s (%s)\n' %
374 (name, util.bytecount(size)))
378 (name, util.bytecount(size)))
375 # for backwards compat, name was partially encoded
379 # for backwards compat, name was partially encoded
376 path = store.decodedir(name)
380 path = store.decodedir(name)
377 with repo.svfs(path, 'w', backgroundclose=True) as ofp:
381 with repo.svfs(path, 'w', backgroundclose=True) as ofp:
378 for chunk in util.filechunkiter(fp, limit=size):
382 for chunk in util.filechunkiter(fp, limit=size):
379 progress.increment(step=len(chunk))
383 progress.increment(step=len(chunk))
380 ofp.write(chunk)
384 ofp.write(chunk)
381
385
382 # force @filecache properties to be reloaded from
386 # force @filecache properties to be reloaded from
383 # streamclone-ed file at next access
387 # streamclone-ed file at next access
384 repo.invalidate(clearfilecache=True)
388 repo.invalidate(clearfilecache=True)
385
389
386 elapsed = util.timer() - start
390 elapsed = util.timer() - start
387 if elapsed <= 0:
391 if elapsed <= 0:
388 elapsed = 0.001
392 elapsed = 0.001
389 progress.complete()
393 progress.complete()
390 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
394 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
391 (util.bytecount(bytecount), elapsed,
395 (util.bytecount(bytecount), elapsed,
392 util.bytecount(bytecount / elapsed)))
396 util.bytecount(bytecount / elapsed)))
393
397
394 def readbundle1header(fp):
398 def readbundle1header(fp):
395 compression = fp.read(2)
399 compression = fp.read(2)
396 if compression != 'UN':
400 if compression != 'UN':
397 raise error.Abort(_('only uncompressed stream clone bundles are '
401 raise error.Abort(_('only uncompressed stream clone bundles are '
398 'supported; got %s') % compression)
402 'supported; got %s') % compression)
399
403
400 filecount, bytecount = struct.unpack('>QQ', fp.read(16))
404 filecount, bytecount = struct.unpack('>QQ', fp.read(16))
401 requireslen = struct.unpack('>H', fp.read(2))[0]
405 requireslen = struct.unpack('>H', fp.read(2))[0]
402 requires = fp.read(requireslen)
406 requires = fp.read(requireslen)
403
407
404 if not requires.endswith('\0'):
408 if not requires.endswith('\0'):
405 raise error.Abort(_('malformed stream clone bundle: '
409 raise error.Abort(_('malformed stream clone bundle: '
406 'requirements not properly encoded'))
410 'requirements not properly encoded'))
407
411
408 requirements = set(requires.rstrip('\0').split(','))
412 requirements = set(requires.rstrip('\0').split(','))
409
413
410 return filecount, bytecount, requirements
414 return filecount, bytecount, requirements
411
415
412 def applybundlev1(repo, fp):
416 def applybundlev1(repo, fp):
413 """Apply the content from a stream clone bundle version 1.
417 """Apply the content from a stream clone bundle version 1.
414
418
415 We assume the 4 byte header has been read and validated and the file handle
419 We assume the 4 byte header has been read and validated and the file handle
416 is at the 2 byte compression identifier.
420 is at the 2 byte compression identifier.
417 """
421 """
418 if len(repo):
422 if len(repo):
419 raise error.Abort(_('cannot apply stream clone bundle on non-empty '
423 raise error.Abort(_('cannot apply stream clone bundle on non-empty '
420 'repo'))
424 'repo'))
421
425
422 filecount, bytecount, requirements = readbundle1header(fp)
426 filecount, bytecount, requirements = readbundle1header(fp)
423 missingreqs = requirements - repo.supportedformats
427 missingreqs = requirements - repo.supportedformats
424 if missingreqs:
428 if missingreqs:
425 raise error.Abort(_('unable to apply stream clone: '
429 raise error.Abort(_('unable to apply stream clone: '
426 'unsupported format: %s') %
430 'unsupported format: %s') %
427 ', '.join(sorted(missingreqs)))
431 ', '.join(sorted(missingreqs)))
428
432
429 consumev1(repo, fp, filecount, bytecount)
433 consumev1(repo, fp, filecount, bytecount)
430
434
431 class streamcloneapplier(object):
435 class streamcloneapplier(object):
432 """Class to manage applying streaming clone bundles.
436 """Class to manage applying streaming clone bundles.
433
437
434 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
438 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
435 readers to perform bundle type-specific functionality.
439 readers to perform bundle type-specific functionality.
436 """
440 """
437 def __init__(self, fh):
441 def __init__(self, fh):
438 self._fh = fh
442 self._fh = fh
439
443
440 def apply(self, repo):
444 def apply(self, repo):
441 return applybundlev1(repo, self._fh)
445 return applybundlev1(repo, self._fh)
442
446
443 # type of file to stream
447 # type of file to stream
444 _fileappend = 0 # append only file
448 _fileappend = 0 # append only file
445 _filefull = 1 # full snapshot file
449 _filefull = 1 # full snapshot file
446
450
447 # Source of the file
451 # Source of the file
448 _srcstore = 's' # store (svfs)
452 _srcstore = 's' # store (svfs)
449 _srccache = 'c' # cache (cache)
453 _srccache = 'c' # cache (cache)
450
454
451 # This is it's own function so extensions can override it.
455 # This is it's own function so extensions can override it.
452 def _walkstreamfullstorefiles(repo):
456 def _walkstreamfullstorefiles(repo):
453 """list snapshot file from the store"""
457 """list snapshot file from the store"""
454 fnames = []
458 fnames = []
455 if not repo.publishing():
459 if not repo.publishing():
456 fnames.append('phaseroots')
460 fnames.append('phaseroots')
457 return fnames
461 return fnames
458
462
459 def _filterfull(entry, copy, vfsmap):
463 def _filterfull(entry, copy, vfsmap):
460 """actually copy the snapshot files"""
464 """actually copy the snapshot files"""
461 src, name, ftype, data = entry
465 src, name, ftype, data = entry
462 if ftype != _filefull:
466 if ftype != _filefull:
463 return entry
467 return entry
464 return (src, name, ftype, copy(vfsmap[src].join(name)))
468 return (src, name, ftype, copy(vfsmap[src].join(name)))
465
469
466 @contextlib.contextmanager
470 @contextlib.contextmanager
467 def maketempcopies():
471 def maketempcopies():
468 """return a function to temporary copy file"""
472 """return a function to temporary copy file"""
469 files = []
473 files = []
470 try:
474 try:
471 def copy(src):
475 def copy(src):
472 fd, dst = pycompat.mkstemp()
476 fd, dst = pycompat.mkstemp()
473 os.close(fd)
477 os.close(fd)
474 files.append(dst)
478 files.append(dst)
475 util.copyfiles(src, dst, hardlink=True)
479 util.copyfiles(src, dst, hardlink=True)
476 return dst
480 return dst
477 yield copy
481 yield copy
478 finally:
482 finally:
479 for tmp in files:
483 for tmp in files:
480 util.tryunlink(tmp)
484 util.tryunlink(tmp)
481
485
482 def _makemap(repo):
486 def _makemap(repo):
483 """make a (src -> vfs) map for the repo"""
487 """make a (src -> vfs) map for the repo"""
484 vfsmap = {
488 vfsmap = {
485 _srcstore: repo.svfs,
489 _srcstore: repo.svfs,
486 _srccache: repo.cachevfs,
490 _srccache: repo.cachevfs,
487 }
491 }
488 # we keep repo.vfs out of the on purpose, ther are too many danger there
492 # we keep repo.vfs out of the on purpose, ther are too many danger there
489 # (eg: .hg/hgrc)
493 # (eg: .hg/hgrc)
490 assert repo.vfs not in vfsmap.values()
494 assert repo.vfs not in vfsmap.values()
491
495
492 return vfsmap
496 return vfsmap
493
497
494 def _emit2(repo, entries, totalfilesize):
498 def _emit2(repo, entries, totalfilesize):
495 """actually emit the stream bundle"""
499 """actually emit the stream bundle"""
496 vfsmap = _makemap(repo)
500 vfsmap = _makemap(repo)
497 progress = repo.ui.makeprogress(_('bundle'), total=totalfilesize,
501 progress = repo.ui.makeprogress(_('bundle'), total=totalfilesize,
498 unit=_('bytes'))
502 unit=_('bytes'))
499 progress.update(0)
503 progress.update(0)
500 with maketempcopies() as copy, progress:
504 with maketempcopies() as copy, progress:
501 # copy is delayed until we are in the try
505 # copy is delayed until we are in the try
502 entries = [_filterfull(e, copy, vfsmap) for e in entries]
506 entries = [_filterfull(e, copy, vfsmap) for e in entries]
503 yield None # this release the lock on the repository
507 yield None # this release the lock on the repository
504 seen = 0
508 seen = 0
505
509
506 for src, name, ftype, data in entries:
510 for src, name, ftype, data in entries:
507 vfs = vfsmap[src]
511 vfs = vfsmap[src]
508 yield src
512 yield src
509 yield util.uvarintencode(len(name))
513 yield util.uvarintencode(len(name))
510 if ftype == _fileappend:
514 if ftype == _fileappend:
511 fp = vfs(name)
515 fp = vfs(name)
512 size = data
516 size = data
513 elif ftype == _filefull:
517 elif ftype == _filefull:
514 fp = open(data, 'rb')
518 fp = open(data, 'rb')
515 size = util.fstat(fp).st_size
519 size = util.fstat(fp).st_size
516 try:
520 try:
517 yield util.uvarintencode(size)
521 yield util.uvarintencode(size)
518 yield name
522 yield name
519 if size <= 65536:
523 if size <= 65536:
520 chunks = (fp.read(size),)
524 chunks = (fp.read(size),)
521 else:
525 else:
522 chunks = util.filechunkiter(fp, limit=size)
526 chunks = util.filechunkiter(fp, limit=size)
523 for chunk in chunks:
527 for chunk in chunks:
524 seen += len(chunk)
528 seen += len(chunk)
525 progress.update(seen)
529 progress.update(seen)
526 yield chunk
530 yield chunk
527 finally:
531 finally:
528 fp.close()
532 fp.close()
529
533
530 def generatev2(repo):
534 def generatev2(repo):
531 """Emit content for version 2 of a streaming clone.
535 """Emit content for version 2 of a streaming clone.
532
536
533 the data stream consists the following entries:
537 the data stream consists the following entries:
534 1) A char representing the file destination (eg: store or cache)
538 1) A char representing the file destination (eg: store or cache)
535 2) A varint containing the length of the filename
539 2) A varint containing the length of the filename
536 3) A varint containing the length of file data
540 3) A varint containing the length of file data
537 4) N bytes containing the filename (the internal, store-agnostic form)
541 4) N bytes containing the filename (the internal, store-agnostic form)
538 5) N bytes containing the file data
542 5) N bytes containing the file data
539
543
540 Returns a 3-tuple of (file count, file size, data iterator).
544 Returns a 3-tuple of (file count, file size, data iterator).
541 """
545 """
542
546
543 with repo.lock():
547 with repo.lock():
544
548
545 entries = []
549 entries = []
546 totalfilesize = 0
550 totalfilesize = 0
547
551
548 repo.ui.debug('scanning\n')
552 repo.ui.debug('scanning\n')
549 for name, ename, size in _walkstreamfiles(repo):
553 for name, ename, size in _walkstreamfiles(repo):
550 if size:
554 if size:
551 entries.append((_srcstore, name, _fileappend, size))
555 entries.append((_srcstore, name, _fileappend, size))
552 totalfilesize += size
556 totalfilesize += size
553 for name in _walkstreamfullstorefiles(repo):
557 for name in _walkstreamfullstorefiles(repo):
554 if repo.svfs.exists(name):
558 if repo.svfs.exists(name):
555 totalfilesize += repo.svfs.lstat(name).st_size
559 totalfilesize += repo.svfs.lstat(name).st_size
556 entries.append((_srcstore, name, _filefull, None))
560 entries.append((_srcstore, name, _filefull, None))
557 for name in cacheutil.cachetocopy(repo):
561 for name in cacheutil.cachetocopy(repo):
558 if repo.cachevfs.exists(name):
562 if repo.cachevfs.exists(name):
559 totalfilesize += repo.cachevfs.lstat(name).st_size
563 totalfilesize += repo.cachevfs.lstat(name).st_size
560 entries.append((_srccache, name, _filefull, None))
564 entries.append((_srccache, name, _filefull, None))
561
565
562 chunks = _emit2(repo, entries, totalfilesize)
566 chunks = _emit2(repo, entries, totalfilesize)
563 first = next(chunks)
567 first = next(chunks)
564 assert first is None
568 assert first is None
565
569
566 return len(entries), totalfilesize, chunks
570 return len(entries), totalfilesize, chunks
567
571
568 @contextlib.contextmanager
572 @contextlib.contextmanager
569 def nested(*ctxs):
573 def nested(*ctxs):
570 this = ctxs[0]
574 this = ctxs[0]
571 rest = ctxs[1:]
575 rest = ctxs[1:]
572 with this:
576 with this:
573 if rest:
577 if rest:
574 with nested(*rest):
578 with nested(*rest):
575 yield
579 yield
576 else:
580 else:
577 yield
581 yield
578
582
579 def consumev2(repo, fp, filecount, filesize):
583 def consumev2(repo, fp, filecount, filesize):
580 """Apply the contents from a version 2 streaming clone.
584 """Apply the contents from a version 2 streaming clone.
581
585
582 Data is read from an object that only needs to provide a ``read(size)``
586 Data is read from an object that only needs to provide a ``read(size)``
583 method.
587 method.
584 """
588 """
585 with repo.lock():
589 with repo.lock():
586 repo.ui.status(_('%d files to transfer, %s of data\n') %
590 repo.ui.status(_('%d files to transfer, %s of data\n') %
587 (filecount, util.bytecount(filesize)))
591 (filecount, util.bytecount(filesize)))
588
592
589 start = util.timer()
593 start = util.timer()
590 progress = repo.ui.makeprogress(_('clone'), total=filesize,
594 progress = repo.ui.makeprogress(_('clone'), total=filesize,
591 unit=_('bytes'))
595 unit=_('bytes'))
592 progress.update(0)
596 progress.update(0)
593
597
594 vfsmap = _makemap(repo)
598 vfsmap = _makemap(repo)
595
599
596 with repo.transaction('clone'):
600 with repo.transaction('clone'):
597 ctxs = (vfs.backgroundclosing(repo.ui)
601 ctxs = (vfs.backgroundclosing(repo.ui)
598 for vfs in vfsmap.values())
602 for vfs in vfsmap.values())
599 with nested(*ctxs):
603 with nested(*ctxs):
600 for i in range(filecount):
604 for i in range(filecount):
601 src = util.readexactly(fp, 1)
605 src = util.readexactly(fp, 1)
602 vfs = vfsmap[src]
606 vfs = vfsmap[src]
603 namelen = util.uvarintdecodestream(fp)
607 namelen = util.uvarintdecodestream(fp)
604 datalen = util.uvarintdecodestream(fp)
608 datalen = util.uvarintdecodestream(fp)
605
609
606 name = util.readexactly(fp, namelen)
610 name = util.readexactly(fp, namelen)
607
611
608 if repo.ui.debugflag:
612 if repo.ui.debugflag:
609 repo.ui.debug('adding [%s] %s (%s)\n' %
613 repo.ui.debug('adding [%s] %s (%s)\n' %
610 (src, name, util.bytecount(datalen)))
614 (src, name, util.bytecount(datalen)))
611
615
612 with vfs(name, 'w') as ofp:
616 with vfs(name, 'w') as ofp:
613 for chunk in util.filechunkiter(fp, limit=datalen):
617 for chunk in util.filechunkiter(fp, limit=datalen):
614 progress.increment(step=len(chunk))
618 progress.increment(step=len(chunk))
615 ofp.write(chunk)
619 ofp.write(chunk)
616
620
617 # force @filecache properties to be reloaded from
621 # force @filecache properties to be reloaded from
618 # streamclone-ed file at next access
622 # streamclone-ed file at next access
619 repo.invalidate(clearfilecache=True)
623 repo.invalidate(clearfilecache=True)
620
624
621 elapsed = util.timer() - start
625 elapsed = util.timer() - start
622 if elapsed <= 0:
626 if elapsed <= 0:
623 elapsed = 0.001
627 elapsed = 0.001
624 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
628 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
625 (util.bytecount(progress.pos), elapsed,
629 (util.bytecount(progress.pos), elapsed,
626 util.bytecount(progress.pos / elapsed)))
630 util.bytecount(progress.pos / elapsed)))
627 progress.complete()
631 progress.complete()
628
632
629 def applybundlev2(repo, fp, filecount, filesize, requirements):
633 def applybundlev2(repo, fp, filecount, filesize, requirements):
630 from . import localrepo
634 from . import localrepo
631
635
632 missingreqs = [r for r in requirements if r not in repo.supported]
636 missingreqs = [r for r in requirements if r not in repo.supported]
633 if missingreqs:
637 if missingreqs:
634 raise error.Abort(_('unable to apply stream clone: '
638 raise error.Abort(_('unable to apply stream clone: '
635 'unsupported format: %s') %
639 'unsupported format: %s') %
636 ', '.join(sorted(missingreqs)))
640 ', '.join(sorted(missingreqs)))
637
641
638 consumev2(repo, fp, filecount, filesize)
642 consumev2(repo, fp, filecount, filesize)
639
643
640 # new requirements = old non-format requirements +
644 # new requirements = old non-format requirements +
641 # new format-related remote requirements
645 # new format-related remote requirements
642 # requirements from the streamed-in repository
646 # requirements from the streamed-in repository
643 repo.requirements = set(requirements) | (
647 repo.requirements = set(requirements) | (
644 repo.requirements - repo.supportedformats)
648 repo.requirements - repo.supportedformats)
645 repo.svfs.options = localrepo.resolvestorevfsoptions(
649 repo.svfs.options = localrepo.resolvestorevfsoptions(
646 repo.ui, repo.requirements, repo.features)
650 repo.ui, repo.requirements, repo.features)
647 repo._writerequirements()
651 repo._writerequirements()
General Comments 0
You need to be logged in to leave comments. Login now