##// END OF EJS Templates
streamclone: rework canperformstreamclone...
Boris Feld -
r35775:bbf7abd0 default
parent child Browse files
Show More
@@ -1,542 +1,541 b''
1 # streamclone.py - producing and consuming streaming repository data
1 # streamclone.py - producing and consuming streaming repository data
2 #
2 #
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import struct
10 import struct
11
11
12 from .i18n import _
12 from .i18n import _
13 from . import (
13 from . import (
14 branchmap,
14 branchmap,
15 error,
15 error,
16 phases,
16 phases,
17 store,
17 store,
18 util,
18 util,
19 )
19 )
20
20
21 def canperformstreamclone(pullop, bailifbundle2supported=False):
21 def canperformstreamclone(pullop, bundle2=False):
22 """Whether it is possible to perform a streaming clone as part of pull.
22 """Whether it is possible to perform a streaming clone as part of pull.
23
23
24 ``bailifbundle2supported`` will cause the function to return False if
24 ``bundle2`` will cause the function to consider stream clone through
25 bundle2 stream clones are supported. It should only be called by the
25 bundle2 and only through bundle2.
26 legacy stream clone code path.
27
26
28 Returns a tuple of (supported, requirements). ``supported`` is True if
27 Returns a tuple of (supported, requirements). ``supported`` is True if
29 streaming clone is supported and False otherwise. ``requirements`` is
28 streaming clone is supported and False otherwise. ``requirements`` is
30 a set of repo requirements from the remote, or ``None`` if stream clone
29 a set of repo requirements from the remote, or ``None`` if stream clone
31 isn't supported.
30 isn't supported.
32 """
31 """
33 repo = pullop.repo
32 repo = pullop.repo
34 remote = pullop.remote
33 remote = pullop.remote
35
34
36 bundle2supported = False
35 bundle2supported = False
37 if pullop.canusebundle2:
36 if pullop.canusebundle2:
38 if 'v1' in pullop.remotebundle2caps.get('stream', []):
37 if 'v2' in pullop.remotebundle2caps.get('stream', []):
39 bundle2supported = True
38 bundle2supported = True
40 # else
39 # else
41 # Server doesn't support bundle2 stream clone or doesn't support
40 # Server doesn't support bundle2 stream clone or doesn't support
42 # the versions we support. Fall back and possibly allow legacy.
41 # the versions we support. Fall back and possibly allow legacy.
43
42
44 # Ensures legacy code path uses available bundle2.
43 # Ensures legacy code path uses available bundle2.
45 if bailifbundle2supported and bundle2supported:
44 if bundle2supported and not bundle2:
46 return False, None
45 return False, None
47 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
46 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
48 #elif not bailifbundle2supported and not bundle2supported:
47 elif bundle2 and not bundle2supported:
49 # return False, None
48 return False, None
50
49
51 # Streaming clone only works on empty repositories.
50 # Streaming clone only works on empty repositories.
52 if len(repo):
51 if len(repo):
53 return False, None
52 return False, None
54
53
55 # Streaming clone only works if all data is being requested.
54 # Streaming clone only works if all data is being requested.
56 if pullop.heads:
55 if pullop.heads:
57 return False, None
56 return False, None
58
57
59 streamrequested = pullop.streamclonerequested
58 streamrequested = pullop.streamclonerequested
60
59
61 # If we don't have a preference, let the server decide for us. This
60 # If we don't have a preference, let the server decide for us. This
62 # likely only comes into play in LANs.
61 # likely only comes into play in LANs.
63 if streamrequested is None:
62 if streamrequested is None:
64 # The server can advertise whether to prefer streaming clone.
63 # The server can advertise whether to prefer streaming clone.
65 streamrequested = remote.capable('stream-preferred')
64 streamrequested = remote.capable('stream-preferred')
66
65
67 if not streamrequested:
66 if not streamrequested:
68 return False, None
67 return False, None
69
68
70 # In order for stream clone to work, the client has to support all the
69 # In order for stream clone to work, the client has to support all the
71 # requirements advertised by the server.
70 # requirements advertised by the server.
72 #
71 #
73 # The server advertises its requirements via the "stream" and "streamreqs"
72 # The server advertises its requirements via the "stream" and "streamreqs"
74 # capability. "stream" (a value-less capability) is advertised if and only
73 # capability. "stream" (a value-less capability) is advertised if and only
75 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
74 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
76 # is advertised and contains a comma-delimited list of requirements.
75 # is advertised and contains a comma-delimited list of requirements.
77 requirements = set()
76 requirements = set()
78 if remote.capable('stream'):
77 if remote.capable('stream'):
79 requirements.add('revlogv1')
78 requirements.add('revlogv1')
80 else:
79 else:
81 streamreqs = remote.capable('streamreqs')
80 streamreqs = remote.capable('streamreqs')
82 # This is weird and shouldn't happen with modern servers.
81 # This is weird and shouldn't happen with modern servers.
83 if not streamreqs:
82 if not streamreqs:
84 pullop.repo.ui.warn(_(
83 pullop.repo.ui.warn(_(
85 'warning: stream clone requested but server has them '
84 'warning: stream clone requested but server has them '
86 'disabled\n'))
85 'disabled\n'))
87 return False, None
86 return False, None
88
87
89 streamreqs = set(streamreqs.split(','))
88 streamreqs = set(streamreqs.split(','))
90 # Server requires something we don't support. Bail.
89 # Server requires something we don't support. Bail.
91 missingreqs = streamreqs - repo.supportedformats
90 missingreqs = streamreqs - repo.supportedformats
92 if missingreqs:
91 if missingreqs:
93 pullop.repo.ui.warn(_(
92 pullop.repo.ui.warn(_(
94 'warning: stream clone requested but client is missing '
93 'warning: stream clone requested but client is missing '
95 'requirements: %s\n') % ', '.join(sorted(missingreqs)))
94 'requirements: %s\n') % ', '.join(sorted(missingreqs)))
96 pullop.repo.ui.warn(
95 pullop.repo.ui.warn(
97 _('(see https://www.mercurial-scm.org/wiki/MissingRequirement '
96 _('(see https://www.mercurial-scm.org/wiki/MissingRequirement '
98 'for more information)\n'))
97 'for more information)\n'))
99 return False, None
98 return False, None
100 requirements = streamreqs
99 requirements = streamreqs
101
100
102 return True, requirements
101 return True, requirements
103
102
104 def maybeperformlegacystreamclone(pullop):
103 def maybeperformlegacystreamclone(pullop):
105 """Possibly perform a legacy stream clone operation.
104 """Possibly perform a legacy stream clone operation.
106
105
107 Legacy stream clones are performed as part of pull but before all other
106 Legacy stream clones are performed as part of pull but before all other
108 operations.
107 operations.
109
108
110 A legacy stream clone will not be performed if a bundle2 stream clone is
109 A legacy stream clone will not be performed if a bundle2 stream clone is
111 supported.
110 supported.
112 """
111 """
113 supported, requirements = canperformstreamclone(pullop)
112 supported, requirements = canperformstreamclone(pullop)
114
113
115 if not supported:
114 if not supported:
116 return
115 return
117
116
118 repo = pullop.repo
117 repo = pullop.repo
119 remote = pullop.remote
118 remote = pullop.remote
120
119
121 # Save remote branchmap. We will use it later to speed up branchcache
120 # Save remote branchmap. We will use it later to speed up branchcache
122 # creation.
121 # creation.
123 rbranchmap = None
122 rbranchmap = None
124 if remote.capable('branchmap'):
123 if remote.capable('branchmap'):
125 rbranchmap = remote.branchmap()
124 rbranchmap = remote.branchmap()
126
125
127 repo.ui.status(_('streaming all changes\n'))
126 repo.ui.status(_('streaming all changes\n'))
128
127
129 fp = remote.stream_out()
128 fp = remote.stream_out()
130 l = fp.readline()
129 l = fp.readline()
131 try:
130 try:
132 resp = int(l)
131 resp = int(l)
133 except ValueError:
132 except ValueError:
134 raise error.ResponseError(
133 raise error.ResponseError(
135 _('unexpected response from remote server:'), l)
134 _('unexpected response from remote server:'), l)
136 if resp == 1:
135 if resp == 1:
137 raise error.Abort(_('operation forbidden by server'))
136 raise error.Abort(_('operation forbidden by server'))
138 elif resp == 2:
137 elif resp == 2:
139 raise error.Abort(_('locking the remote repository failed'))
138 raise error.Abort(_('locking the remote repository failed'))
140 elif resp != 0:
139 elif resp != 0:
141 raise error.Abort(_('the server sent an unknown error code'))
140 raise error.Abort(_('the server sent an unknown error code'))
142
141
143 l = fp.readline()
142 l = fp.readline()
144 try:
143 try:
145 filecount, bytecount = map(int, l.split(' ', 1))
144 filecount, bytecount = map(int, l.split(' ', 1))
146 except (ValueError, TypeError):
145 except (ValueError, TypeError):
147 raise error.ResponseError(
146 raise error.ResponseError(
148 _('unexpected response from remote server:'), l)
147 _('unexpected response from remote server:'), l)
149
148
150 with repo.lock():
149 with repo.lock():
151 consumev1(repo, fp, filecount, bytecount)
150 consumev1(repo, fp, filecount, bytecount)
152
151
153 # new requirements = old non-format requirements +
152 # new requirements = old non-format requirements +
154 # new format-related remote requirements
153 # new format-related remote requirements
155 # requirements from the streamed-in repository
154 # requirements from the streamed-in repository
156 repo.requirements = requirements | (
155 repo.requirements = requirements | (
157 repo.requirements - repo.supportedformats)
156 repo.requirements - repo.supportedformats)
158 repo._applyopenerreqs()
157 repo._applyopenerreqs()
159 repo._writerequirements()
158 repo._writerequirements()
160
159
161 if rbranchmap:
160 if rbranchmap:
162 branchmap.replacecache(repo, rbranchmap)
161 branchmap.replacecache(repo, rbranchmap)
163
162
164 repo.invalidate()
163 repo.invalidate()
165
164
166 def allowservergeneration(repo):
165 def allowservergeneration(repo):
167 """Whether streaming clones are allowed from the server."""
166 """Whether streaming clones are allowed from the server."""
168 if not repo.ui.configbool('server', 'uncompressed', untrusted=True):
167 if not repo.ui.configbool('server', 'uncompressed', untrusted=True):
169 return False
168 return False
170
169
171 # The way stream clone works makes it impossible to hide secret changesets.
170 # The way stream clone works makes it impossible to hide secret changesets.
172 # So don't allow this by default.
171 # So don't allow this by default.
173 secret = phases.hassecret(repo)
172 secret = phases.hassecret(repo)
174 if secret:
173 if secret:
175 return repo.ui.configbool('server', 'uncompressedallowsecret')
174 return repo.ui.configbool('server', 'uncompressedallowsecret')
176
175
177 return True
176 return True
178
177
179 # This is it's own function so extensions can override it.
178 # This is it's own function so extensions can override it.
180 def _walkstreamfiles(repo):
179 def _walkstreamfiles(repo):
181 return repo.store.walk()
180 return repo.store.walk()
182
181
183 def generatev1(repo):
182 def generatev1(repo):
184 """Emit content for version 1 of a streaming clone.
183 """Emit content for version 1 of a streaming clone.
185
184
186 This returns a 3-tuple of (file count, byte size, data iterator).
185 This returns a 3-tuple of (file count, byte size, data iterator).
187
186
188 The data iterator consists of N entries for each file being transferred.
187 The data iterator consists of N entries for each file being transferred.
189 Each file entry starts as a line with the file name and integer size
188 Each file entry starts as a line with the file name and integer size
190 delimited by a null byte.
189 delimited by a null byte.
191
190
192 The raw file data follows. Following the raw file data is the next file
191 The raw file data follows. Following the raw file data is the next file
193 entry, or EOF.
192 entry, or EOF.
194
193
195 When used on the wire protocol, an additional line indicating protocol
194 When used on the wire protocol, an additional line indicating protocol
196 success will be prepended to the stream. This function is not responsible
195 success will be prepended to the stream. This function is not responsible
197 for adding it.
196 for adding it.
198
197
199 This function will obtain a repository lock to ensure a consistent view of
198 This function will obtain a repository lock to ensure a consistent view of
200 the store is captured. It therefore may raise LockError.
199 the store is captured. It therefore may raise LockError.
201 """
200 """
202 entries = []
201 entries = []
203 total_bytes = 0
202 total_bytes = 0
204 # Get consistent snapshot of repo, lock during scan.
203 # Get consistent snapshot of repo, lock during scan.
205 with repo.lock():
204 with repo.lock():
206 repo.ui.debug('scanning\n')
205 repo.ui.debug('scanning\n')
207 for name, ename, size in _walkstreamfiles(repo):
206 for name, ename, size in _walkstreamfiles(repo):
208 if size:
207 if size:
209 entries.append((name, size))
208 entries.append((name, size))
210 total_bytes += size
209 total_bytes += size
211
210
212 repo.ui.debug('%d files, %d bytes to transfer\n' %
211 repo.ui.debug('%d files, %d bytes to transfer\n' %
213 (len(entries), total_bytes))
212 (len(entries), total_bytes))
214
213
215 svfs = repo.svfs
214 svfs = repo.svfs
216 debugflag = repo.ui.debugflag
215 debugflag = repo.ui.debugflag
217
216
218 def emitrevlogdata():
217 def emitrevlogdata():
219 for name, size in entries:
218 for name, size in entries:
220 if debugflag:
219 if debugflag:
221 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
220 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
222 # partially encode name over the wire for backwards compat
221 # partially encode name over the wire for backwards compat
223 yield '%s\0%d\n' % (store.encodedir(name), size)
222 yield '%s\0%d\n' % (store.encodedir(name), size)
224 # auditing at this stage is both pointless (paths are already
223 # auditing at this stage is both pointless (paths are already
225 # trusted by the local repo) and expensive
224 # trusted by the local repo) and expensive
226 with svfs(name, 'rb', auditpath=False) as fp:
225 with svfs(name, 'rb', auditpath=False) as fp:
227 if size <= 65536:
226 if size <= 65536:
228 yield fp.read(size)
227 yield fp.read(size)
229 else:
228 else:
230 for chunk in util.filechunkiter(fp, limit=size):
229 for chunk in util.filechunkiter(fp, limit=size):
231 yield chunk
230 yield chunk
232
231
233 return len(entries), total_bytes, emitrevlogdata()
232 return len(entries), total_bytes, emitrevlogdata()
234
233
235 def generatev1wireproto(repo):
234 def generatev1wireproto(repo):
236 """Emit content for version 1 of streaming clone suitable for the wire.
235 """Emit content for version 1 of streaming clone suitable for the wire.
237
236
238 This is the data output from ``generatev1()`` with 2 header lines. The
237 This is the data output from ``generatev1()`` with 2 header lines. The
239 first line indicates overall success. The 2nd contains the file count and
238 first line indicates overall success. The 2nd contains the file count and
240 byte size of payload.
239 byte size of payload.
241
240
242 The success line contains "0" for success, "1" for stream generation not
241 The success line contains "0" for success, "1" for stream generation not
243 allowed, and "2" for error locking the repository (possibly indicating
242 allowed, and "2" for error locking the repository (possibly indicating
244 a permissions error for the server process).
243 a permissions error for the server process).
245 """
244 """
246 if not allowservergeneration(repo):
245 if not allowservergeneration(repo):
247 yield '1\n'
246 yield '1\n'
248 return
247 return
249
248
250 try:
249 try:
251 filecount, bytecount, it = generatev1(repo)
250 filecount, bytecount, it = generatev1(repo)
252 except error.LockError:
251 except error.LockError:
253 yield '2\n'
252 yield '2\n'
254 return
253 return
255
254
256 # Indicates successful response.
255 # Indicates successful response.
257 yield '0\n'
256 yield '0\n'
258 yield '%d %d\n' % (filecount, bytecount)
257 yield '%d %d\n' % (filecount, bytecount)
259 for chunk in it:
258 for chunk in it:
260 yield chunk
259 yield chunk
261
260
262 def generatebundlev1(repo, compression='UN'):
261 def generatebundlev1(repo, compression='UN'):
263 """Emit content for version 1 of a stream clone bundle.
262 """Emit content for version 1 of a stream clone bundle.
264
263
265 The first 4 bytes of the output ("HGS1") denote this as stream clone
264 The first 4 bytes of the output ("HGS1") denote this as stream clone
266 bundle version 1.
265 bundle version 1.
267
266
268 The next 2 bytes indicate the compression type. Only "UN" is currently
267 The next 2 bytes indicate the compression type. Only "UN" is currently
269 supported.
268 supported.
270
269
271 The next 16 bytes are two 64-bit big endian unsigned integers indicating
270 The next 16 bytes are two 64-bit big endian unsigned integers indicating
272 file count and byte count, respectively.
271 file count and byte count, respectively.
273
272
274 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
273 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
275 of the requirements string, including a trailing \0. The following N bytes
274 of the requirements string, including a trailing \0. The following N bytes
276 are the requirements string, which is ASCII containing a comma-delimited
275 are the requirements string, which is ASCII containing a comma-delimited
277 list of repo requirements that are needed to support the data.
276 list of repo requirements that are needed to support the data.
278
277
279 The remaining content is the output of ``generatev1()`` (which may be
278 The remaining content is the output of ``generatev1()`` (which may be
280 compressed in the future).
279 compressed in the future).
281
280
282 Returns a tuple of (requirements, data generator).
281 Returns a tuple of (requirements, data generator).
283 """
282 """
284 if compression != 'UN':
283 if compression != 'UN':
285 raise ValueError('we do not support the compression argument yet')
284 raise ValueError('we do not support the compression argument yet')
286
285
287 requirements = repo.requirements & repo.supportedformats
286 requirements = repo.requirements & repo.supportedformats
288 requires = ','.join(sorted(requirements))
287 requires = ','.join(sorted(requirements))
289
288
290 def gen():
289 def gen():
291 yield 'HGS1'
290 yield 'HGS1'
292 yield compression
291 yield compression
293
292
294 filecount, bytecount, it = generatev1(repo)
293 filecount, bytecount, it = generatev1(repo)
295 repo.ui.status(_('writing %d bytes for %d files\n') %
294 repo.ui.status(_('writing %d bytes for %d files\n') %
296 (bytecount, filecount))
295 (bytecount, filecount))
297
296
298 yield struct.pack('>QQ', filecount, bytecount)
297 yield struct.pack('>QQ', filecount, bytecount)
299 yield struct.pack('>H', len(requires) + 1)
298 yield struct.pack('>H', len(requires) + 1)
300 yield requires + '\0'
299 yield requires + '\0'
301
300
302 # This is where we'll add compression in the future.
301 # This is where we'll add compression in the future.
303 assert compression == 'UN'
302 assert compression == 'UN'
304
303
305 seen = 0
304 seen = 0
306 repo.ui.progress(_('bundle'), 0, total=bytecount, unit=_('bytes'))
305 repo.ui.progress(_('bundle'), 0, total=bytecount, unit=_('bytes'))
307
306
308 for chunk in it:
307 for chunk in it:
309 seen += len(chunk)
308 seen += len(chunk)
310 repo.ui.progress(_('bundle'), seen, total=bytecount,
309 repo.ui.progress(_('bundle'), seen, total=bytecount,
311 unit=_('bytes'))
310 unit=_('bytes'))
312 yield chunk
311 yield chunk
313
312
314 repo.ui.progress(_('bundle'), None)
313 repo.ui.progress(_('bundle'), None)
315
314
316 return requirements, gen()
315 return requirements, gen()
317
316
318 def consumev1(repo, fp, filecount, bytecount):
317 def consumev1(repo, fp, filecount, bytecount):
319 """Apply the contents from version 1 of a streaming clone file handle.
318 """Apply the contents from version 1 of a streaming clone file handle.
320
319
321 This takes the output from "stream_out" and applies it to the specified
320 This takes the output from "stream_out" and applies it to the specified
322 repository.
321 repository.
323
322
324 Like "stream_out," the status line added by the wire protocol is not
323 Like "stream_out," the status line added by the wire protocol is not
325 handled by this function.
324 handled by this function.
326 """
325 """
327 with repo.lock():
326 with repo.lock():
328 repo.ui.status(_('%d files to transfer, %s of data\n') %
327 repo.ui.status(_('%d files to transfer, %s of data\n') %
329 (filecount, util.bytecount(bytecount)))
328 (filecount, util.bytecount(bytecount)))
330 handled_bytes = 0
329 handled_bytes = 0
331 repo.ui.progress(_('clone'), 0, total=bytecount, unit=_('bytes'))
330 repo.ui.progress(_('clone'), 0, total=bytecount, unit=_('bytes'))
332 start = util.timer()
331 start = util.timer()
333
332
334 # TODO: get rid of (potential) inconsistency
333 # TODO: get rid of (potential) inconsistency
335 #
334 #
336 # If transaction is started and any @filecache property is
335 # If transaction is started and any @filecache property is
337 # changed at this point, it causes inconsistency between
336 # changed at this point, it causes inconsistency between
338 # in-memory cached property and streamclone-ed file on the
337 # in-memory cached property and streamclone-ed file on the
339 # disk. Nested transaction prevents transaction scope "clone"
338 # disk. Nested transaction prevents transaction scope "clone"
340 # below from writing in-memory changes out at the end of it,
339 # below from writing in-memory changes out at the end of it,
341 # even though in-memory changes are discarded at the end of it
340 # even though in-memory changes are discarded at the end of it
342 # regardless of transaction nesting.
341 # regardless of transaction nesting.
343 #
342 #
344 # But transaction nesting can't be simply prohibited, because
343 # But transaction nesting can't be simply prohibited, because
345 # nesting occurs also in ordinary case (e.g. enabling
344 # nesting occurs also in ordinary case (e.g. enabling
346 # clonebundles).
345 # clonebundles).
347
346
348 with repo.transaction('clone'):
347 with repo.transaction('clone'):
349 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
348 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
350 for i in xrange(filecount):
349 for i in xrange(filecount):
351 # XXX doesn't support '\n' or '\r' in filenames
350 # XXX doesn't support '\n' or '\r' in filenames
352 l = fp.readline()
351 l = fp.readline()
353 try:
352 try:
354 name, size = l.split('\0', 1)
353 name, size = l.split('\0', 1)
355 size = int(size)
354 size = int(size)
356 except (ValueError, TypeError):
355 except (ValueError, TypeError):
357 raise error.ResponseError(
356 raise error.ResponseError(
358 _('unexpected response from remote server:'), l)
357 _('unexpected response from remote server:'), l)
359 if repo.ui.debugflag:
358 if repo.ui.debugflag:
360 repo.ui.debug('adding %s (%s)\n' %
359 repo.ui.debug('adding %s (%s)\n' %
361 (name, util.bytecount(size)))
360 (name, util.bytecount(size)))
362 # for backwards compat, name was partially encoded
361 # for backwards compat, name was partially encoded
363 path = store.decodedir(name)
362 path = store.decodedir(name)
364 with repo.svfs(path, 'w', backgroundclose=True) as ofp:
363 with repo.svfs(path, 'w', backgroundclose=True) as ofp:
365 for chunk in util.filechunkiter(fp, limit=size):
364 for chunk in util.filechunkiter(fp, limit=size):
366 handled_bytes += len(chunk)
365 handled_bytes += len(chunk)
367 repo.ui.progress(_('clone'), handled_bytes,
366 repo.ui.progress(_('clone'), handled_bytes,
368 total=bytecount, unit=_('bytes'))
367 total=bytecount, unit=_('bytes'))
369 ofp.write(chunk)
368 ofp.write(chunk)
370
369
371 # force @filecache properties to be reloaded from
370 # force @filecache properties to be reloaded from
372 # streamclone-ed file at next access
371 # streamclone-ed file at next access
373 repo.invalidate(clearfilecache=True)
372 repo.invalidate(clearfilecache=True)
374
373
375 elapsed = util.timer() - start
374 elapsed = util.timer() - start
376 if elapsed <= 0:
375 if elapsed <= 0:
377 elapsed = 0.001
376 elapsed = 0.001
378 repo.ui.progress(_('clone'), None)
377 repo.ui.progress(_('clone'), None)
379 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
378 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
380 (util.bytecount(bytecount), elapsed,
379 (util.bytecount(bytecount), elapsed,
381 util.bytecount(bytecount / elapsed)))
380 util.bytecount(bytecount / elapsed)))
382
381
383 def readbundle1header(fp):
382 def readbundle1header(fp):
384 compression = fp.read(2)
383 compression = fp.read(2)
385 if compression != 'UN':
384 if compression != 'UN':
386 raise error.Abort(_('only uncompressed stream clone bundles are '
385 raise error.Abort(_('only uncompressed stream clone bundles are '
387 'supported; got %s') % compression)
386 'supported; got %s') % compression)
388
387
389 filecount, bytecount = struct.unpack('>QQ', fp.read(16))
388 filecount, bytecount = struct.unpack('>QQ', fp.read(16))
390 requireslen = struct.unpack('>H', fp.read(2))[0]
389 requireslen = struct.unpack('>H', fp.read(2))[0]
391 requires = fp.read(requireslen)
390 requires = fp.read(requireslen)
392
391
393 if not requires.endswith('\0'):
392 if not requires.endswith('\0'):
394 raise error.Abort(_('malformed stream clone bundle: '
393 raise error.Abort(_('malformed stream clone bundle: '
395 'requirements not properly encoded'))
394 'requirements not properly encoded'))
396
395
397 requirements = set(requires.rstrip('\0').split(','))
396 requirements = set(requires.rstrip('\0').split(','))
398
397
399 return filecount, bytecount, requirements
398 return filecount, bytecount, requirements
400
399
401 def applybundlev1(repo, fp):
400 def applybundlev1(repo, fp):
402 """Apply the content from a stream clone bundle version 1.
401 """Apply the content from a stream clone bundle version 1.
403
402
404 We assume the 4 byte header has been read and validated and the file handle
403 We assume the 4 byte header has been read and validated and the file handle
405 is at the 2 byte compression identifier.
404 is at the 2 byte compression identifier.
406 """
405 """
407 if len(repo):
406 if len(repo):
408 raise error.Abort(_('cannot apply stream clone bundle on non-empty '
407 raise error.Abort(_('cannot apply stream clone bundle on non-empty '
409 'repo'))
408 'repo'))
410
409
411 filecount, bytecount, requirements = readbundle1header(fp)
410 filecount, bytecount, requirements = readbundle1header(fp)
412 missingreqs = requirements - repo.supportedformats
411 missingreqs = requirements - repo.supportedformats
413 if missingreqs:
412 if missingreqs:
414 raise error.Abort(_('unable to apply stream clone: '
413 raise error.Abort(_('unable to apply stream clone: '
415 'unsupported format: %s') %
414 'unsupported format: %s') %
416 ', '.join(sorted(missingreqs)))
415 ', '.join(sorted(missingreqs)))
417
416
418 consumev1(repo, fp, filecount, bytecount)
417 consumev1(repo, fp, filecount, bytecount)
419
418
420 class streamcloneapplier(object):
419 class streamcloneapplier(object):
421 """Class to manage applying streaming clone bundles.
420 """Class to manage applying streaming clone bundles.
422
421
423 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
422 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
424 readers to perform bundle type-specific functionality.
423 readers to perform bundle type-specific functionality.
425 """
424 """
426 def __init__(self, fh):
425 def __init__(self, fh):
427 self._fh = fh
426 self._fh = fh
428
427
429 def apply(self, repo):
428 def apply(self, repo):
430 return applybundlev1(repo, self._fh)
429 return applybundlev1(repo, self._fh)
431
430
432 def _emit(repo, entries, totalfilesize):
431 def _emit(repo, entries, totalfilesize):
433 """actually emit the stream bundle"""
432 """actually emit the stream bundle"""
434 progress = repo.ui.progress
433 progress = repo.ui.progress
435 progress(_('bundle'), 0, total=totalfilesize, unit=_('bytes'))
434 progress(_('bundle'), 0, total=totalfilesize, unit=_('bytes'))
436 vfs = repo.svfs
435 vfs = repo.svfs
437 try:
436 try:
438 seen = 0
437 seen = 0
439 for name, size in entries:
438 for name, size in entries:
440 yield util.uvarintencode(len(name))
439 yield util.uvarintencode(len(name))
441 fp = vfs(name)
440 fp = vfs(name)
442 try:
441 try:
443 yield util.uvarintencode(size)
442 yield util.uvarintencode(size)
444 yield name
443 yield name
445 if size <= 65536:
444 if size <= 65536:
446 chunks = (fp.read(size),)
445 chunks = (fp.read(size),)
447 else:
446 else:
448 chunks = util.filechunkiter(fp, limit=size)
447 chunks = util.filechunkiter(fp, limit=size)
449 for chunk in chunks:
448 for chunk in chunks:
450 seen += len(chunk)
449 seen += len(chunk)
451 progress(_('bundle'), seen, total=totalfilesize,
450 progress(_('bundle'), seen, total=totalfilesize,
452 unit=_('bytes'))
451 unit=_('bytes'))
453 yield chunk
452 yield chunk
454 finally:
453 finally:
455 fp.close()
454 fp.close()
456 finally:
455 finally:
457 progress(_('bundle'), None)
456 progress(_('bundle'), None)
458
457
459 def generatev2(repo):
458 def generatev2(repo):
460 """Emit content for version 2 of a streaming clone.
459 """Emit content for version 2 of a streaming clone.
461
460
462 the data stream consists the following entries:
461 the data stream consists the following entries:
463 1) A varint containing the length of the filename
462 1) A varint containing the length of the filename
464 2) A varint containing the length of file data
463 2) A varint containing the length of file data
465 3) N bytes containing the filename (the internal, store-agnostic form)
464 3) N bytes containing the filename (the internal, store-agnostic form)
466 4) N bytes containing the file data
465 4) N bytes containing the file data
467
466
468 Returns a 3-tuple of (file count, file size, data iterator).
467 Returns a 3-tuple of (file count, file size, data iterator).
469 """
468 """
470
469
471 with repo.lock():
470 with repo.lock():
472
471
473 entries = []
472 entries = []
474 totalfilesize = 0
473 totalfilesize = 0
475
474
476 repo.ui.debug('scanning\n')
475 repo.ui.debug('scanning\n')
477 for name, ename, size in _walkstreamfiles(repo):
476 for name, ename, size in _walkstreamfiles(repo):
478 if size:
477 if size:
479 entries.append((name, size))
478 entries.append((name, size))
480 totalfilesize += size
479 totalfilesize += size
481
480
482 chunks = _emit(repo, entries, totalfilesize)
481 chunks = _emit(repo, entries, totalfilesize)
483
482
484 return len(entries), totalfilesize, chunks
483 return len(entries), totalfilesize, chunks
485
484
486 def consumev2(repo, fp, filecount, filesize):
485 def consumev2(repo, fp, filecount, filesize):
487 """Apply the contents from a version 2 streaming clone.
486 """Apply the contents from a version 2 streaming clone.
488
487
489 Data is read from an object that only needs to provide a ``read(size)``
488 Data is read from an object that only needs to provide a ``read(size)``
490 method.
489 method.
491 """
490 """
492 with repo.lock():
491 with repo.lock():
493 repo.ui.status(_('%d files to transfer, %s of data\n') %
492 repo.ui.status(_('%d files to transfer, %s of data\n') %
494 (filecount, util.bytecount(filesize)))
493 (filecount, util.bytecount(filesize)))
495
494
496 start = util.timer()
495 start = util.timer()
497 handledbytes = 0
496 handledbytes = 0
498 progress = repo.ui.progress
497 progress = repo.ui.progress
499
498
500 progress(_('clone'), handledbytes, total=filesize, unit=_('bytes'))
499 progress(_('clone'), handledbytes, total=filesize, unit=_('bytes'))
501
500
502 vfs = repo.svfs
501 vfs = repo.svfs
503
502
504 with repo.transaction('clone'):
503 with repo.transaction('clone'):
505 with vfs.backgroundclosing(repo.ui):
504 with vfs.backgroundclosing(repo.ui):
506 for i in range(filecount):
505 for i in range(filecount):
507 namelen = util.uvarintdecodestream(fp)
506 namelen = util.uvarintdecodestream(fp)
508 datalen = util.uvarintdecodestream(fp)
507 datalen = util.uvarintdecodestream(fp)
509
508
510 name = fp.read(namelen)
509 name = fp.read(namelen)
511
510
512 if repo.ui.debugflag:
511 if repo.ui.debugflag:
513 repo.ui.debug('adding %s (%s)\n' %
512 repo.ui.debug('adding %s (%s)\n' %
514 (name, util.bytecount(datalen)))
513 (name, util.bytecount(datalen)))
515
514
516 with vfs(name, 'w') as ofp:
515 with vfs(name, 'w') as ofp:
517 for chunk in util.filechunkiter(fp, limit=datalen):
516 for chunk in util.filechunkiter(fp, limit=datalen):
518 handledbytes += len(chunk)
517 handledbytes += len(chunk)
519 progress(_('clone'), handledbytes, total=filesize,
518 progress(_('clone'), handledbytes, total=filesize,
520 unit=_('bytes'))
519 unit=_('bytes'))
521 ofp.write(chunk)
520 ofp.write(chunk)
522
521
523 # force @filecache properties to be reloaded from
522 # force @filecache properties to be reloaded from
524 # streamclone-ed file at next access
523 # streamclone-ed file at next access
525 repo.invalidate(clearfilecache=True)
524 repo.invalidate(clearfilecache=True)
526
525
527 elapsed = util.timer() - start
526 elapsed = util.timer() - start
528 if elapsed <= 0:
527 if elapsed <= 0:
529 elapsed = 0.001
528 elapsed = 0.001
530 progress(_('clone'), None)
529 progress(_('clone'), None)
531 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
530 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
532 (util.bytecount(handledbytes), elapsed,
531 (util.bytecount(handledbytes), elapsed,
533 util.bytecount(handledbytes / elapsed)))
532 util.bytecount(handledbytes / elapsed)))
534
533
535 def applybundlev2(repo, fp, filecount, filesize, requirements):
534 def applybundlev2(repo, fp, filecount, filesize, requirements):
536 missingreqs = [r for r in requirements if r not in repo.supported]
535 missingreqs = [r for r in requirements if r not in repo.supported]
537 if missingreqs:
536 if missingreqs:
538 raise error.Abort(_('unable to apply stream clone: '
537 raise error.Abort(_('unable to apply stream clone: '
539 'unsupported format: %s') %
538 'unsupported format: %s') %
540 ', '.join(sorted(missingreqs)))
539 ', '.join(sorted(missingreqs)))
541
540
542 consumev2(repo, fp, filecount, filesize)
541 consumev2(repo, fp, filecount, filesize)
General Comments 0
You need to be logged in to leave comments. Login now