##// END OF EJS Templates
streamclone: use readexactly when reading stream v2...
Boris Feld -
r35821:3ad3aaeb stable
parent child Browse files
Show More
@@ -1,634 +1,634 b''
1 # streamclone.py - producing and consuming streaming repository data
1 # streamclone.py - producing and consuming streaming repository data
2 #
2 #
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import contextlib
10 import contextlib
11 import os
11 import os
12 import struct
12 import struct
13 import tempfile
13 import tempfile
14 import warnings
14 import warnings
15
15
16 from .i18n import _
16 from .i18n import _
17 from . import (
17 from . import (
18 branchmap,
18 branchmap,
19 cacheutil,
19 cacheutil,
20 error,
20 error,
21 phases,
21 phases,
22 store,
22 store,
23 util,
23 util,
24 )
24 )
25
25
26 def canperformstreamclone(pullop, bundle2=False):
26 def canperformstreamclone(pullop, bundle2=False):
27 """Whether it is possible to perform a streaming clone as part of pull.
27 """Whether it is possible to perform a streaming clone as part of pull.
28
28
29 ``bundle2`` will cause the function to consider stream clone through
29 ``bundle2`` will cause the function to consider stream clone through
30 bundle2 and only through bundle2.
30 bundle2 and only through bundle2.
31
31
32 Returns a tuple of (supported, requirements). ``supported`` is True if
32 Returns a tuple of (supported, requirements). ``supported`` is True if
33 streaming clone is supported and False otherwise. ``requirements`` is
33 streaming clone is supported and False otherwise. ``requirements`` is
34 a set of repo requirements from the remote, or ``None`` if stream clone
34 a set of repo requirements from the remote, or ``None`` if stream clone
35 isn't supported.
35 isn't supported.
36 """
36 """
37 repo = pullop.repo
37 repo = pullop.repo
38 remote = pullop.remote
38 remote = pullop.remote
39
39
40 bundle2supported = False
40 bundle2supported = False
41 if pullop.canusebundle2:
41 if pullop.canusebundle2:
42 if 'v2' in pullop.remotebundle2caps.get('stream', []):
42 if 'v2' in pullop.remotebundle2caps.get('stream', []):
43 bundle2supported = True
43 bundle2supported = True
44 # else
44 # else
45 # Server doesn't support bundle2 stream clone or doesn't support
45 # Server doesn't support bundle2 stream clone or doesn't support
46 # the versions we support. Fall back and possibly allow legacy.
46 # the versions we support. Fall back and possibly allow legacy.
47
47
48 # Ensures legacy code path uses available bundle2.
48 # Ensures legacy code path uses available bundle2.
49 if bundle2supported and not bundle2:
49 if bundle2supported and not bundle2:
50 return False, None
50 return False, None
51 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
51 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
52 elif bundle2 and not bundle2supported:
52 elif bundle2 and not bundle2supported:
53 return False, None
53 return False, None
54
54
55 # Streaming clone only works on empty repositories.
55 # Streaming clone only works on empty repositories.
56 if len(repo):
56 if len(repo):
57 return False, None
57 return False, None
58
58
59 # Streaming clone only works if all data is being requested.
59 # Streaming clone only works if all data is being requested.
60 if pullop.heads:
60 if pullop.heads:
61 return False, None
61 return False, None
62
62
63 streamrequested = pullop.streamclonerequested
63 streamrequested = pullop.streamclonerequested
64
64
65 # If we don't have a preference, let the server decide for us. This
65 # If we don't have a preference, let the server decide for us. This
66 # likely only comes into play in LANs.
66 # likely only comes into play in LANs.
67 if streamrequested is None:
67 if streamrequested is None:
68 # The server can advertise whether to prefer streaming clone.
68 # The server can advertise whether to prefer streaming clone.
69 streamrequested = remote.capable('stream-preferred')
69 streamrequested = remote.capable('stream-preferred')
70
70
71 if not streamrequested:
71 if not streamrequested:
72 return False, None
72 return False, None
73
73
74 # In order for stream clone to work, the client has to support all the
74 # In order for stream clone to work, the client has to support all the
75 # requirements advertised by the server.
75 # requirements advertised by the server.
76 #
76 #
77 # The server advertises its requirements via the "stream" and "streamreqs"
77 # The server advertises its requirements via the "stream" and "streamreqs"
78 # capability. "stream" (a value-less capability) is advertised if and only
78 # capability. "stream" (a value-less capability) is advertised if and only
79 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
79 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
80 # is advertised and contains a comma-delimited list of requirements.
80 # is advertised and contains a comma-delimited list of requirements.
81 requirements = set()
81 requirements = set()
82 if remote.capable('stream'):
82 if remote.capable('stream'):
83 requirements.add('revlogv1')
83 requirements.add('revlogv1')
84 else:
84 else:
85 streamreqs = remote.capable('streamreqs')
85 streamreqs = remote.capable('streamreqs')
86 # This is weird and shouldn't happen with modern servers.
86 # This is weird and shouldn't happen with modern servers.
87 if not streamreqs:
87 if not streamreqs:
88 pullop.repo.ui.warn(_(
88 pullop.repo.ui.warn(_(
89 'warning: stream clone requested but server has them '
89 'warning: stream clone requested but server has them '
90 'disabled\n'))
90 'disabled\n'))
91 return False, None
91 return False, None
92
92
93 streamreqs = set(streamreqs.split(','))
93 streamreqs = set(streamreqs.split(','))
94 # Server requires something we don't support. Bail.
94 # Server requires something we don't support. Bail.
95 missingreqs = streamreqs - repo.supportedformats
95 missingreqs = streamreqs - repo.supportedformats
96 if missingreqs:
96 if missingreqs:
97 pullop.repo.ui.warn(_(
97 pullop.repo.ui.warn(_(
98 'warning: stream clone requested but client is missing '
98 'warning: stream clone requested but client is missing '
99 'requirements: %s\n') % ', '.join(sorted(missingreqs)))
99 'requirements: %s\n') % ', '.join(sorted(missingreqs)))
100 pullop.repo.ui.warn(
100 pullop.repo.ui.warn(
101 _('(see https://www.mercurial-scm.org/wiki/MissingRequirement '
101 _('(see https://www.mercurial-scm.org/wiki/MissingRequirement '
102 'for more information)\n'))
102 'for more information)\n'))
103 return False, None
103 return False, None
104 requirements = streamreqs
104 requirements = streamreqs
105
105
106 return True, requirements
106 return True, requirements
107
107
108 def maybeperformlegacystreamclone(pullop):
108 def maybeperformlegacystreamclone(pullop):
109 """Possibly perform a legacy stream clone operation.
109 """Possibly perform a legacy stream clone operation.
110
110
111 Legacy stream clones are performed as part of pull but before all other
111 Legacy stream clones are performed as part of pull but before all other
112 operations.
112 operations.
113
113
114 A legacy stream clone will not be performed if a bundle2 stream clone is
114 A legacy stream clone will not be performed if a bundle2 stream clone is
115 supported.
115 supported.
116 """
116 """
117 supported, requirements = canperformstreamclone(pullop)
117 supported, requirements = canperformstreamclone(pullop)
118
118
119 if not supported:
119 if not supported:
120 return
120 return
121
121
122 repo = pullop.repo
122 repo = pullop.repo
123 remote = pullop.remote
123 remote = pullop.remote
124
124
125 # Save remote branchmap. We will use it later to speed up branchcache
125 # Save remote branchmap. We will use it later to speed up branchcache
126 # creation.
126 # creation.
127 rbranchmap = None
127 rbranchmap = None
128 if remote.capable('branchmap'):
128 if remote.capable('branchmap'):
129 rbranchmap = remote.branchmap()
129 rbranchmap = remote.branchmap()
130
130
131 repo.ui.status(_('streaming all changes\n'))
131 repo.ui.status(_('streaming all changes\n'))
132
132
133 fp = remote.stream_out()
133 fp = remote.stream_out()
134 l = fp.readline()
134 l = fp.readline()
135 try:
135 try:
136 resp = int(l)
136 resp = int(l)
137 except ValueError:
137 except ValueError:
138 raise error.ResponseError(
138 raise error.ResponseError(
139 _('unexpected response from remote server:'), l)
139 _('unexpected response from remote server:'), l)
140 if resp == 1:
140 if resp == 1:
141 raise error.Abort(_('operation forbidden by server'))
141 raise error.Abort(_('operation forbidden by server'))
142 elif resp == 2:
142 elif resp == 2:
143 raise error.Abort(_('locking the remote repository failed'))
143 raise error.Abort(_('locking the remote repository failed'))
144 elif resp != 0:
144 elif resp != 0:
145 raise error.Abort(_('the server sent an unknown error code'))
145 raise error.Abort(_('the server sent an unknown error code'))
146
146
147 l = fp.readline()
147 l = fp.readline()
148 try:
148 try:
149 filecount, bytecount = map(int, l.split(' ', 1))
149 filecount, bytecount = map(int, l.split(' ', 1))
150 except (ValueError, TypeError):
150 except (ValueError, TypeError):
151 raise error.ResponseError(
151 raise error.ResponseError(
152 _('unexpected response from remote server:'), l)
152 _('unexpected response from remote server:'), l)
153
153
154 with repo.lock():
154 with repo.lock():
155 consumev1(repo, fp, filecount, bytecount)
155 consumev1(repo, fp, filecount, bytecount)
156
156
157 # new requirements = old non-format requirements +
157 # new requirements = old non-format requirements +
158 # new format-related remote requirements
158 # new format-related remote requirements
159 # requirements from the streamed-in repository
159 # requirements from the streamed-in repository
160 repo.requirements = requirements | (
160 repo.requirements = requirements | (
161 repo.requirements - repo.supportedformats)
161 repo.requirements - repo.supportedformats)
162 repo._applyopenerreqs()
162 repo._applyopenerreqs()
163 repo._writerequirements()
163 repo._writerequirements()
164
164
165 if rbranchmap:
165 if rbranchmap:
166 branchmap.replacecache(repo, rbranchmap)
166 branchmap.replacecache(repo, rbranchmap)
167
167
168 repo.invalidate()
168 repo.invalidate()
169
169
170 def allowservergeneration(repo):
170 def allowservergeneration(repo):
171 """Whether streaming clones are allowed from the server."""
171 """Whether streaming clones are allowed from the server."""
172 if not repo.ui.configbool('server', 'uncompressed', untrusted=True):
172 if not repo.ui.configbool('server', 'uncompressed', untrusted=True):
173 return False
173 return False
174
174
175 # The way stream clone works makes it impossible to hide secret changesets.
175 # The way stream clone works makes it impossible to hide secret changesets.
176 # So don't allow this by default.
176 # So don't allow this by default.
177 secret = phases.hassecret(repo)
177 secret = phases.hassecret(repo)
178 if secret:
178 if secret:
179 return repo.ui.configbool('server', 'uncompressedallowsecret')
179 return repo.ui.configbool('server', 'uncompressedallowsecret')
180
180
181 return True
181 return True
182
182
183 # This is it's own function so extensions can override it.
183 # This is it's own function so extensions can override it.
184 def _walkstreamfiles(repo):
184 def _walkstreamfiles(repo):
185 return repo.store.walk()
185 return repo.store.walk()
186
186
187 def generatev1(repo):
187 def generatev1(repo):
188 """Emit content for version 1 of a streaming clone.
188 """Emit content for version 1 of a streaming clone.
189
189
190 This returns a 3-tuple of (file count, byte size, data iterator).
190 This returns a 3-tuple of (file count, byte size, data iterator).
191
191
192 The data iterator consists of N entries for each file being transferred.
192 The data iterator consists of N entries for each file being transferred.
193 Each file entry starts as a line with the file name and integer size
193 Each file entry starts as a line with the file name and integer size
194 delimited by a null byte.
194 delimited by a null byte.
195
195
196 The raw file data follows. Following the raw file data is the next file
196 The raw file data follows. Following the raw file data is the next file
197 entry, or EOF.
197 entry, or EOF.
198
198
199 When used on the wire protocol, an additional line indicating protocol
199 When used on the wire protocol, an additional line indicating protocol
200 success will be prepended to the stream. This function is not responsible
200 success will be prepended to the stream. This function is not responsible
201 for adding it.
201 for adding it.
202
202
203 This function will obtain a repository lock to ensure a consistent view of
203 This function will obtain a repository lock to ensure a consistent view of
204 the store is captured. It therefore may raise LockError.
204 the store is captured. It therefore may raise LockError.
205 """
205 """
206 entries = []
206 entries = []
207 total_bytes = 0
207 total_bytes = 0
208 # Get consistent snapshot of repo, lock during scan.
208 # Get consistent snapshot of repo, lock during scan.
209 with repo.lock():
209 with repo.lock():
210 repo.ui.debug('scanning\n')
210 repo.ui.debug('scanning\n')
211 for name, ename, size in _walkstreamfiles(repo):
211 for name, ename, size in _walkstreamfiles(repo):
212 if size:
212 if size:
213 entries.append((name, size))
213 entries.append((name, size))
214 total_bytes += size
214 total_bytes += size
215
215
216 repo.ui.debug('%d files, %d bytes to transfer\n' %
216 repo.ui.debug('%d files, %d bytes to transfer\n' %
217 (len(entries), total_bytes))
217 (len(entries), total_bytes))
218
218
219 svfs = repo.svfs
219 svfs = repo.svfs
220 debugflag = repo.ui.debugflag
220 debugflag = repo.ui.debugflag
221
221
222 def emitrevlogdata():
222 def emitrevlogdata():
223 for name, size in entries:
223 for name, size in entries:
224 if debugflag:
224 if debugflag:
225 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
225 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
226 # partially encode name over the wire for backwards compat
226 # partially encode name over the wire for backwards compat
227 yield '%s\0%d\n' % (store.encodedir(name), size)
227 yield '%s\0%d\n' % (store.encodedir(name), size)
228 # auditing at this stage is both pointless (paths are already
228 # auditing at this stage is both pointless (paths are already
229 # trusted by the local repo) and expensive
229 # trusted by the local repo) and expensive
230 with svfs(name, 'rb', auditpath=False) as fp:
230 with svfs(name, 'rb', auditpath=False) as fp:
231 if size <= 65536:
231 if size <= 65536:
232 yield fp.read(size)
232 yield fp.read(size)
233 else:
233 else:
234 for chunk in util.filechunkiter(fp, limit=size):
234 for chunk in util.filechunkiter(fp, limit=size):
235 yield chunk
235 yield chunk
236
236
237 return len(entries), total_bytes, emitrevlogdata()
237 return len(entries), total_bytes, emitrevlogdata()
238
238
239 def generatev1wireproto(repo):
239 def generatev1wireproto(repo):
240 """Emit content for version 1 of streaming clone suitable for the wire.
240 """Emit content for version 1 of streaming clone suitable for the wire.
241
241
242 This is the data output from ``generatev1()`` with 2 header lines. The
242 This is the data output from ``generatev1()`` with 2 header lines. The
243 first line indicates overall success. The 2nd contains the file count and
243 first line indicates overall success. The 2nd contains the file count and
244 byte size of payload.
244 byte size of payload.
245
245
246 The success line contains "0" for success, "1" for stream generation not
246 The success line contains "0" for success, "1" for stream generation not
247 allowed, and "2" for error locking the repository (possibly indicating
247 allowed, and "2" for error locking the repository (possibly indicating
248 a permissions error for the server process).
248 a permissions error for the server process).
249 """
249 """
250 if not allowservergeneration(repo):
250 if not allowservergeneration(repo):
251 yield '1\n'
251 yield '1\n'
252 return
252 return
253
253
254 try:
254 try:
255 filecount, bytecount, it = generatev1(repo)
255 filecount, bytecount, it = generatev1(repo)
256 except error.LockError:
256 except error.LockError:
257 yield '2\n'
257 yield '2\n'
258 return
258 return
259
259
260 # Indicates successful response.
260 # Indicates successful response.
261 yield '0\n'
261 yield '0\n'
262 yield '%d %d\n' % (filecount, bytecount)
262 yield '%d %d\n' % (filecount, bytecount)
263 for chunk in it:
263 for chunk in it:
264 yield chunk
264 yield chunk
265
265
266 def generatebundlev1(repo, compression='UN'):
266 def generatebundlev1(repo, compression='UN'):
267 """Emit content for version 1 of a stream clone bundle.
267 """Emit content for version 1 of a stream clone bundle.
268
268
269 The first 4 bytes of the output ("HGS1") denote this as stream clone
269 The first 4 bytes of the output ("HGS1") denote this as stream clone
270 bundle version 1.
270 bundle version 1.
271
271
272 The next 2 bytes indicate the compression type. Only "UN" is currently
272 The next 2 bytes indicate the compression type. Only "UN" is currently
273 supported.
273 supported.
274
274
275 The next 16 bytes are two 64-bit big endian unsigned integers indicating
275 The next 16 bytes are two 64-bit big endian unsigned integers indicating
276 file count and byte count, respectively.
276 file count and byte count, respectively.
277
277
278 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
278 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
279 of the requirements string, including a trailing \0. The following N bytes
279 of the requirements string, including a trailing \0. The following N bytes
280 are the requirements string, which is ASCII containing a comma-delimited
280 are the requirements string, which is ASCII containing a comma-delimited
281 list of repo requirements that are needed to support the data.
281 list of repo requirements that are needed to support the data.
282
282
283 The remaining content is the output of ``generatev1()`` (which may be
283 The remaining content is the output of ``generatev1()`` (which may be
284 compressed in the future).
284 compressed in the future).
285
285
286 Returns a tuple of (requirements, data generator).
286 Returns a tuple of (requirements, data generator).
287 """
287 """
288 if compression != 'UN':
288 if compression != 'UN':
289 raise ValueError('we do not support the compression argument yet')
289 raise ValueError('we do not support the compression argument yet')
290
290
291 requirements = repo.requirements & repo.supportedformats
291 requirements = repo.requirements & repo.supportedformats
292 requires = ','.join(sorted(requirements))
292 requires = ','.join(sorted(requirements))
293
293
294 def gen():
294 def gen():
295 yield 'HGS1'
295 yield 'HGS1'
296 yield compression
296 yield compression
297
297
298 filecount, bytecount, it = generatev1(repo)
298 filecount, bytecount, it = generatev1(repo)
299 repo.ui.status(_('writing %d bytes for %d files\n') %
299 repo.ui.status(_('writing %d bytes for %d files\n') %
300 (bytecount, filecount))
300 (bytecount, filecount))
301
301
302 yield struct.pack('>QQ', filecount, bytecount)
302 yield struct.pack('>QQ', filecount, bytecount)
303 yield struct.pack('>H', len(requires) + 1)
303 yield struct.pack('>H', len(requires) + 1)
304 yield requires + '\0'
304 yield requires + '\0'
305
305
306 # This is where we'll add compression in the future.
306 # This is where we'll add compression in the future.
307 assert compression == 'UN'
307 assert compression == 'UN'
308
308
309 seen = 0
309 seen = 0
310 repo.ui.progress(_('bundle'), 0, total=bytecount, unit=_('bytes'))
310 repo.ui.progress(_('bundle'), 0, total=bytecount, unit=_('bytes'))
311
311
312 for chunk in it:
312 for chunk in it:
313 seen += len(chunk)
313 seen += len(chunk)
314 repo.ui.progress(_('bundle'), seen, total=bytecount,
314 repo.ui.progress(_('bundle'), seen, total=bytecount,
315 unit=_('bytes'))
315 unit=_('bytes'))
316 yield chunk
316 yield chunk
317
317
318 repo.ui.progress(_('bundle'), None)
318 repo.ui.progress(_('bundle'), None)
319
319
320 return requirements, gen()
320 return requirements, gen()
321
321
322 def consumev1(repo, fp, filecount, bytecount):
322 def consumev1(repo, fp, filecount, bytecount):
323 """Apply the contents from version 1 of a streaming clone file handle.
323 """Apply the contents from version 1 of a streaming clone file handle.
324
324
325 This takes the output from "stream_out" and applies it to the specified
325 This takes the output from "stream_out" and applies it to the specified
326 repository.
326 repository.
327
327
328 Like "stream_out," the status line added by the wire protocol is not
328 Like "stream_out," the status line added by the wire protocol is not
329 handled by this function.
329 handled by this function.
330 """
330 """
331 with repo.lock():
331 with repo.lock():
332 repo.ui.status(_('%d files to transfer, %s of data\n') %
332 repo.ui.status(_('%d files to transfer, %s of data\n') %
333 (filecount, util.bytecount(bytecount)))
333 (filecount, util.bytecount(bytecount)))
334 handled_bytes = 0
334 handled_bytes = 0
335 repo.ui.progress(_('clone'), 0, total=bytecount, unit=_('bytes'))
335 repo.ui.progress(_('clone'), 0, total=bytecount, unit=_('bytes'))
336 start = util.timer()
336 start = util.timer()
337
337
338 # TODO: get rid of (potential) inconsistency
338 # TODO: get rid of (potential) inconsistency
339 #
339 #
340 # If transaction is started and any @filecache property is
340 # If transaction is started and any @filecache property is
341 # changed at this point, it causes inconsistency between
341 # changed at this point, it causes inconsistency between
342 # in-memory cached property and streamclone-ed file on the
342 # in-memory cached property and streamclone-ed file on the
343 # disk. Nested transaction prevents transaction scope "clone"
343 # disk. Nested transaction prevents transaction scope "clone"
344 # below from writing in-memory changes out at the end of it,
344 # below from writing in-memory changes out at the end of it,
345 # even though in-memory changes are discarded at the end of it
345 # even though in-memory changes are discarded at the end of it
346 # regardless of transaction nesting.
346 # regardless of transaction nesting.
347 #
347 #
348 # But transaction nesting can't be simply prohibited, because
348 # But transaction nesting can't be simply prohibited, because
349 # nesting occurs also in ordinary case (e.g. enabling
349 # nesting occurs also in ordinary case (e.g. enabling
350 # clonebundles).
350 # clonebundles).
351
351
352 with repo.transaction('clone'):
352 with repo.transaction('clone'):
353 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
353 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
354 for i in xrange(filecount):
354 for i in xrange(filecount):
355 # XXX doesn't support '\n' or '\r' in filenames
355 # XXX doesn't support '\n' or '\r' in filenames
356 l = fp.readline()
356 l = fp.readline()
357 try:
357 try:
358 name, size = l.split('\0', 1)
358 name, size = l.split('\0', 1)
359 size = int(size)
359 size = int(size)
360 except (ValueError, TypeError):
360 except (ValueError, TypeError):
361 raise error.ResponseError(
361 raise error.ResponseError(
362 _('unexpected response from remote server:'), l)
362 _('unexpected response from remote server:'), l)
363 if repo.ui.debugflag:
363 if repo.ui.debugflag:
364 repo.ui.debug('adding %s (%s)\n' %
364 repo.ui.debug('adding %s (%s)\n' %
365 (name, util.bytecount(size)))
365 (name, util.bytecount(size)))
366 # for backwards compat, name was partially encoded
366 # for backwards compat, name was partially encoded
367 path = store.decodedir(name)
367 path = store.decodedir(name)
368 with repo.svfs(path, 'w', backgroundclose=True) as ofp:
368 with repo.svfs(path, 'w', backgroundclose=True) as ofp:
369 for chunk in util.filechunkiter(fp, limit=size):
369 for chunk in util.filechunkiter(fp, limit=size):
370 handled_bytes += len(chunk)
370 handled_bytes += len(chunk)
371 repo.ui.progress(_('clone'), handled_bytes,
371 repo.ui.progress(_('clone'), handled_bytes,
372 total=bytecount, unit=_('bytes'))
372 total=bytecount, unit=_('bytes'))
373 ofp.write(chunk)
373 ofp.write(chunk)
374
374
375 # force @filecache properties to be reloaded from
375 # force @filecache properties to be reloaded from
376 # streamclone-ed file at next access
376 # streamclone-ed file at next access
377 repo.invalidate(clearfilecache=True)
377 repo.invalidate(clearfilecache=True)
378
378
379 elapsed = util.timer() - start
379 elapsed = util.timer() - start
380 if elapsed <= 0:
380 if elapsed <= 0:
381 elapsed = 0.001
381 elapsed = 0.001
382 repo.ui.progress(_('clone'), None)
382 repo.ui.progress(_('clone'), None)
383 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
383 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
384 (util.bytecount(bytecount), elapsed,
384 (util.bytecount(bytecount), elapsed,
385 util.bytecount(bytecount / elapsed)))
385 util.bytecount(bytecount / elapsed)))
386
386
387 def readbundle1header(fp):
387 def readbundle1header(fp):
388 compression = fp.read(2)
388 compression = fp.read(2)
389 if compression != 'UN':
389 if compression != 'UN':
390 raise error.Abort(_('only uncompressed stream clone bundles are '
390 raise error.Abort(_('only uncompressed stream clone bundles are '
391 'supported; got %s') % compression)
391 'supported; got %s') % compression)
392
392
393 filecount, bytecount = struct.unpack('>QQ', fp.read(16))
393 filecount, bytecount = struct.unpack('>QQ', fp.read(16))
394 requireslen = struct.unpack('>H', fp.read(2))[0]
394 requireslen = struct.unpack('>H', fp.read(2))[0]
395 requires = fp.read(requireslen)
395 requires = fp.read(requireslen)
396
396
397 if not requires.endswith('\0'):
397 if not requires.endswith('\0'):
398 raise error.Abort(_('malformed stream clone bundle: '
398 raise error.Abort(_('malformed stream clone bundle: '
399 'requirements not properly encoded'))
399 'requirements not properly encoded'))
400
400
401 requirements = set(requires.rstrip('\0').split(','))
401 requirements = set(requires.rstrip('\0').split(','))
402
402
403 return filecount, bytecount, requirements
403 return filecount, bytecount, requirements
404
404
405 def applybundlev1(repo, fp):
405 def applybundlev1(repo, fp):
406 """Apply the content from a stream clone bundle version 1.
406 """Apply the content from a stream clone bundle version 1.
407
407
408 We assume the 4 byte header has been read and validated and the file handle
408 We assume the 4 byte header has been read and validated and the file handle
409 is at the 2 byte compression identifier.
409 is at the 2 byte compression identifier.
410 """
410 """
411 if len(repo):
411 if len(repo):
412 raise error.Abort(_('cannot apply stream clone bundle on non-empty '
412 raise error.Abort(_('cannot apply stream clone bundle on non-empty '
413 'repo'))
413 'repo'))
414
414
415 filecount, bytecount, requirements = readbundle1header(fp)
415 filecount, bytecount, requirements = readbundle1header(fp)
416 missingreqs = requirements - repo.supportedformats
416 missingreqs = requirements - repo.supportedformats
417 if missingreqs:
417 if missingreqs:
418 raise error.Abort(_('unable to apply stream clone: '
418 raise error.Abort(_('unable to apply stream clone: '
419 'unsupported format: %s') %
419 'unsupported format: %s') %
420 ', '.join(sorted(missingreqs)))
420 ', '.join(sorted(missingreqs)))
421
421
422 consumev1(repo, fp, filecount, bytecount)
422 consumev1(repo, fp, filecount, bytecount)
423
423
424 class streamcloneapplier(object):
424 class streamcloneapplier(object):
425 """Class to manage applying streaming clone bundles.
425 """Class to manage applying streaming clone bundles.
426
426
427 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
427 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
428 readers to perform bundle type-specific functionality.
428 readers to perform bundle type-specific functionality.
429 """
429 """
430 def __init__(self, fh):
430 def __init__(self, fh):
431 self._fh = fh
431 self._fh = fh
432
432
433 def apply(self, repo):
433 def apply(self, repo):
434 return applybundlev1(repo, self._fh)
434 return applybundlev1(repo, self._fh)
435
435
436 # type of file to stream
436 # type of file to stream
437 _fileappend = 0 # append only file
437 _fileappend = 0 # append only file
438 _filefull = 1 # full snapshot file
438 _filefull = 1 # full snapshot file
439
439
440 # Source of the file
440 # Source of the file
441 _srcstore = 's' # store (svfs)
441 _srcstore = 's' # store (svfs)
442 _srccache = 'c' # cache (cache)
442 _srccache = 'c' # cache (cache)
443
443
444 # This is it's own function so extensions can override it.
444 # This is it's own function so extensions can override it.
445 def _walkstreamfullstorefiles(repo):
445 def _walkstreamfullstorefiles(repo):
446 """list snapshot file from the store"""
446 """list snapshot file from the store"""
447 fnames = []
447 fnames = []
448 if not repo.publishing():
448 if not repo.publishing():
449 fnames.append('phaseroots')
449 fnames.append('phaseroots')
450 return fnames
450 return fnames
451
451
452 def _filterfull(entry, copy, vfsmap):
452 def _filterfull(entry, copy, vfsmap):
453 """actually copy the snapshot files"""
453 """actually copy the snapshot files"""
454 src, name, ftype, data = entry
454 src, name, ftype, data = entry
455 if ftype != _filefull:
455 if ftype != _filefull:
456 return entry
456 return entry
457 return (src, name, ftype, copy(vfsmap[src].join(name)))
457 return (src, name, ftype, copy(vfsmap[src].join(name)))
458
458
459 @contextlib.contextmanager
459 @contextlib.contextmanager
460 def maketempcopies():
460 def maketempcopies():
461 """return a function to temporary copy file"""
461 """return a function to temporary copy file"""
462 files = []
462 files = []
463 try:
463 try:
464 def copy(src):
464 def copy(src):
465 fd, dst = tempfile.mkstemp()
465 fd, dst = tempfile.mkstemp()
466 os.close(fd)
466 os.close(fd)
467 files.append(dst)
467 files.append(dst)
468 util.copyfiles(src, dst, hardlink=True)
468 util.copyfiles(src, dst, hardlink=True)
469 return dst
469 return dst
470 yield copy
470 yield copy
471 finally:
471 finally:
472 for tmp in files:
472 for tmp in files:
473 util.tryunlink(tmp)
473 util.tryunlink(tmp)
474
474
475 def _makemap(repo):
475 def _makemap(repo):
476 """make a (src -> vfs) map for the repo"""
476 """make a (src -> vfs) map for the repo"""
477 vfsmap = {
477 vfsmap = {
478 _srcstore: repo.svfs,
478 _srcstore: repo.svfs,
479 _srccache: repo.cachevfs,
479 _srccache: repo.cachevfs,
480 }
480 }
481 # we keep repo.vfs out of the on purpose, ther are too many danger there
481 # we keep repo.vfs out of the on purpose, ther are too many danger there
482 # (eg: .hg/hgrc)
482 # (eg: .hg/hgrc)
483 assert repo.vfs not in vfsmap.values()
483 assert repo.vfs not in vfsmap.values()
484
484
485 return vfsmap
485 return vfsmap
486
486
487 def _emit2(repo, entries, totalfilesize):
487 def _emit2(repo, entries, totalfilesize):
488 """actually emit the stream bundle"""
488 """actually emit the stream bundle"""
489 vfsmap = _makemap(repo)
489 vfsmap = _makemap(repo)
490 progress = repo.ui.progress
490 progress = repo.ui.progress
491 progress(_('bundle'), 0, total=totalfilesize, unit=_('bytes'))
491 progress(_('bundle'), 0, total=totalfilesize, unit=_('bytes'))
492 with maketempcopies() as copy:
492 with maketempcopies() as copy:
493 try:
493 try:
494 # copy is delayed until we are in the try
494 # copy is delayed until we are in the try
495 entries = [_filterfull(e, copy, vfsmap) for e in entries]
495 entries = [_filterfull(e, copy, vfsmap) for e in entries]
496 yield None # this release the lock on the repository
496 yield None # this release the lock on the repository
497 seen = 0
497 seen = 0
498
498
499 for src, name, ftype, data in entries:
499 for src, name, ftype, data in entries:
500 vfs = vfsmap[src]
500 vfs = vfsmap[src]
501 yield src
501 yield src
502 yield util.uvarintencode(len(name))
502 yield util.uvarintencode(len(name))
503 if ftype == _fileappend:
503 if ftype == _fileappend:
504 fp = vfs(name)
504 fp = vfs(name)
505 size = data
505 size = data
506 elif ftype == _filefull:
506 elif ftype == _filefull:
507 fp = open(data, 'rb')
507 fp = open(data, 'rb')
508 size = util.fstat(fp).st_size
508 size = util.fstat(fp).st_size
509 try:
509 try:
510 yield util.uvarintencode(size)
510 yield util.uvarintencode(size)
511 yield name
511 yield name
512 if size <= 65536:
512 if size <= 65536:
513 chunks = (fp.read(size),)
513 chunks = (fp.read(size),)
514 else:
514 else:
515 chunks = util.filechunkiter(fp, limit=size)
515 chunks = util.filechunkiter(fp, limit=size)
516 for chunk in chunks:
516 for chunk in chunks:
517 seen += len(chunk)
517 seen += len(chunk)
518 progress(_('bundle'), seen, total=totalfilesize,
518 progress(_('bundle'), seen, total=totalfilesize,
519 unit=_('bytes'))
519 unit=_('bytes'))
520 yield chunk
520 yield chunk
521 finally:
521 finally:
522 fp.close()
522 fp.close()
523 finally:
523 finally:
524 progress(_('bundle'), None)
524 progress(_('bundle'), None)
525
525
526 def generatev2(repo):
526 def generatev2(repo):
527 """Emit content for version 2 of a streaming clone.
527 """Emit content for version 2 of a streaming clone.
528
528
529 the data stream consists the following entries:
529 the data stream consists the following entries:
530 1) A char representing the file destination (eg: store or cache)
530 1) A char representing the file destination (eg: store or cache)
531 2) A varint containing the length of the filename
531 2) A varint containing the length of the filename
532 3) A varint containing the length of file data
532 3) A varint containing the length of file data
533 4) N bytes containing the filename (the internal, store-agnostic form)
533 4) N bytes containing the filename (the internal, store-agnostic form)
534 5) N bytes containing the file data
534 5) N bytes containing the file data
535
535
536 Returns a 3-tuple of (file count, file size, data iterator).
536 Returns a 3-tuple of (file count, file size, data iterator).
537 """
537 """
538
538
539 with repo.lock():
539 with repo.lock():
540
540
541 entries = []
541 entries = []
542 totalfilesize = 0
542 totalfilesize = 0
543
543
544 repo.ui.debug('scanning\n')
544 repo.ui.debug('scanning\n')
545 for name, ename, size in _walkstreamfiles(repo):
545 for name, ename, size in _walkstreamfiles(repo):
546 if size:
546 if size:
547 entries.append((_srcstore, name, _fileappend, size))
547 entries.append((_srcstore, name, _fileappend, size))
548 totalfilesize += size
548 totalfilesize += size
549 for name in _walkstreamfullstorefiles(repo):
549 for name in _walkstreamfullstorefiles(repo):
550 if repo.svfs.exists(name):
550 if repo.svfs.exists(name):
551 totalfilesize += repo.svfs.lstat(name).st_size
551 totalfilesize += repo.svfs.lstat(name).st_size
552 entries.append((_srcstore, name, _filefull, None))
552 entries.append((_srcstore, name, _filefull, None))
553 for name in cacheutil.cachetocopy(repo):
553 for name in cacheutil.cachetocopy(repo):
554 if repo.cachevfs.exists(name):
554 if repo.cachevfs.exists(name):
555 totalfilesize += repo.cachevfs.lstat(name).st_size
555 totalfilesize += repo.cachevfs.lstat(name).st_size
556 entries.append((_srccache, name, _filefull, None))
556 entries.append((_srccache, name, _filefull, None))
557
557
558 chunks = _emit2(repo, entries, totalfilesize)
558 chunks = _emit2(repo, entries, totalfilesize)
559 first = next(chunks)
559 first = next(chunks)
560 assert first is None
560 assert first is None
561
561
562 return len(entries), totalfilesize, chunks
562 return len(entries), totalfilesize, chunks
563
563
564 @contextlib.contextmanager
564 @contextlib.contextmanager
565 def nested(*ctxs):
565 def nested(*ctxs):
566 with warnings.catch_warnings():
566 with warnings.catch_warnings():
567 # For some reason, Python decided 'nested' was deprecated without
567 # For some reason, Python decided 'nested' was deprecated without
568 # replacement. They officially advertised for filtering the deprecation
568 # replacement. They officially advertised for filtering the deprecation
569 # warning for people who actually need the feature.
569 # warning for people who actually need the feature.
570 warnings.filterwarnings("ignore",category=DeprecationWarning)
570 warnings.filterwarnings("ignore",category=DeprecationWarning)
571 with contextlib.nested(*ctxs):
571 with contextlib.nested(*ctxs):
572 yield
572 yield
573
573
574 def consumev2(repo, fp, filecount, filesize):
574 def consumev2(repo, fp, filecount, filesize):
575 """Apply the contents from a version 2 streaming clone.
575 """Apply the contents from a version 2 streaming clone.
576
576
577 Data is read from an object that only needs to provide a ``read(size)``
577 Data is read from an object that only needs to provide a ``read(size)``
578 method.
578 method.
579 """
579 """
580 with repo.lock():
580 with repo.lock():
581 repo.ui.status(_('%d files to transfer, %s of data\n') %
581 repo.ui.status(_('%d files to transfer, %s of data\n') %
582 (filecount, util.bytecount(filesize)))
582 (filecount, util.bytecount(filesize)))
583
583
584 start = util.timer()
584 start = util.timer()
585 handledbytes = 0
585 handledbytes = 0
586 progress = repo.ui.progress
586 progress = repo.ui.progress
587
587
588 progress(_('clone'), handledbytes, total=filesize, unit=_('bytes'))
588 progress(_('clone'), handledbytes, total=filesize, unit=_('bytes'))
589
589
590 vfsmap = _makemap(repo)
590 vfsmap = _makemap(repo)
591
591
592 with repo.transaction('clone'):
592 with repo.transaction('clone'):
593 ctxs = (vfs.backgroundclosing(repo.ui)
593 ctxs = (vfs.backgroundclosing(repo.ui)
594 for vfs in vfsmap.values())
594 for vfs in vfsmap.values())
595 with nested(*ctxs):
595 with nested(*ctxs):
596 for i in range(filecount):
596 for i in range(filecount):
597 src = fp.read(1)
597 src = util.readexactly(fp, 1)
598 vfs = vfsmap[src]
598 vfs = vfsmap[src]
599 namelen = util.uvarintdecodestream(fp)
599 namelen = util.uvarintdecodestream(fp)
600 datalen = util.uvarintdecodestream(fp)
600 datalen = util.uvarintdecodestream(fp)
601
601
602 name = fp.read(namelen)
602 name = util.readexactly(fp, namelen)
603
603
604 if repo.ui.debugflag:
604 if repo.ui.debugflag:
605 repo.ui.debug('adding [%s] %s (%s)\n' %
605 repo.ui.debug('adding [%s] %s (%s)\n' %
606 (src, name, util.bytecount(datalen)))
606 (src, name, util.bytecount(datalen)))
607
607
608 with vfs(name, 'w') as ofp:
608 with vfs(name, 'w') as ofp:
609 for chunk in util.filechunkiter(fp, limit=datalen):
609 for chunk in util.filechunkiter(fp, limit=datalen):
610 handledbytes += len(chunk)
610 handledbytes += len(chunk)
611 progress(_('clone'), handledbytes, total=filesize,
611 progress(_('clone'), handledbytes, total=filesize,
612 unit=_('bytes'))
612 unit=_('bytes'))
613 ofp.write(chunk)
613 ofp.write(chunk)
614
614
615 # force @filecache properties to be reloaded from
615 # force @filecache properties to be reloaded from
616 # streamclone-ed file at next access
616 # streamclone-ed file at next access
617 repo.invalidate(clearfilecache=True)
617 repo.invalidate(clearfilecache=True)
618
618
619 elapsed = util.timer() - start
619 elapsed = util.timer() - start
620 if elapsed <= 0:
620 if elapsed <= 0:
621 elapsed = 0.001
621 elapsed = 0.001
622 progress(_('clone'), None)
622 progress(_('clone'), None)
623 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
623 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
624 (util.bytecount(handledbytes), elapsed,
624 (util.bytecount(handledbytes), elapsed,
625 util.bytecount(handledbytes / elapsed)))
625 util.bytecount(handledbytes / elapsed)))
626
626
627 def applybundlev2(repo, fp, filecount, filesize, requirements):
627 def applybundlev2(repo, fp, filecount, filesize, requirements):
628 missingreqs = [r for r in requirements if r not in repo.supported]
628 missingreqs = [r for r in requirements if r not in repo.supported]
629 if missingreqs:
629 if missingreqs:
630 raise error.Abort(_('unable to apply stream clone: '
630 raise error.Abort(_('unable to apply stream clone: '
631 'unsupported format: %s') %
631 'unsupported format: %s') %
632 ', '.join(sorted(missingreqs)))
632 ', '.join(sorted(missingreqs)))
633
633
634 consumev2(repo, fp, filecount, filesize)
634 consumev2(repo, fp, filecount, filesize)
General Comments 0
You need to be logged in to leave comments. Login now