##// END OF EJS Templates
streamclone: support for producing and consuming stream clone bundles...
Gregory Szorc -
r26755:bb0b955d default
parent child Browse files
Show More
@@ -1,292 +1,392 b''
1 1 # streamclone.py - producing and consuming streaming repository data
2 2 #
3 3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 import struct
10 11 import time
11 12
12 13 from .i18n import _
13 14 from . import (
14 15 branchmap,
15 16 error,
16 17 store,
17 18 util,
18 19 )
19 20
20 21 def canperformstreamclone(pullop, bailifbundle2supported=False):
21 22 """Whether it is possible to perform a streaming clone as part of pull.
22 23
23 24 ``bailifbundle2supported`` will cause the function to return False if
24 25 bundle2 stream clones are supported. It should only be called by the
25 26 legacy stream clone code path.
26 27
27 28 Returns a tuple of (supported, requirements). ``supported`` is True if
28 29 streaming clone is supported and False otherwise. ``requirements`` is
29 30 a set of repo requirements from the remote, or ``None`` if stream clone
30 31 isn't supported.
31 32 """
32 33 repo = pullop.repo
33 34 remote = pullop.remote
34 35
35 36 bundle2supported = False
36 37 if pullop.canusebundle2:
37 38 if 'v1' in pullop.remotebundle2caps.get('stream', []):
38 39 bundle2supported = True
39 40 # else
40 41 # Server doesn't support bundle2 stream clone or doesn't support
41 42 # the versions we support. Fall back and possibly allow legacy.
42 43
43 44 # Ensures legacy code path uses available bundle2.
44 45 if bailifbundle2supported and bundle2supported:
45 46 return False, None
46 47 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
47 48 #elif not bailifbundle2supported and not bundle2supported:
48 49 # return False, None
49 50
50 51 # Streaming clone only works on empty repositories.
51 52 if len(repo):
52 53 return False, None
53 54
54 55 # Streaming clone only works if all data is being requested.
55 56 if pullop.heads:
56 57 return False, None
57 58
58 59 streamrequested = pullop.streamclonerequested
59 60
60 61 # If we don't have a preference, let the server decide for us. This
61 62 # likely only comes into play in LANs.
62 63 if streamrequested is None:
63 64 # The server can advertise whether to prefer streaming clone.
64 65 streamrequested = remote.capable('stream-preferred')
65 66
66 67 if not streamrequested:
67 68 return False, None
68 69
69 70 # In order for stream clone to work, the client has to support all the
70 71 # requirements advertised by the server.
71 72 #
72 73 # The server advertises its requirements via the "stream" and "streamreqs"
73 74 # capability. "stream" (a value-less capability) is advertised if and only
74 75 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
75 76 # is advertised and contains a comma-delimited list of requirements.
76 77 requirements = set()
77 78 if remote.capable('stream'):
78 79 requirements.add('revlogv1')
79 80 else:
80 81 streamreqs = remote.capable('streamreqs')
81 82 # This is weird and shouldn't happen with modern servers.
82 83 if not streamreqs:
83 84 return False, None
84 85
85 86 streamreqs = set(streamreqs.split(','))
86 87 # Server requires something we don't support. Bail.
87 88 if streamreqs - repo.supportedformats:
88 89 return False, None
89 90 requirements = streamreqs
90 91
91 92 return True, requirements
92 93
93 94 def maybeperformlegacystreamclone(pullop):
94 95 """Possibly perform a legacy stream clone operation.
95 96
96 97 Legacy stream clones are performed as part of pull but before all other
97 98 operations.
98 99
99 100 A legacy stream clone will not be performed if a bundle2 stream clone is
100 101 supported.
101 102 """
102 103 supported, requirements = canperformstreamclone(pullop)
103 104
104 105 if not supported:
105 106 return
106 107
107 108 repo = pullop.repo
108 109 remote = pullop.remote
109 110
110 111 # Save remote branchmap. We will use it later to speed up branchcache
111 112 # creation.
112 113 rbranchmap = None
113 114 if remote.capable('branchmap'):
114 115 rbranchmap = remote.branchmap()
115 116
116 117 repo.ui.status(_('streaming all changes\n'))
117 118
118 119 fp = remote.stream_out()
119 120 l = fp.readline()
120 121 try:
121 122 resp = int(l)
122 123 except ValueError:
123 124 raise error.ResponseError(
124 125 _('unexpected response from remote server:'), l)
125 126 if resp == 1:
126 127 raise error.Abort(_('operation forbidden by server'))
127 128 elif resp == 2:
128 129 raise error.Abort(_('locking the remote repository failed'))
129 130 elif resp != 0:
130 131 raise error.Abort(_('the server sent an unknown error code'))
131 132
132 133 l = fp.readline()
133 134 try:
134 135 filecount, bytecount = map(int, l.split(' ', 1))
135 136 except (ValueError, TypeError):
136 137 raise error.ResponseError(
137 138 _('unexpected response from remote server:'), l)
138 139
139 140 lock = repo.lock()
140 141 try:
141 142 consumev1(repo, fp, filecount, bytecount)
142 143
143 144 # new requirements = old non-format requirements +
144 145 # new format-related remote requirements
145 146 # requirements from the streamed-in repository
146 147 repo.requirements = requirements | (
147 148 repo.requirements - repo.supportedformats)
148 149 repo._applyopenerreqs()
149 150 repo._writerequirements()
150 151
151 152 if rbranchmap:
152 153 branchmap.replacecache(repo, rbranchmap)
153 154
154 155 repo.invalidate()
155 156 finally:
156 157 lock.release()
157 158
158 159 def allowservergeneration(ui):
159 160 """Whether streaming clones are allowed from the server."""
160 161 return ui.configbool('server', 'uncompressed', True, untrusted=True)
161 162
162 163 # This is it's own function so extensions can override it.
163 164 def _walkstreamfiles(repo):
164 165 return repo.store.walk()
165 166
166 167 def generatev1(repo):
167 168 """Emit content for version 1 of a streaming clone.
168 169
169 170 This returns a 3-tuple of (file count, byte size, data iterator).
170 171
171 172 The data iterator consists of N entries for each file being transferred.
172 173 Each file entry starts as a line with the file name and integer size
173 174 delimited by a null byte.
174 175
175 176 The raw file data follows. Following the raw file data is the next file
176 177 entry, or EOF.
177 178
178 179 When used on the wire protocol, an additional line indicating protocol
179 180 success will be prepended to the stream. This function is not responsible
180 181 for adding it.
181 182
182 183 This function will obtain a repository lock to ensure a consistent view of
183 184 the store is captured. It therefore may raise LockError.
184 185 """
185 186 entries = []
186 187 total_bytes = 0
187 188 # Get consistent snapshot of repo, lock during scan.
188 189 lock = repo.lock()
189 190 try:
190 191 repo.ui.debug('scanning\n')
191 192 for name, ename, size in _walkstreamfiles(repo):
192 193 if size:
193 194 entries.append((name, size))
194 195 total_bytes += size
195 196 finally:
196 197 lock.release()
197 198
198 199 repo.ui.debug('%d files, %d bytes to transfer\n' %
199 200 (len(entries), total_bytes))
200 201
201 202 svfs = repo.svfs
202 203 oldaudit = svfs.mustaudit
203 204 debugflag = repo.ui.debugflag
204 205 svfs.mustaudit = False
205 206
206 207 def emitrevlogdata():
207 208 try:
208 209 for name, size in entries:
209 210 if debugflag:
210 211 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
211 212 # partially encode name over the wire for backwards compat
212 213 yield '%s\0%d\n' % (store.encodedir(name), size)
213 214 if size <= 65536:
214 215 fp = svfs(name)
215 216 try:
216 217 data = fp.read(size)
217 218 finally:
218 219 fp.close()
219 220 yield data
220 221 else:
221 222 for chunk in util.filechunkiter(svfs(name), limit=size):
222 223 yield chunk
223 224 finally:
224 225 svfs.mustaudit = oldaudit
225 226
226 227 return len(entries), total_bytes, emitrevlogdata()
227 228
228 229 def generatev1wireproto(repo):
229 230 """Emit content for version 1 of streaming clone suitable for the wire.
230 231
231 232 This is the data output from ``generatev1()`` with a header line
232 233 indicating file count and byte size.
233 234 """
234 235 filecount, bytecount, it = generatev1(repo)
235 236 yield '%d %d\n' % (filecount, bytecount)
236 237 for chunk in it:
237 238 yield chunk
238 239
240 def generatebundlev1(repo, compression='UN'):
241 """Emit content for version 1 of a stream clone bundle.
242
243 The first 4 bytes of the output ("HGS1") denote this as stream clone
244 bundle version 1.
245
246 The next 2 bytes indicate the compression type. Only "UN" is currently
247 supported.
248
249 The next 16 bytes are two 64-bit big endian unsigned integers indicating
250 file count and byte count, respectively.
251
252 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
253 of the requirements string, including a trailing \0. The following N bytes
254 are the requirements string, which is ASCII containing a comma-delimited
255 list of repo requirements that are needed to support the data.
256
257 The remaining content is the output of ``generatev1()`` (which may be
258 compressed in the future).
259
260 Returns a tuple of (requirements, data generator).
261 """
262 if compression != 'UN':
263 raise ValueError('we do not support the compression argument yet')
264
265 requirements = repo.requirements & repo.supportedformats
266 requires = ','.join(sorted(requirements))
267
268 def gen():
269 yield 'HGS1'
270 yield compression
271
272 filecount, bytecount, it = generatev1(repo)
273 repo.ui.status(_('writing %d bytes for %d files\n') %
274 (bytecount, filecount))
275
276 yield struct.pack('>QQ', filecount, bytecount)
277 yield struct.pack('>H', len(requires) + 1)
278 yield requires + '\0'
279
280 # This is where we'll add compression in the future.
281 assert compression == 'UN'
282
283 seen = 0
284 repo.ui.progress(_('bundle'), 0, total=bytecount)
285
286 for chunk in it:
287 seen += len(chunk)
288 repo.ui.progress(_('bundle'), seen, total=bytecount)
289 yield chunk
290
291 repo.ui.progress(_('bundle'), None)
292
293 return requirements, gen()
294
239 295 def consumev1(repo, fp, filecount, bytecount):
240 296 """Apply the contents from version 1 of a streaming clone file handle.
241 297
242 298 This takes the output from "streamout" and applies it to the specified
243 299 repository.
244 300
245 301 Like "streamout," the status line added by the wire protocol is not handled
246 302 by this function.
247 303 """
248 304 lock = repo.lock()
249 305 try:
250 306 repo.ui.status(_('%d files to transfer, %s of data\n') %
251 307 (filecount, util.bytecount(bytecount)))
252 308 handled_bytes = 0
253 309 repo.ui.progress(_('clone'), 0, total=bytecount)
254 310 start = time.time()
255 311
256 312 tr = repo.transaction(_('clone'))
257 313 try:
258 314 for i in xrange(filecount):
259 315 # XXX doesn't support '\n' or '\r' in filenames
260 316 l = fp.readline()
261 317 try:
262 318 name, size = l.split('\0', 1)
263 319 size = int(size)
264 320 except (ValueError, TypeError):
265 321 raise error.ResponseError(
266 322 _('unexpected response from remote server:'), l)
267 323 if repo.ui.debugflag:
268 324 repo.ui.debug('adding %s (%s)\n' %
269 325 (name, util.bytecount(size)))
270 326 # for backwards compat, name was partially encoded
271 327 ofp = repo.svfs(store.decodedir(name), 'w')
272 328 for chunk in util.filechunkiter(fp, limit=size):
273 329 handled_bytes += len(chunk)
274 330 repo.ui.progress(_('clone'), handled_bytes, total=bytecount)
275 331 ofp.write(chunk)
276 332 ofp.close()
277 333 tr.close()
278 334 finally:
279 335 tr.release()
280 336
281 337 # Writing straight to files circumvented the inmemory caches
282 338 repo.invalidate()
283 339
284 340 elapsed = time.time() - start
285 341 if elapsed <= 0:
286 342 elapsed = 0.001
287 343 repo.ui.progress(_('clone'), None)
288 344 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
289 345 (util.bytecount(bytecount), elapsed,
290 346 util.bytecount(bytecount / elapsed)))
291 347 finally:
292 348 lock.release()
349
350 def applybundlev1(repo, fp):
351 """Apply the content from a stream clone bundle version 1.
352
353 We assume the 4 byte header has been read and validated and the file handle
354 is at the 2 byte compression identifier.
355 """
356 if len(repo):
357 raise error.Abort(_('cannot apply stream clone bundle on non-empty '
358 'repo'))
359
360 compression = fp.read(2)
361 if compression != 'UN':
362 raise error.Abort(_('only uncompressed stream clone bundles are '
363 'supported; got %s') % compression)
364
365 filecount, bytecount = struct.unpack('>QQ', fp.read(16))
366 requireslen = struct.unpack('>H', fp.read(2))[0]
367 requires = fp.read(requireslen)
368
369 if not requires.endswith('\0'):
370 raise error.Abort(_('malformed stream clone bundle: '
371 'requirements not properly encoded'))
372
373 requirements = set(requires.rstrip('\0').split(','))
374 missingreqs = requirements - repo.supportedformats
375 if missingreqs:
376 raise error.Abort(_('unable to apply stream clone: '
377 'unsupported format: %s') %
378 ', '.join(sorted(missingreqs)))
379
380 consumev1(repo, fp, filecount, bytecount)
381
382 class streamcloneapplier(object):
383 """Class to manage applying streaming clone bundles.
384
385 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
386 readers to perform bundle type-specific functionality.
387 """
388 def __init__(self, fh):
389 self._fh = fh
390
391 def apply(self, repo):
392 return applybundlev1(repo, self._fh)
General Comments 0
You need to be logged in to leave comments. Login now