##// END OF EJS Templates
with: use context manager in maybeperformlegacystreamclone
Bryan O'Sullivan -
r27850:49cfddbf default
parent child Browse files
Show More
@@ -1,384 +1,381 b''
1 1 # streamclone.py - producing and consuming streaming repository data
2 2 #
3 3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import struct
11 11 import time
12 12
13 13 from .i18n import _
14 14 from . import (
15 15 branchmap,
16 16 error,
17 17 store,
18 18 util,
19 19 )
20 20
21 21 def canperformstreamclone(pullop, bailifbundle2supported=False):
22 22 """Whether it is possible to perform a streaming clone as part of pull.
23 23
24 24 ``bailifbundle2supported`` will cause the function to return False if
25 25 bundle2 stream clones are supported. It should only be called by the
26 26 legacy stream clone code path.
27 27
28 28 Returns a tuple of (supported, requirements). ``supported`` is True if
29 29 streaming clone is supported and False otherwise. ``requirements`` is
30 30 a set of repo requirements from the remote, or ``None`` if stream clone
31 31 isn't supported.
32 32 """
33 33 repo = pullop.repo
34 34 remote = pullop.remote
35 35
36 36 bundle2supported = False
37 37 if pullop.canusebundle2:
38 38 if 'v1' in pullop.remotebundle2caps.get('stream', []):
39 39 bundle2supported = True
40 40 # else
41 41 # Server doesn't support bundle2 stream clone or doesn't support
42 42 # the versions we support. Fall back and possibly allow legacy.
43 43
44 44 # Ensures legacy code path uses available bundle2.
45 45 if bailifbundle2supported and bundle2supported:
46 46 return False, None
47 47 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
48 48 #elif not bailifbundle2supported and not bundle2supported:
49 49 # return False, None
50 50
51 51 # Streaming clone only works on empty repositories.
52 52 if len(repo):
53 53 return False, None
54 54
55 55 # Streaming clone only works if all data is being requested.
56 56 if pullop.heads:
57 57 return False, None
58 58
59 59 streamrequested = pullop.streamclonerequested
60 60
61 61 # If we don't have a preference, let the server decide for us. This
62 62 # likely only comes into play in LANs.
63 63 if streamrequested is None:
64 64 # The server can advertise whether to prefer streaming clone.
65 65 streamrequested = remote.capable('stream-preferred')
66 66
67 67 if not streamrequested:
68 68 return False, None
69 69
70 70 # In order for stream clone to work, the client has to support all the
71 71 # requirements advertised by the server.
72 72 #
73 73 # The server advertises its requirements via the "stream" and "streamreqs"
74 74 # capability. "stream" (a value-less capability) is advertised if and only
75 75 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
76 76 # is advertised and contains a comma-delimited list of requirements.
77 77 requirements = set()
78 78 if remote.capable('stream'):
79 79 requirements.add('revlogv1')
80 80 else:
81 81 streamreqs = remote.capable('streamreqs')
82 82 # This is weird and shouldn't happen with modern servers.
83 83 if not streamreqs:
84 84 return False, None
85 85
86 86 streamreqs = set(streamreqs.split(','))
87 87 # Server requires something we don't support. Bail.
88 88 if streamreqs - repo.supportedformats:
89 89 return False, None
90 90 requirements = streamreqs
91 91
92 92 return True, requirements
93 93
94 94 def maybeperformlegacystreamclone(pullop):
95 95 """Possibly perform a legacy stream clone operation.
96 96
97 97 Legacy stream clones are performed as part of pull but before all other
98 98 operations.
99 99
100 100 A legacy stream clone will not be performed if a bundle2 stream clone is
101 101 supported.
102 102 """
103 103 supported, requirements = canperformstreamclone(pullop)
104 104
105 105 if not supported:
106 106 return
107 107
108 108 repo = pullop.repo
109 109 remote = pullop.remote
110 110
111 111 # Save remote branchmap. We will use it later to speed up branchcache
112 112 # creation.
113 113 rbranchmap = None
114 114 if remote.capable('branchmap'):
115 115 rbranchmap = remote.branchmap()
116 116
117 117 repo.ui.status(_('streaming all changes\n'))
118 118
119 119 fp = remote.stream_out()
120 120 l = fp.readline()
121 121 try:
122 122 resp = int(l)
123 123 except ValueError:
124 124 raise error.ResponseError(
125 125 _('unexpected response from remote server:'), l)
126 126 if resp == 1:
127 127 raise error.Abort(_('operation forbidden by server'))
128 128 elif resp == 2:
129 129 raise error.Abort(_('locking the remote repository failed'))
130 130 elif resp != 0:
131 131 raise error.Abort(_('the server sent an unknown error code'))
132 132
133 133 l = fp.readline()
134 134 try:
135 135 filecount, bytecount = map(int, l.split(' ', 1))
136 136 except (ValueError, TypeError):
137 137 raise error.ResponseError(
138 138 _('unexpected response from remote server:'), l)
139 139
140 lock = repo.lock()
141 try:
140 with repo.lock():
142 141 consumev1(repo, fp, filecount, bytecount)
143 142
144 143 # new requirements = old non-format requirements +
145 144 # new format-related remote requirements
146 145 # requirements from the streamed-in repository
147 146 repo.requirements = requirements | (
148 147 repo.requirements - repo.supportedformats)
149 148 repo._applyopenerreqs()
150 149 repo._writerequirements()
151 150
152 151 if rbranchmap:
153 152 branchmap.replacecache(repo, rbranchmap)
154 153
155 154 repo.invalidate()
156 finally:
157 lock.release()
158 155
159 156 def allowservergeneration(ui):
160 157 """Whether streaming clones are allowed from the server."""
161 158 return ui.configbool('server', 'uncompressed', True, untrusted=True)
162 159
163 160 # This is it's own function so extensions can override it.
164 161 def _walkstreamfiles(repo):
165 162 return repo.store.walk()
166 163
167 164 def generatev1(repo):
168 165 """Emit content for version 1 of a streaming clone.
169 166
170 167 This returns a 3-tuple of (file count, byte size, data iterator).
171 168
172 169 The data iterator consists of N entries for each file being transferred.
173 170 Each file entry starts as a line with the file name and integer size
174 171 delimited by a null byte.
175 172
176 173 The raw file data follows. Following the raw file data is the next file
177 174 entry, or EOF.
178 175
179 176 When used on the wire protocol, an additional line indicating protocol
180 177 success will be prepended to the stream. This function is not responsible
181 178 for adding it.
182 179
183 180 This function will obtain a repository lock to ensure a consistent view of
184 181 the store is captured. It therefore may raise LockError.
185 182 """
186 183 entries = []
187 184 total_bytes = 0
188 185 # Get consistent snapshot of repo, lock during scan.
189 186 with repo.lock():
190 187 repo.ui.debug('scanning\n')
191 188 for name, ename, size in _walkstreamfiles(repo):
192 189 if size:
193 190 entries.append((name, size))
194 191 total_bytes += size
195 192
196 193 repo.ui.debug('%d files, %d bytes to transfer\n' %
197 194 (len(entries), total_bytes))
198 195
199 196 svfs = repo.svfs
200 197 oldaudit = svfs.mustaudit
201 198 debugflag = repo.ui.debugflag
202 199 svfs.mustaudit = False
203 200
204 201 def emitrevlogdata():
205 202 try:
206 203 for name, size in entries:
207 204 if debugflag:
208 205 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
209 206 # partially encode name over the wire for backwards compat
210 207 yield '%s\0%d\n' % (store.encodedir(name), size)
211 208 if size <= 65536:
212 209 yield svfs.read(name)
213 210 else:
214 211 for chunk in util.filechunkiter(svfs(name), limit=size):
215 212 yield chunk
216 213 finally:
217 214 svfs.mustaudit = oldaudit
218 215
219 216 return len(entries), total_bytes, emitrevlogdata()
220 217
221 218 def generatev1wireproto(repo):
222 219 """Emit content for version 1 of streaming clone suitable for the wire.
223 220
224 221 This is the data output from ``generatev1()`` with a header line
225 222 indicating file count and byte size.
226 223 """
227 224 filecount, bytecount, it = generatev1(repo)
228 225 yield '%d %d\n' % (filecount, bytecount)
229 226 for chunk in it:
230 227 yield chunk
231 228
232 229 def generatebundlev1(repo, compression='UN'):
233 230 """Emit content for version 1 of a stream clone bundle.
234 231
235 232 The first 4 bytes of the output ("HGS1") denote this as stream clone
236 233 bundle version 1.
237 234
238 235 The next 2 bytes indicate the compression type. Only "UN" is currently
239 236 supported.
240 237
241 238 The next 16 bytes are two 64-bit big endian unsigned integers indicating
242 239 file count and byte count, respectively.
243 240
244 241 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
245 242 of the requirements string, including a trailing \0. The following N bytes
246 243 are the requirements string, which is ASCII containing a comma-delimited
247 244 list of repo requirements that are needed to support the data.
248 245
249 246 The remaining content is the output of ``generatev1()`` (which may be
250 247 compressed in the future).
251 248
252 249 Returns a tuple of (requirements, data generator).
253 250 """
254 251 if compression != 'UN':
255 252 raise ValueError('we do not support the compression argument yet')
256 253
257 254 requirements = repo.requirements & repo.supportedformats
258 255 requires = ','.join(sorted(requirements))
259 256
260 257 def gen():
261 258 yield 'HGS1'
262 259 yield compression
263 260
264 261 filecount, bytecount, it = generatev1(repo)
265 262 repo.ui.status(_('writing %d bytes for %d files\n') %
266 263 (bytecount, filecount))
267 264
268 265 yield struct.pack('>QQ', filecount, bytecount)
269 266 yield struct.pack('>H', len(requires) + 1)
270 267 yield requires + '\0'
271 268
272 269 # This is where we'll add compression in the future.
273 270 assert compression == 'UN'
274 271
275 272 seen = 0
276 273 repo.ui.progress(_('bundle'), 0, total=bytecount)
277 274
278 275 for chunk in it:
279 276 seen += len(chunk)
280 277 repo.ui.progress(_('bundle'), seen, total=bytecount)
281 278 yield chunk
282 279
283 280 repo.ui.progress(_('bundle'), None)
284 281
285 282 return requirements, gen()
286 283
287 284 def consumev1(repo, fp, filecount, bytecount):
288 285 """Apply the contents from version 1 of a streaming clone file handle.
289 286
290 287 This takes the output from "streamout" and applies it to the specified
291 288 repository.
292 289
293 290 Like "streamout," the status line added by the wire protocol is not handled
294 291 by this function.
295 292 """
296 293 lock = repo.lock()
297 294 try:
298 295 repo.ui.status(_('%d files to transfer, %s of data\n') %
299 296 (filecount, util.bytecount(bytecount)))
300 297 handled_bytes = 0
301 298 repo.ui.progress(_('clone'), 0, total=bytecount)
302 299 start = time.time()
303 300
304 301 tr = repo.transaction('clone')
305 302 try:
306 303 for i in xrange(filecount):
307 304 # XXX doesn't support '\n' or '\r' in filenames
308 305 l = fp.readline()
309 306 try:
310 307 name, size = l.split('\0', 1)
311 308 size = int(size)
312 309 except (ValueError, TypeError):
313 310 raise error.ResponseError(
314 311 _('unexpected response from remote server:'), l)
315 312 if repo.ui.debugflag:
316 313 repo.ui.debug('adding %s (%s)\n' %
317 314 (name, util.bytecount(size)))
318 315 # for backwards compat, name was partially encoded
319 316 with repo.svfs(store.decodedir(name), 'w') as ofp:
320 317 for chunk in util.filechunkiter(fp, limit=size):
321 318 handled_bytes += len(chunk)
322 319 repo.ui.progress(_('clone'), handled_bytes,
323 320 total=bytecount)
324 321 ofp.write(chunk)
325 322 tr.close()
326 323 finally:
327 324 tr.release()
328 325
329 326 # Writing straight to files circumvented the inmemory caches
330 327 repo.invalidate()
331 328
332 329 elapsed = time.time() - start
333 330 if elapsed <= 0:
334 331 elapsed = 0.001
335 332 repo.ui.progress(_('clone'), None)
336 333 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
337 334 (util.bytecount(bytecount), elapsed,
338 335 util.bytecount(bytecount / elapsed)))
339 336 finally:
340 337 lock.release()
341 338
342 339 def applybundlev1(repo, fp):
343 340 """Apply the content from a stream clone bundle version 1.
344 341
345 342 We assume the 4 byte header has been read and validated and the file handle
346 343 is at the 2 byte compression identifier.
347 344 """
348 345 if len(repo):
349 346 raise error.Abort(_('cannot apply stream clone bundle on non-empty '
350 347 'repo'))
351 348
352 349 compression = fp.read(2)
353 350 if compression != 'UN':
354 351 raise error.Abort(_('only uncompressed stream clone bundles are '
355 352 'supported; got %s') % compression)
356 353
357 354 filecount, bytecount = struct.unpack('>QQ', fp.read(16))
358 355 requireslen = struct.unpack('>H', fp.read(2))[0]
359 356 requires = fp.read(requireslen)
360 357
361 358 if not requires.endswith('\0'):
362 359 raise error.Abort(_('malformed stream clone bundle: '
363 360 'requirements not properly encoded'))
364 361
365 362 requirements = set(requires.rstrip('\0').split(','))
366 363 missingreqs = requirements - repo.supportedformats
367 364 if missingreqs:
368 365 raise error.Abort(_('unable to apply stream clone: '
369 366 'unsupported format: %s') %
370 367 ', '.join(sorted(missingreqs)))
371 368
372 369 consumev1(repo, fp, filecount, bytecount)
373 370
374 371 class streamcloneapplier(object):
375 372 """Class to manage applying streaming clone bundles.
376 373
377 374 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
378 375 readers to perform bundle type-specific functionality.
379 376 """
380 377 def __init__(self, fh):
381 378 self._fh = fh
382 379
383 380 def apply(self, repo):
384 381 return applybundlev1(repo, self._fh)
General Comments 0
You need to be logged in to leave comments. Login now