##// END OF EJS Templates
streamclone: close large revlog files explicitly in generatev1()
Yuya Nishihara -
r33410:c7843083 default
parent child Browse files
Show More
@@ -1,413 +1,412 b''
1 # streamclone.py - producing and consuming streaming repository data
1 # streamclone.py - producing and consuming streaming repository data
2 #
2 #
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import struct
10 import struct
11
11
12 from .i18n import _
12 from .i18n import _
13 from . import (
13 from . import (
14 branchmap,
14 branchmap,
15 error,
15 error,
16 phases,
16 phases,
17 store,
17 store,
18 util,
18 util,
19 )
19 )
20
20
21 def canperformstreamclone(pullop, bailifbundle2supported=False):
21 def canperformstreamclone(pullop, bailifbundle2supported=False):
22 """Whether it is possible to perform a streaming clone as part of pull.
22 """Whether it is possible to perform a streaming clone as part of pull.
23
23
24 ``bailifbundle2supported`` will cause the function to return False if
24 ``bailifbundle2supported`` will cause the function to return False if
25 bundle2 stream clones are supported. It should only be called by the
25 bundle2 stream clones are supported. It should only be called by the
26 legacy stream clone code path.
26 legacy stream clone code path.
27
27
28 Returns a tuple of (supported, requirements). ``supported`` is True if
28 Returns a tuple of (supported, requirements). ``supported`` is True if
29 streaming clone is supported and False otherwise. ``requirements`` is
29 streaming clone is supported and False otherwise. ``requirements`` is
30 a set of repo requirements from the remote, or ``None`` if stream clone
30 a set of repo requirements from the remote, or ``None`` if stream clone
31 isn't supported.
31 isn't supported.
32 """
32 """
33 repo = pullop.repo
33 repo = pullop.repo
34 remote = pullop.remote
34 remote = pullop.remote
35
35
36 bundle2supported = False
36 bundle2supported = False
37 if pullop.canusebundle2:
37 if pullop.canusebundle2:
38 if 'v1' in pullop.remotebundle2caps.get('stream', []):
38 if 'v1' in pullop.remotebundle2caps.get('stream', []):
39 bundle2supported = True
39 bundle2supported = True
40 # else
40 # else
41 # Server doesn't support bundle2 stream clone or doesn't support
41 # Server doesn't support bundle2 stream clone or doesn't support
42 # the versions we support. Fall back and possibly allow legacy.
42 # the versions we support. Fall back and possibly allow legacy.
43
43
44 # Ensures legacy code path uses available bundle2.
44 # Ensures legacy code path uses available bundle2.
45 if bailifbundle2supported and bundle2supported:
45 if bailifbundle2supported and bundle2supported:
46 return False, None
46 return False, None
47 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
47 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
48 #elif not bailifbundle2supported and not bundle2supported:
48 #elif not bailifbundle2supported and not bundle2supported:
49 # return False, None
49 # return False, None
50
50
51 # Streaming clone only works on empty repositories.
51 # Streaming clone only works on empty repositories.
52 if len(repo):
52 if len(repo):
53 return False, None
53 return False, None
54
54
55 # Streaming clone only works if all data is being requested.
55 # Streaming clone only works if all data is being requested.
56 if pullop.heads:
56 if pullop.heads:
57 return False, None
57 return False, None
58
58
59 streamrequested = pullop.streamclonerequested
59 streamrequested = pullop.streamclonerequested
60
60
61 # If we don't have a preference, let the server decide for us. This
61 # If we don't have a preference, let the server decide for us. This
62 # likely only comes into play in LANs.
62 # likely only comes into play in LANs.
63 if streamrequested is None:
63 if streamrequested is None:
64 # The server can advertise whether to prefer streaming clone.
64 # The server can advertise whether to prefer streaming clone.
65 streamrequested = remote.capable('stream-preferred')
65 streamrequested = remote.capable('stream-preferred')
66
66
67 if not streamrequested:
67 if not streamrequested:
68 return False, None
68 return False, None
69
69
70 # In order for stream clone to work, the client has to support all the
70 # In order for stream clone to work, the client has to support all the
71 # requirements advertised by the server.
71 # requirements advertised by the server.
72 #
72 #
73 # The server advertises its requirements via the "stream" and "streamreqs"
73 # The server advertises its requirements via the "stream" and "streamreqs"
74 # capability. "stream" (a value-less capability) is advertised if and only
74 # capability. "stream" (a value-less capability) is advertised if and only
75 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
75 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
76 # is advertised and contains a comma-delimited list of requirements.
76 # is advertised and contains a comma-delimited list of requirements.
77 requirements = set()
77 requirements = set()
78 if remote.capable('stream'):
78 if remote.capable('stream'):
79 requirements.add('revlogv1')
79 requirements.add('revlogv1')
80 else:
80 else:
81 streamreqs = remote.capable('streamreqs')
81 streamreqs = remote.capable('streamreqs')
82 # This is weird and shouldn't happen with modern servers.
82 # This is weird and shouldn't happen with modern servers.
83 if not streamreqs:
83 if not streamreqs:
84 pullop.repo.ui.warn(_(
84 pullop.repo.ui.warn(_(
85 'warning: stream clone requested but server has them '
85 'warning: stream clone requested but server has them '
86 'disabled\n'))
86 'disabled\n'))
87 return False, None
87 return False, None
88
88
89 streamreqs = set(streamreqs.split(','))
89 streamreqs = set(streamreqs.split(','))
90 # Server requires something we don't support. Bail.
90 # Server requires something we don't support. Bail.
91 missingreqs = streamreqs - repo.supportedformats
91 missingreqs = streamreqs - repo.supportedformats
92 if missingreqs:
92 if missingreqs:
93 pullop.repo.ui.warn(_(
93 pullop.repo.ui.warn(_(
94 'warning: stream clone requested but client is missing '
94 'warning: stream clone requested but client is missing '
95 'requirements: %s\n') % ', '.join(sorted(missingreqs)))
95 'requirements: %s\n') % ', '.join(sorted(missingreqs)))
96 pullop.repo.ui.warn(
96 pullop.repo.ui.warn(
97 _('(see https://www.mercurial-scm.org/wiki/MissingRequirement '
97 _('(see https://www.mercurial-scm.org/wiki/MissingRequirement '
98 'for more information)\n'))
98 'for more information)\n'))
99 return False, None
99 return False, None
100 requirements = streamreqs
100 requirements = streamreqs
101
101
102 return True, requirements
102 return True, requirements
103
103
104 def maybeperformlegacystreamclone(pullop):
104 def maybeperformlegacystreamclone(pullop):
105 """Possibly perform a legacy stream clone operation.
105 """Possibly perform a legacy stream clone operation.
106
106
107 Legacy stream clones are performed as part of pull but before all other
107 Legacy stream clones are performed as part of pull but before all other
108 operations.
108 operations.
109
109
110 A legacy stream clone will not be performed if a bundle2 stream clone is
110 A legacy stream clone will not be performed if a bundle2 stream clone is
111 supported.
111 supported.
112 """
112 """
113 supported, requirements = canperformstreamclone(pullop)
113 supported, requirements = canperformstreamclone(pullop)
114
114
115 if not supported:
115 if not supported:
116 return
116 return
117
117
118 repo = pullop.repo
118 repo = pullop.repo
119 remote = pullop.remote
119 remote = pullop.remote
120
120
121 # Save remote branchmap. We will use it later to speed up branchcache
121 # Save remote branchmap. We will use it later to speed up branchcache
122 # creation.
122 # creation.
123 rbranchmap = None
123 rbranchmap = None
124 if remote.capable('branchmap'):
124 if remote.capable('branchmap'):
125 rbranchmap = remote.branchmap()
125 rbranchmap = remote.branchmap()
126
126
127 repo.ui.status(_('streaming all changes\n'))
127 repo.ui.status(_('streaming all changes\n'))
128
128
129 fp = remote.stream_out()
129 fp = remote.stream_out()
130 l = fp.readline()
130 l = fp.readline()
131 try:
131 try:
132 resp = int(l)
132 resp = int(l)
133 except ValueError:
133 except ValueError:
134 raise error.ResponseError(
134 raise error.ResponseError(
135 _('unexpected response from remote server:'), l)
135 _('unexpected response from remote server:'), l)
136 if resp == 1:
136 if resp == 1:
137 raise error.Abort(_('operation forbidden by server'))
137 raise error.Abort(_('operation forbidden by server'))
138 elif resp == 2:
138 elif resp == 2:
139 raise error.Abort(_('locking the remote repository failed'))
139 raise error.Abort(_('locking the remote repository failed'))
140 elif resp != 0:
140 elif resp != 0:
141 raise error.Abort(_('the server sent an unknown error code'))
141 raise error.Abort(_('the server sent an unknown error code'))
142
142
143 l = fp.readline()
143 l = fp.readline()
144 try:
144 try:
145 filecount, bytecount = map(int, l.split(' ', 1))
145 filecount, bytecount = map(int, l.split(' ', 1))
146 except (ValueError, TypeError):
146 except (ValueError, TypeError):
147 raise error.ResponseError(
147 raise error.ResponseError(
148 _('unexpected response from remote server:'), l)
148 _('unexpected response from remote server:'), l)
149
149
150 with repo.lock():
150 with repo.lock():
151 consumev1(repo, fp, filecount, bytecount)
151 consumev1(repo, fp, filecount, bytecount)
152
152
153 # new requirements = old non-format requirements +
153 # new requirements = old non-format requirements +
154 # new format-related remote requirements
154 # new format-related remote requirements
155 # requirements from the streamed-in repository
155 # requirements from the streamed-in repository
156 repo.requirements = requirements | (
156 repo.requirements = requirements | (
157 repo.requirements - repo.supportedformats)
157 repo.requirements - repo.supportedformats)
158 repo._applyopenerreqs()
158 repo._applyopenerreqs()
159 repo._writerequirements()
159 repo._writerequirements()
160
160
161 if rbranchmap:
161 if rbranchmap:
162 branchmap.replacecache(repo, rbranchmap)
162 branchmap.replacecache(repo, rbranchmap)
163
163
164 repo.invalidate()
164 repo.invalidate()
165
165
166 def allowservergeneration(repo):
166 def allowservergeneration(repo):
167 """Whether streaming clones are allowed from the server."""
167 """Whether streaming clones are allowed from the server."""
168 if not repo.ui.configbool('server', 'uncompressed', True, untrusted=True):
168 if not repo.ui.configbool('server', 'uncompressed', True, untrusted=True):
169 return False
169 return False
170
170
171 # The way stream clone works makes it impossible to hide secret changesets.
171 # The way stream clone works makes it impossible to hide secret changesets.
172 # So don't allow this by default.
172 # So don't allow this by default.
173 secret = phases.hassecret(repo)
173 secret = phases.hassecret(repo)
174 if secret:
174 if secret:
175 return repo.ui.configbool('server', 'uncompressedallowsecret')
175 return repo.ui.configbool('server', 'uncompressedallowsecret')
176
176
177 return True
177 return True
178
178
179 # This is it's own function so extensions can override it.
179 # This is it's own function so extensions can override it.
180 def _walkstreamfiles(repo):
180 def _walkstreamfiles(repo):
181 return repo.store.walk()
181 return repo.store.walk()
182
182
183 def generatev1(repo):
183 def generatev1(repo):
184 """Emit content for version 1 of a streaming clone.
184 """Emit content for version 1 of a streaming clone.
185
185
186 This returns a 3-tuple of (file count, byte size, data iterator).
186 This returns a 3-tuple of (file count, byte size, data iterator).
187
187
188 The data iterator consists of N entries for each file being transferred.
188 The data iterator consists of N entries for each file being transferred.
189 Each file entry starts as a line with the file name and integer size
189 Each file entry starts as a line with the file name and integer size
190 delimited by a null byte.
190 delimited by a null byte.
191
191
192 The raw file data follows. Following the raw file data is the next file
192 The raw file data follows. Following the raw file data is the next file
193 entry, or EOF.
193 entry, or EOF.
194
194
195 When used on the wire protocol, an additional line indicating protocol
195 When used on the wire protocol, an additional line indicating protocol
196 success will be prepended to the stream. This function is not responsible
196 success will be prepended to the stream. This function is not responsible
197 for adding it.
197 for adding it.
198
198
199 This function will obtain a repository lock to ensure a consistent view of
199 This function will obtain a repository lock to ensure a consistent view of
200 the store is captured. It therefore may raise LockError.
200 the store is captured. It therefore may raise LockError.
201 """
201 """
202 entries = []
202 entries = []
203 total_bytes = 0
203 total_bytes = 0
204 # Get consistent snapshot of repo, lock during scan.
204 # Get consistent snapshot of repo, lock during scan.
205 with repo.lock():
205 with repo.lock():
206 repo.ui.debug('scanning\n')
206 repo.ui.debug('scanning\n')
207 for name, ename, size in _walkstreamfiles(repo):
207 for name, ename, size in _walkstreamfiles(repo):
208 if size:
208 if size:
209 entries.append((name, size))
209 entries.append((name, size))
210 total_bytes += size
210 total_bytes += size
211
211
212 repo.ui.debug('%d files, %d bytes to transfer\n' %
212 repo.ui.debug('%d files, %d bytes to transfer\n' %
213 (len(entries), total_bytes))
213 (len(entries), total_bytes))
214
214
215 svfs = repo.svfs
215 svfs = repo.svfs
216 debugflag = repo.ui.debugflag
216 debugflag = repo.ui.debugflag
217
217
218 def emitrevlogdata():
218 def emitrevlogdata():
219 for name, size in entries:
219 for name, size in entries:
220 if debugflag:
220 if debugflag:
221 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
221 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
222 # partially encode name over the wire for backwards compat
222 # partially encode name over the wire for backwards compat
223 yield '%s\0%d\n' % (store.encodedir(name), size)
223 yield '%s\0%d\n' % (store.encodedir(name), size)
224 with svfs(name, 'rb', auditpath=False) as fp:
224 if size <= 65536:
225 if size <= 65536:
225 with svfs(name, 'rb', auditpath=False) as fp:
226 yield fp.read(size)
226 yield fp.read(size)
227 else:
227 else:
228 data = svfs(name, auditpath=False)
228 for chunk in util.filechunkiter(fp, limit=size):
229 for chunk in util.filechunkiter(data, limit=size):
230 yield chunk
229 yield chunk
231
230
232 return len(entries), total_bytes, emitrevlogdata()
231 return len(entries), total_bytes, emitrevlogdata()
233
232
234 def generatev1wireproto(repo):
233 def generatev1wireproto(repo):
235 """Emit content for version 1 of streaming clone suitable for the wire.
234 """Emit content for version 1 of streaming clone suitable for the wire.
236
235
237 This is the data output from ``generatev1()`` with a header line
236 This is the data output from ``generatev1()`` with a header line
238 indicating file count and byte size.
237 indicating file count and byte size.
239 """
238 """
240 filecount, bytecount, it = generatev1(repo)
239 filecount, bytecount, it = generatev1(repo)
241 yield '%d %d\n' % (filecount, bytecount)
240 yield '%d %d\n' % (filecount, bytecount)
242 for chunk in it:
241 for chunk in it:
243 yield chunk
242 yield chunk
244
243
245 def generatebundlev1(repo, compression='UN'):
244 def generatebundlev1(repo, compression='UN'):
246 """Emit content for version 1 of a stream clone bundle.
245 """Emit content for version 1 of a stream clone bundle.
247
246
248 The first 4 bytes of the output ("HGS1") denote this as stream clone
247 The first 4 bytes of the output ("HGS1") denote this as stream clone
249 bundle version 1.
248 bundle version 1.
250
249
251 The next 2 bytes indicate the compression type. Only "UN" is currently
250 The next 2 bytes indicate the compression type. Only "UN" is currently
252 supported.
251 supported.
253
252
254 The next 16 bytes are two 64-bit big endian unsigned integers indicating
253 The next 16 bytes are two 64-bit big endian unsigned integers indicating
255 file count and byte count, respectively.
254 file count and byte count, respectively.
256
255
257 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
256 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
258 of the requirements string, including a trailing \0. The following N bytes
257 of the requirements string, including a trailing \0. The following N bytes
259 are the requirements string, which is ASCII containing a comma-delimited
258 are the requirements string, which is ASCII containing a comma-delimited
260 list of repo requirements that are needed to support the data.
259 list of repo requirements that are needed to support the data.
261
260
262 The remaining content is the output of ``generatev1()`` (which may be
261 The remaining content is the output of ``generatev1()`` (which may be
263 compressed in the future).
262 compressed in the future).
264
263
265 Returns a tuple of (requirements, data generator).
264 Returns a tuple of (requirements, data generator).
266 """
265 """
267 if compression != 'UN':
266 if compression != 'UN':
268 raise ValueError('we do not support the compression argument yet')
267 raise ValueError('we do not support the compression argument yet')
269
268
270 requirements = repo.requirements & repo.supportedformats
269 requirements = repo.requirements & repo.supportedformats
271 requires = ','.join(sorted(requirements))
270 requires = ','.join(sorted(requirements))
272
271
273 def gen():
272 def gen():
274 yield 'HGS1'
273 yield 'HGS1'
275 yield compression
274 yield compression
276
275
277 filecount, bytecount, it = generatev1(repo)
276 filecount, bytecount, it = generatev1(repo)
278 repo.ui.status(_('writing %d bytes for %d files\n') %
277 repo.ui.status(_('writing %d bytes for %d files\n') %
279 (bytecount, filecount))
278 (bytecount, filecount))
280
279
281 yield struct.pack('>QQ', filecount, bytecount)
280 yield struct.pack('>QQ', filecount, bytecount)
282 yield struct.pack('>H', len(requires) + 1)
281 yield struct.pack('>H', len(requires) + 1)
283 yield requires + '\0'
282 yield requires + '\0'
284
283
285 # This is where we'll add compression in the future.
284 # This is where we'll add compression in the future.
286 assert compression == 'UN'
285 assert compression == 'UN'
287
286
288 seen = 0
287 seen = 0
289 repo.ui.progress(_('bundle'), 0, total=bytecount, unit=_('bytes'))
288 repo.ui.progress(_('bundle'), 0, total=bytecount, unit=_('bytes'))
290
289
291 for chunk in it:
290 for chunk in it:
292 seen += len(chunk)
291 seen += len(chunk)
293 repo.ui.progress(_('bundle'), seen, total=bytecount,
292 repo.ui.progress(_('bundle'), seen, total=bytecount,
294 unit=_('bytes'))
293 unit=_('bytes'))
295 yield chunk
294 yield chunk
296
295
297 repo.ui.progress(_('bundle'), None)
296 repo.ui.progress(_('bundle'), None)
298
297
299 return requirements, gen()
298 return requirements, gen()
300
299
301 def consumev1(repo, fp, filecount, bytecount):
300 def consumev1(repo, fp, filecount, bytecount):
302 """Apply the contents from version 1 of a streaming clone file handle.
301 """Apply the contents from version 1 of a streaming clone file handle.
303
302
304 This takes the output from "stream_out" and applies it to the specified
303 This takes the output from "stream_out" and applies it to the specified
305 repository.
304 repository.
306
305
307 Like "stream_out," the status line added by the wire protocol is not
306 Like "stream_out," the status line added by the wire protocol is not
308 handled by this function.
307 handled by this function.
309 """
308 """
310 with repo.lock():
309 with repo.lock():
311 repo.ui.status(_('%d files to transfer, %s of data\n') %
310 repo.ui.status(_('%d files to transfer, %s of data\n') %
312 (filecount, util.bytecount(bytecount)))
311 (filecount, util.bytecount(bytecount)))
313 handled_bytes = 0
312 handled_bytes = 0
314 repo.ui.progress(_('clone'), 0, total=bytecount, unit=_('bytes'))
313 repo.ui.progress(_('clone'), 0, total=bytecount, unit=_('bytes'))
315 start = util.timer()
314 start = util.timer()
316
315
317 # TODO: get rid of (potential) inconsistency
316 # TODO: get rid of (potential) inconsistency
318 #
317 #
319 # If transaction is started and any @filecache property is
318 # If transaction is started and any @filecache property is
320 # changed at this point, it causes inconsistency between
319 # changed at this point, it causes inconsistency between
321 # in-memory cached property and streamclone-ed file on the
320 # in-memory cached property and streamclone-ed file on the
322 # disk. Nested transaction prevents transaction scope "clone"
321 # disk. Nested transaction prevents transaction scope "clone"
323 # below from writing in-memory changes out at the end of it,
322 # below from writing in-memory changes out at the end of it,
324 # even though in-memory changes are discarded at the end of it
323 # even though in-memory changes are discarded at the end of it
325 # regardless of transaction nesting.
324 # regardless of transaction nesting.
326 #
325 #
327 # But transaction nesting can't be simply prohibited, because
326 # But transaction nesting can't be simply prohibited, because
328 # nesting occurs also in ordinary case (e.g. enabling
327 # nesting occurs also in ordinary case (e.g. enabling
329 # clonebundles).
328 # clonebundles).
330
329
331 with repo.transaction('clone'):
330 with repo.transaction('clone'):
332 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
331 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
333 for i in xrange(filecount):
332 for i in xrange(filecount):
334 # XXX doesn't support '\n' or '\r' in filenames
333 # XXX doesn't support '\n' or '\r' in filenames
335 l = fp.readline()
334 l = fp.readline()
336 try:
335 try:
337 name, size = l.split('\0', 1)
336 name, size = l.split('\0', 1)
338 size = int(size)
337 size = int(size)
339 except (ValueError, TypeError):
338 except (ValueError, TypeError):
340 raise error.ResponseError(
339 raise error.ResponseError(
341 _('unexpected response from remote server:'), l)
340 _('unexpected response from remote server:'), l)
342 if repo.ui.debugflag:
341 if repo.ui.debugflag:
343 repo.ui.debug('adding %s (%s)\n' %
342 repo.ui.debug('adding %s (%s)\n' %
344 (name, util.bytecount(size)))
343 (name, util.bytecount(size)))
345 # for backwards compat, name was partially encoded
344 # for backwards compat, name was partially encoded
346 path = store.decodedir(name)
345 path = store.decodedir(name)
347 with repo.svfs(path, 'w', backgroundclose=True) as ofp:
346 with repo.svfs(path, 'w', backgroundclose=True) as ofp:
348 for chunk in util.filechunkiter(fp, limit=size):
347 for chunk in util.filechunkiter(fp, limit=size):
349 handled_bytes += len(chunk)
348 handled_bytes += len(chunk)
350 repo.ui.progress(_('clone'), handled_bytes,
349 repo.ui.progress(_('clone'), handled_bytes,
351 total=bytecount, unit=_('bytes'))
350 total=bytecount, unit=_('bytes'))
352 ofp.write(chunk)
351 ofp.write(chunk)
353
352
354 # force @filecache properties to be reloaded from
353 # force @filecache properties to be reloaded from
355 # streamclone-ed file at next access
354 # streamclone-ed file at next access
356 repo.invalidate(clearfilecache=True)
355 repo.invalidate(clearfilecache=True)
357
356
358 elapsed = util.timer() - start
357 elapsed = util.timer() - start
359 if elapsed <= 0:
358 if elapsed <= 0:
360 elapsed = 0.001
359 elapsed = 0.001
361 repo.ui.progress(_('clone'), None)
360 repo.ui.progress(_('clone'), None)
362 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
361 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
363 (util.bytecount(bytecount), elapsed,
362 (util.bytecount(bytecount), elapsed,
364 util.bytecount(bytecount / elapsed)))
363 util.bytecount(bytecount / elapsed)))
365
364
366 def readbundle1header(fp):
365 def readbundle1header(fp):
367 compression = fp.read(2)
366 compression = fp.read(2)
368 if compression != 'UN':
367 if compression != 'UN':
369 raise error.Abort(_('only uncompressed stream clone bundles are '
368 raise error.Abort(_('only uncompressed stream clone bundles are '
370 'supported; got %s') % compression)
369 'supported; got %s') % compression)
371
370
372 filecount, bytecount = struct.unpack('>QQ', fp.read(16))
371 filecount, bytecount = struct.unpack('>QQ', fp.read(16))
373 requireslen = struct.unpack('>H', fp.read(2))[0]
372 requireslen = struct.unpack('>H', fp.read(2))[0]
374 requires = fp.read(requireslen)
373 requires = fp.read(requireslen)
375
374
376 if not requires.endswith('\0'):
375 if not requires.endswith('\0'):
377 raise error.Abort(_('malformed stream clone bundle: '
376 raise error.Abort(_('malformed stream clone bundle: '
378 'requirements not properly encoded'))
377 'requirements not properly encoded'))
379
378
380 requirements = set(requires.rstrip('\0').split(','))
379 requirements = set(requires.rstrip('\0').split(','))
381
380
382 return filecount, bytecount, requirements
381 return filecount, bytecount, requirements
383
382
384 def applybundlev1(repo, fp):
383 def applybundlev1(repo, fp):
385 """Apply the content from a stream clone bundle version 1.
384 """Apply the content from a stream clone bundle version 1.
386
385
387 We assume the 4 byte header has been read and validated and the file handle
386 We assume the 4 byte header has been read and validated and the file handle
388 is at the 2 byte compression identifier.
387 is at the 2 byte compression identifier.
389 """
388 """
390 if len(repo):
389 if len(repo):
391 raise error.Abort(_('cannot apply stream clone bundle on non-empty '
390 raise error.Abort(_('cannot apply stream clone bundle on non-empty '
392 'repo'))
391 'repo'))
393
392
394 filecount, bytecount, requirements = readbundle1header(fp)
393 filecount, bytecount, requirements = readbundle1header(fp)
395 missingreqs = requirements - repo.supportedformats
394 missingreqs = requirements - repo.supportedformats
396 if missingreqs:
395 if missingreqs:
397 raise error.Abort(_('unable to apply stream clone: '
396 raise error.Abort(_('unable to apply stream clone: '
398 'unsupported format: %s') %
397 'unsupported format: %s') %
399 ', '.join(sorted(missingreqs)))
398 ', '.join(sorted(missingreqs)))
400
399
401 consumev1(repo, fp, filecount, bytecount)
400 consumev1(repo, fp, filecount, bytecount)
402
401
403 class streamcloneapplier(object):
402 class streamcloneapplier(object):
404 """Class to manage applying streaming clone bundles.
403 """Class to manage applying streaming clone bundles.
405
404
406 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
405 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
407 readers to perform bundle type-specific functionality.
406 readers to perform bundle type-specific functionality.
408 """
407 """
409 def __init__(self, fh):
408 def __init__(self, fh):
410 self._fh = fh
409 self._fh = fh
411
410
412 def apply(self, repo):
411 def apply(self, repo):
413 return applybundlev1(repo, self._fh)
412 return applybundlev1(repo, self._fh)
General Comments 0
You need to be logged in to leave comments. Login now