##// END OF EJS Templates
streamclone: use readexactly when reading stream v2...
Boris Feld -
r35821:3ad3aaeb stable
parent child Browse files
Show More
@@ -1,634 +1,634 b''
1 1 # streamclone.py - producing and consuming streaming repository data
2 2 #
3 3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import contextlib
11 11 import os
12 12 import struct
13 13 import tempfile
14 14 import warnings
15 15
16 16 from .i18n import _
17 17 from . import (
18 18 branchmap,
19 19 cacheutil,
20 20 error,
21 21 phases,
22 22 store,
23 23 util,
24 24 )
25 25
26 26 def canperformstreamclone(pullop, bundle2=False):
27 27 """Whether it is possible to perform a streaming clone as part of pull.
28 28
29 29 ``bundle2`` will cause the function to consider stream clone through
30 30 bundle2 and only through bundle2.
31 31
32 32 Returns a tuple of (supported, requirements). ``supported`` is True if
33 33 streaming clone is supported and False otherwise. ``requirements`` is
34 34 a set of repo requirements from the remote, or ``None`` if stream clone
35 35 isn't supported.
36 36 """
37 37 repo = pullop.repo
38 38 remote = pullop.remote
39 39
40 40 bundle2supported = False
41 41 if pullop.canusebundle2:
42 42 if 'v2' in pullop.remotebundle2caps.get('stream', []):
43 43 bundle2supported = True
44 44 # else
45 45 # Server doesn't support bundle2 stream clone or doesn't support
46 46 # the versions we support. Fall back and possibly allow legacy.
47 47
48 48 # Ensures legacy code path uses available bundle2.
49 49 if bundle2supported and not bundle2:
50 50 return False, None
51 51 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
52 52 elif bundle2 and not bundle2supported:
53 53 return False, None
54 54
55 55 # Streaming clone only works on empty repositories.
56 56 if len(repo):
57 57 return False, None
58 58
59 59 # Streaming clone only works if all data is being requested.
60 60 if pullop.heads:
61 61 return False, None
62 62
63 63 streamrequested = pullop.streamclonerequested
64 64
65 65 # If we don't have a preference, let the server decide for us. This
66 66 # likely only comes into play in LANs.
67 67 if streamrequested is None:
68 68 # The server can advertise whether to prefer streaming clone.
69 69 streamrequested = remote.capable('stream-preferred')
70 70
71 71 if not streamrequested:
72 72 return False, None
73 73
74 74 # In order for stream clone to work, the client has to support all the
75 75 # requirements advertised by the server.
76 76 #
77 77 # The server advertises its requirements via the "stream" and "streamreqs"
78 78 # capability. "stream" (a value-less capability) is advertised if and only
79 79 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
80 80 # is advertised and contains a comma-delimited list of requirements.
81 81 requirements = set()
82 82 if remote.capable('stream'):
83 83 requirements.add('revlogv1')
84 84 else:
85 85 streamreqs = remote.capable('streamreqs')
86 86 # This is weird and shouldn't happen with modern servers.
87 87 if not streamreqs:
88 88 pullop.repo.ui.warn(_(
89 89 'warning: stream clone requested but server has them '
90 90 'disabled\n'))
91 91 return False, None
92 92
93 93 streamreqs = set(streamreqs.split(','))
94 94 # Server requires something we don't support. Bail.
95 95 missingreqs = streamreqs - repo.supportedformats
96 96 if missingreqs:
97 97 pullop.repo.ui.warn(_(
98 98 'warning: stream clone requested but client is missing '
99 99 'requirements: %s\n') % ', '.join(sorted(missingreqs)))
100 100 pullop.repo.ui.warn(
101 101 _('(see https://www.mercurial-scm.org/wiki/MissingRequirement '
102 102 'for more information)\n'))
103 103 return False, None
104 104 requirements = streamreqs
105 105
106 106 return True, requirements
107 107
108 108 def maybeperformlegacystreamclone(pullop):
109 109 """Possibly perform a legacy stream clone operation.
110 110
111 111 Legacy stream clones are performed as part of pull but before all other
112 112 operations.
113 113
114 114 A legacy stream clone will not be performed if a bundle2 stream clone is
115 115 supported.
116 116 """
117 117 supported, requirements = canperformstreamclone(pullop)
118 118
119 119 if not supported:
120 120 return
121 121
122 122 repo = pullop.repo
123 123 remote = pullop.remote
124 124
125 125 # Save remote branchmap. We will use it later to speed up branchcache
126 126 # creation.
127 127 rbranchmap = None
128 128 if remote.capable('branchmap'):
129 129 rbranchmap = remote.branchmap()
130 130
131 131 repo.ui.status(_('streaming all changes\n'))
132 132
133 133 fp = remote.stream_out()
134 134 l = fp.readline()
135 135 try:
136 136 resp = int(l)
137 137 except ValueError:
138 138 raise error.ResponseError(
139 139 _('unexpected response from remote server:'), l)
140 140 if resp == 1:
141 141 raise error.Abort(_('operation forbidden by server'))
142 142 elif resp == 2:
143 143 raise error.Abort(_('locking the remote repository failed'))
144 144 elif resp != 0:
145 145 raise error.Abort(_('the server sent an unknown error code'))
146 146
147 147 l = fp.readline()
148 148 try:
149 149 filecount, bytecount = map(int, l.split(' ', 1))
150 150 except (ValueError, TypeError):
151 151 raise error.ResponseError(
152 152 _('unexpected response from remote server:'), l)
153 153
154 154 with repo.lock():
155 155 consumev1(repo, fp, filecount, bytecount)
156 156
157 157 # new requirements = old non-format requirements +
158 158 # new format-related remote requirements
159 159 # requirements from the streamed-in repository
160 160 repo.requirements = requirements | (
161 161 repo.requirements - repo.supportedformats)
162 162 repo._applyopenerreqs()
163 163 repo._writerequirements()
164 164
165 165 if rbranchmap:
166 166 branchmap.replacecache(repo, rbranchmap)
167 167
168 168 repo.invalidate()
169 169
170 170 def allowservergeneration(repo):
171 171 """Whether streaming clones are allowed from the server."""
172 172 if not repo.ui.configbool('server', 'uncompressed', untrusted=True):
173 173 return False
174 174
175 175 # The way stream clone works makes it impossible to hide secret changesets.
176 176 # So don't allow this by default.
177 177 secret = phases.hassecret(repo)
178 178 if secret:
179 179 return repo.ui.configbool('server', 'uncompressedallowsecret')
180 180
181 181 return True
182 182
183 183 # This is it's own function so extensions can override it.
184 184 def _walkstreamfiles(repo):
185 185 return repo.store.walk()
186 186
187 187 def generatev1(repo):
188 188 """Emit content for version 1 of a streaming clone.
189 189
190 190 This returns a 3-tuple of (file count, byte size, data iterator).
191 191
192 192 The data iterator consists of N entries for each file being transferred.
193 193 Each file entry starts as a line with the file name and integer size
194 194 delimited by a null byte.
195 195
196 196 The raw file data follows. Following the raw file data is the next file
197 197 entry, or EOF.
198 198
199 199 When used on the wire protocol, an additional line indicating protocol
200 200 success will be prepended to the stream. This function is not responsible
201 201 for adding it.
202 202
203 203 This function will obtain a repository lock to ensure a consistent view of
204 204 the store is captured. It therefore may raise LockError.
205 205 """
206 206 entries = []
207 207 total_bytes = 0
208 208 # Get consistent snapshot of repo, lock during scan.
209 209 with repo.lock():
210 210 repo.ui.debug('scanning\n')
211 211 for name, ename, size in _walkstreamfiles(repo):
212 212 if size:
213 213 entries.append((name, size))
214 214 total_bytes += size
215 215
216 216 repo.ui.debug('%d files, %d bytes to transfer\n' %
217 217 (len(entries), total_bytes))
218 218
219 219 svfs = repo.svfs
220 220 debugflag = repo.ui.debugflag
221 221
222 222 def emitrevlogdata():
223 223 for name, size in entries:
224 224 if debugflag:
225 225 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
226 226 # partially encode name over the wire for backwards compat
227 227 yield '%s\0%d\n' % (store.encodedir(name), size)
228 228 # auditing at this stage is both pointless (paths are already
229 229 # trusted by the local repo) and expensive
230 230 with svfs(name, 'rb', auditpath=False) as fp:
231 231 if size <= 65536:
232 232 yield fp.read(size)
233 233 else:
234 234 for chunk in util.filechunkiter(fp, limit=size):
235 235 yield chunk
236 236
237 237 return len(entries), total_bytes, emitrevlogdata()
238 238
239 239 def generatev1wireproto(repo):
240 240 """Emit content for version 1 of streaming clone suitable for the wire.
241 241
242 242 This is the data output from ``generatev1()`` with 2 header lines. The
243 243 first line indicates overall success. The 2nd contains the file count and
244 244 byte size of payload.
245 245
246 246 The success line contains "0" for success, "1" for stream generation not
247 247 allowed, and "2" for error locking the repository (possibly indicating
248 248 a permissions error for the server process).
249 249 """
250 250 if not allowservergeneration(repo):
251 251 yield '1\n'
252 252 return
253 253
254 254 try:
255 255 filecount, bytecount, it = generatev1(repo)
256 256 except error.LockError:
257 257 yield '2\n'
258 258 return
259 259
260 260 # Indicates successful response.
261 261 yield '0\n'
262 262 yield '%d %d\n' % (filecount, bytecount)
263 263 for chunk in it:
264 264 yield chunk
265 265
266 266 def generatebundlev1(repo, compression='UN'):
267 267 """Emit content for version 1 of a stream clone bundle.
268 268
269 269 The first 4 bytes of the output ("HGS1") denote this as stream clone
270 270 bundle version 1.
271 271
272 272 The next 2 bytes indicate the compression type. Only "UN" is currently
273 273 supported.
274 274
275 275 The next 16 bytes are two 64-bit big endian unsigned integers indicating
276 276 file count and byte count, respectively.
277 277
278 278 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
279 279 of the requirements string, including a trailing \0. The following N bytes
280 280 are the requirements string, which is ASCII containing a comma-delimited
281 281 list of repo requirements that are needed to support the data.
282 282
283 283 The remaining content is the output of ``generatev1()`` (which may be
284 284 compressed in the future).
285 285
286 286 Returns a tuple of (requirements, data generator).
287 287 """
288 288 if compression != 'UN':
289 289 raise ValueError('we do not support the compression argument yet')
290 290
291 291 requirements = repo.requirements & repo.supportedformats
292 292 requires = ','.join(sorted(requirements))
293 293
294 294 def gen():
295 295 yield 'HGS1'
296 296 yield compression
297 297
298 298 filecount, bytecount, it = generatev1(repo)
299 299 repo.ui.status(_('writing %d bytes for %d files\n') %
300 300 (bytecount, filecount))
301 301
302 302 yield struct.pack('>QQ', filecount, bytecount)
303 303 yield struct.pack('>H', len(requires) + 1)
304 304 yield requires + '\0'
305 305
306 306 # This is where we'll add compression in the future.
307 307 assert compression == 'UN'
308 308
309 309 seen = 0
310 310 repo.ui.progress(_('bundle'), 0, total=bytecount, unit=_('bytes'))
311 311
312 312 for chunk in it:
313 313 seen += len(chunk)
314 314 repo.ui.progress(_('bundle'), seen, total=bytecount,
315 315 unit=_('bytes'))
316 316 yield chunk
317 317
318 318 repo.ui.progress(_('bundle'), None)
319 319
320 320 return requirements, gen()
321 321
322 322 def consumev1(repo, fp, filecount, bytecount):
323 323 """Apply the contents from version 1 of a streaming clone file handle.
324 324
325 325 This takes the output from "stream_out" and applies it to the specified
326 326 repository.
327 327
328 328 Like "stream_out," the status line added by the wire protocol is not
329 329 handled by this function.
330 330 """
331 331 with repo.lock():
332 332 repo.ui.status(_('%d files to transfer, %s of data\n') %
333 333 (filecount, util.bytecount(bytecount)))
334 334 handled_bytes = 0
335 335 repo.ui.progress(_('clone'), 0, total=bytecount, unit=_('bytes'))
336 336 start = util.timer()
337 337
338 338 # TODO: get rid of (potential) inconsistency
339 339 #
340 340 # If transaction is started and any @filecache property is
341 341 # changed at this point, it causes inconsistency between
342 342 # in-memory cached property and streamclone-ed file on the
343 343 # disk. Nested transaction prevents transaction scope "clone"
344 344 # below from writing in-memory changes out at the end of it,
345 345 # even though in-memory changes are discarded at the end of it
346 346 # regardless of transaction nesting.
347 347 #
348 348 # But transaction nesting can't be simply prohibited, because
349 349 # nesting occurs also in ordinary case (e.g. enabling
350 350 # clonebundles).
351 351
352 352 with repo.transaction('clone'):
353 353 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
354 354 for i in xrange(filecount):
355 355 # XXX doesn't support '\n' or '\r' in filenames
356 356 l = fp.readline()
357 357 try:
358 358 name, size = l.split('\0', 1)
359 359 size = int(size)
360 360 except (ValueError, TypeError):
361 361 raise error.ResponseError(
362 362 _('unexpected response from remote server:'), l)
363 363 if repo.ui.debugflag:
364 364 repo.ui.debug('adding %s (%s)\n' %
365 365 (name, util.bytecount(size)))
366 366 # for backwards compat, name was partially encoded
367 367 path = store.decodedir(name)
368 368 with repo.svfs(path, 'w', backgroundclose=True) as ofp:
369 369 for chunk in util.filechunkiter(fp, limit=size):
370 370 handled_bytes += len(chunk)
371 371 repo.ui.progress(_('clone'), handled_bytes,
372 372 total=bytecount, unit=_('bytes'))
373 373 ofp.write(chunk)
374 374
375 375 # force @filecache properties to be reloaded from
376 376 # streamclone-ed file at next access
377 377 repo.invalidate(clearfilecache=True)
378 378
379 379 elapsed = util.timer() - start
380 380 if elapsed <= 0:
381 381 elapsed = 0.001
382 382 repo.ui.progress(_('clone'), None)
383 383 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
384 384 (util.bytecount(bytecount), elapsed,
385 385 util.bytecount(bytecount / elapsed)))
386 386
387 387 def readbundle1header(fp):
388 388 compression = fp.read(2)
389 389 if compression != 'UN':
390 390 raise error.Abort(_('only uncompressed stream clone bundles are '
391 391 'supported; got %s') % compression)
392 392
393 393 filecount, bytecount = struct.unpack('>QQ', fp.read(16))
394 394 requireslen = struct.unpack('>H', fp.read(2))[0]
395 395 requires = fp.read(requireslen)
396 396
397 397 if not requires.endswith('\0'):
398 398 raise error.Abort(_('malformed stream clone bundle: '
399 399 'requirements not properly encoded'))
400 400
401 401 requirements = set(requires.rstrip('\0').split(','))
402 402
403 403 return filecount, bytecount, requirements
404 404
405 405 def applybundlev1(repo, fp):
406 406 """Apply the content from a stream clone bundle version 1.
407 407
408 408 We assume the 4 byte header has been read and validated and the file handle
409 409 is at the 2 byte compression identifier.
410 410 """
411 411 if len(repo):
412 412 raise error.Abort(_('cannot apply stream clone bundle on non-empty '
413 413 'repo'))
414 414
415 415 filecount, bytecount, requirements = readbundle1header(fp)
416 416 missingreqs = requirements - repo.supportedformats
417 417 if missingreqs:
418 418 raise error.Abort(_('unable to apply stream clone: '
419 419 'unsupported format: %s') %
420 420 ', '.join(sorted(missingreqs)))
421 421
422 422 consumev1(repo, fp, filecount, bytecount)
423 423
424 424 class streamcloneapplier(object):
425 425 """Class to manage applying streaming clone bundles.
426 426
427 427 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
428 428 readers to perform bundle type-specific functionality.
429 429 """
430 430 def __init__(self, fh):
431 431 self._fh = fh
432 432
433 433 def apply(self, repo):
434 434 return applybundlev1(repo, self._fh)
435 435
436 436 # type of file to stream
437 437 _fileappend = 0 # append only file
438 438 _filefull = 1 # full snapshot file
439 439
440 440 # Source of the file
441 441 _srcstore = 's' # store (svfs)
442 442 _srccache = 'c' # cache (cache)
443 443
444 444 # This is it's own function so extensions can override it.
445 445 def _walkstreamfullstorefiles(repo):
446 446 """list snapshot file from the store"""
447 447 fnames = []
448 448 if not repo.publishing():
449 449 fnames.append('phaseroots')
450 450 return fnames
451 451
452 452 def _filterfull(entry, copy, vfsmap):
453 453 """actually copy the snapshot files"""
454 454 src, name, ftype, data = entry
455 455 if ftype != _filefull:
456 456 return entry
457 457 return (src, name, ftype, copy(vfsmap[src].join(name)))
458 458
459 459 @contextlib.contextmanager
460 460 def maketempcopies():
461 461 """return a function to temporary copy file"""
462 462 files = []
463 463 try:
464 464 def copy(src):
465 465 fd, dst = tempfile.mkstemp()
466 466 os.close(fd)
467 467 files.append(dst)
468 468 util.copyfiles(src, dst, hardlink=True)
469 469 return dst
470 470 yield copy
471 471 finally:
472 472 for tmp in files:
473 473 util.tryunlink(tmp)
474 474
475 475 def _makemap(repo):
476 476 """make a (src -> vfs) map for the repo"""
477 477 vfsmap = {
478 478 _srcstore: repo.svfs,
479 479 _srccache: repo.cachevfs,
480 480 }
481 481 # we keep repo.vfs out of the on purpose, ther are too many danger there
482 482 # (eg: .hg/hgrc)
483 483 assert repo.vfs not in vfsmap.values()
484 484
485 485 return vfsmap
486 486
487 487 def _emit2(repo, entries, totalfilesize):
488 488 """actually emit the stream bundle"""
489 489 vfsmap = _makemap(repo)
490 490 progress = repo.ui.progress
491 491 progress(_('bundle'), 0, total=totalfilesize, unit=_('bytes'))
492 492 with maketempcopies() as copy:
493 493 try:
494 494 # copy is delayed until we are in the try
495 495 entries = [_filterfull(e, copy, vfsmap) for e in entries]
496 496 yield None # this release the lock on the repository
497 497 seen = 0
498 498
499 499 for src, name, ftype, data in entries:
500 500 vfs = vfsmap[src]
501 501 yield src
502 502 yield util.uvarintencode(len(name))
503 503 if ftype == _fileappend:
504 504 fp = vfs(name)
505 505 size = data
506 506 elif ftype == _filefull:
507 507 fp = open(data, 'rb')
508 508 size = util.fstat(fp).st_size
509 509 try:
510 510 yield util.uvarintencode(size)
511 511 yield name
512 512 if size <= 65536:
513 513 chunks = (fp.read(size),)
514 514 else:
515 515 chunks = util.filechunkiter(fp, limit=size)
516 516 for chunk in chunks:
517 517 seen += len(chunk)
518 518 progress(_('bundle'), seen, total=totalfilesize,
519 519 unit=_('bytes'))
520 520 yield chunk
521 521 finally:
522 522 fp.close()
523 523 finally:
524 524 progress(_('bundle'), None)
525 525
526 526 def generatev2(repo):
527 527 """Emit content for version 2 of a streaming clone.
528 528
529 529 the data stream consists the following entries:
530 530 1) A char representing the file destination (eg: store or cache)
531 531 2) A varint containing the length of the filename
532 532 3) A varint containing the length of file data
533 533 4) N bytes containing the filename (the internal, store-agnostic form)
534 534 5) N bytes containing the file data
535 535
536 536 Returns a 3-tuple of (file count, file size, data iterator).
537 537 """
538 538
539 539 with repo.lock():
540 540
541 541 entries = []
542 542 totalfilesize = 0
543 543
544 544 repo.ui.debug('scanning\n')
545 545 for name, ename, size in _walkstreamfiles(repo):
546 546 if size:
547 547 entries.append((_srcstore, name, _fileappend, size))
548 548 totalfilesize += size
549 549 for name in _walkstreamfullstorefiles(repo):
550 550 if repo.svfs.exists(name):
551 551 totalfilesize += repo.svfs.lstat(name).st_size
552 552 entries.append((_srcstore, name, _filefull, None))
553 553 for name in cacheutil.cachetocopy(repo):
554 554 if repo.cachevfs.exists(name):
555 555 totalfilesize += repo.cachevfs.lstat(name).st_size
556 556 entries.append((_srccache, name, _filefull, None))
557 557
558 558 chunks = _emit2(repo, entries, totalfilesize)
559 559 first = next(chunks)
560 560 assert first is None
561 561
562 562 return len(entries), totalfilesize, chunks
563 563
564 564 @contextlib.contextmanager
565 565 def nested(*ctxs):
566 566 with warnings.catch_warnings():
567 567 # For some reason, Python decided 'nested' was deprecated without
568 568 # replacement. They officially advertised for filtering the deprecation
569 569 # warning for people who actually need the feature.
570 570 warnings.filterwarnings("ignore",category=DeprecationWarning)
571 571 with contextlib.nested(*ctxs):
572 572 yield
573 573
574 574 def consumev2(repo, fp, filecount, filesize):
575 575 """Apply the contents from a version 2 streaming clone.
576 576
577 577 Data is read from an object that only needs to provide a ``read(size)``
578 578 method.
579 579 """
580 580 with repo.lock():
581 581 repo.ui.status(_('%d files to transfer, %s of data\n') %
582 582 (filecount, util.bytecount(filesize)))
583 583
584 584 start = util.timer()
585 585 handledbytes = 0
586 586 progress = repo.ui.progress
587 587
588 588 progress(_('clone'), handledbytes, total=filesize, unit=_('bytes'))
589 589
590 590 vfsmap = _makemap(repo)
591 591
592 592 with repo.transaction('clone'):
593 593 ctxs = (vfs.backgroundclosing(repo.ui)
594 594 for vfs in vfsmap.values())
595 595 with nested(*ctxs):
596 596 for i in range(filecount):
597 src = fp.read(1)
597 src = util.readexactly(fp, 1)
598 598 vfs = vfsmap[src]
599 599 namelen = util.uvarintdecodestream(fp)
600 600 datalen = util.uvarintdecodestream(fp)
601 601
602 name = fp.read(namelen)
602 name = util.readexactly(fp, namelen)
603 603
604 604 if repo.ui.debugflag:
605 605 repo.ui.debug('adding [%s] %s (%s)\n' %
606 606 (src, name, util.bytecount(datalen)))
607 607
608 608 with vfs(name, 'w') as ofp:
609 609 for chunk in util.filechunkiter(fp, limit=datalen):
610 610 handledbytes += len(chunk)
611 611 progress(_('clone'), handledbytes, total=filesize,
612 612 unit=_('bytes'))
613 613 ofp.write(chunk)
614 614
615 615 # force @filecache properties to be reloaded from
616 616 # streamclone-ed file at next access
617 617 repo.invalidate(clearfilecache=True)
618 618
619 619 elapsed = util.timer() - start
620 620 if elapsed <= 0:
621 621 elapsed = 0.001
622 622 progress(_('clone'), None)
623 623 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
624 624 (util.bytecount(handledbytes), elapsed,
625 625 util.bytecount(handledbytes / elapsed)))
626 626
627 627 def applybundlev2(repo, fp, filecount, filesize, requirements):
628 628 missingreqs = [r for r in requirements if r not in repo.supported]
629 629 if missingreqs:
630 630 raise error.Abort(_('unable to apply stream clone: '
631 631 'unsupported format: %s') %
632 632 ', '.join(sorted(missingreqs)))
633 633
634 634 consumev2(repo, fp, filecount, filesize)
General Comments 0
You need to be logged in to leave comments. Login now