##// END OF EJS Templates
streamclone: support for producing and consuming stream clone bundles...
Gregory Szorc -
r26755:bb0b955d default
parent child Browse files
Show More
@@ -1,292 +1,392 b''
1 # streamclone.py - producing and consuming streaming repository data
1 # streamclone.py - producing and consuming streaming repository data
2 #
2 #
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import struct
10 import time
11 import time
11
12
12 from .i18n import _
13 from .i18n import _
13 from . import (
14 from . import (
14 branchmap,
15 branchmap,
15 error,
16 error,
16 store,
17 store,
17 util,
18 util,
18 )
19 )
19
20
20 def canperformstreamclone(pullop, bailifbundle2supported=False):
21 def canperformstreamclone(pullop, bailifbundle2supported=False):
21 """Whether it is possible to perform a streaming clone as part of pull.
22 """Whether it is possible to perform a streaming clone as part of pull.
22
23
23 ``bailifbundle2supported`` will cause the function to return False if
24 ``bailifbundle2supported`` will cause the function to return False if
24 bundle2 stream clones are supported. It should only be called by the
25 bundle2 stream clones are supported. It should only be called by the
25 legacy stream clone code path.
26 legacy stream clone code path.
26
27
27 Returns a tuple of (supported, requirements). ``supported`` is True if
28 Returns a tuple of (supported, requirements). ``supported`` is True if
28 streaming clone is supported and False otherwise. ``requirements`` is
29 streaming clone is supported and False otherwise. ``requirements`` is
29 a set of repo requirements from the remote, or ``None`` if stream clone
30 a set of repo requirements from the remote, or ``None`` if stream clone
30 isn't supported.
31 isn't supported.
31 """
32 """
32 repo = pullop.repo
33 repo = pullop.repo
33 remote = pullop.remote
34 remote = pullop.remote
34
35
35 bundle2supported = False
36 bundle2supported = False
36 if pullop.canusebundle2:
37 if pullop.canusebundle2:
37 if 'v1' in pullop.remotebundle2caps.get('stream', []):
38 if 'v1' in pullop.remotebundle2caps.get('stream', []):
38 bundle2supported = True
39 bundle2supported = True
39 # else
40 # else
40 # Server doesn't support bundle2 stream clone or doesn't support
41 # Server doesn't support bundle2 stream clone or doesn't support
41 # the versions we support. Fall back and possibly allow legacy.
42 # the versions we support. Fall back and possibly allow legacy.
42
43
43 # Ensures legacy code path uses available bundle2.
44 # Ensures legacy code path uses available bundle2.
44 if bailifbundle2supported and bundle2supported:
45 if bailifbundle2supported and bundle2supported:
45 return False, None
46 return False, None
46 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
47 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
47 #elif not bailifbundle2supported and not bundle2supported:
48 #elif not bailifbundle2supported and not bundle2supported:
48 # return False, None
49 # return False, None
49
50
50 # Streaming clone only works on empty repositories.
51 # Streaming clone only works on empty repositories.
51 if len(repo):
52 if len(repo):
52 return False, None
53 return False, None
53
54
54 # Streaming clone only works if all data is being requested.
55 # Streaming clone only works if all data is being requested.
55 if pullop.heads:
56 if pullop.heads:
56 return False, None
57 return False, None
57
58
58 streamrequested = pullop.streamclonerequested
59 streamrequested = pullop.streamclonerequested
59
60
60 # If we don't have a preference, let the server decide for us. This
61 # If we don't have a preference, let the server decide for us. This
61 # likely only comes into play in LANs.
62 # likely only comes into play in LANs.
62 if streamrequested is None:
63 if streamrequested is None:
63 # The server can advertise whether to prefer streaming clone.
64 # The server can advertise whether to prefer streaming clone.
64 streamrequested = remote.capable('stream-preferred')
65 streamrequested = remote.capable('stream-preferred')
65
66
66 if not streamrequested:
67 if not streamrequested:
67 return False, None
68 return False, None
68
69
69 # In order for stream clone to work, the client has to support all the
70 # In order for stream clone to work, the client has to support all the
70 # requirements advertised by the server.
71 # requirements advertised by the server.
71 #
72 #
72 # The server advertises its requirements via the "stream" and "streamreqs"
73 # The server advertises its requirements via the "stream" and "streamreqs"
73 # capability. "stream" (a value-less capability) is advertised if and only
74 # capability. "stream" (a value-less capability) is advertised if and only
74 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
75 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
75 # is advertised and contains a comma-delimited list of requirements.
76 # is advertised and contains a comma-delimited list of requirements.
76 requirements = set()
77 requirements = set()
77 if remote.capable('stream'):
78 if remote.capable('stream'):
78 requirements.add('revlogv1')
79 requirements.add('revlogv1')
79 else:
80 else:
80 streamreqs = remote.capable('streamreqs')
81 streamreqs = remote.capable('streamreqs')
81 # This is weird and shouldn't happen with modern servers.
82 # This is weird and shouldn't happen with modern servers.
82 if not streamreqs:
83 if not streamreqs:
83 return False, None
84 return False, None
84
85
85 streamreqs = set(streamreqs.split(','))
86 streamreqs = set(streamreqs.split(','))
86 # Server requires something we don't support. Bail.
87 # Server requires something we don't support. Bail.
87 if streamreqs - repo.supportedformats:
88 if streamreqs - repo.supportedformats:
88 return False, None
89 return False, None
89 requirements = streamreqs
90 requirements = streamreqs
90
91
91 return True, requirements
92 return True, requirements
92
93
93 def maybeperformlegacystreamclone(pullop):
94 def maybeperformlegacystreamclone(pullop):
94 """Possibly perform a legacy stream clone operation.
95 """Possibly perform a legacy stream clone operation.
95
96
96 Legacy stream clones are performed as part of pull but before all other
97 Legacy stream clones are performed as part of pull but before all other
97 operations.
98 operations.
98
99
99 A legacy stream clone will not be performed if a bundle2 stream clone is
100 A legacy stream clone will not be performed if a bundle2 stream clone is
100 supported.
101 supported.
101 """
102 """
102 supported, requirements = canperformstreamclone(pullop)
103 supported, requirements = canperformstreamclone(pullop)
103
104
104 if not supported:
105 if not supported:
105 return
106 return
106
107
107 repo = pullop.repo
108 repo = pullop.repo
108 remote = pullop.remote
109 remote = pullop.remote
109
110
110 # Save remote branchmap. We will use it later to speed up branchcache
111 # Save remote branchmap. We will use it later to speed up branchcache
111 # creation.
112 # creation.
112 rbranchmap = None
113 rbranchmap = None
113 if remote.capable('branchmap'):
114 if remote.capable('branchmap'):
114 rbranchmap = remote.branchmap()
115 rbranchmap = remote.branchmap()
115
116
116 repo.ui.status(_('streaming all changes\n'))
117 repo.ui.status(_('streaming all changes\n'))
117
118
118 fp = remote.stream_out()
119 fp = remote.stream_out()
119 l = fp.readline()
120 l = fp.readline()
120 try:
121 try:
121 resp = int(l)
122 resp = int(l)
122 except ValueError:
123 except ValueError:
123 raise error.ResponseError(
124 raise error.ResponseError(
124 _('unexpected response from remote server:'), l)
125 _('unexpected response from remote server:'), l)
125 if resp == 1:
126 if resp == 1:
126 raise error.Abort(_('operation forbidden by server'))
127 raise error.Abort(_('operation forbidden by server'))
127 elif resp == 2:
128 elif resp == 2:
128 raise error.Abort(_('locking the remote repository failed'))
129 raise error.Abort(_('locking the remote repository failed'))
129 elif resp != 0:
130 elif resp != 0:
130 raise error.Abort(_('the server sent an unknown error code'))
131 raise error.Abort(_('the server sent an unknown error code'))
131
132
132 l = fp.readline()
133 l = fp.readline()
133 try:
134 try:
134 filecount, bytecount = map(int, l.split(' ', 1))
135 filecount, bytecount = map(int, l.split(' ', 1))
135 except (ValueError, TypeError):
136 except (ValueError, TypeError):
136 raise error.ResponseError(
137 raise error.ResponseError(
137 _('unexpected response from remote server:'), l)
138 _('unexpected response from remote server:'), l)
138
139
139 lock = repo.lock()
140 lock = repo.lock()
140 try:
141 try:
141 consumev1(repo, fp, filecount, bytecount)
142 consumev1(repo, fp, filecount, bytecount)
142
143
143 # new requirements = old non-format requirements +
144 # new requirements = old non-format requirements +
144 # new format-related remote requirements
145 # new format-related remote requirements
145 # requirements from the streamed-in repository
146 # requirements from the streamed-in repository
146 repo.requirements = requirements | (
147 repo.requirements = requirements | (
147 repo.requirements - repo.supportedformats)
148 repo.requirements - repo.supportedformats)
148 repo._applyopenerreqs()
149 repo._applyopenerreqs()
149 repo._writerequirements()
150 repo._writerequirements()
150
151
151 if rbranchmap:
152 if rbranchmap:
152 branchmap.replacecache(repo, rbranchmap)
153 branchmap.replacecache(repo, rbranchmap)
153
154
154 repo.invalidate()
155 repo.invalidate()
155 finally:
156 finally:
156 lock.release()
157 lock.release()
157
158
158 def allowservergeneration(ui):
159 def allowservergeneration(ui):
159 """Whether streaming clones are allowed from the server."""
160 """Whether streaming clones are allowed from the server."""
160 return ui.configbool('server', 'uncompressed', True, untrusted=True)
161 return ui.configbool('server', 'uncompressed', True, untrusted=True)
161
162
162 # This is it's own function so extensions can override it.
163 # This is it's own function so extensions can override it.
163 def _walkstreamfiles(repo):
164 def _walkstreamfiles(repo):
164 return repo.store.walk()
165 return repo.store.walk()
165
166
166 def generatev1(repo):
167 def generatev1(repo):
167 """Emit content for version 1 of a streaming clone.
168 """Emit content for version 1 of a streaming clone.
168
169
169 This returns a 3-tuple of (file count, byte size, data iterator).
170 This returns a 3-tuple of (file count, byte size, data iterator).
170
171
171 The data iterator consists of N entries for each file being transferred.
172 The data iterator consists of N entries for each file being transferred.
172 Each file entry starts as a line with the file name and integer size
173 Each file entry starts as a line with the file name and integer size
173 delimited by a null byte.
174 delimited by a null byte.
174
175
175 The raw file data follows. Following the raw file data is the next file
176 The raw file data follows. Following the raw file data is the next file
176 entry, or EOF.
177 entry, or EOF.
177
178
178 When used on the wire protocol, an additional line indicating protocol
179 When used on the wire protocol, an additional line indicating protocol
179 success will be prepended to the stream. This function is not responsible
180 success will be prepended to the stream. This function is not responsible
180 for adding it.
181 for adding it.
181
182
182 This function will obtain a repository lock to ensure a consistent view of
183 This function will obtain a repository lock to ensure a consistent view of
183 the store is captured. It therefore may raise LockError.
184 the store is captured. It therefore may raise LockError.
184 """
185 """
185 entries = []
186 entries = []
186 total_bytes = 0
187 total_bytes = 0
187 # Get consistent snapshot of repo, lock during scan.
188 # Get consistent snapshot of repo, lock during scan.
188 lock = repo.lock()
189 lock = repo.lock()
189 try:
190 try:
190 repo.ui.debug('scanning\n')
191 repo.ui.debug('scanning\n')
191 for name, ename, size in _walkstreamfiles(repo):
192 for name, ename, size in _walkstreamfiles(repo):
192 if size:
193 if size:
193 entries.append((name, size))
194 entries.append((name, size))
194 total_bytes += size
195 total_bytes += size
195 finally:
196 finally:
196 lock.release()
197 lock.release()
197
198
198 repo.ui.debug('%d files, %d bytes to transfer\n' %
199 repo.ui.debug('%d files, %d bytes to transfer\n' %
199 (len(entries), total_bytes))
200 (len(entries), total_bytes))
200
201
201 svfs = repo.svfs
202 svfs = repo.svfs
202 oldaudit = svfs.mustaudit
203 oldaudit = svfs.mustaudit
203 debugflag = repo.ui.debugflag
204 debugflag = repo.ui.debugflag
204 svfs.mustaudit = False
205 svfs.mustaudit = False
205
206
206 def emitrevlogdata():
207 def emitrevlogdata():
207 try:
208 try:
208 for name, size in entries:
209 for name, size in entries:
209 if debugflag:
210 if debugflag:
210 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
211 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
211 # partially encode name over the wire for backwards compat
212 # partially encode name over the wire for backwards compat
212 yield '%s\0%d\n' % (store.encodedir(name), size)
213 yield '%s\0%d\n' % (store.encodedir(name), size)
213 if size <= 65536:
214 if size <= 65536:
214 fp = svfs(name)
215 fp = svfs(name)
215 try:
216 try:
216 data = fp.read(size)
217 data = fp.read(size)
217 finally:
218 finally:
218 fp.close()
219 fp.close()
219 yield data
220 yield data
220 else:
221 else:
221 for chunk in util.filechunkiter(svfs(name), limit=size):
222 for chunk in util.filechunkiter(svfs(name), limit=size):
222 yield chunk
223 yield chunk
223 finally:
224 finally:
224 svfs.mustaudit = oldaudit
225 svfs.mustaudit = oldaudit
225
226
226 return len(entries), total_bytes, emitrevlogdata()
227 return len(entries), total_bytes, emitrevlogdata()
227
228
228 def generatev1wireproto(repo):
229 def generatev1wireproto(repo):
229 """Emit content for version 1 of streaming clone suitable for the wire.
230 """Emit content for version 1 of streaming clone suitable for the wire.
230
231
231 This is the data output from ``generatev1()`` with a header line
232 This is the data output from ``generatev1()`` with a header line
232 indicating file count and byte size.
233 indicating file count and byte size.
233 """
234 """
234 filecount, bytecount, it = generatev1(repo)
235 filecount, bytecount, it = generatev1(repo)
235 yield '%d %d\n' % (filecount, bytecount)
236 yield '%d %d\n' % (filecount, bytecount)
236 for chunk in it:
237 for chunk in it:
237 yield chunk
238 yield chunk
238
239
240 def generatebundlev1(repo, compression='UN'):
241 """Emit content for version 1 of a stream clone bundle.
242
243 The first 4 bytes of the output ("HGS1") denote this as stream clone
244 bundle version 1.
245
246 The next 2 bytes indicate the compression type. Only "UN" is currently
247 supported.
248
249 The next 16 bytes are two 64-bit big endian unsigned integers indicating
250 file count and byte count, respectively.
251
252 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
253 of the requirements string, including a trailing \0. The following N bytes
254 are the requirements string, which is ASCII containing a comma-delimited
255 list of repo requirements that are needed to support the data.
256
257 The remaining content is the output of ``generatev1()`` (which may be
258 compressed in the future).
259
260 Returns a tuple of (requirements, data generator).
261 """
262 if compression != 'UN':
263 raise ValueError('we do not support the compression argument yet')
264
265 requirements = repo.requirements & repo.supportedformats
266 requires = ','.join(sorted(requirements))
267
268 def gen():
269 yield 'HGS1'
270 yield compression
271
272 filecount, bytecount, it = generatev1(repo)
273 repo.ui.status(_('writing %d bytes for %d files\n') %
274 (bytecount, filecount))
275
276 yield struct.pack('>QQ', filecount, bytecount)
277 yield struct.pack('>H', len(requires) + 1)
278 yield requires + '\0'
279
280 # This is where we'll add compression in the future.
281 assert compression == 'UN'
282
283 seen = 0
284 repo.ui.progress(_('bundle'), 0, total=bytecount)
285
286 for chunk in it:
287 seen += len(chunk)
288 repo.ui.progress(_('bundle'), seen, total=bytecount)
289 yield chunk
290
291 repo.ui.progress(_('bundle'), None)
292
293 return requirements, gen()
294
239 def consumev1(repo, fp, filecount, bytecount):
295 def consumev1(repo, fp, filecount, bytecount):
240 """Apply the contents from version 1 of a streaming clone file handle.
296 """Apply the contents from version 1 of a streaming clone file handle.
241
297
242 This takes the output from "streamout" and applies it to the specified
298 This takes the output from "streamout" and applies it to the specified
243 repository.
299 repository.
244
300
245 Like "streamout," the status line added by the wire protocol is not handled
301 Like "streamout," the status line added by the wire protocol is not handled
246 by this function.
302 by this function.
247 """
303 """
248 lock = repo.lock()
304 lock = repo.lock()
249 try:
305 try:
250 repo.ui.status(_('%d files to transfer, %s of data\n') %
306 repo.ui.status(_('%d files to transfer, %s of data\n') %
251 (filecount, util.bytecount(bytecount)))
307 (filecount, util.bytecount(bytecount)))
252 handled_bytes = 0
308 handled_bytes = 0
253 repo.ui.progress(_('clone'), 0, total=bytecount)
309 repo.ui.progress(_('clone'), 0, total=bytecount)
254 start = time.time()
310 start = time.time()
255
311
256 tr = repo.transaction(_('clone'))
312 tr = repo.transaction(_('clone'))
257 try:
313 try:
258 for i in xrange(filecount):
314 for i in xrange(filecount):
259 # XXX doesn't support '\n' or '\r' in filenames
315 # XXX doesn't support '\n' or '\r' in filenames
260 l = fp.readline()
316 l = fp.readline()
261 try:
317 try:
262 name, size = l.split('\0', 1)
318 name, size = l.split('\0', 1)
263 size = int(size)
319 size = int(size)
264 except (ValueError, TypeError):
320 except (ValueError, TypeError):
265 raise error.ResponseError(
321 raise error.ResponseError(
266 _('unexpected response from remote server:'), l)
322 _('unexpected response from remote server:'), l)
267 if repo.ui.debugflag:
323 if repo.ui.debugflag:
268 repo.ui.debug('adding %s (%s)\n' %
324 repo.ui.debug('adding %s (%s)\n' %
269 (name, util.bytecount(size)))
325 (name, util.bytecount(size)))
270 # for backwards compat, name was partially encoded
326 # for backwards compat, name was partially encoded
271 ofp = repo.svfs(store.decodedir(name), 'w')
327 ofp = repo.svfs(store.decodedir(name), 'w')
272 for chunk in util.filechunkiter(fp, limit=size):
328 for chunk in util.filechunkiter(fp, limit=size):
273 handled_bytes += len(chunk)
329 handled_bytes += len(chunk)
274 repo.ui.progress(_('clone'), handled_bytes, total=bytecount)
330 repo.ui.progress(_('clone'), handled_bytes, total=bytecount)
275 ofp.write(chunk)
331 ofp.write(chunk)
276 ofp.close()
332 ofp.close()
277 tr.close()
333 tr.close()
278 finally:
334 finally:
279 tr.release()
335 tr.release()
280
336
281 # Writing straight to files circumvented the inmemory caches
337 # Writing straight to files circumvented the inmemory caches
282 repo.invalidate()
338 repo.invalidate()
283
339
284 elapsed = time.time() - start
340 elapsed = time.time() - start
285 if elapsed <= 0:
341 if elapsed <= 0:
286 elapsed = 0.001
342 elapsed = 0.001
287 repo.ui.progress(_('clone'), None)
343 repo.ui.progress(_('clone'), None)
288 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
344 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
289 (util.bytecount(bytecount), elapsed,
345 (util.bytecount(bytecount), elapsed,
290 util.bytecount(bytecount / elapsed)))
346 util.bytecount(bytecount / elapsed)))
291 finally:
347 finally:
292 lock.release()
348 lock.release()
349
350 def applybundlev1(repo, fp):
351 """Apply the content from a stream clone bundle version 1.
352
353 We assume the 4 byte header has been read and validated and the file handle
354 is at the 2 byte compression identifier.
355 """
356 if len(repo):
357 raise error.Abort(_('cannot apply stream clone bundle on non-empty '
358 'repo'))
359
360 compression = fp.read(2)
361 if compression != 'UN':
362 raise error.Abort(_('only uncompressed stream clone bundles are '
363 'supported; got %s') % compression)
364
365 filecount, bytecount = struct.unpack('>QQ', fp.read(16))
366 requireslen = struct.unpack('>H', fp.read(2))[0]
367 requires = fp.read(requireslen)
368
369 if not requires.endswith('\0'):
370 raise error.Abort(_('malformed stream clone bundle: '
371 'requirements not properly encoded'))
372
373 requirements = set(requires.rstrip('\0').split(','))
374 missingreqs = requirements - repo.supportedformats
375 if missingreqs:
376 raise error.Abort(_('unable to apply stream clone: '
377 'unsupported format: %s') %
378 ', '.join(sorted(missingreqs)))
379
380 consumev1(repo, fp, filecount, bytecount)
381
382 class streamcloneapplier(object):
383 """Class to manage applying streaming clone bundles.
384
385 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
386 readers to perform bundle type-specific functionality.
387 """
388 def __init__(self, fh):
389 self._fh = fh
390
391 def apply(self, repo):
392 return applybundlev1(repo, self._fh)
General Comments 0
You need to be logged in to leave comments. Login now