##// END OF EJS Templates
streamclone: use progress helper...
Martin von Zweigbergk -
r38368:e59eaf51 default
parent child Browse files
Show More
@@ -1,649 +1,644 b''
1 # streamclone.py - producing and consuming streaming repository data
1 # streamclone.py - producing and consuming streaming repository data
2 #
2 #
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import contextlib
10 import contextlib
11 import os
11 import os
12 import struct
12 import struct
13 import warnings
13 import warnings
14
14
15 from .i18n import _
15 from .i18n import _
16 from . import (
16 from . import (
17 branchmap,
17 branchmap,
18 cacheutil,
18 cacheutil,
19 error,
19 error,
20 phases,
20 phases,
21 pycompat,
21 pycompat,
22 store,
22 store,
23 util,
23 util,
24 )
24 )
25
25
26 def canperformstreamclone(pullop, bundle2=False):
26 def canperformstreamclone(pullop, bundle2=False):
27 """Whether it is possible to perform a streaming clone as part of pull.
27 """Whether it is possible to perform a streaming clone as part of pull.
28
28
29 ``bundle2`` will cause the function to consider stream clone through
29 ``bundle2`` will cause the function to consider stream clone through
30 bundle2 and only through bundle2.
30 bundle2 and only through bundle2.
31
31
32 Returns a tuple of (supported, requirements). ``supported`` is True if
32 Returns a tuple of (supported, requirements). ``supported`` is True if
33 streaming clone is supported and False otherwise. ``requirements`` is
33 streaming clone is supported and False otherwise. ``requirements`` is
34 a set of repo requirements from the remote, or ``None`` if stream clone
34 a set of repo requirements from the remote, or ``None`` if stream clone
35 isn't supported.
35 isn't supported.
36 """
36 """
37 repo = pullop.repo
37 repo = pullop.repo
38 remote = pullop.remote
38 remote = pullop.remote
39
39
40 bundle2supported = False
40 bundle2supported = False
41 if pullop.canusebundle2:
41 if pullop.canusebundle2:
42 if 'v2' in pullop.remotebundle2caps.get('stream', []):
42 if 'v2' in pullop.remotebundle2caps.get('stream', []):
43 bundle2supported = True
43 bundle2supported = True
44 # else
44 # else
45 # Server doesn't support bundle2 stream clone or doesn't support
45 # Server doesn't support bundle2 stream clone or doesn't support
46 # the versions we support. Fall back and possibly allow legacy.
46 # the versions we support. Fall back and possibly allow legacy.
47
47
48 # Ensures legacy code path uses available bundle2.
48 # Ensures legacy code path uses available bundle2.
49 if bundle2supported and not bundle2:
49 if bundle2supported and not bundle2:
50 return False, None
50 return False, None
51 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
51 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
52 elif bundle2 and not bundle2supported:
52 elif bundle2 and not bundle2supported:
53 return False, None
53 return False, None
54
54
55 # Streaming clone only works on empty repositories.
55 # Streaming clone only works on empty repositories.
56 if len(repo):
56 if len(repo):
57 return False, None
57 return False, None
58
58
59 # Streaming clone only works if all data is being requested.
59 # Streaming clone only works if all data is being requested.
60 if pullop.heads:
60 if pullop.heads:
61 return False, None
61 return False, None
62
62
63 streamrequested = pullop.streamclonerequested
63 streamrequested = pullop.streamclonerequested
64
64
65 # If we don't have a preference, let the server decide for us. This
65 # If we don't have a preference, let the server decide for us. This
66 # likely only comes into play in LANs.
66 # likely only comes into play in LANs.
67 if streamrequested is None:
67 if streamrequested is None:
68 # The server can advertise whether to prefer streaming clone.
68 # The server can advertise whether to prefer streaming clone.
69 streamrequested = remote.capable('stream-preferred')
69 streamrequested = remote.capable('stream-preferred')
70
70
71 if not streamrequested:
71 if not streamrequested:
72 return False, None
72 return False, None
73
73
74 # In order for stream clone to work, the client has to support all the
74 # In order for stream clone to work, the client has to support all the
75 # requirements advertised by the server.
75 # requirements advertised by the server.
76 #
76 #
77 # The server advertises its requirements via the "stream" and "streamreqs"
77 # The server advertises its requirements via the "stream" and "streamreqs"
78 # capability. "stream" (a value-less capability) is advertised if and only
78 # capability. "stream" (a value-less capability) is advertised if and only
79 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
79 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
80 # is advertised and contains a comma-delimited list of requirements.
80 # is advertised and contains a comma-delimited list of requirements.
81 requirements = set()
81 requirements = set()
82 if remote.capable('stream'):
82 if remote.capable('stream'):
83 requirements.add('revlogv1')
83 requirements.add('revlogv1')
84 else:
84 else:
85 streamreqs = remote.capable('streamreqs')
85 streamreqs = remote.capable('streamreqs')
86 # This is weird and shouldn't happen with modern servers.
86 # This is weird and shouldn't happen with modern servers.
87 if not streamreqs:
87 if not streamreqs:
88 pullop.repo.ui.warn(_(
88 pullop.repo.ui.warn(_(
89 'warning: stream clone requested but server has them '
89 'warning: stream clone requested but server has them '
90 'disabled\n'))
90 'disabled\n'))
91 return False, None
91 return False, None
92
92
93 streamreqs = set(streamreqs.split(','))
93 streamreqs = set(streamreqs.split(','))
94 # Server requires something we don't support. Bail.
94 # Server requires something we don't support. Bail.
95 missingreqs = streamreqs - repo.supportedformats
95 missingreqs = streamreqs - repo.supportedformats
96 if missingreqs:
96 if missingreqs:
97 pullop.repo.ui.warn(_(
97 pullop.repo.ui.warn(_(
98 'warning: stream clone requested but client is missing '
98 'warning: stream clone requested but client is missing '
99 'requirements: %s\n') % ', '.join(sorted(missingreqs)))
99 'requirements: %s\n') % ', '.join(sorted(missingreqs)))
100 pullop.repo.ui.warn(
100 pullop.repo.ui.warn(
101 _('(see https://www.mercurial-scm.org/wiki/MissingRequirement '
101 _('(see https://www.mercurial-scm.org/wiki/MissingRequirement '
102 'for more information)\n'))
102 'for more information)\n'))
103 return False, None
103 return False, None
104 requirements = streamreqs
104 requirements = streamreqs
105
105
106 return True, requirements
106 return True, requirements
107
107
108 def maybeperformlegacystreamclone(pullop):
108 def maybeperformlegacystreamclone(pullop):
109 """Possibly perform a legacy stream clone operation.
109 """Possibly perform a legacy stream clone operation.
110
110
111 Legacy stream clones are performed as part of pull but before all other
111 Legacy stream clones are performed as part of pull but before all other
112 operations.
112 operations.
113
113
114 A legacy stream clone will not be performed if a bundle2 stream clone is
114 A legacy stream clone will not be performed if a bundle2 stream clone is
115 supported.
115 supported.
116 """
116 """
117 supported, requirements = canperformstreamclone(pullop)
117 supported, requirements = canperformstreamclone(pullop)
118
118
119 if not supported:
119 if not supported:
120 return
120 return
121
121
122 repo = pullop.repo
122 repo = pullop.repo
123 remote = pullop.remote
123 remote = pullop.remote
124
124
125 # Save remote branchmap. We will use it later to speed up branchcache
125 # Save remote branchmap. We will use it later to speed up branchcache
126 # creation.
126 # creation.
127 rbranchmap = None
127 rbranchmap = None
128 if remote.capable('branchmap'):
128 if remote.capable('branchmap'):
129 with remote.commandexecutor() as e:
129 with remote.commandexecutor() as e:
130 rbranchmap = e.callcommand('branchmap', {}).result()
130 rbranchmap = e.callcommand('branchmap', {}).result()
131
131
132 repo.ui.status(_('streaming all changes\n'))
132 repo.ui.status(_('streaming all changes\n'))
133
133
134 with remote.commandexecutor() as e:
134 with remote.commandexecutor() as e:
135 fp = e.callcommand('stream_out', {}).result()
135 fp = e.callcommand('stream_out', {}).result()
136
136
137 # TODO strictly speaking, this code should all be inside the context
137 # TODO strictly speaking, this code should all be inside the context
138 # manager because the context manager is supposed to ensure all wire state
138 # manager because the context manager is supposed to ensure all wire state
139 # is flushed when exiting. But the legacy peers don't do this, so it
139 # is flushed when exiting. But the legacy peers don't do this, so it
140 # doesn't matter.
140 # doesn't matter.
141 l = fp.readline()
141 l = fp.readline()
142 try:
142 try:
143 resp = int(l)
143 resp = int(l)
144 except ValueError:
144 except ValueError:
145 raise error.ResponseError(
145 raise error.ResponseError(
146 _('unexpected response from remote server:'), l)
146 _('unexpected response from remote server:'), l)
147 if resp == 1:
147 if resp == 1:
148 raise error.Abort(_('operation forbidden by server'))
148 raise error.Abort(_('operation forbidden by server'))
149 elif resp == 2:
149 elif resp == 2:
150 raise error.Abort(_('locking the remote repository failed'))
150 raise error.Abort(_('locking the remote repository failed'))
151 elif resp != 0:
151 elif resp != 0:
152 raise error.Abort(_('the server sent an unknown error code'))
152 raise error.Abort(_('the server sent an unknown error code'))
153
153
154 l = fp.readline()
154 l = fp.readline()
155 try:
155 try:
156 filecount, bytecount = map(int, l.split(' ', 1))
156 filecount, bytecount = map(int, l.split(' ', 1))
157 except (ValueError, TypeError):
157 except (ValueError, TypeError):
158 raise error.ResponseError(
158 raise error.ResponseError(
159 _('unexpected response from remote server:'), l)
159 _('unexpected response from remote server:'), l)
160
160
161 with repo.lock():
161 with repo.lock():
162 consumev1(repo, fp, filecount, bytecount)
162 consumev1(repo, fp, filecount, bytecount)
163
163
164 # new requirements = old non-format requirements +
164 # new requirements = old non-format requirements +
165 # new format-related remote requirements
165 # new format-related remote requirements
166 # requirements from the streamed-in repository
166 # requirements from the streamed-in repository
167 repo.requirements = requirements | (
167 repo.requirements = requirements | (
168 repo.requirements - repo.supportedformats)
168 repo.requirements - repo.supportedformats)
169 repo._applyopenerreqs()
169 repo._applyopenerreqs()
170 repo._writerequirements()
170 repo._writerequirements()
171
171
172 if rbranchmap:
172 if rbranchmap:
173 branchmap.replacecache(repo, rbranchmap)
173 branchmap.replacecache(repo, rbranchmap)
174
174
175 repo.invalidate()
175 repo.invalidate()
176
176
177 def allowservergeneration(repo):
177 def allowservergeneration(repo):
178 """Whether streaming clones are allowed from the server."""
178 """Whether streaming clones are allowed from the server."""
179 if not repo.ui.configbool('server', 'uncompressed', untrusted=True):
179 if not repo.ui.configbool('server', 'uncompressed', untrusted=True):
180 return False
180 return False
181
181
182 # The way stream clone works makes it impossible to hide secret changesets.
182 # The way stream clone works makes it impossible to hide secret changesets.
183 # So don't allow this by default.
183 # So don't allow this by default.
184 secret = phases.hassecret(repo)
184 secret = phases.hassecret(repo)
185 if secret:
185 if secret:
186 return repo.ui.configbool('server', 'uncompressedallowsecret')
186 return repo.ui.configbool('server', 'uncompressedallowsecret')
187
187
188 return True
188 return True
189
189
190 # This is it's own function so extensions can override it.
190 # This is it's own function so extensions can override it.
191 def _walkstreamfiles(repo):
191 def _walkstreamfiles(repo):
192 return repo.store.walk()
192 return repo.store.walk()
193
193
194 def generatev1(repo):
194 def generatev1(repo):
195 """Emit content for version 1 of a streaming clone.
195 """Emit content for version 1 of a streaming clone.
196
196
197 This returns a 3-tuple of (file count, byte size, data iterator).
197 This returns a 3-tuple of (file count, byte size, data iterator).
198
198
199 The data iterator consists of N entries for each file being transferred.
199 The data iterator consists of N entries for each file being transferred.
200 Each file entry starts as a line with the file name and integer size
200 Each file entry starts as a line with the file name and integer size
201 delimited by a null byte.
201 delimited by a null byte.
202
202
203 The raw file data follows. Following the raw file data is the next file
203 The raw file data follows. Following the raw file data is the next file
204 entry, or EOF.
204 entry, or EOF.
205
205
206 When used on the wire protocol, an additional line indicating protocol
206 When used on the wire protocol, an additional line indicating protocol
207 success will be prepended to the stream. This function is not responsible
207 success will be prepended to the stream. This function is not responsible
208 for adding it.
208 for adding it.
209
209
210 This function will obtain a repository lock to ensure a consistent view of
210 This function will obtain a repository lock to ensure a consistent view of
211 the store is captured. It therefore may raise LockError.
211 the store is captured. It therefore may raise LockError.
212 """
212 """
213 entries = []
213 entries = []
214 total_bytes = 0
214 total_bytes = 0
215 # Get consistent snapshot of repo, lock during scan.
215 # Get consistent snapshot of repo, lock during scan.
216 with repo.lock():
216 with repo.lock():
217 repo.ui.debug('scanning\n')
217 repo.ui.debug('scanning\n')
218 for name, ename, size in _walkstreamfiles(repo):
218 for name, ename, size in _walkstreamfiles(repo):
219 if size:
219 if size:
220 entries.append((name, size))
220 entries.append((name, size))
221 total_bytes += size
221 total_bytes += size
222
222
223 repo.ui.debug('%d files, %d bytes to transfer\n' %
223 repo.ui.debug('%d files, %d bytes to transfer\n' %
224 (len(entries), total_bytes))
224 (len(entries), total_bytes))
225
225
226 svfs = repo.svfs
226 svfs = repo.svfs
227 debugflag = repo.ui.debugflag
227 debugflag = repo.ui.debugflag
228
228
229 def emitrevlogdata():
229 def emitrevlogdata():
230 for name, size in entries:
230 for name, size in entries:
231 if debugflag:
231 if debugflag:
232 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
232 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
233 # partially encode name over the wire for backwards compat
233 # partially encode name over the wire for backwards compat
234 yield '%s\0%d\n' % (store.encodedir(name), size)
234 yield '%s\0%d\n' % (store.encodedir(name), size)
235 # auditing at this stage is both pointless (paths are already
235 # auditing at this stage is both pointless (paths are already
236 # trusted by the local repo) and expensive
236 # trusted by the local repo) and expensive
237 with svfs(name, 'rb', auditpath=False) as fp:
237 with svfs(name, 'rb', auditpath=False) as fp:
238 if size <= 65536:
238 if size <= 65536:
239 yield fp.read(size)
239 yield fp.read(size)
240 else:
240 else:
241 for chunk in util.filechunkiter(fp, limit=size):
241 for chunk in util.filechunkiter(fp, limit=size):
242 yield chunk
242 yield chunk
243
243
244 return len(entries), total_bytes, emitrevlogdata()
244 return len(entries), total_bytes, emitrevlogdata()
245
245
246 def generatev1wireproto(repo):
246 def generatev1wireproto(repo):
247 """Emit content for version 1 of streaming clone suitable for the wire.
247 """Emit content for version 1 of streaming clone suitable for the wire.
248
248
249 This is the data output from ``generatev1()`` with 2 header lines. The
249 This is the data output from ``generatev1()`` with 2 header lines. The
250 first line indicates overall success. The 2nd contains the file count and
250 first line indicates overall success. The 2nd contains the file count and
251 byte size of payload.
251 byte size of payload.
252
252
253 The success line contains "0" for success, "1" for stream generation not
253 The success line contains "0" for success, "1" for stream generation not
254 allowed, and "2" for error locking the repository (possibly indicating
254 allowed, and "2" for error locking the repository (possibly indicating
255 a permissions error for the server process).
255 a permissions error for the server process).
256 """
256 """
257 if not allowservergeneration(repo):
257 if not allowservergeneration(repo):
258 yield '1\n'
258 yield '1\n'
259 return
259 return
260
260
261 try:
261 try:
262 filecount, bytecount, it = generatev1(repo)
262 filecount, bytecount, it = generatev1(repo)
263 except error.LockError:
263 except error.LockError:
264 yield '2\n'
264 yield '2\n'
265 return
265 return
266
266
267 # Indicates successful response.
267 # Indicates successful response.
268 yield '0\n'
268 yield '0\n'
269 yield '%d %d\n' % (filecount, bytecount)
269 yield '%d %d\n' % (filecount, bytecount)
270 for chunk in it:
270 for chunk in it:
271 yield chunk
271 yield chunk
272
272
273 def generatebundlev1(repo, compression='UN'):
273 def generatebundlev1(repo, compression='UN'):
274 """Emit content for version 1 of a stream clone bundle.
274 """Emit content for version 1 of a stream clone bundle.
275
275
276 The first 4 bytes of the output ("HGS1") denote this as stream clone
276 The first 4 bytes of the output ("HGS1") denote this as stream clone
277 bundle version 1.
277 bundle version 1.
278
278
279 The next 2 bytes indicate the compression type. Only "UN" is currently
279 The next 2 bytes indicate the compression type. Only "UN" is currently
280 supported.
280 supported.
281
281
282 The next 16 bytes are two 64-bit big endian unsigned integers indicating
282 The next 16 bytes are two 64-bit big endian unsigned integers indicating
283 file count and byte count, respectively.
283 file count and byte count, respectively.
284
284
285 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
285 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
286 of the requirements string, including a trailing \0. The following N bytes
286 of the requirements string, including a trailing \0. The following N bytes
287 are the requirements string, which is ASCII containing a comma-delimited
287 are the requirements string, which is ASCII containing a comma-delimited
288 list of repo requirements that are needed to support the data.
288 list of repo requirements that are needed to support the data.
289
289
290 The remaining content is the output of ``generatev1()`` (which may be
290 The remaining content is the output of ``generatev1()`` (which may be
291 compressed in the future).
291 compressed in the future).
292
292
293 Returns a tuple of (requirements, data generator).
293 Returns a tuple of (requirements, data generator).
294 """
294 """
295 if compression != 'UN':
295 if compression != 'UN':
296 raise ValueError('we do not support the compression argument yet')
296 raise ValueError('we do not support the compression argument yet')
297
297
298 requirements = repo.requirements & repo.supportedformats
298 requirements = repo.requirements & repo.supportedformats
299 requires = ','.join(sorted(requirements))
299 requires = ','.join(sorted(requirements))
300
300
301 def gen():
301 def gen():
302 yield 'HGS1'
302 yield 'HGS1'
303 yield compression
303 yield compression
304
304
305 filecount, bytecount, it = generatev1(repo)
305 filecount, bytecount, it = generatev1(repo)
306 repo.ui.status(_('writing %d bytes for %d files\n') %
306 repo.ui.status(_('writing %d bytes for %d files\n') %
307 (bytecount, filecount))
307 (bytecount, filecount))
308
308
309 yield struct.pack('>QQ', filecount, bytecount)
309 yield struct.pack('>QQ', filecount, bytecount)
310 yield struct.pack('>H', len(requires) + 1)
310 yield struct.pack('>H', len(requires) + 1)
311 yield requires + '\0'
311 yield requires + '\0'
312
312
313 # This is where we'll add compression in the future.
313 # This is where we'll add compression in the future.
314 assert compression == 'UN'
314 assert compression == 'UN'
315
315
316 seen = 0
316 progress = repo.ui.makeprogress(_('bundle'), total=bytecount,
317 repo.ui.progress(_('bundle'), 0, total=bytecount, unit=_('bytes'))
317 unit=_('bytes'))
318 progress.update(0)
318
319
319 for chunk in it:
320 for chunk in it:
320 seen += len(chunk)
321 progress.increment(step=len(chunk))
321 repo.ui.progress(_('bundle'), seen, total=bytecount,
322 unit=_('bytes'))
323 yield chunk
322 yield chunk
324
323
325 repo.ui.progress(_('bundle'), None)
324 progress.update(None)
326
325
327 return requirements, gen()
326 return requirements, gen()
328
327
329 def consumev1(repo, fp, filecount, bytecount):
328 def consumev1(repo, fp, filecount, bytecount):
330 """Apply the contents from version 1 of a streaming clone file handle.
329 """Apply the contents from version 1 of a streaming clone file handle.
331
330
332 This takes the output from "stream_out" and applies it to the specified
331 This takes the output from "stream_out" and applies it to the specified
333 repository.
332 repository.
334
333
335 Like "stream_out," the status line added by the wire protocol is not
334 Like "stream_out," the status line added by the wire protocol is not
336 handled by this function.
335 handled by this function.
337 """
336 """
338 with repo.lock():
337 with repo.lock():
339 repo.ui.status(_('%d files to transfer, %s of data\n') %
338 repo.ui.status(_('%d files to transfer, %s of data\n') %
340 (filecount, util.bytecount(bytecount)))
339 (filecount, util.bytecount(bytecount)))
341 handled_bytes = 0
340 progress = repo.ui.makeprogress(_('clone'), total=bytecount,
342 repo.ui.progress(_('clone'), 0, total=bytecount, unit=_('bytes'))
341 unit=_('bytes'))
342 progress.update(0)
343 start = util.timer()
343 start = util.timer()
344
344
345 # TODO: get rid of (potential) inconsistency
345 # TODO: get rid of (potential) inconsistency
346 #
346 #
347 # If transaction is started and any @filecache property is
347 # If transaction is started and any @filecache property is
348 # changed at this point, it causes inconsistency between
348 # changed at this point, it causes inconsistency between
349 # in-memory cached property and streamclone-ed file on the
349 # in-memory cached property and streamclone-ed file on the
350 # disk. Nested transaction prevents transaction scope "clone"
350 # disk. Nested transaction prevents transaction scope "clone"
351 # below from writing in-memory changes out at the end of it,
351 # below from writing in-memory changes out at the end of it,
352 # even though in-memory changes are discarded at the end of it
352 # even though in-memory changes are discarded at the end of it
353 # regardless of transaction nesting.
353 # regardless of transaction nesting.
354 #
354 #
355 # But transaction nesting can't be simply prohibited, because
355 # But transaction nesting can't be simply prohibited, because
356 # nesting occurs also in ordinary case (e.g. enabling
356 # nesting occurs also in ordinary case (e.g. enabling
357 # clonebundles).
357 # clonebundles).
358
358
359 with repo.transaction('clone'):
359 with repo.transaction('clone'):
360 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
360 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
361 for i in xrange(filecount):
361 for i in xrange(filecount):
362 # XXX doesn't support '\n' or '\r' in filenames
362 # XXX doesn't support '\n' or '\r' in filenames
363 l = fp.readline()
363 l = fp.readline()
364 try:
364 try:
365 name, size = l.split('\0', 1)
365 name, size = l.split('\0', 1)
366 size = int(size)
366 size = int(size)
367 except (ValueError, TypeError):
367 except (ValueError, TypeError):
368 raise error.ResponseError(
368 raise error.ResponseError(
369 _('unexpected response from remote server:'), l)
369 _('unexpected response from remote server:'), l)
370 if repo.ui.debugflag:
370 if repo.ui.debugflag:
371 repo.ui.debug('adding %s (%s)\n' %
371 repo.ui.debug('adding %s (%s)\n' %
372 (name, util.bytecount(size)))
372 (name, util.bytecount(size)))
373 # for backwards compat, name was partially encoded
373 # for backwards compat, name was partially encoded
374 path = store.decodedir(name)
374 path = store.decodedir(name)
375 with repo.svfs(path, 'w', backgroundclose=True) as ofp:
375 with repo.svfs(path, 'w', backgroundclose=True) as ofp:
376 for chunk in util.filechunkiter(fp, limit=size):
376 for chunk in util.filechunkiter(fp, limit=size):
377 handled_bytes += len(chunk)
377 progress.increment(step=len(chunk))
378 repo.ui.progress(_('clone'), handled_bytes,
379 total=bytecount, unit=_('bytes'))
380 ofp.write(chunk)
378 ofp.write(chunk)
381
379
382 # force @filecache properties to be reloaded from
380 # force @filecache properties to be reloaded from
383 # streamclone-ed file at next access
381 # streamclone-ed file at next access
384 repo.invalidate(clearfilecache=True)
382 repo.invalidate(clearfilecache=True)
385
383
386 elapsed = util.timer() - start
384 elapsed = util.timer() - start
387 if elapsed <= 0:
385 if elapsed <= 0:
388 elapsed = 0.001
386 elapsed = 0.001
389 repo.ui.progress(_('clone'), None)
387 progress.update(None)
390 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
388 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
391 (util.bytecount(bytecount), elapsed,
389 (util.bytecount(bytecount), elapsed,
392 util.bytecount(bytecount / elapsed)))
390 util.bytecount(bytecount / elapsed)))
393
391
394 def readbundle1header(fp):
392 def readbundle1header(fp):
395 compression = fp.read(2)
393 compression = fp.read(2)
396 if compression != 'UN':
394 if compression != 'UN':
397 raise error.Abort(_('only uncompressed stream clone bundles are '
395 raise error.Abort(_('only uncompressed stream clone bundles are '
398 'supported; got %s') % compression)
396 'supported; got %s') % compression)
399
397
400 filecount, bytecount = struct.unpack('>QQ', fp.read(16))
398 filecount, bytecount = struct.unpack('>QQ', fp.read(16))
401 requireslen = struct.unpack('>H', fp.read(2))[0]
399 requireslen = struct.unpack('>H', fp.read(2))[0]
402 requires = fp.read(requireslen)
400 requires = fp.read(requireslen)
403
401
404 if not requires.endswith('\0'):
402 if not requires.endswith('\0'):
405 raise error.Abort(_('malformed stream clone bundle: '
403 raise error.Abort(_('malformed stream clone bundle: '
406 'requirements not properly encoded'))
404 'requirements not properly encoded'))
407
405
408 requirements = set(requires.rstrip('\0').split(','))
406 requirements = set(requires.rstrip('\0').split(','))
409
407
410 return filecount, bytecount, requirements
408 return filecount, bytecount, requirements
411
409
412 def applybundlev1(repo, fp):
410 def applybundlev1(repo, fp):
413 """Apply the content from a stream clone bundle version 1.
411 """Apply the content from a stream clone bundle version 1.
414
412
415 We assume the 4 byte header has been read and validated and the file handle
413 We assume the 4 byte header has been read and validated and the file handle
416 is at the 2 byte compression identifier.
414 is at the 2 byte compression identifier.
417 """
415 """
418 if len(repo):
416 if len(repo):
419 raise error.Abort(_('cannot apply stream clone bundle on non-empty '
417 raise error.Abort(_('cannot apply stream clone bundle on non-empty '
420 'repo'))
418 'repo'))
421
419
422 filecount, bytecount, requirements = readbundle1header(fp)
420 filecount, bytecount, requirements = readbundle1header(fp)
423 missingreqs = requirements - repo.supportedformats
421 missingreqs = requirements - repo.supportedformats
424 if missingreqs:
422 if missingreqs:
425 raise error.Abort(_('unable to apply stream clone: '
423 raise error.Abort(_('unable to apply stream clone: '
426 'unsupported format: %s') %
424 'unsupported format: %s') %
427 ', '.join(sorted(missingreqs)))
425 ', '.join(sorted(missingreqs)))
428
426
429 consumev1(repo, fp, filecount, bytecount)
427 consumev1(repo, fp, filecount, bytecount)
430
428
431 class streamcloneapplier(object):
429 class streamcloneapplier(object):
432 """Class to manage applying streaming clone bundles.
430 """Class to manage applying streaming clone bundles.
433
431
434 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
432 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
435 readers to perform bundle type-specific functionality.
433 readers to perform bundle type-specific functionality.
436 """
434 """
437 def __init__(self, fh):
435 def __init__(self, fh):
438 self._fh = fh
436 self._fh = fh
439
437
440 def apply(self, repo):
438 def apply(self, repo):
441 return applybundlev1(repo, self._fh)
439 return applybundlev1(repo, self._fh)
442
440
443 # type of file to stream
441 # type of file to stream
444 _fileappend = 0 # append only file
442 _fileappend = 0 # append only file
445 _filefull = 1 # full snapshot file
443 _filefull = 1 # full snapshot file
446
444
447 # Source of the file
445 # Source of the file
448 _srcstore = 's' # store (svfs)
446 _srcstore = 's' # store (svfs)
449 _srccache = 'c' # cache (cache)
447 _srccache = 'c' # cache (cache)
450
448
451 # This is it's own function so extensions can override it.
449 # This is it's own function so extensions can override it.
452 def _walkstreamfullstorefiles(repo):
450 def _walkstreamfullstorefiles(repo):
453 """list snapshot file from the store"""
451 """list snapshot file from the store"""
454 fnames = []
452 fnames = []
455 if not repo.publishing():
453 if not repo.publishing():
456 fnames.append('phaseroots')
454 fnames.append('phaseroots')
457 return fnames
455 return fnames
458
456
459 def _filterfull(entry, copy, vfsmap):
457 def _filterfull(entry, copy, vfsmap):
460 """actually copy the snapshot files"""
458 """actually copy the snapshot files"""
461 src, name, ftype, data = entry
459 src, name, ftype, data = entry
462 if ftype != _filefull:
460 if ftype != _filefull:
463 return entry
461 return entry
464 return (src, name, ftype, copy(vfsmap[src].join(name)))
462 return (src, name, ftype, copy(vfsmap[src].join(name)))
465
463
466 @contextlib.contextmanager
464 @contextlib.contextmanager
467 def maketempcopies():
465 def maketempcopies():
468 """return a function to temporary copy file"""
466 """return a function to temporary copy file"""
469 files = []
467 files = []
470 try:
468 try:
471 def copy(src):
469 def copy(src):
472 fd, dst = pycompat.mkstemp()
470 fd, dst = pycompat.mkstemp()
473 os.close(fd)
471 os.close(fd)
474 files.append(dst)
472 files.append(dst)
475 util.copyfiles(src, dst, hardlink=True)
473 util.copyfiles(src, dst, hardlink=True)
476 return dst
474 return dst
477 yield copy
475 yield copy
478 finally:
476 finally:
479 for tmp in files:
477 for tmp in files:
480 util.tryunlink(tmp)
478 util.tryunlink(tmp)
481
479
482 def _makemap(repo):
480 def _makemap(repo):
483 """make a (src -> vfs) map for the repo"""
481 """make a (src -> vfs) map for the repo"""
484 vfsmap = {
482 vfsmap = {
485 _srcstore: repo.svfs,
483 _srcstore: repo.svfs,
486 _srccache: repo.cachevfs,
484 _srccache: repo.cachevfs,
487 }
485 }
488 # we keep repo.vfs out of the on purpose, ther are too many danger there
486 # we keep repo.vfs out of the on purpose, ther are too many danger there
489 # (eg: .hg/hgrc)
487 # (eg: .hg/hgrc)
490 assert repo.vfs not in vfsmap.values()
488 assert repo.vfs not in vfsmap.values()
491
489
492 return vfsmap
490 return vfsmap
493
491
494 def _emit2(repo, entries, totalfilesize):
492 def _emit2(repo, entries, totalfilesize):
495 """actually emit the stream bundle"""
493 """actually emit the stream bundle"""
496 vfsmap = _makemap(repo)
494 vfsmap = _makemap(repo)
497 progress = repo.ui.progress
495 progress = repo.ui.makeprogress(_('bundle'), total=totalfilesize,
498 progress(_('bundle'), 0, total=totalfilesize, unit=_('bytes'))
496 unit=_('bytes'))
497 progress.update(0)
499 with maketempcopies() as copy:
498 with maketempcopies() as copy:
500 try:
499 try:
501 # copy is delayed until we are in the try
500 # copy is delayed until we are in the try
502 entries = [_filterfull(e, copy, vfsmap) for e in entries]
501 entries = [_filterfull(e, copy, vfsmap) for e in entries]
503 yield None # this release the lock on the repository
502 yield None # this release the lock on the repository
504 seen = 0
503 seen = 0
505
504
506 for src, name, ftype, data in entries:
505 for src, name, ftype, data in entries:
507 vfs = vfsmap[src]
506 vfs = vfsmap[src]
508 yield src
507 yield src
509 yield util.uvarintencode(len(name))
508 yield util.uvarintencode(len(name))
510 if ftype == _fileappend:
509 if ftype == _fileappend:
511 fp = vfs(name)
510 fp = vfs(name)
512 size = data
511 size = data
513 elif ftype == _filefull:
512 elif ftype == _filefull:
514 fp = open(data, 'rb')
513 fp = open(data, 'rb')
515 size = util.fstat(fp).st_size
514 size = util.fstat(fp).st_size
516 try:
515 try:
517 yield util.uvarintencode(size)
516 yield util.uvarintencode(size)
518 yield name
517 yield name
519 if size <= 65536:
518 if size <= 65536:
520 chunks = (fp.read(size),)
519 chunks = (fp.read(size),)
521 else:
520 else:
522 chunks = util.filechunkiter(fp, limit=size)
521 chunks = util.filechunkiter(fp, limit=size)
523 for chunk in chunks:
522 for chunk in chunks:
524 seen += len(chunk)
523 seen += len(chunk)
525 progress(_('bundle'), seen, total=totalfilesize,
524 progress.update(seen)
526 unit=_('bytes'))
527 yield chunk
525 yield chunk
528 finally:
526 finally:
529 fp.close()
527 fp.close()
530 finally:
528 finally:
531 progress(_('bundle'), None)
529 progress.update(None)
532
530
533 def generatev2(repo):
531 def generatev2(repo):
534 """Emit content for version 2 of a streaming clone.
532 """Emit content for version 2 of a streaming clone.
535
533
536 the data stream consists the following entries:
534 the data stream consists the following entries:
537 1) A char representing the file destination (eg: store or cache)
535 1) A char representing the file destination (eg: store or cache)
538 2) A varint containing the length of the filename
536 2) A varint containing the length of the filename
539 3) A varint containing the length of file data
537 3) A varint containing the length of file data
540 4) N bytes containing the filename (the internal, store-agnostic form)
538 4) N bytes containing the filename (the internal, store-agnostic form)
541 5) N bytes containing the file data
539 5) N bytes containing the file data
542
540
543 Returns a 3-tuple of (file count, file size, data iterator).
541 Returns a 3-tuple of (file count, file size, data iterator).
544 """
542 """
545
543
546 with repo.lock():
544 with repo.lock():
547
545
548 entries = []
546 entries = []
549 totalfilesize = 0
547 totalfilesize = 0
550
548
551 repo.ui.debug('scanning\n')
549 repo.ui.debug('scanning\n')
552 for name, ename, size in _walkstreamfiles(repo):
550 for name, ename, size in _walkstreamfiles(repo):
553 if size:
551 if size:
554 entries.append((_srcstore, name, _fileappend, size))
552 entries.append((_srcstore, name, _fileappend, size))
555 totalfilesize += size
553 totalfilesize += size
556 for name in _walkstreamfullstorefiles(repo):
554 for name in _walkstreamfullstorefiles(repo):
557 if repo.svfs.exists(name):
555 if repo.svfs.exists(name):
558 totalfilesize += repo.svfs.lstat(name).st_size
556 totalfilesize += repo.svfs.lstat(name).st_size
559 entries.append((_srcstore, name, _filefull, None))
557 entries.append((_srcstore, name, _filefull, None))
560 for name in cacheutil.cachetocopy(repo):
558 for name in cacheutil.cachetocopy(repo):
561 if repo.cachevfs.exists(name):
559 if repo.cachevfs.exists(name):
562 totalfilesize += repo.cachevfs.lstat(name).st_size
560 totalfilesize += repo.cachevfs.lstat(name).st_size
563 entries.append((_srccache, name, _filefull, None))
561 entries.append((_srccache, name, _filefull, None))
564
562
565 chunks = _emit2(repo, entries, totalfilesize)
563 chunks = _emit2(repo, entries, totalfilesize)
566 first = next(chunks)
564 first = next(chunks)
567 assert first is None
565 assert first is None
568
566
569 return len(entries), totalfilesize, chunks
567 return len(entries), totalfilesize, chunks
570
568
571 @contextlib.contextmanager
569 @contextlib.contextmanager
572 def nested(*ctxs):
570 def nested(*ctxs):
573 with warnings.catch_warnings():
571 with warnings.catch_warnings():
574 # For some reason, Python decided 'nested' was deprecated without
572 # For some reason, Python decided 'nested' was deprecated without
575 # replacement. They officially advertised for filtering the deprecation
573 # replacement. They officially advertised for filtering the deprecation
576 # warning for people who actually need the feature.
574 # warning for people who actually need the feature.
577 warnings.filterwarnings("ignore",category=DeprecationWarning)
575 warnings.filterwarnings("ignore",category=DeprecationWarning)
578 with contextlib.nested(*ctxs):
576 with contextlib.nested(*ctxs):
579 yield
577 yield
580
578
581 def consumev2(repo, fp, filecount, filesize):
579 def consumev2(repo, fp, filecount, filesize):
582 """Apply the contents from a version 2 streaming clone.
580 """Apply the contents from a version 2 streaming clone.
583
581
584 Data is read from an object that only needs to provide a ``read(size)``
582 Data is read from an object that only needs to provide a ``read(size)``
585 method.
583 method.
586 """
584 """
587 with repo.lock():
585 with repo.lock():
588 repo.ui.status(_('%d files to transfer, %s of data\n') %
586 repo.ui.status(_('%d files to transfer, %s of data\n') %
589 (filecount, util.bytecount(filesize)))
587 (filecount, util.bytecount(filesize)))
590
588
591 start = util.timer()
589 start = util.timer()
592 handledbytes = 0
590 progress = repo.ui.makeprogress(_('clone'), total=filesize,
593 progress = repo.ui.progress
591 unit=_('bytes'))
594
592 progress.update(0)
595 progress(_('clone'), handledbytes, total=filesize, unit=_('bytes'))
596
593
597 vfsmap = _makemap(repo)
594 vfsmap = _makemap(repo)
598
595
599 with repo.transaction('clone'):
596 with repo.transaction('clone'):
600 ctxs = (vfs.backgroundclosing(repo.ui)
597 ctxs = (vfs.backgroundclosing(repo.ui)
601 for vfs in vfsmap.values())
598 for vfs in vfsmap.values())
602 with nested(*ctxs):
599 with nested(*ctxs):
603 for i in range(filecount):
600 for i in range(filecount):
604 src = util.readexactly(fp, 1)
601 src = util.readexactly(fp, 1)
605 vfs = vfsmap[src]
602 vfs = vfsmap[src]
606 namelen = util.uvarintdecodestream(fp)
603 namelen = util.uvarintdecodestream(fp)
607 datalen = util.uvarintdecodestream(fp)
604 datalen = util.uvarintdecodestream(fp)
608
605
609 name = util.readexactly(fp, namelen)
606 name = util.readexactly(fp, namelen)
610
607
611 if repo.ui.debugflag:
608 if repo.ui.debugflag:
612 repo.ui.debug('adding [%s] %s (%s)\n' %
609 repo.ui.debug('adding [%s] %s (%s)\n' %
613 (src, name, util.bytecount(datalen)))
610 (src, name, util.bytecount(datalen)))
614
611
615 with vfs(name, 'w') as ofp:
612 with vfs(name, 'w') as ofp:
616 for chunk in util.filechunkiter(fp, limit=datalen):
613 for chunk in util.filechunkiter(fp, limit=datalen):
617 handledbytes += len(chunk)
614 progress.increment(step=len(chunk))
618 progress(_('clone'), handledbytes, total=filesize,
619 unit=_('bytes'))
620 ofp.write(chunk)
615 ofp.write(chunk)
621
616
622 # force @filecache properties to be reloaded from
617 # force @filecache properties to be reloaded from
623 # streamclone-ed file at next access
618 # streamclone-ed file at next access
624 repo.invalidate(clearfilecache=True)
619 repo.invalidate(clearfilecache=True)
625
620
626 elapsed = util.timer() - start
621 elapsed = util.timer() - start
627 if elapsed <= 0:
622 if elapsed <= 0:
628 elapsed = 0.001
623 elapsed = 0.001
629 progress(_('clone'), None)
624 progress.update(None)
630 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
625 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
631 (util.bytecount(handledbytes), elapsed,
626 (util.bytecount(progress.pos), elapsed,
632 util.bytecount(handledbytes / elapsed)))
627 util.bytecount(progress.pos / elapsed)))
633
628
634 def applybundlev2(repo, fp, filecount, filesize, requirements):
629 def applybundlev2(repo, fp, filecount, filesize, requirements):
635 missingreqs = [r for r in requirements if r not in repo.supported]
630 missingreqs = [r for r in requirements if r not in repo.supported]
636 if missingreqs:
631 if missingreqs:
637 raise error.Abort(_('unable to apply stream clone: '
632 raise error.Abort(_('unable to apply stream clone: '
638 'unsupported format: %s') %
633 'unsupported format: %s') %
639 ', '.join(sorted(missingreqs)))
634 ', '.join(sorted(missingreqs)))
640
635
641 consumev2(repo, fp, filecount, filesize)
636 consumev2(repo, fp, filecount, filesize)
642
637
643 # new requirements = old non-format requirements +
638 # new requirements = old non-format requirements +
644 # new format-related remote requirements
639 # new format-related remote requirements
645 # requirements from the streamed-in repository
640 # requirements from the streamed-in repository
646 repo.requirements = set(requirements) | (
641 repo.requirements = set(requirements) | (
647 repo.requirements - repo.supportedformats)
642 repo.requirements - repo.supportedformats)
648 repo._applyopenerreqs()
643 repo._applyopenerreqs()
649 repo._writerequirements()
644 repo._writerequirements()
General Comments 0
You need to be logged in to leave comments. Login now