##// END OF EJS Templates
stream-clone: factor computation of requirement of a stream clone...
marmoute -
r49443:8475a136 default
parent child Browse files
Show More
@@ -1,932 +1,941 b''
1 1 # streamclone.py - producing and consuming streaming repository data
2 2 #
3 3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import contextlib
11 11 import errno
12 12 import os
13 13 import struct
14 14
15 15 from .i18n import _
16 16 from .pycompat import open
17 17 from .interfaces import repository
18 18 from . import (
19 19 bookmarks,
20 20 cacheutil,
21 21 error,
22 22 narrowspec,
23 23 phases,
24 24 pycompat,
25 25 requirements as requirementsmod,
26 26 scmutil,
27 27 store,
28 28 util,
29 29 )
30 30 from .utils import (
31 31 stringutil,
32 32 )
33 33
34 34
35 35 def new_stream_clone_requirements(
36 36 supported_formats, default_requirements, streamed_requirements
37 37 ):
38 38 """determine the final set of requirement for a new stream clone
39 39
40 40 this method combine the "default" requirements that a new repository would
41 41 use with the constaint we get from the stream clone content. We keep local
42 42 configuration choice when possible.
43 43 """
44 44 requirements = set(default_requirements)
45 45 requirements -= supported_formats
46 46 requirements.update(streamed_requirements)
47 47 return requirements
48 48
49 49
50 def streamed_requirements(repo):
51 """the set of requirement the new clone will have to support
52
53 This is used for advertising the stream options and to generate the actual
54 stream content."""
55 requiredformats = repo.requirements & repo.supportedformats
56 return requiredformats
57
58
50 59 def canperformstreamclone(pullop, bundle2=False):
51 60 """Whether it is possible to perform a streaming clone as part of pull.
52 61
53 62 ``bundle2`` will cause the function to consider stream clone through
54 63 bundle2 and only through bundle2.
55 64
56 65 Returns a tuple of (supported, requirements). ``supported`` is True if
57 66 streaming clone is supported and False otherwise. ``requirements`` is
58 67 a set of repo requirements from the remote, or ``None`` if stream clone
59 68 isn't supported.
60 69 """
61 70 repo = pullop.repo
62 71 remote = pullop.remote
63 72
64 73 bundle2supported = False
65 74 if pullop.canusebundle2:
66 75 if b'v2' in pullop.remotebundle2caps.get(b'stream', []):
67 76 bundle2supported = True
68 77 # else
69 78 # Server doesn't support bundle2 stream clone or doesn't support
70 79 # the versions we support. Fall back and possibly allow legacy.
71 80
72 81 # Ensures legacy code path uses available bundle2.
73 82 if bundle2supported and not bundle2:
74 83 return False, None
75 84 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
76 85 elif bundle2 and not bundle2supported:
77 86 return False, None
78 87
79 88 # Streaming clone only works on empty repositories.
80 89 if len(repo):
81 90 return False, None
82 91
83 92 # Streaming clone only works if all data is being requested.
84 93 if pullop.heads:
85 94 return False, None
86 95
87 96 streamrequested = pullop.streamclonerequested
88 97
89 98 # If we don't have a preference, let the server decide for us. This
90 99 # likely only comes into play in LANs.
91 100 if streamrequested is None:
92 101 # The server can advertise whether to prefer streaming clone.
93 102 streamrequested = remote.capable(b'stream-preferred')
94 103
95 104 if not streamrequested:
96 105 return False, None
97 106
98 107 # In order for stream clone to work, the client has to support all the
99 108 # requirements advertised by the server.
100 109 #
101 110 # The server advertises its requirements via the "stream" and "streamreqs"
102 111 # capability. "stream" (a value-less capability) is advertised if and only
103 112 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
104 113 # is advertised and contains a comma-delimited list of requirements.
105 114 requirements = set()
106 115 if remote.capable(b'stream'):
107 116 requirements.add(requirementsmod.REVLOGV1_REQUIREMENT)
108 117 else:
109 118 streamreqs = remote.capable(b'streamreqs')
110 119 # This is weird and shouldn't happen with modern servers.
111 120 if not streamreqs:
112 121 pullop.repo.ui.warn(
113 122 _(
114 123 b'warning: stream clone requested but server has them '
115 124 b'disabled\n'
116 125 )
117 126 )
118 127 return False, None
119 128
120 129 streamreqs = set(streamreqs.split(b','))
121 130 # Server requires something we don't support. Bail.
122 131 missingreqs = streamreqs - repo.supportedformats
123 132 if missingreqs:
124 133 pullop.repo.ui.warn(
125 134 _(
126 135 b'warning: stream clone requested but client is missing '
127 136 b'requirements: %s\n'
128 137 )
129 138 % b', '.join(sorted(missingreqs))
130 139 )
131 140 pullop.repo.ui.warn(
132 141 _(
133 142 b'(see https://www.mercurial-scm.org/wiki/MissingRequirement '
134 143 b'for more information)\n'
135 144 )
136 145 )
137 146 return False, None
138 147 requirements = streamreqs
139 148
140 149 return True, requirements
141 150
142 151
143 152 def maybeperformlegacystreamclone(pullop):
144 153 """Possibly perform a legacy stream clone operation.
145 154
146 155 Legacy stream clones are performed as part of pull but before all other
147 156 operations.
148 157
149 158 A legacy stream clone will not be performed if a bundle2 stream clone is
150 159 supported.
151 160 """
152 161 from . import localrepo
153 162
154 163 supported, requirements = canperformstreamclone(pullop)
155 164
156 165 if not supported:
157 166 return
158 167
159 168 repo = pullop.repo
160 169 remote = pullop.remote
161 170
162 171 # Save remote branchmap. We will use it later to speed up branchcache
163 172 # creation.
164 173 rbranchmap = None
165 174 if remote.capable(b'branchmap'):
166 175 with remote.commandexecutor() as e:
167 176 rbranchmap = e.callcommand(b'branchmap', {}).result()
168 177
169 178 repo.ui.status(_(b'streaming all changes\n'))
170 179
171 180 with remote.commandexecutor() as e:
172 181 fp = e.callcommand(b'stream_out', {}).result()
173 182
174 183 # TODO strictly speaking, this code should all be inside the context
175 184 # manager because the context manager is supposed to ensure all wire state
176 185 # is flushed when exiting. But the legacy peers don't do this, so it
177 186 # doesn't matter.
178 187 l = fp.readline()
179 188 try:
180 189 resp = int(l)
181 190 except ValueError:
182 191 raise error.ResponseError(
183 192 _(b'unexpected response from remote server:'), l
184 193 )
185 194 if resp == 1:
186 195 raise error.Abort(_(b'operation forbidden by server'))
187 196 elif resp == 2:
188 197 raise error.Abort(_(b'locking the remote repository failed'))
189 198 elif resp != 0:
190 199 raise error.Abort(_(b'the server sent an unknown error code'))
191 200
192 201 l = fp.readline()
193 202 try:
194 203 filecount, bytecount = map(int, l.split(b' ', 1))
195 204 except (ValueError, TypeError):
196 205 raise error.ResponseError(
197 206 _(b'unexpected response from remote server:'), l
198 207 )
199 208
200 209 with repo.lock():
201 210 consumev1(repo, fp, filecount, bytecount)
202 211 repo.requirements = new_stream_clone_requirements(
203 212 repo.supportedformats,
204 213 repo.requirements,
205 214 requirements,
206 215 )
207 216 repo.svfs.options = localrepo.resolvestorevfsoptions(
208 217 repo.ui, repo.requirements, repo.features
209 218 )
210 219 scmutil.writereporequirements(repo)
211 220
212 221 if rbranchmap:
213 222 repo._branchcaches.replace(repo, rbranchmap)
214 223
215 224 repo.invalidate()
216 225
217 226
218 227 def allowservergeneration(repo):
219 228 """Whether streaming clones are allowed from the server."""
220 229 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
221 230 return False
222 231
223 232 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
224 233 return False
225 234
226 235 # The way stream clone works makes it impossible to hide secret changesets.
227 236 # So don't allow this by default.
228 237 secret = phases.hassecret(repo)
229 238 if secret:
230 239 return repo.ui.configbool(b'server', b'uncompressedallowsecret')
231 240
232 241 return True
233 242
234 243
235 244 # This is it's own function so extensions can override it.
236 245 def _walkstreamfiles(repo, matcher=None):
237 246 return repo.store.walk(matcher)
238 247
239 248
240 249 def generatev1(repo):
241 250 """Emit content for version 1 of a streaming clone.
242 251
243 252 This returns a 3-tuple of (file count, byte size, data iterator).
244 253
245 254 The data iterator consists of N entries for each file being transferred.
246 255 Each file entry starts as a line with the file name and integer size
247 256 delimited by a null byte.
248 257
249 258 The raw file data follows. Following the raw file data is the next file
250 259 entry, or EOF.
251 260
252 261 When used on the wire protocol, an additional line indicating protocol
253 262 success will be prepended to the stream. This function is not responsible
254 263 for adding it.
255 264
256 265 This function will obtain a repository lock to ensure a consistent view of
257 266 the store is captured. It therefore may raise LockError.
258 267 """
259 268 entries = []
260 269 total_bytes = 0
261 270 # Get consistent snapshot of repo, lock during scan.
262 271 with repo.lock():
263 272 repo.ui.debug(b'scanning\n')
264 273 for file_type, name, size in _walkstreamfiles(repo):
265 274 if size:
266 275 entries.append((name, size))
267 276 total_bytes += size
268 277 _test_sync_point_walk_1(repo)
269 278 _test_sync_point_walk_2(repo)
270 279
271 280 repo.ui.debug(
272 281 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
273 282 )
274 283
275 284 svfs = repo.svfs
276 285 debugflag = repo.ui.debugflag
277 286
278 287 def emitrevlogdata():
279 288 for name, size in entries:
280 289 if debugflag:
281 290 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
282 291 # partially encode name over the wire for backwards compat
283 292 yield b'%s\0%d\n' % (store.encodedir(name), size)
284 293 # auditing at this stage is both pointless (paths are already
285 294 # trusted by the local repo) and expensive
286 295 with svfs(name, b'rb', auditpath=False) as fp:
287 296 if size <= 65536:
288 297 yield fp.read(size)
289 298 else:
290 299 for chunk in util.filechunkiter(fp, limit=size):
291 300 yield chunk
292 301
293 302 return len(entries), total_bytes, emitrevlogdata()
294 303
295 304
296 305 def generatev1wireproto(repo):
297 306 """Emit content for version 1 of streaming clone suitable for the wire.
298 307
299 308 This is the data output from ``generatev1()`` with 2 header lines. The
300 309 first line indicates overall success. The 2nd contains the file count and
301 310 byte size of payload.
302 311
303 312 The success line contains "0" for success, "1" for stream generation not
304 313 allowed, and "2" for error locking the repository (possibly indicating
305 314 a permissions error for the server process).
306 315 """
307 316 if not allowservergeneration(repo):
308 317 yield b'1\n'
309 318 return
310 319
311 320 try:
312 321 filecount, bytecount, it = generatev1(repo)
313 322 except error.LockError:
314 323 yield b'2\n'
315 324 return
316 325
317 326 # Indicates successful response.
318 327 yield b'0\n'
319 328 yield b'%d %d\n' % (filecount, bytecount)
320 329 for chunk in it:
321 330 yield chunk
322 331
323 332
324 333 def generatebundlev1(repo, compression=b'UN'):
325 334 """Emit content for version 1 of a stream clone bundle.
326 335
327 336 The first 4 bytes of the output ("HGS1") denote this as stream clone
328 337 bundle version 1.
329 338
330 339 The next 2 bytes indicate the compression type. Only "UN" is currently
331 340 supported.
332 341
333 342 The next 16 bytes are two 64-bit big endian unsigned integers indicating
334 343 file count and byte count, respectively.
335 344
336 345 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
337 346 of the requirements string, including a trailing \0. The following N bytes
338 347 are the requirements string, which is ASCII containing a comma-delimited
339 348 list of repo requirements that are needed to support the data.
340 349
341 350 The remaining content is the output of ``generatev1()`` (which may be
342 351 compressed in the future).
343 352
344 353 Returns a tuple of (requirements, data generator).
345 354 """
346 355 if compression != b'UN':
347 356 raise ValueError(b'we do not support the compression argument yet')
348 357
349 requirements = repo.requirements & repo.supportedformats
358 requirements = streamed_requirements(repo)
350 359 requires = b','.join(sorted(requirements))
351 360
352 361 def gen():
353 362 yield b'HGS1'
354 363 yield compression
355 364
356 365 filecount, bytecount, it = generatev1(repo)
357 366 repo.ui.status(
358 367 _(b'writing %d bytes for %d files\n') % (bytecount, filecount)
359 368 )
360 369
361 370 yield struct.pack(b'>QQ', filecount, bytecount)
362 371 yield struct.pack(b'>H', len(requires) + 1)
363 372 yield requires + b'\0'
364 373
365 374 # This is where we'll add compression in the future.
366 375 assert compression == b'UN'
367 376
368 377 progress = repo.ui.makeprogress(
369 378 _(b'bundle'), total=bytecount, unit=_(b'bytes')
370 379 )
371 380 progress.update(0)
372 381
373 382 for chunk in it:
374 383 progress.increment(step=len(chunk))
375 384 yield chunk
376 385
377 386 progress.complete()
378 387
379 388 return requirements, gen()
380 389
381 390
382 391 def consumev1(repo, fp, filecount, bytecount):
383 392 """Apply the contents from version 1 of a streaming clone file handle.
384 393
385 394 This takes the output from "stream_out" and applies it to the specified
386 395 repository.
387 396
388 397 Like "stream_out," the status line added by the wire protocol is not
389 398 handled by this function.
390 399 """
391 400 with repo.lock():
392 401 repo.ui.status(
393 402 _(b'%d files to transfer, %s of data\n')
394 403 % (filecount, util.bytecount(bytecount))
395 404 )
396 405 progress = repo.ui.makeprogress(
397 406 _(b'clone'), total=bytecount, unit=_(b'bytes')
398 407 )
399 408 progress.update(0)
400 409 start = util.timer()
401 410
402 411 # TODO: get rid of (potential) inconsistency
403 412 #
404 413 # If transaction is started and any @filecache property is
405 414 # changed at this point, it causes inconsistency between
406 415 # in-memory cached property and streamclone-ed file on the
407 416 # disk. Nested transaction prevents transaction scope "clone"
408 417 # below from writing in-memory changes out at the end of it,
409 418 # even though in-memory changes are discarded at the end of it
410 419 # regardless of transaction nesting.
411 420 #
412 421 # But transaction nesting can't be simply prohibited, because
413 422 # nesting occurs also in ordinary case (e.g. enabling
414 423 # clonebundles).
415 424
416 425 with repo.transaction(b'clone'):
417 426 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
418 427 for i in pycompat.xrange(filecount):
419 428 # XXX doesn't support '\n' or '\r' in filenames
420 429 l = fp.readline()
421 430 try:
422 431 name, size = l.split(b'\0', 1)
423 432 size = int(size)
424 433 except (ValueError, TypeError):
425 434 raise error.ResponseError(
426 435 _(b'unexpected response from remote server:'), l
427 436 )
428 437 if repo.ui.debugflag:
429 438 repo.ui.debug(
430 439 b'adding %s (%s)\n' % (name, util.bytecount(size))
431 440 )
432 441 # for backwards compat, name was partially encoded
433 442 path = store.decodedir(name)
434 443 with repo.svfs(path, b'w', backgroundclose=True) as ofp:
435 444 for chunk in util.filechunkiter(fp, limit=size):
436 445 progress.increment(step=len(chunk))
437 446 ofp.write(chunk)
438 447
439 448 # force @filecache properties to be reloaded from
440 449 # streamclone-ed file at next access
441 450 repo.invalidate(clearfilecache=True)
442 451
443 452 elapsed = util.timer() - start
444 453 if elapsed <= 0:
445 454 elapsed = 0.001
446 455 progress.complete()
447 456 repo.ui.status(
448 457 _(b'transferred %s in %.1f seconds (%s/sec)\n')
449 458 % (
450 459 util.bytecount(bytecount),
451 460 elapsed,
452 461 util.bytecount(bytecount / elapsed),
453 462 )
454 463 )
455 464
456 465
457 466 def readbundle1header(fp):
458 467 compression = fp.read(2)
459 468 if compression != b'UN':
460 469 raise error.Abort(
461 470 _(
462 471 b'only uncompressed stream clone bundles are '
463 472 b'supported; got %s'
464 473 )
465 474 % compression
466 475 )
467 476
468 477 filecount, bytecount = struct.unpack(b'>QQ', fp.read(16))
469 478 requireslen = struct.unpack(b'>H', fp.read(2))[0]
470 479 requires = fp.read(requireslen)
471 480
472 481 if not requires.endswith(b'\0'):
473 482 raise error.Abort(
474 483 _(
475 484 b'malformed stream clone bundle: '
476 485 b'requirements not properly encoded'
477 486 )
478 487 )
479 488
480 489 requirements = set(requires.rstrip(b'\0').split(b','))
481 490
482 491 return filecount, bytecount, requirements
483 492
484 493
485 494 def applybundlev1(repo, fp):
486 495 """Apply the content from a stream clone bundle version 1.
487 496
488 497 We assume the 4 byte header has been read and validated and the file handle
489 498 is at the 2 byte compression identifier.
490 499 """
491 500 if len(repo):
492 501 raise error.Abort(
493 502 _(b'cannot apply stream clone bundle on non-empty repo')
494 503 )
495 504
496 505 filecount, bytecount, requirements = readbundle1header(fp)
497 506 missingreqs = requirements - repo.supportedformats
498 507 if missingreqs:
499 508 raise error.Abort(
500 509 _(b'unable to apply stream clone: unsupported format: %s')
501 510 % b', '.join(sorted(missingreqs))
502 511 )
503 512
504 513 consumev1(repo, fp, filecount, bytecount)
505 514
506 515
507 516 class streamcloneapplier(object):
508 517 """Class to manage applying streaming clone bundles.
509 518
510 519 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
511 520 readers to perform bundle type-specific functionality.
512 521 """
513 522
514 523 def __init__(self, fh):
515 524 self._fh = fh
516 525
517 526 def apply(self, repo):
518 527 return applybundlev1(repo, self._fh)
519 528
520 529
521 530 # type of file to stream
522 531 _fileappend = 0 # append only file
523 532 _filefull = 1 # full snapshot file
524 533
525 534 # Source of the file
526 535 _srcstore = b's' # store (svfs)
527 536 _srccache = b'c' # cache (cache)
528 537
529 538 # This is it's own function so extensions can override it.
530 539 def _walkstreamfullstorefiles(repo):
531 540 """list snapshot file from the store"""
532 541 fnames = []
533 542 if not repo.publishing():
534 543 fnames.append(b'phaseroots')
535 544 return fnames
536 545
537 546
538 547 def _filterfull(entry, copy, vfsmap):
539 548 """actually copy the snapshot files"""
540 549 src, name, ftype, data = entry
541 550 if ftype != _filefull:
542 551 return entry
543 552 return (src, name, ftype, copy(vfsmap[src].join(name)))
544 553
545 554
546 555 @contextlib.contextmanager
547 556 def maketempcopies():
548 557 """return a function to temporary copy file"""
549 558 files = []
550 559 try:
551 560
552 561 def copy(src):
553 562 fd, dst = pycompat.mkstemp()
554 563 os.close(fd)
555 564 files.append(dst)
556 565 util.copyfiles(src, dst, hardlink=True)
557 566 return dst
558 567
559 568 yield copy
560 569 finally:
561 570 for tmp in files:
562 571 util.tryunlink(tmp)
563 572
564 573
565 574 def _makemap(repo):
566 575 """make a (src -> vfs) map for the repo"""
567 576 vfsmap = {
568 577 _srcstore: repo.svfs,
569 578 _srccache: repo.cachevfs,
570 579 }
571 580 # we keep repo.vfs out of the on purpose, ther are too many danger there
572 581 # (eg: .hg/hgrc)
573 582 assert repo.vfs not in vfsmap.values()
574 583
575 584 return vfsmap
576 585
577 586
578 587 def _emit2(repo, entries, totalfilesize):
579 588 """actually emit the stream bundle"""
580 589 vfsmap = _makemap(repo)
581 590 # we keep repo.vfs out of the on purpose, ther are too many danger there
582 591 # (eg: .hg/hgrc),
583 592 #
584 593 # this assert is duplicated (from _makemap) as author might think this is
585 594 # fine, while this is really not fine.
586 595 if repo.vfs in vfsmap.values():
587 596 raise error.ProgrammingError(
588 597 b'repo.vfs must not be added to vfsmap for security reasons'
589 598 )
590 599
591 600 progress = repo.ui.makeprogress(
592 601 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
593 602 )
594 603 progress.update(0)
595 604 with maketempcopies() as copy, progress:
596 605 # copy is delayed until we are in the try
597 606 entries = [_filterfull(e, copy, vfsmap) for e in entries]
598 607 yield None # this release the lock on the repository
599 608 totalbytecount = 0
600 609
601 610 for src, name, ftype, data in entries:
602 611 vfs = vfsmap[src]
603 612 yield src
604 613 yield util.uvarintencode(len(name))
605 614 if ftype == _fileappend:
606 615 fp = vfs(name)
607 616 size = data
608 617 elif ftype == _filefull:
609 618 fp = open(data, b'rb')
610 619 size = util.fstat(fp).st_size
611 620 bytecount = 0
612 621 try:
613 622 yield util.uvarintencode(size)
614 623 yield name
615 624 if size <= 65536:
616 625 chunks = (fp.read(size),)
617 626 else:
618 627 chunks = util.filechunkiter(fp, limit=size)
619 628 for chunk in chunks:
620 629 bytecount += len(chunk)
621 630 totalbytecount += len(chunk)
622 631 progress.update(totalbytecount)
623 632 yield chunk
624 633 if bytecount != size:
625 634 # Would most likely be caused by a race due to `hg strip` or
626 635 # a revlog split
627 636 raise error.Abort(
628 637 _(
629 638 b'clone could only read %d bytes from %s, but '
630 639 b'expected %d bytes'
631 640 )
632 641 % (bytecount, name, size)
633 642 )
634 643 finally:
635 644 fp.close()
636 645
637 646
638 647 def _test_sync_point_walk_1(repo):
639 648 """a function for synchronisation during tests"""
640 649
641 650
642 651 def _test_sync_point_walk_2(repo):
643 652 """a function for synchronisation during tests"""
644 653
645 654
646 655 def _v2_walk(repo, includes, excludes, includeobsmarkers):
647 656 """emit a seris of files information useful to clone a repo
648 657
649 658 return (entries, totalfilesize)
650 659
651 660 entries is a list of tuple (vfs-key, file-path, file-type, size)
652 661
653 662 - `vfs-key`: is a key to the right vfs to write the file (see _makemap)
654 663 - `name`: file path of the file to copy (to be feed to the vfss)
655 664 - `file-type`: do this file need to be copied with the source lock ?
656 665 - `size`: the size of the file (or None)
657 666 """
658 667 assert repo._currentlock(repo._lockref) is not None
659 668 entries = []
660 669 totalfilesize = 0
661 670
662 671 matcher = None
663 672 if includes or excludes:
664 673 matcher = narrowspec.match(repo.root, includes, excludes)
665 674
666 675 for rl_type, name, size in _walkstreamfiles(repo, matcher):
667 676 if size:
668 677 ft = _fileappend
669 678 if rl_type & store.FILEFLAGS_VOLATILE:
670 679 ft = _filefull
671 680 entries.append((_srcstore, name, ft, size))
672 681 totalfilesize += size
673 682 for name in _walkstreamfullstorefiles(repo):
674 683 if repo.svfs.exists(name):
675 684 totalfilesize += repo.svfs.lstat(name).st_size
676 685 entries.append((_srcstore, name, _filefull, None))
677 686 if includeobsmarkers and repo.svfs.exists(b'obsstore'):
678 687 totalfilesize += repo.svfs.lstat(b'obsstore').st_size
679 688 entries.append((_srcstore, b'obsstore', _filefull, None))
680 689 for name in cacheutil.cachetocopy(repo):
681 690 if repo.cachevfs.exists(name):
682 691 totalfilesize += repo.cachevfs.lstat(name).st_size
683 692 entries.append((_srccache, name, _filefull, None))
684 693 return entries, totalfilesize
685 694
686 695
687 696 def generatev2(repo, includes, excludes, includeobsmarkers):
688 697 """Emit content for version 2 of a streaming clone.
689 698
690 699 the data stream consists the following entries:
691 700 1) A char representing the file destination (eg: store or cache)
692 701 2) A varint containing the length of the filename
693 702 3) A varint containing the length of file data
694 703 4) N bytes containing the filename (the internal, store-agnostic form)
695 704 5) N bytes containing the file data
696 705
697 706 Returns a 3-tuple of (file count, file size, data iterator).
698 707 """
699 708
700 709 with repo.lock():
701 710
702 711 repo.ui.debug(b'scanning\n')
703 712
704 713 entries, totalfilesize = _v2_walk(
705 714 repo,
706 715 includes=includes,
707 716 excludes=excludes,
708 717 includeobsmarkers=includeobsmarkers,
709 718 )
710 719
711 720 chunks = _emit2(repo, entries, totalfilesize)
712 721 first = next(chunks)
713 722 assert first is None
714 723 _test_sync_point_walk_1(repo)
715 724 _test_sync_point_walk_2(repo)
716 725
717 726 return len(entries), totalfilesize, chunks
718 727
719 728
720 729 @contextlib.contextmanager
721 730 def nested(*ctxs):
722 731 this = ctxs[0]
723 732 rest = ctxs[1:]
724 733 with this:
725 734 if rest:
726 735 with nested(*rest):
727 736 yield
728 737 else:
729 738 yield
730 739
731 740
732 741 def consumev2(repo, fp, filecount, filesize):
733 742 """Apply the contents from a version 2 streaming clone.
734 743
735 744 Data is read from an object that only needs to provide a ``read(size)``
736 745 method.
737 746 """
738 747 with repo.lock():
739 748 repo.ui.status(
740 749 _(b'%d files to transfer, %s of data\n')
741 750 % (filecount, util.bytecount(filesize))
742 751 )
743 752
744 753 start = util.timer()
745 754 progress = repo.ui.makeprogress(
746 755 _(b'clone'), total=filesize, unit=_(b'bytes')
747 756 )
748 757 progress.update(0)
749 758
750 759 vfsmap = _makemap(repo)
751 760 # we keep repo.vfs out of the on purpose, ther are too many danger
752 761 # there (eg: .hg/hgrc),
753 762 #
754 763 # this assert is duplicated (from _makemap) as author might think this
755 764 # is fine, while this is really not fine.
756 765 if repo.vfs in vfsmap.values():
757 766 raise error.ProgrammingError(
758 767 b'repo.vfs must not be added to vfsmap for security reasons'
759 768 )
760 769
761 770 with repo.transaction(b'clone'):
762 771 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
763 772 with nested(*ctxs):
764 773 for i in range(filecount):
765 774 src = util.readexactly(fp, 1)
766 775 vfs = vfsmap[src]
767 776 namelen = util.uvarintdecodestream(fp)
768 777 datalen = util.uvarintdecodestream(fp)
769 778
770 779 name = util.readexactly(fp, namelen)
771 780
772 781 if repo.ui.debugflag:
773 782 repo.ui.debug(
774 783 b'adding [%s] %s (%s)\n'
775 784 % (src, name, util.bytecount(datalen))
776 785 )
777 786
778 787 with vfs(name, b'w') as ofp:
779 788 for chunk in util.filechunkiter(fp, limit=datalen):
780 789 progress.increment(step=len(chunk))
781 790 ofp.write(chunk)
782 791
783 792 # force @filecache properties to be reloaded from
784 793 # streamclone-ed file at next access
785 794 repo.invalidate(clearfilecache=True)
786 795
787 796 elapsed = util.timer() - start
788 797 if elapsed <= 0:
789 798 elapsed = 0.001
790 799 repo.ui.status(
791 800 _(b'transferred %s in %.1f seconds (%s/sec)\n')
792 801 % (
793 802 util.bytecount(progress.pos),
794 803 elapsed,
795 804 util.bytecount(progress.pos / elapsed),
796 805 )
797 806 )
798 807 progress.complete()
799 808
800 809
801 810 def applybundlev2(repo, fp, filecount, filesize, requirements):
802 811 from . import localrepo
803 812
804 813 missingreqs = [r for r in requirements if r not in repo.supported]
805 814 if missingreqs:
806 815 raise error.Abort(
807 816 _(b'unable to apply stream clone: unsupported format: %s')
808 817 % b', '.join(sorted(missingreqs))
809 818 )
810 819
811 820 consumev2(repo, fp, filecount, filesize)
812 821
813 822 repo.requirements = new_stream_clone_requirements(
814 823 repo.supportedformats,
815 824 repo.requirements,
816 825 requirements,
817 826 )
818 827 repo.svfs.options = localrepo.resolvestorevfsoptions(
819 828 repo.ui, repo.requirements, repo.features
820 829 )
821 830 scmutil.writereporequirements(repo)
822 831
823 832
824 833 def _copy_files(src_vfs_map, dst_vfs_map, entries, progress):
825 834 hardlink = [True]
826 835
827 836 def copy_used():
828 837 hardlink[0] = False
829 838 progress.topic = _(b'copying')
830 839
831 840 for k, path, size in entries:
832 841 src_vfs = src_vfs_map[k]
833 842 dst_vfs = dst_vfs_map[k]
834 843 src_path = src_vfs.join(path)
835 844 dst_path = dst_vfs.join(path)
836 845 # We cannot use dirname and makedirs of dst_vfs here because the store
837 846 # encoding confuses them. See issue 6581 for details.
838 847 dirname = os.path.dirname(dst_path)
839 848 if not os.path.exists(dirname):
840 849 util.makedirs(dirname)
841 850 dst_vfs.register_file(path)
842 851 # XXX we could use the #nb_bytes argument.
843 852 util.copyfile(
844 853 src_path,
845 854 dst_path,
846 855 hardlink=hardlink[0],
847 856 no_hardlink_cb=copy_used,
848 857 check_fs_hardlink=False,
849 858 )
850 859 progress.increment()
851 860 return hardlink[0]
852 861
853 862
854 863 def local_copy(src_repo, dest_repo):
855 864 """copy all content from one local repository to another
856 865
857 866 This is useful for local clone"""
858 867 src_store_requirements = {
859 868 r
860 869 for r in src_repo.requirements
861 870 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
862 871 }
863 872 dest_store_requirements = {
864 873 r
865 874 for r in dest_repo.requirements
866 875 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
867 876 }
868 877 assert src_store_requirements == dest_store_requirements
869 878
870 879 with dest_repo.lock():
871 880 with src_repo.lock():
872 881
873 882 # bookmark is not integrated to the streaming as it might use the
874 883 # `repo.vfs` and they are too many sentitive data accessible
875 884 # through `repo.vfs` to expose it to streaming clone.
876 885 src_book_vfs = bookmarks.bookmarksvfs(src_repo)
877 886 srcbookmarks = src_book_vfs.join(b'bookmarks')
878 887 bm_count = 0
879 888 if os.path.exists(srcbookmarks):
880 889 bm_count = 1
881 890
882 891 entries, totalfilesize = _v2_walk(
883 892 src_repo,
884 893 includes=None,
885 894 excludes=None,
886 895 includeobsmarkers=True,
887 896 )
888 897 src_vfs_map = _makemap(src_repo)
889 898 dest_vfs_map = _makemap(dest_repo)
890 899 progress = src_repo.ui.makeprogress(
891 900 topic=_(b'linking'),
892 901 total=len(entries) + bm_count,
893 902 unit=_(b'files'),
894 903 )
895 904 # copy files
896 905 #
897 906 # We could copy the full file while the source repository is locked
898 907 # and the other one without the lock. However, in the linking case,
899 908 # this would also requires checks that nobody is appending any data
900 909 # to the files while we do the clone, so this is not done yet. We
901 910 # could do this blindly when copying files.
902 911 files = ((k, path, size) for k, path, ftype, size in entries)
903 912 hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress)
904 913
905 914 # copy bookmarks over
906 915 if bm_count:
907 916 dst_book_vfs = bookmarks.bookmarksvfs(dest_repo)
908 917 dstbookmarks = dst_book_vfs.join(b'bookmarks')
909 918 util.copyfile(srcbookmarks, dstbookmarks)
910 919 progress.complete()
911 920 if hardlink:
912 921 msg = b'linked %d files\n'
913 922 else:
914 923 msg = b'copied %d files\n'
915 924 src_repo.ui.debug(msg % (len(entries) + bm_count))
916 925
917 926 with dest_repo.transaction(b"localclone") as tr:
918 927 dest_repo.store.write(tr)
919 928
920 929 # clean up transaction file as they do not make sense
921 930 undo_files = [(dest_repo.svfs, b'undo.backupfiles')]
922 931 undo_files.extend(dest_repo.undofiles())
923 932 for undovfs, undofile in undo_files:
924 933 try:
925 934 undovfs.unlink(undofile)
926 935 except OSError as e:
927 936 if e.errno != errno.ENOENT:
928 937 msg = _(b'error removing %s: %s\n')
929 938 path = undovfs.join(undofile)
930 939 e_msg = stringutil.forcebytestr(e)
931 940 msg %= (path, e_msg)
932 941 dest_repo.ui.warn(msg)
@@ -1,739 +1,739 b''
1 1 # wireprotov1server.py - Wire protocol version 1 server functionality
2 2 #
3 3 # Copyright 2005-2010 Olivia Mackall <olivia@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import binascii
11 11 import os
12 12
13 13 from .i18n import _
14 14 from .node import hex
15 15 from .pycompat import getattr
16 16
17 17 from . import (
18 18 bundle2,
19 19 bundlecaches,
20 20 changegroup as changegroupmod,
21 21 discovery,
22 22 encoding,
23 23 error,
24 24 exchange,
25 25 pushkey as pushkeymod,
26 26 pycompat,
27 27 requirements as requirementsmod,
28 28 streamclone,
29 29 util,
30 30 wireprototypes,
31 31 )
32 32
33 33 from .utils import (
34 34 procutil,
35 35 stringutil,
36 36 )
37 37
38 38 urlerr = util.urlerr
39 39 urlreq = util.urlreq
40 40
41 41 bundle2requiredmain = _(b'incompatible Mercurial client; bundle2 required')
42 42 bundle2requiredhint = _(
43 43 b'see https://www.mercurial-scm.org/wiki/IncompatibleClient'
44 44 )
45 45 bundle2required = b'%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
46 46
47 47
48 48 def clientcompressionsupport(proto):
49 49 """Returns a list of compression methods supported by the client.
50 50
51 51 Returns a list of the compression methods supported by the client
52 52 according to the protocol capabilities. If no such capability has
53 53 been announced, fallback to the default of zlib and uncompressed.
54 54 """
55 55 for cap in proto.getprotocaps():
56 56 if cap.startswith(b'comp='):
57 57 return cap[5:].split(b',')
58 58 return [b'zlib', b'none']
59 59
60 60
61 61 # wire protocol command can either return a string or one of these classes.
62 62
63 63
64 64 def getdispatchrepo(repo, proto, command):
65 65 """Obtain the repo used for processing wire protocol commands.
66 66
67 67 The intent of this function is to serve as a monkeypatch point for
68 68 extensions that need commands to operate on different repo views under
69 69 specialized circumstances.
70 70 """
71 71 viewconfig = repo.ui.config(b'server', b'view')
72 72 return repo.filtered(viewconfig)
73 73
74 74
75 75 def dispatch(repo, proto, command):
76 76 repo = getdispatchrepo(repo, proto, command)
77 77
78 78 func, spec = commands[command]
79 79 args = proto.getargs(spec)
80 80
81 81 return func(repo, proto, *args)
82 82
83 83
84 84 def options(cmd, keys, others):
85 85 opts = {}
86 86 for k in keys:
87 87 if k in others:
88 88 opts[k] = others[k]
89 89 del others[k]
90 90 if others:
91 91 procutil.stderr.write(
92 92 b"warning: %s ignored unexpected arguments %s\n"
93 93 % (cmd, b",".join(others))
94 94 )
95 95 return opts
96 96
97 97
98 98 def bundle1allowed(repo, action):
99 99 """Whether a bundle1 operation is allowed from the server.
100 100
101 101 Priority is:
102 102
103 103 1. server.bundle1gd.<action> (if generaldelta active)
104 104 2. server.bundle1.<action>
105 105 3. server.bundle1gd (if generaldelta active)
106 106 4. server.bundle1
107 107 """
108 108 ui = repo.ui
109 109 gd = requirementsmod.GENERALDELTA_REQUIREMENT in repo.requirements
110 110
111 111 if gd:
112 112 v = ui.configbool(b'server', b'bundle1gd.%s' % action)
113 113 if v is not None:
114 114 return v
115 115
116 116 v = ui.configbool(b'server', b'bundle1.%s' % action)
117 117 if v is not None:
118 118 return v
119 119
120 120 if gd:
121 121 v = ui.configbool(b'server', b'bundle1gd')
122 122 if v is not None:
123 123 return v
124 124
125 125 return ui.configbool(b'server', b'bundle1')
126 126
127 127
128 128 commands = wireprototypes.commanddict()
129 129
130 130
131 131 def wireprotocommand(name, args=None, permission=b'push'):
132 132 """Decorator to declare a wire protocol command.
133 133
134 134 ``name`` is the name of the wire protocol command being provided.
135 135
136 136 ``args`` defines the named arguments accepted by the command. It is
137 137 a space-delimited list of argument names. ``*`` denotes a special value
138 138 that says to accept all named arguments.
139 139
140 140 ``permission`` defines the permission type needed to run this command.
141 141 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
142 142 respectively. Default is to assume command requires ``push`` permissions
143 143 because otherwise commands not declaring their permissions could modify
144 144 a repository that is supposed to be read-only.
145 145 """
146 146 transports = {
147 147 k for k, v in wireprototypes.TRANSPORTS.items() if v[b'version'] == 1
148 148 }
149 149
150 150 if permission not in (b'push', b'pull'):
151 151 raise error.ProgrammingError(
152 152 b'invalid wire protocol permission; '
153 153 b'got %s; expected "push" or "pull"' % permission
154 154 )
155 155
156 156 if args is None:
157 157 args = b''
158 158
159 159 if not isinstance(args, bytes):
160 160 raise error.ProgrammingError(
161 161 b'arguments for version 1 commands must be declared as bytes'
162 162 )
163 163
164 164 def register(func):
165 165 if name in commands:
166 166 raise error.ProgrammingError(
167 167 b'%s command already registered for version 1' % name
168 168 )
169 169 commands[name] = wireprototypes.commandentry(
170 170 func, args=args, transports=transports, permission=permission
171 171 )
172 172
173 173 return func
174 174
175 175 return register
176 176
177 177
178 178 # TODO define a more appropriate permissions type to use for this.
179 179 @wireprotocommand(b'batch', b'cmds *', permission=b'pull')
180 180 def batch(repo, proto, cmds, others):
181 181 unescapearg = wireprototypes.unescapebatcharg
182 182 res = []
183 183 for pair in cmds.split(b';'):
184 184 op, args = pair.split(b' ', 1)
185 185 vals = {}
186 186 for a in args.split(b','):
187 187 if a:
188 188 n, v = a.split(b'=')
189 189 vals[unescapearg(n)] = unescapearg(v)
190 190 func, spec = commands[op]
191 191
192 192 # Validate that client has permissions to perform this command.
193 193 perm = commands[op].permission
194 194 assert perm in (b'push', b'pull')
195 195 proto.checkperm(perm)
196 196
197 197 if spec:
198 198 keys = spec.split()
199 199 data = {}
200 200 for k in keys:
201 201 if k == b'*':
202 202 star = {}
203 203 for key in vals.keys():
204 204 if key not in keys:
205 205 star[key] = vals[key]
206 206 data[b'*'] = star
207 207 else:
208 208 data[k] = vals[k]
209 209 result = func(repo, proto, *[data[k] for k in keys])
210 210 else:
211 211 result = func(repo, proto)
212 212 if isinstance(result, wireprototypes.ooberror):
213 213 return result
214 214
215 215 # For now, all batchable commands must return bytesresponse or
216 216 # raw bytes (for backwards compatibility).
217 217 assert isinstance(result, (wireprototypes.bytesresponse, bytes))
218 218 if isinstance(result, wireprototypes.bytesresponse):
219 219 result = result.data
220 220 res.append(wireprototypes.escapebatcharg(result))
221 221
222 222 return wireprototypes.bytesresponse(b';'.join(res))
223 223
224 224
225 225 @wireprotocommand(b'between', b'pairs', permission=b'pull')
226 226 def between(repo, proto, pairs):
227 227 pairs = [wireprototypes.decodelist(p, b'-') for p in pairs.split(b" ")]
228 228 r = []
229 229 for b in repo.between(pairs):
230 230 r.append(wireprototypes.encodelist(b) + b"\n")
231 231
232 232 return wireprototypes.bytesresponse(b''.join(r))
233 233
234 234
235 235 @wireprotocommand(b'branchmap', permission=b'pull')
236 236 def branchmap(repo, proto):
237 237 branchmap = repo.branchmap()
238 238 heads = []
239 239 for branch, nodes in pycompat.iteritems(branchmap):
240 240 branchname = urlreq.quote(encoding.fromlocal(branch))
241 241 branchnodes = wireprototypes.encodelist(nodes)
242 242 heads.append(b'%s %s' % (branchname, branchnodes))
243 243
244 244 return wireprototypes.bytesresponse(b'\n'.join(heads))
245 245
246 246
247 247 @wireprotocommand(b'branches', b'nodes', permission=b'pull')
248 248 def branches(repo, proto, nodes):
249 249 nodes = wireprototypes.decodelist(nodes)
250 250 r = []
251 251 for b in repo.branches(nodes):
252 252 r.append(wireprototypes.encodelist(b) + b"\n")
253 253
254 254 return wireprototypes.bytesresponse(b''.join(r))
255 255
256 256
257 257 @wireprotocommand(b'clonebundles', b'', permission=b'pull')
258 258 def clonebundles(repo, proto):
259 259 """Server command for returning info for available bundles to seed clones.
260 260
261 261 Clients will parse this response and determine what bundle to fetch.
262 262
263 263 Extensions may wrap this command to filter or dynamically emit data
264 264 depending on the request. e.g. you could advertise URLs for the closest
265 265 data center given the client's IP address.
266 266 """
267 267 return wireprototypes.bytesresponse(
268 268 repo.vfs.tryread(bundlecaches.CB_MANIFEST_FILE)
269 269 )
270 270
271 271
272 272 wireprotocaps = [
273 273 b'lookup',
274 274 b'branchmap',
275 275 b'pushkey',
276 276 b'known',
277 277 b'getbundle',
278 278 b'unbundlehash',
279 279 ]
280 280
281 281
282 282 def _capabilities(repo, proto):
283 283 """return a list of capabilities for a repo
284 284
285 285 This function exists to allow extensions to easily wrap capabilities
286 286 computation
287 287
288 288 - returns a lists: easy to alter
289 289 - change done here will be propagated to both `capabilities` and `hello`
290 290 command without any other action needed.
291 291 """
292 292 # copy to prevent modification of the global list
293 293 caps = list(wireprotocaps)
294 294
295 295 # Command of same name as capability isn't exposed to version 1 of
296 296 # transports. So conditionally add it.
297 297 if commands.commandavailable(b'changegroupsubset', proto):
298 298 caps.append(b'changegroupsubset')
299 299
300 300 if streamclone.allowservergeneration(repo):
301 301 if repo.ui.configbool(b'server', b'preferuncompressed'):
302 302 caps.append(b'stream-preferred')
303 requiredformats = repo.requirements & repo.supportedformats
303 requiredformats = streamclone.streamed_requirements(repo)
304 304 # if our local revlogs are just revlogv1, add 'stream' cap
305 305 if not requiredformats - {requirementsmod.REVLOGV1_REQUIREMENT}:
306 306 caps.append(b'stream')
307 307 # otherwise, add 'streamreqs' detailing our local revlog format
308 308 else:
309 309 caps.append(b'streamreqs=%s' % b','.join(sorted(requiredformats)))
310 310 if repo.ui.configbool(b'experimental', b'bundle2-advertise'):
311 311 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role=b'server'))
312 312 caps.append(b'bundle2=' + urlreq.quote(capsblob))
313 313 caps.append(b'unbundle=%s' % b','.join(bundle2.bundlepriority))
314 314
315 315 if repo.ui.configbool(b'experimental', b'narrow'):
316 316 caps.append(wireprototypes.NARROWCAP)
317 317 if repo.ui.configbool(b'experimental', b'narrowservebrokenellipses'):
318 318 caps.append(wireprototypes.ELLIPSESCAP)
319 319
320 320 return proto.addcapabilities(repo, caps)
321 321
322 322
323 323 # If you are writing an extension and consider wrapping this function. Wrap
324 324 # `_capabilities` instead.
325 325 @wireprotocommand(b'capabilities', permission=b'pull')
326 326 def capabilities(repo, proto):
327 327 caps = _capabilities(repo, proto)
328 328 return wireprototypes.bytesresponse(b' '.join(sorted(caps)))
329 329
330 330
331 331 @wireprotocommand(b'changegroup', b'roots', permission=b'pull')
332 332 def changegroup(repo, proto, roots):
333 333 nodes = wireprototypes.decodelist(roots)
334 334 outgoing = discovery.outgoing(
335 335 repo, missingroots=nodes, ancestorsof=repo.heads()
336 336 )
337 337 cg = changegroupmod.makechangegroup(repo, outgoing, b'01', b'serve')
338 338 gen = iter(lambda: cg.read(32768), b'')
339 339 return wireprototypes.streamres(gen=gen)
340 340
341 341
342 342 @wireprotocommand(b'changegroupsubset', b'bases heads', permission=b'pull')
343 343 def changegroupsubset(repo, proto, bases, heads):
344 344 bases = wireprototypes.decodelist(bases)
345 345 heads = wireprototypes.decodelist(heads)
346 346 outgoing = discovery.outgoing(repo, missingroots=bases, ancestorsof=heads)
347 347 cg = changegroupmod.makechangegroup(repo, outgoing, b'01', b'serve')
348 348 gen = iter(lambda: cg.read(32768), b'')
349 349 return wireprototypes.streamres(gen=gen)
350 350
351 351
352 352 @wireprotocommand(b'debugwireargs', b'one two *', permission=b'pull')
353 353 def debugwireargs(repo, proto, one, two, others):
354 354 # only accept optional args from the known set
355 355 opts = options(b'debugwireargs', [b'three', b'four'], others)
356 356 return wireprototypes.bytesresponse(
357 357 repo.debugwireargs(one, two, **pycompat.strkwargs(opts))
358 358 )
359 359
360 360
361 361 def find_pullbundle(repo, proto, opts, clheads, heads, common):
362 362 """Return a file object for the first matching pullbundle.
363 363
364 364 Pullbundles are specified in .hg/pullbundles.manifest similar to
365 365 clonebundles.
366 366 For each entry, the bundle specification is checked for compatibility:
367 367 - Client features vs the BUNDLESPEC.
368 368 - Revisions shared with the clients vs base revisions of the bundle.
369 369 A bundle can be applied only if all its base revisions are known by
370 370 the client.
371 371 - At least one leaf of the bundle's DAG is missing on the client.
372 372 - Every leaf of the bundle's DAG is part of node set the client wants.
373 373 E.g. do not send a bundle of all changes if the client wants only
374 374 one specific branch of many.
375 375 """
376 376
377 377 def decodehexstring(s):
378 378 return {binascii.unhexlify(h) for h in s.split(b';')}
379 379
380 380 manifest = repo.vfs.tryread(b'pullbundles.manifest')
381 381 if not manifest:
382 382 return None
383 383 res = bundlecaches.parseclonebundlesmanifest(repo, manifest)
384 384 res = bundlecaches.filterclonebundleentries(repo, res)
385 385 if not res:
386 386 return None
387 387 cl = repo.unfiltered().changelog
388 388 heads_anc = cl.ancestors([cl.rev(rev) for rev in heads], inclusive=True)
389 389 common_anc = cl.ancestors([cl.rev(rev) for rev in common], inclusive=True)
390 390 compformats = clientcompressionsupport(proto)
391 391 for entry in res:
392 392 comp = entry.get(b'COMPRESSION')
393 393 altcomp = util.compengines._bundlenames.get(comp)
394 394 if comp and comp not in compformats and altcomp not in compformats:
395 395 continue
396 396 # No test yet for VERSION, since V2 is supported by any client
397 397 # that advertises partial pulls
398 398 if b'heads' in entry:
399 399 try:
400 400 bundle_heads = decodehexstring(entry[b'heads'])
401 401 except TypeError:
402 402 # Bad heads entry
403 403 continue
404 404 if bundle_heads.issubset(common):
405 405 continue # Nothing new
406 406 if all(cl.rev(rev) in common_anc for rev in bundle_heads):
407 407 continue # Still nothing new
408 408 if any(
409 409 cl.rev(rev) not in heads_anc and cl.rev(rev) not in common_anc
410 410 for rev in bundle_heads
411 411 ):
412 412 continue
413 413 if b'bases' in entry:
414 414 try:
415 415 bundle_bases = decodehexstring(entry[b'bases'])
416 416 except TypeError:
417 417 # Bad bases entry
418 418 continue
419 419 if not all(cl.rev(rev) in common_anc for rev in bundle_bases):
420 420 continue
421 421 path = entry[b'URL']
422 422 repo.ui.debug(b'sending pullbundle "%s"\n' % path)
423 423 try:
424 424 return repo.vfs.open(path)
425 425 except IOError:
426 426 repo.ui.debug(b'pullbundle "%s" not accessible\n' % path)
427 427 continue
428 428 return None
429 429
430 430
431 431 @wireprotocommand(b'getbundle', b'*', permission=b'pull')
432 432 def getbundle(repo, proto, others):
433 433 opts = options(
434 434 b'getbundle', wireprototypes.GETBUNDLE_ARGUMENTS.keys(), others
435 435 )
436 436 for k, v in pycompat.iteritems(opts):
437 437 keytype = wireprototypes.GETBUNDLE_ARGUMENTS[k]
438 438 if keytype == b'nodes':
439 439 opts[k] = wireprototypes.decodelist(v)
440 440 elif keytype == b'csv':
441 441 opts[k] = list(v.split(b','))
442 442 elif keytype == b'scsv':
443 443 opts[k] = set(v.split(b','))
444 444 elif keytype == b'boolean':
445 445 # Client should serialize False as '0', which is a non-empty string
446 446 # so it evaluates as a True bool.
447 447 if v == b'0':
448 448 opts[k] = False
449 449 else:
450 450 opts[k] = bool(v)
451 451 elif keytype != b'plain':
452 452 raise KeyError(b'unknown getbundle option type %s' % keytype)
453 453
454 454 if not bundle1allowed(repo, b'pull'):
455 455 if not exchange.bundle2requested(opts.get(b'bundlecaps')):
456 456 if proto.name == b'http-v1':
457 457 return wireprototypes.ooberror(bundle2required)
458 458 raise error.Abort(bundle2requiredmain, hint=bundle2requiredhint)
459 459
460 460 try:
461 461 clheads = set(repo.changelog.heads())
462 462 heads = set(opts.get(b'heads', set()))
463 463 common = set(opts.get(b'common', set()))
464 464 common.discard(repo.nullid)
465 465 if (
466 466 repo.ui.configbool(b'server', b'pullbundle')
467 467 and b'partial-pull' in proto.getprotocaps()
468 468 ):
469 469 # Check if a pre-built bundle covers this request.
470 470 bundle = find_pullbundle(repo, proto, opts, clheads, heads, common)
471 471 if bundle:
472 472 return wireprototypes.streamres(
473 473 gen=util.filechunkiter(bundle), prefer_uncompressed=True
474 474 )
475 475
476 476 if repo.ui.configbool(b'server', b'disablefullbundle'):
477 477 # Check to see if this is a full clone.
478 478 changegroup = opts.get(b'cg', True)
479 479 if changegroup and not common and clheads == heads:
480 480 raise error.Abort(
481 481 _(b'server has pull-based clones disabled'),
482 482 hint=_(b'remove --pull if specified or upgrade Mercurial'),
483 483 )
484 484
485 485 info, chunks = exchange.getbundlechunks(
486 486 repo, b'serve', **pycompat.strkwargs(opts)
487 487 )
488 488 prefercompressed = info.get(b'prefercompressed', True)
489 489 except error.Abort as exc:
490 490 # cleanly forward Abort error to the client
491 491 if not exchange.bundle2requested(opts.get(b'bundlecaps')):
492 492 if proto.name == b'http-v1':
493 493 return wireprototypes.ooberror(exc.message + b'\n')
494 494 raise # cannot do better for bundle1 + ssh
495 495 # bundle2 request expect a bundle2 reply
496 496 bundler = bundle2.bundle20(repo.ui)
497 497 manargs = [(b'message', exc.message)]
498 498 advargs = []
499 499 if exc.hint is not None:
500 500 advargs.append((b'hint', exc.hint))
501 501 bundler.addpart(bundle2.bundlepart(b'error:abort', manargs, advargs))
502 502 chunks = bundler.getchunks()
503 503 prefercompressed = False
504 504
505 505 return wireprototypes.streamres(
506 506 gen=chunks, prefer_uncompressed=not prefercompressed
507 507 )
508 508
509 509
510 510 @wireprotocommand(b'heads', permission=b'pull')
511 511 def heads(repo, proto):
512 512 h = repo.heads()
513 513 return wireprototypes.bytesresponse(wireprototypes.encodelist(h) + b'\n')
514 514
515 515
516 516 @wireprotocommand(b'hello', permission=b'pull')
517 517 def hello(repo, proto):
518 518 """Called as part of SSH handshake to obtain server info.
519 519
520 520 Returns a list of lines describing interesting things about the
521 521 server, in an RFC822-like format.
522 522
523 523 Currently, the only one defined is ``capabilities``, which consists of a
524 524 line of space separated tokens describing server abilities:
525 525
526 526 capabilities: <token0> <token1> <token2>
527 527 """
528 528 caps = capabilities(repo, proto).data
529 529 return wireprototypes.bytesresponse(b'capabilities: %s\n' % caps)
530 530
531 531
532 532 @wireprotocommand(b'listkeys', b'namespace', permission=b'pull')
533 533 def listkeys(repo, proto, namespace):
534 534 d = sorted(repo.listkeys(encoding.tolocal(namespace)).items())
535 535 return wireprototypes.bytesresponse(pushkeymod.encodekeys(d))
536 536
537 537
538 538 @wireprotocommand(b'lookup', b'key', permission=b'pull')
539 539 def lookup(repo, proto, key):
540 540 try:
541 541 k = encoding.tolocal(key)
542 542 n = repo.lookup(k)
543 543 r = hex(n)
544 544 success = 1
545 545 except Exception as inst:
546 546 r = stringutil.forcebytestr(inst)
547 547 success = 0
548 548 return wireprototypes.bytesresponse(b'%d %s\n' % (success, r))
549 549
550 550
551 551 @wireprotocommand(b'known', b'nodes *', permission=b'pull')
552 552 def known(repo, proto, nodes, others):
553 553 v = b''.join(
554 554 b and b'1' or b'0' for b in repo.known(wireprototypes.decodelist(nodes))
555 555 )
556 556 return wireprototypes.bytesresponse(v)
557 557
558 558
559 559 @wireprotocommand(b'protocaps', b'caps', permission=b'pull')
560 560 def protocaps(repo, proto, caps):
561 561 if proto.name == wireprototypes.SSHV1:
562 562 proto._protocaps = set(caps.split(b' '))
563 563 return wireprototypes.bytesresponse(b'OK')
564 564
565 565
566 566 @wireprotocommand(b'pushkey', b'namespace key old new', permission=b'push')
567 567 def pushkey(repo, proto, namespace, key, old, new):
568 568 # compatibility with pre-1.8 clients which were accidentally
569 569 # sending raw binary nodes rather than utf-8-encoded hex
570 570 if len(new) == 20 and stringutil.escapestr(new) != new:
571 571 # looks like it could be a binary node
572 572 try:
573 573 new.decode('utf-8')
574 574 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
575 575 except UnicodeDecodeError:
576 576 pass # binary, leave unmodified
577 577 else:
578 578 new = encoding.tolocal(new) # normal path
579 579
580 580 with proto.mayberedirectstdio() as output:
581 581 r = (
582 582 repo.pushkey(
583 583 encoding.tolocal(namespace),
584 584 encoding.tolocal(key),
585 585 encoding.tolocal(old),
586 586 new,
587 587 )
588 588 or False
589 589 )
590 590
591 591 output = output.getvalue() if output else b''
592 592 return wireprototypes.bytesresponse(b'%d\n%s' % (int(r), output))
593 593
594 594
595 595 @wireprotocommand(b'stream_out', permission=b'pull')
596 596 def stream(repo, proto):
597 597 """If the server supports streaming clone, it advertises the "stream"
598 598 capability with a value representing the version and flags of the repo
599 599 it is serving. Client checks to see if it understands the format.
600 600 """
601 601 return wireprototypes.streamreslegacy(streamclone.generatev1wireproto(repo))
602 602
603 603
604 604 @wireprotocommand(b'unbundle', b'heads', permission=b'push')
605 605 def unbundle(repo, proto, heads):
606 606 their_heads = wireprototypes.decodelist(heads)
607 607
608 608 with proto.mayberedirectstdio() as output:
609 609 try:
610 610 exchange.check_heads(repo, their_heads, b'preparing changes')
611 611 cleanup = lambda: None
612 612 try:
613 613 payload = proto.getpayload()
614 614 if repo.ui.configbool(b'server', b'streamunbundle'):
615 615
616 616 def cleanup():
617 617 # Ensure that the full payload is consumed, so
618 618 # that the connection doesn't contain trailing garbage.
619 619 for p in payload:
620 620 pass
621 621
622 622 fp = util.chunkbuffer(payload)
623 623 else:
624 624 # write bundle data to temporary file as it can be big
625 625 fp, tempname = None, None
626 626
627 627 def cleanup():
628 628 if fp:
629 629 fp.close()
630 630 if tempname:
631 631 os.unlink(tempname)
632 632
633 633 fd, tempname = pycompat.mkstemp(prefix=b'hg-unbundle-')
634 634 repo.ui.debug(
635 635 b'redirecting incoming bundle to %s\n' % tempname
636 636 )
637 637 fp = os.fdopen(fd, pycompat.sysstr(b'wb+'))
638 638 for p in payload:
639 639 fp.write(p)
640 640 fp.seek(0)
641 641
642 642 gen = exchange.readbundle(repo.ui, fp, None)
643 643 if isinstance(
644 644 gen, changegroupmod.cg1unpacker
645 645 ) and not bundle1allowed(repo, b'push'):
646 646 if proto.name == b'http-v1':
647 647 # need to special case http because stderr do not get to
648 648 # the http client on failed push so we need to abuse
649 649 # some other error type to make sure the message get to
650 650 # the user.
651 651 return wireprototypes.ooberror(bundle2required)
652 652 raise error.Abort(
653 653 bundle2requiredmain, hint=bundle2requiredhint
654 654 )
655 655
656 656 r = exchange.unbundle(
657 657 repo, gen, their_heads, b'serve', proto.client()
658 658 )
659 659 if util.safehasattr(r, b'addpart'):
660 660 # The return looks streamable, we are in the bundle2 case
661 661 # and should return a stream.
662 662 return wireprototypes.streamreslegacy(gen=r.getchunks())
663 663 return wireprototypes.pushres(
664 664 r, output.getvalue() if output else b''
665 665 )
666 666
667 667 finally:
668 668 cleanup()
669 669
670 670 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
671 671 # handle non-bundle2 case first
672 672 if not getattr(exc, 'duringunbundle2', False):
673 673 try:
674 674 raise
675 675 except error.Abort as exc:
676 676 # The old code we moved used procutil.stderr directly.
677 677 # We did not change it to minimise code change.
678 678 # This need to be moved to something proper.
679 679 # Feel free to do it.
680 680 procutil.stderr.write(exc.format())
681 681 procutil.stderr.flush()
682 682 return wireprototypes.pushres(
683 683 0, output.getvalue() if output else b''
684 684 )
685 685 except error.PushRaced:
686 686 return wireprototypes.pusherr(
687 687 pycompat.bytestr(exc),
688 688 output.getvalue() if output else b'',
689 689 )
690 690
691 691 bundler = bundle2.bundle20(repo.ui)
692 692 for out in getattr(exc, '_bundle2salvagedoutput', ()):
693 693 bundler.addpart(out)
694 694 try:
695 695 try:
696 696 raise
697 697 except error.PushkeyFailed as exc:
698 698 # check client caps
699 699 remotecaps = getattr(exc, '_replycaps', None)
700 700 if (
701 701 remotecaps is not None
702 702 and b'pushkey' not in remotecaps.get(b'error', ())
703 703 ):
704 704 # no support remote side, fallback to Abort handler.
705 705 raise
706 706 part = bundler.newpart(b'error:pushkey')
707 707 part.addparam(b'in-reply-to', exc.partid)
708 708 if exc.namespace is not None:
709 709 part.addparam(
710 710 b'namespace', exc.namespace, mandatory=False
711 711 )
712 712 if exc.key is not None:
713 713 part.addparam(b'key', exc.key, mandatory=False)
714 714 if exc.new is not None:
715 715 part.addparam(b'new', exc.new, mandatory=False)
716 716 if exc.old is not None:
717 717 part.addparam(b'old', exc.old, mandatory=False)
718 718 if exc.ret is not None:
719 719 part.addparam(b'ret', exc.ret, mandatory=False)
720 720 except error.BundleValueError as exc:
721 721 errpart = bundler.newpart(b'error:unsupportedcontent')
722 722 if exc.parttype is not None:
723 723 errpart.addparam(b'parttype', exc.parttype)
724 724 if exc.params:
725 725 errpart.addparam(b'params', b'\0'.join(exc.params))
726 726 except error.Abort as exc:
727 727 manargs = [(b'message', exc.message)]
728 728 advargs = []
729 729 if exc.hint is not None:
730 730 advargs.append((b'hint', exc.hint))
731 731 bundler.addpart(
732 732 bundle2.bundlepart(b'error:abort', manargs, advargs)
733 733 )
734 734 except error.PushRaced as exc:
735 735 bundler.newpart(
736 736 b'error:pushraced',
737 737 [(b'message', stringutil.forcebytestr(exc))],
738 738 )
739 739 return wireprototypes.streamreslegacy(gen=bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now