##// END OF EJS Templates
streamclone: also stream caches to the client...
Boris Feld -
r35785:5f5fb279 default
parent child Browse files
Show More
@@ -1,595 +1,634 b''
1 1 # streamclone.py - producing and consuming streaming repository data
2 2 #
3 3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import contextlib
11 11 import os
12 12 import struct
13 13 import tempfile
14 import warnings
14 15
15 16 from .i18n import _
16 17 from . import (
17 18 branchmap,
19 cacheutil,
18 20 error,
19 21 phases,
20 22 store,
21 23 util,
22 24 )
23 25
24 26 def canperformstreamclone(pullop, bundle2=False):
25 27 """Whether it is possible to perform a streaming clone as part of pull.
26 28
27 29 ``bundle2`` will cause the function to consider stream clone through
28 30 bundle2 and only through bundle2.
29 31
30 32 Returns a tuple of (supported, requirements). ``supported`` is True if
31 33 streaming clone is supported and False otherwise. ``requirements`` is
32 34 a set of repo requirements from the remote, or ``None`` if stream clone
33 35 isn't supported.
34 36 """
35 37 repo = pullop.repo
36 38 remote = pullop.remote
37 39
38 40 bundle2supported = False
39 41 if pullop.canusebundle2:
40 42 if 'v2' in pullop.remotebundle2caps.get('stream', []):
41 43 bundle2supported = True
42 44 # else
43 45 # Server doesn't support bundle2 stream clone or doesn't support
44 46 # the versions we support. Fall back and possibly allow legacy.
45 47
46 48 # Ensures legacy code path uses available bundle2.
47 49 if bundle2supported and not bundle2:
48 50 return False, None
49 51 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
50 52 elif bundle2 and not bundle2supported:
51 53 return False, None
52 54
53 55 # Streaming clone only works on empty repositories.
54 56 if len(repo):
55 57 return False, None
56 58
57 59 # Streaming clone only works if all data is being requested.
58 60 if pullop.heads:
59 61 return False, None
60 62
61 63 streamrequested = pullop.streamclonerequested
62 64
63 65 # If we don't have a preference, let the server decide for us. This
64 66 # likely only comes into play in LANs.
65 67 if streamrequested is None:
66 68 # The server can advertise whether to prefer streaming clone.
67 69 streamrequested = remote.capable('stream-preferred')
68 70
69 71 if not streamrequested:
70 72 return False, None
71 73
72 74 # In order for stream clone to work, the client has to support all the
73 75 # requirements advertised by the server.
74 76 #
75 77 # The server advertises its requirements via the "stream" and "streamreqs"
76 78 # capability. "stream" (a value-less capability) is advertised if and only
77 79 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
78 80 # is advertised and contains a comma-delimited list of requirements.
79 81 requirements = set()
80 82 if remote.capable('stream'):
81 83 requirements.add('revlogv1')
82 84 else:
83 85 streamreqs = remote.capable('streamreqs')
84 86 # This is weird and shouldn't happen with modern servers.
85 87 if not streamreqs:
86 88 pullop.repo.ui.warn(_(
87 89 'warning: stream clone requested but server has them '
88 90 'disabled\n'))
89 91 return False, None
90 92
91 93 streamreqs = set(streamreqs.split(','))
92 94 # Server requires something we don't support. Bail.
93 95 missingreqs = streamreqs - repo.supportedformats
94 96 if missingreqs:
95 97 pullop.repo.ui.warn(_(
96 98 'warning: stream clone requested but client is missing '
97 99 'requirements: %s\n') % ', '.join(sorted(missingreqs)))
98 100 pullop.repo.ui.warn(
99 101 _('(see https://www.mercurial-scm.org/wiki/MissingRequirement '
100 102 'for more information)\n'))
101 103 return False, None
102 104 requirements = streamreqs
103 105
104 106 return True, requirements
105 107
106 108 def maybeperformlegacystreamclone(pullop):
107 109 """Possibly perform a legacy stream clone operation.
108 110
109 111 Legacy stream clones are performed as part of pull but before all other
110 112 operations.
111 113
112 114 A legacy stream clone will not be performed if a bundle2 stream clone is
113 115 supported.
114 116 """
115 117 supported, requirements = canperformstreamclone(pullop)
116 118
117 119 if not supported:
118 120 return
119 121
120 122 repo = pullop.repo
121 123 remote = pullop.remote
122 124
123 125 # Save remote branchmap. We will use it later to speed up branchcache
124 126 # creation.
125 127 rbranchmap = None
126 128 if remote.capable('branchmap'):
127 129 rbranchmap = remote.branchmap()
128 130
129 131 repo.ui.status(_('streaming all changes\n'))
130 132
131 133 fp = remote.stream_out()
132 134 l = fp.readline()
133 135 try:
134 136 resp = int(l)
135 137 except ValueError:
136 138 raise error.ResponseError(
137 139 _('unexpected response from remote server:'), l)
138 140 if resp == 1:
139 141 raise error.Abort(_('operation forbidden by server'))
140 142 elif resp == 2:
141 143 raise error.Abort(_('locking the remote repository failed'))
142 144 elif resp != 0:
143 145 raise error.Abort(_('the server sent an unknown error code'))
144 146
145 147 l = fp.readline()
146 148 try:
147 149 filecount, bytecount = map(int, l.split(' ', 1))
148 150 except (ValueError, TypeError):
149 151 raise error.ResponseError(
150 152 _('unexpected response from remote server:'), l)
151 153
152 154 with repo.lock():
153 155 consumev1(repo, fp, filecount, bytecount)
154 156
155 157 # new requirements = old non-format requirements +
156 158 # new format-related remote requirements
157 159 # requirements from the streamed-in repository
158 160 repo.requirements = requirements | (
159 161 repo.requirements - repo.supportedformats)
160 162 repo._applyopenerreqs()
161 163 repo._writerequirements()
162 164
163 165 if rbranchmap:
164 166 branchmap.replacecache(repo, rbranchmap)
165 167
166 168 repo.invalidate()
167 169
168 170 def allowservergeneration(repo):
169 171 """Whether streaming clones are allowed from the server."""
170 172 if not repo.ui.configbool('server', 'uncompressed', untrusted=True):
171 173 return False
172 174
173 175 # The way stream clone works makes it impossible to hide secret changesets.
174 176 # So don't allow this by default.
175 177 secret = phases.hassecret(repo)
176 178 if secret:
177 179 return repo.ui.configbool('server', 'uncompressedallowsecret')
178 180
179 181 return True
180 182
181 183 # This is it's own function so extensions can override it.
182 184 def _walkstreamfiles(repo):
183 185 return repo.store.walk()
184 186
185 187 def generatev1(repo):
186 188 """Emit content for version 1 of a streaming clone.
187 189
188 190 This returns a 3-tuple of (file count, byte size, data iterator).
189 191
190 192 The data iterator consists of N entries for each file being transferred.
191 193 Each file entry starts as a line with the file name and integer size
192 194 delimited by a null byte.
193 195
194 196 The raw file data follows. Following the raw file data is the next file
195 197 entry, or EOF.
196 198
197 199 When used on the wire protocol, an additional line indicating protocol
198 200 success will be prepended to the stream. This function is not responsible
199 201 for adding it.
200 202
201 203 This function will obtain a repository lock to ensure a consistent view of
202 204 the store is captured. It therefore may raise LockError.
203 205 """
204 206 entries = []
205 207 total_bytes = 0
206 208 # Get consistent snapshot of repo, lock during scan.
207 209 with repo.lock():
208 210 repo.ui.debug('scanning\n')
209 211 for name, ename, size in _walkstreamfiles(repo):
210 212 if size:
211 213 entries.append((name, size))
212 214 total_bytes += size
213 215
214 216 repo.ui.debug('%d files, %d bytes to transfer\n' %
215 217 (len(entries), total_bytes))
216 218
217 219 svfs = repo.svfs
218 220 debugflag = repo.ui.debugflag
219 221
220 222 def emitrevlogdata():
221 223 for name, size in entries:
222 224 if debugflag:
223 225 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
224 226 # partially encode name over the wire for backwards compat
225 227 yield '%s\0%d\n' % (store.encodedir(name), size)
226 228 # auditing at this stage is both pointless (paths are already
227 229 # trusted by the local repo) and expensive
228 230 with svfs(name, 'rb', auditpath=False) as fp:
229 231 if size <= 65536:
230 232 yield fp.read(size)
231 233 else:
232 234 for chunk in util.filechunkiter(fp, limit=size):
233 235 yield chunk
234 236
235 237 return len(entries), total_bytes, emitrevlogdata()
236 238
237 239 def generatev1wireproto(repo):
238 240 """Emit content for version 1 of streaming clone suitable for the wire.
239 241
240 242 This is the data output from ``generatev1()`` with 2 header lines. The
241 243 first line indicates overall success. The 2nd contains the file count and
242 244 byte size of payload.
243 245
244 246 The success line contains "0" for success, "1" for stream generation not
245 247 allowed, and "2" for error locking the repository (possibly indicating
246 248 a permissions error for the server process).
247 249 """
248 250 if not allowservergeneration(repo):
249 251 yield '1\n'
250 252 return
251 253
252 254 try:
253 255 filecount, bytecount, it = generatev1(repo)
254 256 except error.LockError:
255 257 yield '2\n'
256 258 return
257 259
258 260 # Indicates successful response.
259 261 yield '0\n'
260 262 yield '%d %d\n' % (filecount, bytecount)
261 263 for chunk in it:
262 264 yield chunk
263 265
264 266 def generatebundlev1(repo, compression='UN'):
265 267 """Emit content for version 1 of a stream clone bundle.
266 268
267 269 The first 4 bytes of the output ("HGS1") denote this as stream clone
268 270 bundle version 1.
269 271
270 272 The next 2 bytes indicate the compression type. Only "UN" is currently
271 273 supported.
272 274
273 275 The next 16 bytes are two 64-bit big endian unsigned integers indicating
274 276 file count and byte count, respectively.
275 277
276 278 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
277 279 of the requirements string, including a trailing \0. The following N bytes
278 280 are the requirements string, which is ASCII containing a comma-delimited
279 281 list of repo requirements that are needed to support the data.
280 282
281 283 The remaining content is the output of ``generatev1()`` (which may be
282 284 compressed in the future).
283 285
284 286 Returns a tuple of (requirements, data generator).
285 287 """
286 288 if compression != 'UN':
287 289 raise ValueError('we do not support the compression argument yet')
288 290
289 291 requirements = repo.requirements & repo.supportedformats
290 292 requires = ','.join(sorted(requirements))
291 293
292 294 def gen():
293 295 yield 'HGS1'
294 296 yield compression
295 297
296 298 filecount, bytecount, it = generatev1(repo)
297 299 repo.ui.status(_('writing %d bytes for %d files\n') %
298 300 (bytecount, filecount))
299 301
300 302 yield struct.pack('>QQ', filecount, bytecount)
301 303 yield struct.pack('>H', len(requires) + 1)
302 304 yield requires + '\0'
303 305
304 306 # This is where we'll add compression in the future.
305 307 assert compression == 'UN'
306 308
307 309 seen = 0
308 310 repo.ui.progress(_('bundle'), 0, total=bytecount, unit=_('bytes'))
309 311
310 312 for chunk in it:
311 313 seen += len(chunk)
312 314 repo.ui.progress(_('bundle'), seen, total=bytecount,
313 315 unit=_('bytes'))
314 316 yield chunk
315 317
316 318 repo.ui.progress(_('bundle'), None)
317 319
318 320 return requirements, gen()
319 321
320 322 def consumev1(repo, fp, filecount, bytecount):
321 323 """Apply the contents from version 1 of a streaming clone file handle.
322 324
323 325 This takes the output from "stream_out" and applies it to the specified
324 326 repository.
325 327
326 328 Like "stream_out," the status line added by the wire protocol is not
327 329 handled by this function.
328 330 """
329 331 with repo.lock():
330 332 repo.ui.status(_('%d files to transfer, %s of data\n') %
331 333 (filecount, util.bytecount(bytecount)))
332 334 handled_bytes = 0
333 335 repo.ui.progress(_('clone'), 0, total=bytecount, unit=_('bytes'))
334 336 start = util.timer()
335 337
336 338 # TODO: get rid of (potential) inconsistency
337 339 #
338 340 # If transaction is started and any @filecache property is
339 341 # changed at this point, it causes inconsistency between
340 342 # in-memory cached property and streamclone-ed file on the
341 343 # disk. Nested transaction prevents transaction scope "clone"
342 344 # below from writing in-memory changes out at the end of it,
343 345 # even though in-memory changes are discarded at the end of it
344 346 # regardless of transaction nesting.
345 347 #
346 348 # But transaction nesting can't be simply prohibited, because
347 349 # nesting occurs also in ordinary case (e.g. enabling
348 350 # clonebundles).
349 351
350 352 with repo.transaction('clone'):
351 353 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
352 354 for i in xrange(filecount):
353 355 # XXX doesn't support '\n' or '\r' in filenames
354 356 l = fp.readline()
355 357 try:
356 358 name, size = l.split('\0', 1)
357 359 size = int(size)
358 360 except (ValueError, TypeError):
359 361 raise error.ResponseError(
360 362 _('unexpected response from remote server:'), l)
361 363 if repo.ui.debugflag:
362 364 repo.ui.debug('adding %s (%s)\n' %
363 365 (name, util.bytecount(size)))
364 366 # for backwards compat, name was partially encoded
365 367 path = store.decodedir(name)
366 368 with repo.svfs(path, 'w', backgroundclose=True) as ofp:
367 369 for chunk in util.filechunkiter(fp, limit=size):
368 370 handled_bytes += len(chunk)
369 371 repo.ui.progress(_('clone'), handled_bytes,
370 372 total=bytecount, unit=_('bytes'))
371 373 ofp.write(chunk)
372 374
373 375 # force @filecache properties to be reloaded from
374 376 # streamclone-ed file at next access
375 377 repo.invalidate(clearfilecache=True)
376 378
377 379 elapsed = util.timer() - start
378 380 if elapsed <= 0:
379 381 elapsed = 0.001
380 382 repo.ui.progress(_('clone'), None)
381 383 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
382 384 (util.bytecount(bytecount), elapsed,
383 385 util.bytecount(bytecount / elapsed)))
384 386
385 387 def readbundle1header(fp):
386 388 compression = fp.read(2)
387 389 if compression != 'UN':
388 390 raise error.Abort(_('only uncompressed stream clone bundles are '
389 391 'supported; got %s') % compression)
390 392
391 393 filecount, bytecount = struct.unpack('>QQ', fp.read(16))
392 394 requireslen = struct.unpack('>H', fp.read(2))[0]
393 395 requires = fp.read(requireslen)
394 396
395 397 if not requires.endswith('\0'):
396 398 raise error.Abort(_('malformed stream clone bundle: '
397 399 'requirements not properly encoded'))
398 400
399 401 requirements = set(requires.rstrip('\0').split(','))
400 402
401 403 return filecount, bytecount, requirements
402 404
403 405 def applybundlev1(repo, fp):
404 406 """Apply the content from a stream clone bundle version 1.
405 407
406 408 We assume the 4 byte header has been read and validated and the file handle
407 409 is at the 2 byte compression identifier.
408 410 """
409 411 if len(repo):
410 412 raise error.Abort(_('cannot apply stream clone bundle on non-empty '
411 413 'repo'))
412 414
413 415 filecount, bytecount, requirements = readbundle1header(fp)
414 416 missingreqs = requirements - repo.supportedformats
415 417 if missingreqs:
416 418 raise error.Abort(_('unable to apply stream clone: '
417 419 'unsupported format: %s') %
418 420 ', '.join(sorted(missingreqs)))
419 421
420 422 consumev1(repo, fp, filecount, bytecount)
421 423
422 424 class streamcloneapplier(object):
423 425 """Class to manage applying streaming clone bundles.
424 426
425 427 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
426 428 readers to perform bundle type-specific functionality.
427 429 """
428 430 def __init__(self, fh):
429 431 self._fh = fh
430 432
431 433 def apply(self, repo):
432 434 return applybundlev1(repo, self._fh)
433 435
434 436 # type of file to stream
435 437 _fileappend = 0 # append only file
436 438 _filefull = 1 # full snapshot file
437 439
440 # Source of the file
441 _srcstore = 's' # store (svfs)
442 _srccache = 'c' # cache (cache)
443
438 444 # This is it's own function so extensions can override it.
439 445 def _walkstreamfullstorefiles(repo):
440 446 """list snapshot file from the store"""
441 447 fnames = []
442 448 if not repo.publishing():
443 449 fnames.append('phaseroots')
444 450 return fnames
445 451
446 def _filterfull(entry, copy, vfs):
452 def _filterfull(entry, copy, vfsmap):
447 453 """actually copy the snapshot files"""
448 name, ftype, data = entry
454 src, name, ftype, data = entry
449 455 if ftype != _filefull:
450 456 return entry
451 return (name, ftype, copy(vfs.join(name)))
457 return (src, name, ftype, copy(vfsmap[src].join(name)))
452 458
453 459 @contextlib.contextmanager
454 460 def maketempcopies():
455 461 """return a function to temporary copy file"""
456 462 files = []
457 463 try:
458 464 def copy(src):
459 465 fd, dst = tempfile.mkstemp()
460 466 os.close(fd)
461 467 files.append(dst)
462 468 util.copyfiles(src, dst, hardlink=True)
463 469 return dst
464 470 yield copy
465 471 finally:
466 472 for tmp in files:
467 473 util.tryunlink(tmp)
468 474
475 def _makemap(repo):
476 """make a (src -> vfs) map for the repo"""
477 vfsmap = {
478 _srcstore: repo.svfs,
479 _srccache: repo.cachevfs,
480 }
481 # we keep repo.vfs out of the on purpose, ther are too many danger there
482 # (eg: .hg/hgrc)
483 assert repo.vfs not in vfsmap.values()
484
485 return vfsmap
486
469 487 def _emit(repo, entries, totalfilesize):
470 488 """actually emit the stream bundle"""
471 vfs = repo.svfs
489 vfsmap = _makemap(repo)
472 490 progress = repo.ui.progress
473 491 progress(_('bundle'), 0, total=totalfilesize, unit=_('bytes'))
474 492 with maketempcopies() as copy:
475 493 try:
476 494 # copy is delayed until we are in the try
477 entries = [_filterfull(e, copy, vfs) for e in entries]
495 entries = [_filterfull(e, copy, vfsmap) for e in entries]
478 496 yield None # this release the lock on the repository
479 497 seen = 0
480 498
481 for name, ftype, data in entries:
499 for src, name, ftype, data in entries:
500 vfs = vfsmap[src]
501 yield src
482 502 yield util.uvarintencode(len(name))
483 503 if ftype == _fileappend:
484 504 fp = vfs(name)
485 505 size = data
486 506 elif ftype == _filefull:
487 507 fp = open(data, 'rb')
488 508 size = util.fstat(fp).st_size
489 509 try:
490 510 yield util.uvarintencode(size)
491 511 yield name
492 512 if size <= 65536:
493 513 chunks = (fp.read(size),)
494 514 else:
495 515 chunks = util.filechunkiter(fp, limit=size)
496 516 for chunk in chunks:
497 517 seen += len(chunk)
498 518 progress(_('bundle'), seen, total=totalfilesize,
499 519 unit=_('bytes'))
500 520 yield chunk
501 521 finally:
502 522 fp.close()
503 523 finally:
504 524 progress(_('bundle'), None)
505 525
506 526 def generatev2(repo):
507 527 """Emit content for version 2 of a streaming clone.
508 528
509 529 the data stream consists the following entries:
510 1) A varint containing the length of the filename
511 2) A varint containing the length of file data
512 3) N bytes containing the filename (the internal, store-agnostic form)
513 4) N bytes containing the file data
530 1) A char representing the file destination (eg: store or cache)
531 2) A varint containing the length of the filename
532 3) A varint containing the length of file data
533 4) N bytes containing the filename (the internal, store-agnostic form)
534 5) N bytes containing the file data
514 535
515 536 Returns a 3-tuple of (file count, file size, data iterator).
516 537 """
517 538
518 539 with repo.lock():
519 540
520 541 entries = []
521 542 totalfilesize = 0
522 543
523 544 repo.ui.debug('scanning\n')
524 545 for name, ename, size in _walkstreamfiles(repo):
525 546 if size:
526 entries.append((name, _fileappend, size))
547 entries.append((_srcstore, name, _fileappend, size))
527 548 totalfilesize += size
528 549 for name in _walkstreamfullstorefiles(repo):
529 550 if repo.svfs.exists(name):
530 551 totalfilesize += repo.svfs.lstat(name).st_size
531 entries.append((name, _filefull, None))
552 entries.append((_srcstore, name, _filefull, None))
553 for name in cacheutil.cachetocopy(repo):
554 if repo.cachevfs.exists(name):
555 totalfilesize += repo.cachevfs.lstat(name).st_size
556 entries.append((_srccache, name, _filefull, None))
532 557
533 558 chunks = _emit(repo, entries, totalfilesize)
534 559 first = next(chunks)
535 560 assert first is None
536 561
537 562 return len(entries), totalfilesize, chunks
538 563
564 @contextlib.contextmanager
565 def nested(*ctxs):
566 with warnings.catch_warnings():
567 # For some reason, Python decided 'nested' was deprecated without
568 # replacement. They officially advertised for filtering the deprecation
569 # warning for people who actually need the feature.
570 warnings.filterwarnings("ignore",category=DeprecationWarning)
571 with contextlib.nested(*ctxs):
572 yield
573
539 574 def consumev2(repo, fp, filecount, filesize):
540 575 """Apply the contents from a version 2 streaming clone.
541 576
542 577 Data is read from an object that only needs to provide a ``read(size)``
543 578 method.
544 579 """
545 580 with repo.lock():
546 581 repo.ui.status(_('%d files to transfer, %s of data\n') %
547 582 (filecount, util.bytecount(filesize)))
548 583
549 584 start = util.timer()
550 585 handledbytes = 0
551 586 progress = repo.ui.progress
552 587
553 588 progress(_('clone'), handledbytes, total=filesize, unit=_('bytes'))
554 589
555 vfs = repo.svfs
590 vfsmap = _makemap(repo)
556 591
557 592 with repo.transaction('clone'):
558 with vfs.backgroundclosing(repo.ui):
593 ctxs = (vfs.backgroundclosing(repo.ui)
594 for vfs in vfsmap.values())
595 with nested(*ctxs):
559 596 for i in range(filecount):
597 src = fp.read(1)
598 vfs = vfsmap[src]
560 599 namelen = util.uvarintdecodestream(fp)
561 600 datalen = util.uvarintdecodestream(fp)
562 601
563 602 name = fp.read(namelen)
564 603
565 604 if repo.ui.debugflag:
566 repo.ui.debug('adding %s (%s)\n' %
567 (name, util.bytecount(datalen)))
605 repo.ui.debug('adding [%s] %s (%s)\n' %
606 (src, name, util.bytecount(datalen)))
568 607
569 608 with vfs(name, 'w') as ofp:
570 609 for chunk in util.filechunkiter(fp, limit=datalen):
571 610 handledbytes += len(chunk)
572 611 progress(_('clone'), handledbytes, total=filesize,
573 612 unit=_('bytes'))
574 613 ofp.write(chunk)
575 614
576 615 # force @filecache properties to be reloaded from
577 616 # streamclone-ed file at next access
578 617 repo.invalidate(clearfilecache=True)
579 618
580 619 elapsed = util.timer() - start
581 620 if elapsed <= 0:
582 621 elapsed = 0.001
583 622 progress(_('clone'), None)
584 623 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
585 624 (util.bytecount(handledbytes), elapsed,
586 625 util.bytecount(handledbytes / elapsed)))
587 626
588 627 def applybundlev2(repo, fp, filecount, filesize, requirements):
589 628 missingreqs = [r for r in requirements if r not in repo.supported]
590 629 if missingreqs:
591 630 raise error.Abort(_('unable to apply stream clone: '
592 631 'unsupported format: %s') %
593 632 ', '.join(sorted(missingreqs)))
594 633
595 634 consumev2(repo, fp, filecount, filesize)
@@ -1,330 +1,336 b''
1 1 #require serve
2 2
3 3 #testcases stream-legacy stream-bundle2
4 4
5 5 #if stream-bundle2
6 6 $ cat << EOF >> $HGRCPATH
7 7 > [experimental]
8 8 > bundle2.stream = yes
9 9 > EOF
10 10 #endif
11 11
12 12 Initialize repository
13 13 the status call is to check for issue5130
14 14
15 15 $ hg init server
16 16 $ cd server
17 17 $ touch foo
18 18 $ hg -q commit -A -m initial
19 19 >>> for i in range(1024):
20 20 ... with open(str(i), 'wb') as fh:
21 21 ... fh.write(str(i))
22 22 $ hg -q commit -A -m 'add a lot of files'
23 23 $ hg st
24 24 $ hg serve -p $HGPORT -d --pid-file=hg.pid
25 25 $ cat hg.pid >> $DAEMON_PIDS
26 26 $ cd ..
27 27
28 28 Basic clone
29 29
30 30 #if stream-legacy
31 31 $ hg clone --stream -U http://localhost:$HGPORT clone1
32 32 streaming all changes
33 33 1027 files to transfer, 96.3 KB of data
34 34 transferred 96.3 KB in * seconds (*/sec) (glob)
35 35 searching for changes
36 36 no changes found
37 37 #endif
38 38 #if stream-bundle2
39 39 $ hg clone --stream -U http://localhost:$HGPORT clone1
40 40 streaming all changes
41 1027 files to transfer, 96.3 KB of data
42 transferred 96.3 KB in * seconds (* */sec) (glob)
41 1030 files to transfer, 96.4 KB of data
42 transferred 96.4 KB in * seconds (* */sec) (glob)
43
44 $ ls -1 clone1/.hg/cache
45 branch2-served
46 rbc-names-v1
47 rbc-revs-v1
43 48 #endif
44 49
45 50 --uncompressed is an alias to --stream
46 51
47 52 #if stream-legacy
48 53 $ hg clone --uncompressed -U http://localhost:$HGPORT clone1-uncompressed
49 54 streaming all changes
50 55 1027 files to transfer, 96.3 KB of data
51 56 transferred 96.3 KB in * seconds (*/sec) (glob)
52 57 searching for changes
53 58 no changes found
54 59 #endif
55 60 #if stream-bundle2
56 61 $ hg clone --uncompressed -U http://localhost:$HGPORT clone1-uncompressed
57 62 streaming all changes
58 1027 files to transfer, 96.3 KB of data
59 transferred 96.3 KB in * seconds (* */sec) (glob)
63 1030 files to transfer, 96.4 KB of data
64 transferred 96.4 KB in * seconds (* */sec) (glob)
60 65 #endif
61 66
62 67 Clone with background file closing enabled
63 68
64 69 #if stream-legacy
65 70 $ hg --debug --config worker.backgroundclose=true --config worker.backgroundcloseminfilecount=1 clone --stream -U http://localhost:$HGPORT clone-background | grep -v adding
66 71 using http://localhost:$HGPORT/
67 72 sending capabilities command
68 73 sending branchmap command
69 74 streaming all changes
70 75 sending stream_out command
71 76 1027 files to transfer, 96.3 KB of data
72 77 starting 4 threads for background file closing
73 78 transferred 96.3 KB in * seconds (*/sec) (glob)
74 79 query 1; heads
75 80 sending batch command
76 81 searching for changes
77 82 all remote heads known locally
78 83 no changes found
79 84 sending getbundle command
80 85 bundle2-input-bundle: with-transaction
81 86 bundle2-input-part: "listkeys" (params: 1 mandatory) supported
82 87 bundle2-input-part: "phase-heads" supported
83 88 bundle2-input-part: total payload size 24
84 89 bundle2-input-bundle: 1 parts total
85 90 checking for updated bookmarks
86 91 #endif
87 92 #if stream-bundle2
88 93 $ hg --debug --config worker.backgroundclose=true --config worker.backgroundcloseminfilecount=1 clone --stream -U http://localhost:$HGPORT clone-background | grep -v adding
89 94 using http://localhost:$HGPORT/
90 95 sending capabilities command
91 96 query 1; heads
92 97 sending batch command
93 98 streaming all changes
94 99 sending getbundle command
95 100 bundle2-input-bundle: with-transaction
96 101 bundle2-input-part: "stream" (params: 4 mandatory) supported
97 102 applying stream bundle
98 1027 files to transfer, 96.3 KB of data
103 1030 files to transfer, 96.4 KB of data
104 starting 4 threads for background file closing
99 105 starting 4 threads for background file closing
100 transferred 96.3 KB in * seconds (* */sec) (glob)
101 bundle2-input-part: total payload size 110887
106 transferred 96.4 KB in * seconds (* */sec) (glob)
107 bundle2-input-part: total payload size 112077
102 108 bundle2-input-part: "listkeys" (params: 1 mandatory) supported
103 109 bundle2-input-bundle: 1 parts total
104 110 checking for updated bookmarks
105 111 #endif
106 112
107 113 Cannot stream clone when there are secret changesets
108 114
109 115 $ hg -R server phase --force --secret -r tip
110 116 $ hg clone --stream -U http://localhost:$HGPORT secret-denied
111 117 warning: stream clone requested but server has them disabled
112 118 requesting all changes
113 119 adding changesets
114 120 adding manifests
115 121 adding file changes
116 122 added 1 changesets with 1 changes to 1 files
117 123 new changesets 96ee1d7354c4
118 124
119 125 $ killdaemons.py
120 126
121 127 Streaming of secrets can be overridden by server config
122 128
123 129 $ cd server
124 130 $ hg serve --config server.uncompressedallowsecret=true -p $HGPORT -d --pid-file=hg.pid
125 131 $ cat hg.pid > $DAEMON_PIDS
126 132 $ cd ..
127 133
128 134 #if stream-legacy
129 135 $ hg clone --stream -U http://localhost:$HGPORT secret-allowed
130 136 streaming all changes
131 137 1027 files to transfer, 96.3 KB of data
132 138 transferred 96.3 KB in * seconds (*/sec) (glob)
133 139 searching for changes
134 140 no changes found
135 141 #endif
136 142 #if stream-bundle2
137 143 $ hg clone --stream -U http://localhost:$HGPORT secret-allowed
138 144 streaming all changes
139 1027 files to transfer, 96.3 KB of data
140 transferred 96.3 KB in * seconds (* */sec) (glob)
145 1030 files to transfer, 96.4 KB of data
146 transferred 96.4 KB in * seconds (* */sec) (glob)
141 147 #endif
142 148
143 149 $ killdaemons.py
144 150
145 151 Verify interaction between preferuncompressed and secret presence
146 152
147 153 $ cd server
148 154 $ hg serve --config server.preferuncompressed=true -p $HGPORT -d --pid-file=hg.pid
149 155 $ cat hg.pid > $DAEMON_PIDS
150 156 $ cd ..
151 157
152 158 $ hg clone -U http://localhost:$HGPORT preferuncompressed-secret
153 159 requesting all changes
154 160 adding changesets
155 161 adding manifests
156 162 adding file changes
157 163 added 1 changesets with 1 changes to 1 files
158 164 new changesets 96ee1d7354c4
159 165
160 166 $ killdaemons.py
161 167
162 168 Clone not allowed when full bundles disabled and can't serve secrets
163 169
164 170 $ cd server
165 171 $ hg serve --config server.disablefullbundle=true -p $HGPORT -d --pid-file=hg.pid
166 172 $ cat hg.pid > $DAEMON_PIDS
167 173 $ cd ..
168 174
169 175 $ hg clone --stream http://localhost:$HGPORT secret-full-disabled
170 176 warning: stream clone requested but server has them disabled
171 177 requesting all changes
172 178 remote: abort: server has pull-based clones disabled
173 179 abort: pull failed on remote
174 180 (remove --pull if specified or upgrade Mercurial)
175 181 [255]
176 182
177 183 Local stream clone with secrets involved
178 184 (This is just a test over behavior: if you have access to the repo's files,
179 185 there is no security so it isn't important to prevent a clone here.)
180 186
181 187 $ hg clone -U --stream server local-secret
182 188 warning: stream clone requested but server has them disabled
183 189 requesting all changes
184 190 adding changesets
185 191 adding manifests
186 192 adding file changes
187 193 added 1 changesets with 1 changes to 1 files
188 194 new changesets 96ee1d7354c4
189 195
190 196 Stream clone while repo is changing:
191 197
192 198 $ mkdir changing
193 199 $ cd changing
194 200
195 201 extension for delaying the server process so we reliably can modify the repo
196 202 while cloning
197 203
198 204 $ cat > delayer.py <<EOF
199 205 > import time
200 206 > from mercurial import extensions, vfs
201 207 > def __call__(orig, self, path, *args, **kwargs):
202 208 > if path == 'data/f1.i':
203 209 > time.sleep(2)
204 210 > return orig(self, path, *args, **kwargs)
205 211 > extensions.wrapfunction(vfs.vfs, '__call__', __call__)
206 212 > EOF
207 213
208 214 prepare repo with small and big file to cover both code paths in emitrevlogdata
209 215
210 216 $ hg init repo
211 217 $ touch repo/f1
212 218 $ $TESTDIR/seq.py 50000 > repo/f2
213 219 $ hg -R repo ci -Aqm "0"
214 220 $ hg serve -R repo -p $HGPORT1 -d --pid-file=hg.pid --config extensions.delayer=delayer.py
215 221 $ cat hg.pid >> $DAEMON_PIDS
216 222
217 223 clone while modifying the repo between stating file with write lock and
218 224 actually serving file content
219 225
220 226 $ hg clone -q --stream -U http://localhost:$HGPORT1 clone &
221 227 $ sleep 1
222 228 $ echo >> repo/f1
223 229 $ echo >> repo/f2
224 230 $ hg -R repo ci -m "1"
225 231 $ wait
226 232 $ hg -R clone id
227 233 000000000000
228 234 $ cd ..
229 235
230 236 Stream repository with bookmarks
231 237 --------------------------------
232 238
233 239 (revert introduction of secret changeset)
234 240
235 241 $ hg -R server phase --draft 'secret()'
236 242
237 243 add a bookmark
238 244
239 245 $ hg -R server bookmark -r tip some-bookmark
240 246
241 247 clone it
242 248
243 249 #if stream-legacy
244 250 $ hg clone --stream http://localhost:$HGPORT with-bookmarks
245 251 streaming all changes
246 252 1027 files to transfer, 96.3 KB of data
247 253 transferred 96.3 KB in * seconds (*) (glob)
248 254 searching for changes
249 255 no changes found
250 256 updating to branch default
251 257 1025 files updated, 0 files merged, 0 files removed, 0 files unresolved
252 258 #endif
253 259 #if stream-bundle2
254 260 $ hg clone --stream http://localhost:$HGPORT with-bookmarks
255 261 streaming all changes
256 1027 files to transfer, 96.3 KB of data
257 transferred 96.3 KB in * seconds (* */sec) (glob)
262 1033 files to transfer, 96.6 KB of data
263 transferred 96.6 KB in * seconds (* */sec) (glob)
258 264 updating to branch default
259 265 1025 files updated, 0 files merged, 0 files removed, 0 files unresolved
260 266 #endif
261 267 $ hg -R with-bookmarks bookmarks
262 268 some-bookmark 1:c17445101a72
263 269
264 270 Stream repository with phases
265 271 -----------------------------
266 272
267 273 Clone as publishing
268 274
269 275 $ hg -R server phase -r 'all()'
270 276 0: draft
271 277 1: draft
272 278
273 279 #if stream-legacy
274 280 $ hg clone --stream http://localhost:$HGPORT phase-publish
275 281 streaming all changes
276 282 1027 files to transfer, 96.3 KB of data
277 283 transferred 96.3 KB in * seconds (*) (glob)
278 284 searching for changes
279 285 no changes found
280 286 updating to branch default
281 287 1025 files updated, 0 files merged, 0 files removed, 0 files unresolved
282 288 #endif
283 289 #if stream-bundle2
284 290 $ hg clone --stream http://localhost:$HGPORT phase-publish
285 291 streaming all changes
286 1027 files to transfer, 96.3 KB of data
287 transferred 96.3 KB in * seconds (* */sec) (glob)
292 1033 files to transfer, 96.6 KB of data
293 transferred 96.6 KB in * seconds (* */sec) (glob)
288 294 updating to branch default
289 295 1025 files updated, 0 files merged, 0 files removed, 0 files unresolved
290 296 #endif
291 297 $ hg -R phase-publish phase -r 'all()'
292 298 0: public
293 299 1: public
294 300
295 301 Clone as non publishing
296 302
297 303 $ cat << EOF >> server/.hg/hgrc
298 304 > [phases]
299 305 > publish = False
300 306 > EOF
301 307 $ killdaemons.py
302 308 $ hg -R server serve -p $HGPORT -d --pid-file=hg.pid
303 309 $ cat hg.pid >> $DAEMON_PIDS
304 310
305 311 #if stream-legacy
306 312 $ hg clone --stream http://localhost:$HGPORT phase-no-publish
307 313 streaming all changes
308 314 1027 files to transfer, 96.3 KB of data
309 315 transferred 96.3 KB in * seconds (*) (glob)
310 316 searching for changes
311 317 no changes found
312 318 updating to branch default
313 319 1025 files updated, 0 files merged, 0 files removed, 0 files unresolved
314 320 $ hg -R phase-no-publish phase -r 'all()'
315 321 0: public
316 322 1: public
317 323 #endif
318 324 #if stream-bundle2
319 325 $ hg clone --stream http://localhost:$HGPORT phase-no-publish
320 326 streaming all changes
321 1028 files to transfer, 96.4 KB of data
322 transferred 96.4 KB in * seconds (* */sec) (glob)
327 1034 files to transfer, 96.7 KB of data
328 transferred 96.7 KB in * seconds (* */sec) (glob)
323 329 updating to branch default
324 330 1025 files updated, 0 files merged, 0 files removed, 0 files unresolved
325 331 $ hg -R phase-no-publish phase -r 'all()'
326 332 0: draft
327 333 1: draft
328 334 #endif
329 335
330 336 $ killdaemons.py
General Comments 0
You need to be logged in to leave comments. Login now