##// END OF EJS Templates
sidedata: apply basic but tight security around exchange...
marmoute -
r43401:c17a63eb default
parent child Browse files
Show More
@@ -1,2555 +1,2574
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import, division
149 149
150 150 import collections
151 151 import errno
152 152 import os
153 153 import re
154 154 import string
155 155 import struct
156 156 import sys
157 157
158 158 from .i18n import _
159 159 from . import (
160 160 bookmarks,
161 161 changegroup,
162 162 encoding,
163 163 error,
164 164 node as nodemod,
165 165 obsolete,
166 166 phases,
167 167 pushkey,
168 168 pycompat,
169 169 streamclone,
170 170 tags,
171 171 url,
172 172 util,
173 173 )
174 174 from .utils import stringutil
175 175
176 176 urlerr = util.urlerr
177 177 urlreq = util.urlreq
178 178
179 179 _pack = struct.pack
180 180 _unpack = struct.unpack
181 181
182 182 _fstreamparamsize = b'>i'
183 183 _fpartheadersize = b'>i'
184 184 _fparttypesize = b'>B'
185 185 _fpartid = b'>I'
186 186 _fpayloadsize = b'>i'
187 187 _fpartparamcount = b'>BB'
188 188
189 189 preferedchunksize = 32768
190 190
191 191 _parttypeforbidden = re.compile(b'[^a-zA-Z0-9_:-]')
192 192
193 193
194 194 def outdebug(ui, message):
195 195 """debug regarding output stream (bundling)"""
196 196 if ui.configbool(b'devel', b'bundle2.debug'):
197 197 ui.debug(b'bundle2-output: %s\n' % message)
198 198
199 199
200 200 def indebug(ui, message):
201 201 """debug on input stream (unbundling)"""
202 202 if ui.configbool(b'devel', b'bundle2.debug'):
203 203 ui.debug(b'bundle2-input: %s\n' % message)
204 204
205 205
206 206 def validateparttype(parttype):
207 207 """raise ValueError if a parttype contains invalid character"""
208 208 if _parttypeforbidden.search(parttype):
209 209 raise ValueError(parttype)
210 210
211 211
212 212 def _makefpartparamsizes(nbparams):
213 213 """return a struct format to read part parameter sizes
214 214
215 215 The number parameters is variable so we need to build that format
216 216 dynamically.
217 217 """
218 218 return b'>' + (b'BB' * nbparams)
219 219
220 220
221 221 parthandlermapping = {}
222 222
223 223
224 224 def parthandler(parttype, params=()):
225 225 """decorator that register a function as a bundle2 part handler
226 226
227 227 eg::
228 228
229 229 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
230 230 def myparttypehandler(...):
231 231 '''process a part of type "my part".'''
232 232 ...
233 233 """
234 234 validateparttype(parttype)
235 235
236 236 def _decorator(func):
237 237 lparttype = parttype.lower() # enforce lower case matching.
238 238 assert lparttype not in parthandlermapping
239 239 parthandlermapping[lparttype] = func
240 240 func.params = frozenset(params)
241 241 return func
242 242
243 243 return _decorator
244 244
245 245
246 246 class unbundlerecords(object):
247 247 """keep record of what happens during and unbundle
248 248
249 249 New records are added using `records.add('cat', obj)`. Where 'cat' is a
250 250 category of record and obj is an arbitrary object.
251 251
252 252 `records['cat']` will return all entries of this category 'cat'.
253 253
254 254 Iterating on the object itself will yield `('category', obj)` tuples
255 255 for all entries.
256 256
257 257 All iterations happens in chronological order.
258 258 """
259 259
260 260 def __init__(self):
261 261 self._categories = {}
262 262 self._sequences = []
263 263 self._replies = {}
264 264
265 265 def add(self, category, entry, inreplyto=None):
266 266 """add a new record of a given category.
267 267
268 268 The entry can then be retrieved in the list returned by
269 269 self['category']."""
270 270 self._categories.setdefault(category, []).append(entry)
271 271 self._sequences.append((category, entry))
272 272 if inreplyto is not None:
273 273 self.getreplies(inreplyto).add(category, entry)
274 274
275 275 def getreplies(self, partid):
276 276 """get the records that are replies to a specific part"""
277 277 return self._replies.setdefault(partid, unbundlerecords())
278 278
279 279 def __getitem__(self, cat):
280 280 return tuple(self._categories.get(cat, ()))
281 281
282 282 def __iter__(self):
283 283 return iter(self._sequences)
284 284
285 285 def __len__(self):
286 286 return len(self._sequences)
287 287
288 288 def __nonzero__(self):
289 289 return bool(self._sequences)
290 290
291 291 __bool__ = __nonzero__
292 292
293 293
294 294 class bundleoperation(object):
295 295 """an object that represents a single bundling process
296 296
297 297 Its purpose is to carry unbundle-related objects and states.
298 298
299 299 A new object should be created at the beginning of each bundle processing.
300 300 The object is to be returned by the processing function.
301 301
302 302 The object has very little content now it will ultimately contain:
303 303 * an access to the repo the bundle is applied to,
304 304 * a ui object,
305 305 * a way to retrieve a transaction to add changes to the repo,
306 306 * a way to record the result of processing each part,
307 307 * a way to construct a bundle response when applicable.
308 308 """
309 309
310 310 def __init__(self, repo, transactiongetter, captureoutput=True, source=b''):
311 311 self.repo = repo
312 312 self.ui = repo.ui
313 313 self.records = unbundlerecords()
314 314 self.reply = None
315 315 self.captureoutput = captureoutput
316 316 self.hookargs = {}
317 317 self._gettransaction = transactiongetter
318 318 # carries value that can modify part behavior
319 319 self.modes = {}
320 320 self.source = source
321 321
322 322 def gettransaction(self):
323 323 transaction = self._gettransaction()
324 324
325 325 if self.hookargs:
326 326 # the ones added to the transaction supercede those added
327 327 # to the operation.
328 328 self.hookargs.update(transaction.hookargs)
329 329 transaction.hookargs = self.hookargs
330 330
331 331 # mark the hookargs as flushed. further attempts to add to
332 332 # hookargs will result in an abort.
333 333 self.hookargs = None
334 334
335 335 return transaction
336 336
337 337 def addhookargs(self, hookargs):
338 338 if self.hookargs is None:
339 339 raise error.ProgrammingError(
340 340 b'attempted to add hookargs to '
341 341 b'operation after transaction started'
342 342 )
343 343 self.hookargs.update(hookargs)
344 344
345 345
346 346 class TransactionUnavailable(RuntimeError):
347 347 pass
348 348
349 349
350 350 def _notransaction():
351 351 """default method to get a transaction while processing a bundle
352 352
353 353 Raise an exception to highlight the fact that no transaction was expected
354 354 to be created"""
355 355 raise TransactionUnavailable()
356 356
357 357
358 358 def applybundle(repo, unbundler, tr, source, url=None, **kwargs):
359 359 # transform me into unbundler.apply() as soon as the freeze is lifted
360 360 if isinstance(unbundler, unbundle20):
361 361 tr.hookargs[b'bundle2'] = b'1'
362 362 if source is not None and b'source' not in tr.hookargs:
363 363 tr.hookargs[b'source'] = source
364 364 if url is not None and b'url' not in tr.hookargs:
365 365 tr.hookargs[b'url'] = url
366 366 return processbundle(repo, unbundler, lambda: tr, source=source)
367 367 else:
368 368 # the transactiongetter won't be used, but we might as well set it
369 369 op = bundleoperation(repo, lambda: tr, source=source)
370 370 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
371 371 return op
372 372
373 373
374 374 class partiterator(object):
375 375 def __init__(self, repo, op, unbundler):
376 376 self.repo = repo
377 377 self.op = op
378 378 self.unbundler = unbundler
379 379 self.iterator = None
380 380 self.count = 0
381 381 self.current = None
382 382
383 383 def __enter__(self):
384 384 def func():
385 385 itr = enumerate(self.unbundler.iterparts(), 1)
386 386 for count, p in itr:
387 387 self.count = count
388 388 self.current = p
389 389 yield p
390 390 p.consume()
391 391 self.current = None
392 392
393 393 self.iterator = func()
394 394 return self.iterator
395 395
396 396 def __exit__(self, type, exc, tb):
397 397 if not self.iterator:
398 398 return
399 399
400 400 # Only gracefully abort in a normal exception situation. User aborts
401 401 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
402 402 # and should not gracefully cleanup.
403 403 if isinstance(exc, Exception):
404 404 # Any exceptions seeking to the end of the bundle at this point are
405 405 # almost certainly related to the underlying stream being bad.
406 406 # And, chances are that the exception we're handling is related to
407 407 # getting in that bad state. So, we swallow the seeking error and
408 408 # re-raise the original error.
409 409 seekerror = False
410 410 try:
411 411 if self.current:
412 412 # consume the part content to not corrupt the stream.
413 413 self.current.consume()
414 414
415 415 for part in self.iterator:
416 416 # consume the bundle content
417 417 part.consume()
418 418 except Exception:
419 419 seekerror = True
420 420
421 421 # Small hack to let caller code distinguish exceptions from bundle2
422 422 # processing from processing the old format. This is mostly needed
423 423 # to handle different return codes to unbundle according to the type
424 424 # of bundle. We should probably clean up or drop this return code
425 425 # craziness in a future version.
426 426 exc.duringunbundle2 = True
427 427 salvaged = []
428 428 replycaps = None
429 429 if self.op.reply is not None:
430 430 salvaged = self.op.reply.salvageoutput()
431 431 replycaps = self.op.reply.capabilities
432 432 exc._replycaps = replycaps
433 433 exc._bundle2salvagedoutput = salvaged
434 434
435 435 # Re-raising from a variable loses the original stack. So only use
436 436 # that form if we need to.
437 437 if seekerror:
438 438 raise exc
439 439
440 440 self.repo.ui.debug(
441 441 b'bundle2-input-bundle: %i parts total\n' % self.count
442 442 )
443 443
444 444
445 445 def processbundle(repo, unbundler, transactiongetter=None, op=None, source=b''):
446 446 """This function process a bundle, apply effect to/from a repo
447 447
448 448 It iterates over each part then searches for and uses the proper handling
449 449 code to process the part. Parts are processed in order.
450 450
451 451 Unknown Mandatory part will abort the process.
452 452
453 453 It is temporarily possible to provide a prebuilt bundleoperation to the
454 454 function. This is used to ensure output is properly propagated in case of
455 455 an error during the unbundling. This output capturing part will likely be
456 456 reworked and this ability will probably go away in the process.
457 457 """
458 458 if op is None:
459 459 if transactiongetter is None:
460 460 transactiongetter = _notransaction
461 461 op = bundleoperation(repo, transactiongetter, source=source)
462 462 # todo:
463 463 # - replace this is a init function soon.
464 464 # - exception catching
465 465 unbundler.params
466 466 if repo.ui.debugflag:
467 467 msg = [b'bundle2-input-bundle:']
468 468 if unbundler.params:
469 469 msg.append(b' %i params' % len(unbundler.params))
470 470 if op._gettransaction is None or op._gettransaction is _notransaction:
471 471 msg.append(b' no-transaction')
472 472 else:
473 473 msg.append(b' with-transaction')
474 474 msg.append(b'\n')
475 475 repo.ui.debug(b''.join(msg))
476 476
477 477 processparts(repo, op, unbundler)
478 478
479 479 return op
480 480
481 481
482 482 def processparts(repo, op, unbundler):
483 483 with partiterator(repo, op, unbundler) as parts:
484 484 for part in parts:
485 485 _processpart(op, part)
486 486
487 487
488 488 def _processchangegroup(op, cg, tr, source, url, **kwargs):
489 489 ret = cg.apply(op.repo, tr, source, url, **kwargs)
490 490 op.records.add(b'changegroup', {b'return': ret,})
491 491 return ret
492 492
493 493
494 494 def _gethandler(op, part):
495 495 status = b'unknown' # used by debug output
496 496 try:
497 497 handler = parthandlermapping.get(part.type)
498 498 if handler is None:
499 499 status = b'unsupported-type'
500 500 raise error.BundleUnknownFeatureError(parttype=part.type)
501 501 indebug(op.ui, b'found a handler for part %s' % part.type)
502 502 unknownparams = part.mandatorykeys - handler.params
503 503 if unknownparams:
504 504 unknownparams = list(unknownparams)
505 505 unknownparams.sort()
506 506 status = b'unsupported-params (%s)' % b', '.join(unknownparams)
507 507 raise error.BundleUnknownFeatureError(
508 508 parttype=part.type, params=unknownparams
509 509 )
510 510 status = b'supported'
511 511 except error.BundleUnknownFeatureError as exc:
512 512 if part.mandatory: # mandatory parts
513 513 raise
514 514 indebug(op.ui, b'ignoring unsupported advisory part %s' % exc)
515 515 return # skip to part processing
516 516 finally:
517 517 if op.ui.debugflag:
518 518 msg = [b'bundle2-input-part: "%s"' % part.type]
519 519 if not part.mandatory:
520 520 msg.append(b' (advisory)')
521 521 nbmp = len(part.mandatorykeys)
522 522 nbap = len(part.params) - nbmp
523 523 if nbmp or nbap:
524 524 msg.append(b' (params:')
525 525 if nbmp:
526 526 msg.append(b' %i mandatory' % nbmp)
527 527 if nbap:
528 528 msg.append(b' %i advisory' % nbmp)
529 529 msg.append(b')')
530 530 msg.append(b' %s\n' % status)
531 531 op.ui.debug(b''.join(msg))
532 532
533 533 return handler
534 534
535 535
536 536 def _processpart(op, part):
537 537 """process a single part from a bundle
538 538
539 539 The part is guaranteed to have been fully consumed when the function exits
540 540 (even if an exception is raised)."""
541 541 handler = _gethandler(op, part)
542 542 if handler is None:
543 543 return
544 544
545 545 # handler is called outside the above try block so that we don't
546 546 # risk catching KeyErrors from anything other than the
547 547 # parthandlermapping lookup (any KeyError raised by handler()
548 548 # itself represents a defect of a different variety).
549 549 output = None
550 550 if op.captureoutput and op.reply is not None:
551 551 op.ui.pushbuffer(error=True, subproc=True)
552 552 output = b''
553 553 try:
554 554 handler(op, part)
555 555 finally:
556 556 if output is not None:
557 557 output = op.ui.popbuffer()
558 558 if output:
559 559 outpart = op.reply.newpart(b'output', data=output, mandatory=False)
560 560 outpart.addparam(
561 561 b'in-reply-to', pycompat.bytestr(part.id), mandatory=False
562 562 )
563 563
564 564
565 565 def decodecaps(blob):
566 566 """decode a bundle2 caps bytes blob into a dictionary
567 567
568 568 The blob is a list of capabilities (one per line)
569 569 Capabilities may have values using a line of the form::
570 570
571 571 capability=value1,value2,value3
572 572
573 573 The values are always a list."""
574 574 caps = {}
575 575 for line in blob.splitlines():
576 576 if not line:
577 577 continue
578 578 if b'=' not in line:
579 579 key, vals = line, ()
580 580 else:
581 581 key, vals = line.split(b'=', 1)
582 582 vals = vals.split(b',')
583 583 key = urlreq.unquote(key)
584 584 vals = [urlreq.unquote(v) for v in vals]
585 585 caps[key] = vals
586 586 return caps
587 587
588 588
589 589 def encodecaps(caps):
590 590 """encode a bundle2 caps dictionary into a bytes blob"""
591 591 chunks = []
592 592 for ca in sorted(caps):
593 593 vals = caps[ca]
594 594 ca = urlreq.quote(ca)
595 595 vals = [urlreq.quote(v) for v in vals]
596 596 if vals:
597 597 ca = b"%s=%s" % (ca, b','.join(vals))
598 598 chunks.append(ca)
599 599 return b'\n'.join(chunks)
600 600
601 601
602 602 bundletypes = {
603 603 b"": (b"", b'UN'), # only when using unbundle on ssh and old http servers
604 604 # since the unification ssh accepts a header but there
605 605 # is no capability signaling it.
606 606 b"HG20": (), # special-cased below
607 607 b"HG10UN": (b"HG10UN", b'UN'),
608 608 b"HG10BZ": (b"HG10", b'BZ'),
609 609 b"HG10GZ": (b"HG10GZ", b'GZ'),
610 610 }
611 611
612 612 # hgweb uses this list to communicate its preferred type
613 613 bundlepriority = [b'HG10GZ', b'HG10BZ', b'HG10UN']
614 614
615 615
616 616 class bundle20(object):
617 617 """represent an outgoing bundle2 container
618 618
619 619 Use the `addparam` method to add stream level parameter. and `newpart` to
620 620 populate it. Then call `getchunks` to retrieve all the binary chunks of
621 621 data that compose the bundle2 container."""
622 622
623 623 _magicstring = b'HG20'
624 624
625 625 def __init__(self, ui, capabilities=()):
626 626 self.ui = ui
627 627 self._params = []
628 628 self._parts = []
629 629 self.capabilities = dict(capabilities)
630 630 self._compengine = util.compengines.forbundletype(b'UN')
631 631 self._compopts = None
632 632 # If compression is being handled by a consumer of the raw
633 633 # data (e.g. the wire protocol), unsetting this flag tells
634 634 # consumers that the bundle is best left uncompressed.
635 635 self.prefercompressed = True
636 636
637 637 def setcompression(self, alg, compopts=None):
638 638 """setup core part compression to <alg>"""
639 639 if alg in (None, b'UN'):
640 640 return
641 641 assert not any(n.lower() == b'compression' for n, v in self._params)
642 642 self.addparam(b'Compression', alg)
643 643 self._compengine = util.compengines.forbundletype(alg)
644 644 self._compopts = compopts
645 645
646 646 @property
647 647 def nbparts(self):
648 648 """total number of parts added to the bundler"""
649 649 return len(self._parts)
650 650
651 651 # methods used to defines the bundle2 content
652 652 def addparam(self, name, value=None):
653 653 """add a stream level parameter"""
654 654 if not name:
655 655 raise error.ProgrammingError(b'empty parameter name')
656 656 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
657 657 raise error.ProgrammingError(
658 658 b'non letter first character: %s' % name
659 659 )
660 660 self._params.append((name, value))
661 661
662 662 def addpart(self, part):
663 663 """add a new part to the bundle2 container
664 664
665 665 Parts contains the actual applicative payload."""
666 666 assert part.id is None
667 667 part.id = len(self._parts) # very cheap counter
668 668 self._parts.append(part)
669 669
670 670 def newpart(self, typeid, *args, **kwargs):
671 671 """create a new part and add it to the containers
672 672
673 673 As the part is directly added to the containers. For now, this means
674 674 that any failure to properly initialize the part after calling
675 675 ``newpart`` should result in a failure of the whole bundling process.
676 676
677 677 You can still fall back to manually create and add if you need better
678 678 control."""
679 679 part = bundlepart(typeid, *args, **kwargs)
680 680 self.addpart(part)
681 681 return part
682 682
683 683 # methods used to generate the bundle2 stream
684 684 def getchunks(self):
685 685 if self.ui.debugflag:
686 686 msg = [b'bundle2-output-bundle: "%s",' % self._magicstring]
687 687 if self._params:
688 688 msg.append(b' (%i params)' % len(self._params))
689 689 msg.append(b' %i parts total\n' % len(self._parts))
690 690 self.ui.debug(b''.join(msg))
691 691 outdebug(self.ui, b'start emission of %s stream' % self._magicstring)
692 692 yield self._magicstring
693 693 param = self._paramchunk()
694 694 outdebug(self.ui, b'bundle parameter: %s' % param)
695 695 yield _pack(_fstreamparamsize, len(param))
696 696 if param:
697 697 yield param
698 698 for chunk in self._compengine.compressstream(
699 699 self._getcorechunk(), self._compopts
700 700 ):
701 701 yield chunk
702 702
703 703 def _paramchunk(self):
704 704 """return a encoded version of all stream parameters"""
705 705 blocks = []
706 706 for par, value in self._params:
707 707 par = urlreq.quote(par)
708 708 if value is not None:
709 709 value = urlreq.quote(value)
710 710 par = b'%s=%s' % (par, value)
711 711 blocks.append(par)
712 712 return b' '.join(blocks)
713 713
714 714 def _getcorechunk(self):
715 715 """yield chunk for the core part of the bundle
716 716
717 717 (all but headers and parameters)"""
718 718 outdebug(self.ui, b'start of parts')
719 719 for part in self._parts:
720 720 outdebug(self.ui, b'bundle part: "%s"' % part.type)
721 721 for chunk in part.getchunks(ui=self.ui):
722 722 yield chunk
723 723 outdebug(self.ui, b'end of bundle')
724 724 yield _pack(_fpartheadersize, 0)
725 725
726 726 def salvageoutput(self):
727 727 """return a list with a copy of all output parts in the bundle
728 728
729 729 This is meant to be used during error handling to make sure we preserve
730 730 server output"""
731 731 salvaged = []
732 732 for part in self._parts:
733 733 if part.type.startswith(b'output'):
734 734 salvaged.append(part.copy())
735 735 return salvaged
736 736
737 737
738 738 class unpackermixin(object):
739 739 """A mixin to extract bytes and struct data from a stream"""
740 740
741 741 def __init__(self, fp):
742 742 self._fp = fp
743 743
744 744 def _unpack(self, format):
745 745 """unpack this struct format from the stream
746 746
747 747 This method is meant for internal usage by the bundle2 protocol only.
748 748 They directly manipulate the low level stream including bundle2 level
749 749 instruction.
750 750
751 751 Do not use it to implement higher-level logic or methods."""
752 752 data = self._readexact(struct.calcsize(format))
753 753 return _unpack(format, data)
754 754
755 755 def _readexact(self, size):
756 756 """read exactly <size> bytes from the stream
757 757
758 758 This method is meant for internal usage by the bundle2 protocol only.
759 759 They directly manipulate the low level stream including bundle2 level
760 760 instruction.
761 761
762 762 Do not use it to implement higher-level logic or methods."""
763 763 return changegroup.readexactly(self._fp, size)
764 764
765 765
766 766 def getunbundler(ui, fp, magicstring=None):
767 767 """return a valid unbundler object for a given magicstring"""
768 768 if magicstring is None:
769 769 magicstring = changegroup.readexactly(fp, 4)
770 770 magic, version = magicstring[0:2], magicstring[2:4]
771 771 if magic != b'HG':
772 772 ui.debug(
773 773 b"error: invalid magic: %r (version %r), should be 'HG'\n"
774 774 % (magic, version)
775 775 )
776 776 raise error.Abort(_(b'not a Mercurial bundle'))
777 777 unbundlerclass = formatmap.get(version)
778 778 if unbundlerclass is None:
779 779 raise error.Abort(_(b'unknown bundle version %s') % version)
780 780 unbundler = unbundlerclass(ui, fp)
781 781 indebug(ui, b'start processing of %s stream' % magicstring)
782 782 return unbundler
783 783
784 784
785 785 class unbundle20(unpackermixin):
786 786 """interpret a bundle2 stream
787 787
788 788 This class is fed with a binary stream and yields parts through its
789 789 `iterparts` methods."""
790 790
791 791 _magicstring = b'HG20'
792 792
793 793 def __init__(self, ui, fp):
794 794 """If header is specified, we do not read it out of the stream."""
795 795 self.ui = ui
796 796 self._compengine = util.compengines.forbundletype(b'UN')
797 797 self._compressed = None
798 798 super(unbundle20, self).__init__(fp)
799 799
800 800 @util.propertycache
801 801 def params(self):
802 802 """dictionary of stream level parameters"""
803 803 indebug(self.ui, b'reading bundle2 stream parameters')
804 804 params = {}
805 805 paramssize = self._unpack(_fstreamparamsize)[0]
806 806 if paramssize < 0:
807 807 raise error.BundleValueError(
808 808 b'negative bundle param size: %i' % paramssize
809 809 )
810 810 if paramssize:
811 811 params = self._readexact(paramssize)
812 812 params = self._processallparams(params)
813 813 return params
814 814
815 815 def _processallparams(self, paramsblock):
816 816 """"""
817 817 params = util.sortdict()
818 818 for p in paramsblock.split(b' '):
819 819 p = p.split(b'=', 1)
820 820 p = [urlreq.unquote(i) for i in p]
821 821 if len(p) < 2:
822 822 p.append(None)
823 823 self._processparam(*p)
824 824 params[p[0]] = p[1]
825 825 return params
826 826
827 827 def _processparam(self, name, value):
828 828 """process a parameter, applying its effect if needed
829 829
830 830 Parameter starting with a lower case letter are advisory and will be
831 831 ignored when unknown. Those starting with an upper case letter are
832 832 mandatory and will this function will raise a KeyError when unknown.
833 833
834 834 Note: no option are currently supported. Any input will be either
835 835 ignored or failing.
836 836 """
837 837 if not name:
838 838 raise ValueError(r'empty parameter name')
839 839 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
840 840 raise ValueError(r'non letter first character: %s' % name)
841 841 try:
842 842 handler = b2streamparamsmap[name.lower()]
843 843 except KeyError:
844 844 if name[0:1].islower():
845 845 indebug(self.ui, b"ignoring unknown parameter %s" % name)
846 846 else:
847 847 raise error.BundleUnknownFeatureError(params=(name,))
848 848 else:
849 849 handler(self, name, value)
850 850
851 851 def _forwardchunks(self):
852 852 """utility to transfer a bundle2 as binary
853 853
854 854 This is made necessary by the fact the 'getbundle' command over 'ssh'
855 855 have no way to know then the reply end, relying on the bundle to be
856 856 interpreted to know its end. This is terrible and we are sorry, but we
857 857 needed to move forward to get general delta enabled.
858 858 """
859 859 yield self._magicstring
860 860 assert b'params' not in vars(self)
861 861 paramssize = self._unpack(_fstreamparamsize)[0]
862 862 if paramssize < 0:
863 863 raise error.BundleValueError(
864 864 b'negative bundle param size: %i' % paramssize
865 865 )
866 866 if paramssize:
867 867 params = self._readexact(paramssize)
868 868 self._processallparams(params)
869 869 # The payload itself is decompressed below, so drop
870 870 # the compression parameter passed down to compensate.
871 871 outparams = []
872 872 for p in params.split(b' '):
873 873 k, v = p.split(b'=', 1)
874 874 if k.lower() != b'compression':
875 875 outparams.append(p)
876 876 outparams = b' '.join(outparams)
877 877 yield _pack(_fstreamparamsize, len(outparams))
878 878 yield outparams
879 879 else:
880 880 yield _pack(_fstreamparamsize, paramssize)
881 881 # From there, payload might need to be decompressed
882 882 self._fp = self._compengine.decompressorreader(self._fp)
883 883 emptycount = 0
884 884 while emptycount < 2:
885 885 # so we can brainlessly loop
886 886 assert _fpartheadersize == _fpayloadsize
887 887 size = self._unpack(_fpartheadersize)[0]
888 888 yield _pack(_fpartheadersize, size)
889 889 if size:
890 890 emptycount = 0
891 891 else:
892 892 emptycount += 1
893 893 continue
894 894 if size == flaginterrupt:
895 895 continue
896 896 elif size < 0:
897 897 raise error.BundleValueError(b'negative chunk size: %i')
898 898 yield self._readexact(size)
899 899
900 900 def iterparts(self, seekable=False):
901 901 """yield all parts contained in the stream"""
902 902 cls = seekableunbundlepart if seekable else unbundlepart
903 903 # make sure param have been loaded
904 904 self.params
905 905 # From there, payload need to be decompressed
906 906 self._fp = self._compengine.decompressorreader(self._fp)
907 907 indebug(self.ui, b'start extraction of bundle2 parts')
908 908 headerblock = self._readpartheader()
909 909 while headerblock is not None:
910 910 part = cls(self.ui, headerblock, self._fp)
911 911 yield part
912 912 # Ensure part is fully consumed so we can start reading the next
913 913 # part.
914 914 part.consume()
915 915
916 916 headerblock = self._readpartheader()
917 917 indebug(self.ui, b'end of bundle2 stream')
918 918
919 919 def _readpartheader(self):
920 920 """reads a part header size and return the bytes blob
921 921
922 922 returns None if empty"""
923 923 headersize = self._unpack(_fpartheadersize)[0]
924 924 if headersize < 0:
925 925 raise error.BundleValueError(
926 926 b'negative part header size: %i' % headersize
927 927 )
928 928 indebug(self.ui, b'part header size: %i' % headersize)
929 929 if headersize:
930 930 return self._readexact(headersize)
931 931 return None
932 932
933 933 def compressed(self):
934 934 self.params # load params
935 935 return self._compressed
936 936
937 937 def close(self):
938 938 """close underlying file"""
939 939 if util.safehasattr(self._fp, 'close'):
940 940 return self._fp.close()
941 941
942 942
943 943 formatmap = {b'20': unbundle20}
944 944
945 945 b2streamparamsmap = {}
946 946
947 947
948 948 def b2streamparamhandler(name):
949 949 """register a handler for a stream level parameter"""
950 950
951 951 def decorator(func):
952 952 assert name not in formatmap
953 953 b2streamparamsmap[name] = func
954 954 return func
955 955
956 956 return decorator
957 957
958 958
959 959 @b2streamparamhandler(b'compression')
960 960 def processcompression(unbundler, param, value):
961 961 """read compression parameter and install payload decompression"""
962 962 if value not in util.compengines.supportedbundletypes:
963 963 raise error.BundleUnknownFeatureError(params=(param,), values=(value,))
964 964 unbundler._compengine = util.compengines.forbundletype(value)
965 965 if value is not None:
966 966 unbundler._compressed = True
967 967
968 968
969 969 class bundlepart(object):
970 970 """A bundle2 part contains application level payload
971 971
972 972 The part `type` is used to route the part to the application level
973 973 handler.
974 974
975 975 The part payload is contained in ``part.data``. It could be raw bytes or a
976 976 generator of byte chunks.
977 977
978 978 You can add parameters to the part using the ``addparam`` method.
979 979 Parameters can be either mandatory (default) or advisory. Remote side
980 980 should be able to safely ignore the advisory ones.
981 981
982 982 Both data and parameters cannot be modified after the generation has begun.
983 983 """
984 984
985 985 def __init__(
986 986 self,
987 987 parttype,
988 988 mandatoryparams=(),
989 989 advisoryparams=(),
990 990 data=b'',
991 991 mandatory=True,
992 992 ):
993 993 validateparttype(parttype)
994 994 self.id = None
995 995 self.type = parttype
996 996 self._data = data
997 997 self._mandatoryparams = list(mandatoryparams)
998 998 self._advisoryparams = list(advisoryparams)
999 999 # checking for duplicated entries
1000 1000 self._seenparams = set()
1001 1001 for pname, __ in self._mandatoryparams + self._advisoryparams:
1002 1002 if pname in self._seenparams:
1003 1003 raise error.ProgrammingError(b'duplicated params: %s' % pname)
1004 1004 self._seenparams.add(pname)
1005 1005 # status of the part's generation:
1006 1006 # - None: not started,
1007 1007 # - False: currently generated,
1008 1008 # - True: generation done.
1009 1009 self._generated = None
1010 1010 self.mandatory = mandatory
1011 1011
1012 1012 def __repr__(self):
1013 1013 cls = b"%s.%s" % (self.__class__.__module__, self.__class__.__name__)
1014 1014 return b'<%s object at %x; id: %s; type: %s; mandatory: %s>' % (
1015 1015 cls,
1016 1016 id(self),
1017 1017 self.id,
1018 1018 self.type,
1019 1019 self.mandatory,
1020 1020 )
1021 1021
1022 1022 def copy(self):
1023 1023 """return a copy of the part
1024 1024
1025 1025 The new part have the very same content but no partid assigned yet.
1026 1026 Parts with generated data cannot be copied."""
1027 1027 assert not util.safehasattr(self.data, 'next')
1028 1028 return self.__class__(
1029 1029 self.type,
1030 1030 self._mandatoryparams,
1031 1031 self._advisoryparams,
1032 1032 self._data,
1033 1033 self.mandatory,
1034 1034 )
1035 1035
1036 1036 # methods used to defines the part content
1037 1037 @property
1038 1038 def data(self):
1039 1039 return self._data
1040 1040
1041 1041 @data.setter
1042 1042 def data(self, data):
1043 1043 if self._generated is not None:
1044 1044 raise error.ReadOnlyPartError(b'part is being generated')
1045 1045 self._data = data
1046 1046
1047 1047 @property
1048 1048 def mandatoryparams(self):
1049 1049 # make it an immutable tuple to force people through ``addparam``
1050 1050 return tuple(self._mandatoryparams)
1051 1051
1052 1052 @property
1053 1053 def advisoryparams(self):
1054 1054 # make it an immutable tuple to force people through ``addparam``
1055 1055 return tuple(self._advisoryparams)
1056 1056
1057 1057 def addparam(self, name, value=b'', mandatory=True):
1058 1058 """add a parameter to the part
1059 1059
1060 1060 If 'mandatory' is set to True, the remote handler must claim support
1061 1061 for this parameter or the unbundling will be aborted.
1062 1062
1063 1063 The 'name' and 'value' cannot exceed 255 bytes each.
1064 1064 """
1065 1065 if self._generated is not None:
1066 1066 raise error.ReadOnlyPartError(b'part is being generated')
1067 1067 if name in self._seenparams:
1068 1068 raise ValueError(b'duplicated params: %s' % name)
1069 1069 self._seenparams.add(name)
1070 1070 params = self._advisoryparams
1071 1071 if mandatory:
1072 1072 params = self._mandatoryparams
1073 1073 params.append((name, value))
1074 1074
1075 1075 # methods used to generates the bundle2 stream
1076 1076 def getchunks(self, ui):
1077 1077 if self._generated is not None:
1078 1078 raise error.ProgrammingError(b'part can only be consumed once')
1079 1079 self._generated = False
1080 1080
1081 1081 if ui.debugflag:
1082 1082 msg = [b'bundle2-output-part: "%s"' % self.type]
1083 1083 if not self.mandatory:
1084 1084 msg.append(b' (advisory)')
1085 1085 nbmp = len(self.mandatoryparams)
1086 1086 nbap = len(self.advisoryparams)
1087 1087 if nbmp or nbap:
1088 1088 msg.append(b' (params:')
1089 1089 if nbmp:
1090 1090 msg.append(b' %i mandatory' % nbmp)
1091 1091 if nbap:
1092 1092 msg.append(b' %i advisory' % nbmp)
1093 1093 msg.append(b')')
1094 1094 if not self.data:
1095 1095 msg.append(b' empty payload')
1096 1096 elif util.safehasattr(self.data, 'next') or util.safehasattr(
1097 1097 self.data, b'__next__'
1098 1098 ):
1099 1099 msg.append(b' streamed payload')
1100 1100 else:
1101 1101 msg.append(b' %i bytes payload' % len(self.data))
1102 1102 msg.append(b'\n')
1103 1103 ui.debug(b''.join(msg))
1104 1104
1105 1105 #### header
1106 1106 if self.mandatory:
1107 1107 parttype = self.type.upper()
1108 1108 else:
1109 1109 parttype = self.type.lower()
1110 1110 outdebug(ui, b'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1111 1111 ## parttype
1112 1112 header = [
1113 1113 _pack(_fparttypesize, len(parttype)),
1114 1114 parttype,
1115 1115 _pack(_fpartid, self.id),
1116 1116 ]
1117 1117 ## parameters
1118 1118 # count
1119 1119 manpar = self.mandatoryparams
1120 1120 advpar = self.advisoryparams
1121 1121 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1122 1122 # size
1123 1123 parsizes = []
1124 1124 for key, value in manpar:
1125 1125 parsizes.append(len(key))
1126 1126 parsizes.append(len(value))
1127 1127 for key, value in advpar:
1128 1128 parsizes.append(len(key))
1129 1129 parsizes.append(len(value))
1130 1130 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1131 1131 header.append(paramsizes)
1132 1132 # key, value
1133 1133 for key, value in manpar:
1134 1134 header.append(key)
1135 1135 header.append(value)
1136 1136 for key, value in advpar:
1137 1137 header.append(key)
1138 1138 header.append(value)
1139 1139 ## finalize header
1140 1140 try:
1141 1141 headerchunk = b''.join(header)
1142 1142 except TypeError:
1143 1143 raise TypeError(
1144 1144 r'Found a non-bytes trying to '
1145 1145 r'build bundle part header: %r' % header
1146 1146 )
1147 1147 outdebug(ui, b'header chunk size: %i' % len(headerchunk))
1148 1148 yield _pack(_fpartheadersize, len(headerchunk))
1149 1149 yield headerchunk
1150 1150 ## payload
1151 1151 try:
1152 1152 for chunk in self._payloadchunks():
1153 1153 outdebug(ui, b'payload chunk size: %i' % len(chunk))
1154 1154 yield _pack(_fpayloadsize, len(chunk))
1155 1155 yield chunk
1156 1156 except GeneratorExit:
1157 1157 # GeneratorExit means that nobody is listening for our
1158 1158 # results anyway, so just bail quickly rather than trying
1159 1159 # to produce an error part.
1160 1160 ui.debug(b'bundle2-generatorexit\n')
1161 1161 raise
1162 1162 except BaseException as exc:
1163 1163 bexc = stringutil.forcebytestr(exc)
1164 1164 # backup exception data for later
1165 1165 ui.debug(
1166 1166 b'bundle2-input-stream-interrupt: encoding exception %s' % bexc
1167 1167 )
1168 1168 tb = sys.exc_info()[2]
1169 1169 msg = b'unexpected error: %s' % bexc
1170 1170 interpart = bundlepart(
1171 1171 b'error:abort', [(b'message', msg)], mandatory=False
1172 1172 )
1173 1173 interpart.id = 0
1174 1174 yield _pack(_fpayloadsize, -1)
1175 1175 for chunk in interpart.getchunks(ui=ui):
1176 1176 yield chunk
1177 1177 outdebug(ui, b'closing payload chunk')
1178 1178 # abort current part payload
1179 1179 yield _pack(_fpayloadsize, 0)
1180 1180 pycompat.raisewithtb(exc, tb)
1181 1181 # end of payload
1182 1182 outdebug(ui, b'closing payload chunk')
1183 1183 yield _pack(_fpayloadsize, 0)
1184 1184 self._generated = True
1185 1185
1186 1186 def _payloadchunks(self):
1187 1187 """yield chunks of a the part payload
1188 1188
1189 1189 Exists to handle the different methods to provide data to a part."""
1190 1190 # we only support fixed size data now.
1191 1191 # This will be improved in the future.
1192 1192 if util.safehasattr(self.data, 'next') or util.safehasattr(
1193 1193 self.data, b'__next__'
1194 1194 ):
1195 1195 buff = util.chunkbuffer(self.data)
1196 1196 chunk = buff.read(preferedchunksize)
1197 1197 while chunk:
1198 1198 yield chunk
1199 1199 chunk = buff.read(preferedchunksize)
1200 1200 elif len(self.data):
1201 1201 yield self.data
1202 1202
1203 1203
1204 1204 flaginterrupt = -1
1205 1205
1206 1206
1207 1207 class interrupthandler(unpackermixin):
1208 1208 """read one part and process it with restricted capability
1209 1209
1210 1210 This allows to transmit exception raised on the producer size during part
1211 1211 iteration while the consumer is reading a part.
1212 1212
1213 1213 Part processed in this manner only have access to a ui object,"""
1214 1214
1215 1215 def __init__(self, ui, fp):
1216 1216 super(interrupthandler, self).__init__(fp)
1217 1217 self.ui = ui
1218 1218
1219 1219 def _readpartheader(self):
1220 1220 """reads a part header size and return the bytes blob
1221 1221
1222 1222 returns None if empty"""
1223 1223 headersize = self._unpack(_fpartheadersize)[0]
1224 1224 if headersize < 0:
1225 1225 raise error.BundleValueError(
1226 1226 b'negative part header size: %i' % headersize
1227 1227 )
1228 1228 indebug(self.ui, b'part header size: %i\n' % headersize)
1229 1229 if headersize:
1230 1230 return self._readexact(headersize)
1231 1231 return None
1232 1232
1233 1233 def __call__(self):
1234 1234
1235 1235 self.ui.debug(
1236 1236 b'bundle2-input-stream-interrupt: opening out of band context\n'
1237 1237 )
1238 1238 indebug(self.ui, b'bundle2 stream interruption, looking for a part.')
1239 1239 headerblock = self._readpartheader()
1240 1240 if headerblock is None:
1241 1241 indebug(self.ui, b'no part found during interruption.')
1242 1242 return
1243 1243 part = unbundlepart(self.ui, headerblock, self._fp)
1244 1244 op = interruptoperation(self.ui)
1245 1245 hardabort = False
1246 1246 try:
1247 1247 _processpart(op, part)
1248 1248 except (SystemExit, KeyboardInterrupt):
1249 1249 hardabort = True
1250 1250 raise
1251 1251 finally:
1252 1252 if not hardabort:
1253 1253 part.consume()
1254 1254 self.ui.debug(
1255 1255 b'bundle2-input-stream-interrupt: closing out of band context\n'
1256 1256 )
1257 1257
1258 1258
1259 1259 class interruptoperation(object):
1260 1260 """A limited operation to be use by part handler during interruption
1261 1261
1262 1262 It only have access to an ui object.
1263 1263 """
1264 1264
1265 1265 def __init__(self, ui):
1266 1266 self.ui = ui
1267 1267 self.reply = None
1268 1268 self.captureoutput = False
1269 1269
1270 1270 @property
1271 1271 def repo(self):
1272 1272 raise error.ProgrammingError(b'no repo access from stream interruption')
1273 1273
1274 1274 def gettransaction(self):
1275 1275 raise TransactionUnavailable(b'no repo access from stream interruption')
1276 1276
1277 1277
1278 1278 def decodepayloadchunks(ui, fh):
1279 1279 """Reads bundle2 part payload data into chunks.
1280 1280
1281 1281 Part payload data consists of framed chunks. This function takes
1282 1282 a file handle and emits those chunks.
1283 1283 """
1284 1284 dolog = ui.configbool(b'devel', b'bundle2.debug')
1285 1285 debug = ui.debug
1286 1286
1287 1287 headerstruct = struct.Struct(_fpayloadsize)
1288 1288 headersize = headerstruct.size
1289 1289 unpack = headerstruct.unpack
1290 1290
1291 1291 readexactly = changegroup.readexactly
1292 1292 read = fh.read
1293 1293
1294 1294 chunksize = unpack(readexactly(fh, headersize))[0]
1295 1295 indebug(ui, b'payload chunk size: %i' % chunksize)
1296 1296
1297 1297 # changegroup.readexactly() is inlined below for performance.
1298 1298 while chunksize:
1299 1299 if chunksize >= 0:
1300 1300 s = read(chunksize)
1301 1301 if len(s) < chunksize:
1302 1302 raise error.Abort(
1303 1303 _(
1304 1304 b'stream ended unexpectedly '
1305 1305 b' (got %d bytes, expected %d)'
1306 1306 )
1307 1307 % (len(s), chunksize)
1308 1308 )
1309 1309
1310 1310 yield s
1311 1311 elif chunksize == flaginterrupt:
1312 1312 # Interrupt "signal" detected. The regular stream is interrupted
1313 1313 # and a bundle2 part follows. Consume it.
1314 1314 interrupthandler(ui, fh)()
1315 1315 else:
1316 1316 raise error.BundleValueError(
1317 1317 b'negative payload chunk size: %s' % chunksize
1318 1318 )
1319 1319
1320 1320 s = read(headersize)
1321 1321 if len(s) < headersize:
1322 1322 raise error.Abort(
1323 1323 _(b'stream ended unexpectedly (got %d bytes, expected %d)')
1324 1324 % (len(s), chunksize)
1325 1325 )
1326 1326
1327 1327 chunksize = unpack(s)[0]
1328 1328
1329 1329 # indebug() inlined for performance.
1330 1330 if dolog:
1331 1331 debug(b'bundle2-input: payload chunk size: %i\n' % chunksize)
1332 1332
1333 1333
1334 1334 class unbundlepart(unpackermixin):
1335 1335 """a bundle part read from a bundle"""
1336 1336
1337 1337 def __init__(self, ui, header, fp):
1338 1338 super(unbundlepart, self).__init__(fp)
1339 1339 self._seekable = util.safehasattr(fp, 'seek') and util.safehasattr(
1340 1340 fp, b'tell'
1341 1341 )
1342 1342 self.ui = ui
1343 1343 # unbundle state attr
1344 1344 self._headerdata = header
1345 1345 self._headeroffset = 0
1346 1346 self._initialized = False
1347 1347 self.consumed = False
1348 1348 # part data
1349 1349 self.id = None
1350 1350 self.type = None
1351 1351 self.mandatoryparams = None
1352 1352 self.advisoryparams = None
1353 1353 self.params = None
1354 1354 self.mandatorykeys = ()
1355 1355 self._readheader()
1356 1356 self._mandatory = None
1357 1357 self._pos = 0
1358 1358
1359 1359 def _fromheader(self, size):
1360 1360 """return the next <size> byte from the header"""
1361 1361 offset = self._headeroffset
1362 1362 data = self._headerdata[offset : (offset + size)]
1363 1363 self._headeroffset = offset + size
1364 1364 return data
1365 1365
1366 1366 def _unpackheader(self, format):
1367 1367 """read given format from header
1368 1368
1369 1369 This automatically compute the size of the format to read."""
1370 1370 data = self._fromheader(struct.calcsize(format))
1371 1371 return _unpack(format, data)
1372 1372
1373 1373 def _initparams(self, mandatoryparams, advisoryparams):
1374 1374 """internal function to setup all logic related parameters"""
1375 1375 # make it read only to prevent people touching it by mistake.
1376 1376 self.mandatoryparams = tuple(mandatoryparams)
1377 1377 self.advisoryparams = tuple(advisoryparams)
1378 1378 # user friendly UI
1379 1379 self.params = util.sortdict(self.mandatoryparams)
1380 1380 self.params.update(self.advisoryparams)
1381 1381 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1382 1382
1383 1383 def _readheader(self):
1384 1384 """read the header and setup the object"""
1385 1385 typesize = self._unpackheader(_fparttypesize)[0]
1386 1386 self.type = self._fromheader(typesize)
1387 1387 indebug(self.ui, b'part type: "%s"' % self.type)
1388 1388 self.id = self._unpackheader(_fpartid)[0]
1389 1389 indebug(self.ui, b'part id: "%s"' % pycompat.bytestr(self.id))
1390 1390 # extract mandatory bit from type
1391 1391 self.mandatory = self.type != self.type.lower()
1392 1392 self.type = self.type.lower()
1393 1393 ## reading parameters
1394 1394 # param count
1395 1395 mancount, advcount = self._unpackheader(_fpartparamcount)
1396 1396 indebug(self.ui, b'part parameters: %i' % (mancount + advcount))
1397 1397 # param size
1398 1398 fparamsizes = _makefpartparamsizes(mancount + advcount)
1399 1399 paramsizes = self._unpackheader(fparamsizes)
1400 1400 # make it a list of couple again
1401 1401 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1402 1402 # split mandatory from advisory
1403 1403 mansizes = paramsizes[:mancount]
1404 1404 advsizes = paramsizes[mancount:]
1405 1405 # retrieve param value
1406 1406 manparams = []
1407 1407 for key, value in mansizes:
1408 1408 manparams.append((self._fromheader(key), self._fromheader(value)))
1409 1409 advparams = []
1410 1410 for key, value in advsizes:
1411 1411 advparams.append((self._fromheader(key), self._fromheader(value)))
1412 1412 self._initparams(manparams, advparams)
1413 1413 ## part payload
1414 1414 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1415 1415 # we read the data, tell it
1416 1416 self._initialized = True
1417 1417
1418 1418 def _payloadchunks(self):
1419 1419 """Generator of decoded chunks in the payload."""
1420 1420 return decodepayloadchunks(self.ui, self._fp)
1421 1421
1422 1422 def consume(self):
1423 1423 """Read the part payload until completion.
1424 1424
1425 1425 By consuming the part data, the underlying stream read offset will
1426 1426 be advanced to the next part (or end of stream).
1427 1427 """
1428 1428 if self.consumed:
1429 1429 return
1430 1430
1431 1431 chunk = self.read(32768)
1432 1432 while chunk:
1433 1433 self._pos += len(chunk)
1434 1434 chunk = self.read(32768)
1435 1435
1436 1436 def read(self, size=None):
1437 1437 """read payload data"""
1438 1438 if not self._initialized:
1439 1439 self._readheader()
1440 1440 if size is None:
1441 1441 data = self._payloadstream.read()
1442 1442 else:
1443 1443 data = self._payloadstream.read(size)
1444 1444 self._pos += len(data)
1445 1445 if size is None or len(data) < size:
1446 1446 if not self.consumed and self._pos:
1447 1447 self.ui.debug(
1448 1448 b'bundle2-input-part: total payload size %i\n' % self._pos
1449 1449 )
1450 1450 self.consumed = True
1451 1451 return data
1452 1452
1453 1453
1454 1454 class seekableunbundlepart(unbundlepart):
1455 1455 """A bundle2 part in a bundle that is seekable.
1456 1456
1457 1457 Regular ``unbundlepart`` instances can only be read once. This class
1458 1458 extends ``unbundlepart`` to enable bi-directional seeking within the
1459 1459 part.
1460 1460
1461 1461 Bundle2 part data consists of framed chunks. Offsets when seeking
1462 1462 refer to the decoded data, not the offsets in the underlying bundle2
1463 1463 stream.
1464 1464
1465 1465 To facilitate quickly seeking within the decoded data, instances of this
1466 1466 class maintain a mapping between offsets in the underlying stream and
1467 1467 the decoded payload. This mapping will consume memory in proportion
1468 1468 to the number of chunks within the payload (which almost certainly
1469 1469 increases in proportion with the size of the part).
1470 1470 """
1471 1471
1472 1472 def __init__(self, ui, header, fp):
1473 1473 # (payload, file) offsets for chunk starts.
1474 1474 self._chunkindex = []
1475 1475
1476 1476 super(seekableunbundlepart, self).__init__(ui, header, fp)
1477 1477
1478 1478 def _payloadchunks(self, chunknum=0):
1479 1479 '''seek to specified chunk and start yielding data'''
1480 1480 if len(self._chunkindex) == 0:
1481 1481 assert chunknum == 0, b'Must start with chunk 0'
1482 1482 self._chunkindex.append((0, self._tellfp()))
1483 1483 else:
1484 1484 assert chunknum < len(self._chunkindex), (
1485 1485 b'Unknown chunk %d' % chunknum
1486 1486 )
1487 1487 self._seekfp(self._chunkindex[chunknum][1])
1488 1488
1489 1489 pos = self._chunkindex[chunknum][0]
1490 1490
1491 1491 for chunk in decodepayloadchunks(self.ui, self._fp):
1492 1492 chunknum += 1
1493 1493 pos += len(chunk)
1494 1494 if chunknum == len(self._chunkindex):
1495 1495 self._chunkindex.append((pos, self._tellfp()))
1496 1496
1497 1497 yield chunk
1498 1498
1499 1499 def _findchunk(self, pos):
1500 1500 '''for a given payload position, return a chunk number and offset'''
1501 1501 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1502 1502 if ppos == pos:
1503 1503 return chunk, 0
1504 1504 elif ppos > pos:
1505 1505 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1506 1506 raise ValueError(b'Unknown chunk')
1507 1507
1508 1508 def tell(self):
1509 1509 return self._pos
1510 1510
1511 1511 def seek(self, offset, whence=os.SEEK_SET):
1512 1512 if whence == os.SEEK_SET:
1513 1513 newpos = offset
1514 1514 elif whence == os.SEEK_CUR:
1515 1515 newpos = self._pos + offset
1516 1516 elif whence == os.SEEK_END:
1517 1517 if not self.consumed:
1518 1518 # Can't use self.consume() here because it advances self._pos.
1519 1519 chunk = self.read(32768)
1520 1520 while chunk:
1521 1521 chunk = self.read(32768)
1522 1522 newpos = self._chunkindex[-1][0] - offset
1523 1523 else:
1524 1524 raise ValueError(b'Unknown whence value: %r' % (whence,))
1525 1525
1526 1526 if newpos > self._chunkindex[-1][0] and not self.consumed:
1527 1527 # Can't use self.consume() here because it advances self._pos.
1528 1528 chunk = self.read(32768)
1529 1529 while chunk:
1530 1530 chunk = self.read(32668)
1531 1531
1532 1532 if not 0 <= newpos <= self._chunkindex[-1][0]:
1533 1533 raise ValueError(b'Offset out of range')
1534 1534
1535 1535 if self._pos != newpos:
1536 1536 chunk, internaloffset = self._findchunk(newpos)
1537 1537 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1538 1538 adjust = self.read(internaloffset)
1539 1539 if len(adjust) != internaloffset:
1540 1540 raise error.Abort(_(b'Seek failed\n'))
1541 1541 self._pos = newpos
1542 1542
1543 1543 def _seekfp(self, offset, whence=0):
1544 1544 """move the underlying file pointer
1545 1545
1546 1546 This method is meant for internal usage by the bundle2 protocol only.
1547 1547 They directly manipulate the low level stream including bundle2 level
1548 1548 instruction.
1549 1549
1550 1550 Do not use it to implement higher-level logic or methods."""
1551 1551 if self._seekable:
1552 1552 return self._fp.seek(offset, whence)
1553 1553 else:
1554 1554 raise NotImplementedError(_(b'File pointer is not seekable'))
1555 1555
1556 1556 def _tellfp(self):
1557 1557 """return the file offset, or None if file is not seekable
1558 1558
1559 1559 This method is meant for internal usage by the bundle2 protocol only.
1560 1560 They directly manipulate the low level stream including bundle2 level
1561 1561 instruction.
1562 1562
1563 1563 Do not use it to implement higher-level logic or methods."""
1564 1564 if self._seekable:
1565 1565 try:
1566 1566 return self._fp.tell()
1567 1567 except IOError as e:
1568 1568 if e.errno == errno.ESPIPE:
1569 1569 self._seekable = False
1570 1570 else:
1571 1571 raise
1572 1572 return None
1573 1573
1574 1574
1575 1575 # These are only the static capabilities.
1576 1576 # Check the 'getrepocaps' function for the rest.
1577 1577 capabilities = {
1578 1578 b'HG20': (),
1579 1579 b'bookmarks': (),
1580 1580 b'error': (b'abort', b'unsupportedcontent', b'pushraced', b'pushkey'),
1581 1581 b'listkeys': (),
1582 1582 b'pushkey': (),
1583 1583 b'digests': tuple(sorted(util.DIGESTS.keys())),
1584 1584 b'remote-changegroup': (b'http', b'https'),
1585 1585 b'hgtagsfnodes': (),
1586 1586 b'rev-branch-cache': (),
1587 1587 b'phases': (b'heads',),
1588 1588 b'stream': (b'v2',),
1589 1589 }
1590 1590
1591 1591
1592 1592 def getrepocaps(repo, allowpushback=False, role=None):
1593 1593 """return the bundle2 capabilities for a given repo
1594 1594
1595 1595 Exists to allow extensions (like evolution) to mutate the capabilities.
1596 1596
1597 1597 The returned value is used for servers advertising their capabilities as
1598 1598 well as clients advertising their capabilities to servers as part of
1599 1599 bundle2 requests. The ``role`` argument specifies which is which.
1600 1600 """
1601 1601 if role not in (b'client', b'server'):
1602 1602 raise error.ProgrammingError(b'role argument must be client or server')
1603 1603
1604 1604 caps = capabilities.copy()
1605 1605 caps[b'changegroup'] = tuple(
1606 1606 sorted(changegroup.supportedincomingversions(repo))
1607 1607 )
1608 1608 if obsolete.isenabled(repo, obsolete.exchangeopt):
1609 1609 supportedformat = tuple(b'V%i' % v for v in obsolete.formats)
1610 1610 caps[b'obsmarkers'] = supportedformat
1611 1611 if allowpushback:
1612 1612 caps[b'pushback'] = ()
1613 1613 cpmode = repo.ui.config(b'server', b'concurrent-push-mode')
1614 1614 if cpmode == b'check-related':
1615 1615 caps[b'checkheads'] = (b'related',)
1616 1616 if b'phases' in repo.ui.configlist(b'devel', b'legacy.exchange'):
1617 1617 caps.pop(b'phases')
1618 1618
1619 1619 # Don't advertise stream clone support in server mode if not configured.
1620 1620 if role == b'server':
1621 1621 streamsupported = repo.ui.configbool(
1622 1622 b'server', b'uncompressed', untrusted=True
1623 1623 )
1624 1624 featuresupported = repo.ui.configbool(b'server', b'bundle2.stream')
1625 1625
1626 1626 if not streamsupported or not featuresupported:
1627 1627 caps.pop(b'stream')
1628 1628 # Else always advertise support on client, because payload support
1629 1629 # should always be advertised.
1630 1630
1631 1631 return caps
1632 1632
1633 1633
1634 1634 def bundle2caps(remote):
1635 1635 """return the bundle capabilities of a peer as dict"""
1636 1636 raw = remote.capable(b'bundle2')
1637 1637 if not raw and raw != b'':
1638 1638 return {}
1639 1639 capsblob = urlreq.unquote(remote.capable(b'bundle2'))
1640 1640 return decodecaps(capsblob)
1641 1641
1642 1642
1643 1643 def obsmarkersversion(caps):
1644 1644 """extract the list of supported obsmarkers versions from a bundle2caps dict
1645 1645 """
1646 1646 obscaps = caps.get(b'obsmarkers', ())
1647 1647 return [int(c[1:]) for c in obscaps if c.startswith(b'V')]
1648 1648
1649 1649
1650 1650 def writenewbundle(
1651 1651 ui,
1652 1652 repo,
1653 1653 source,
1654 1654 filename,
1655 1655 bundletype,
1656 1656 outgoing,
1657 1657 opts,
1658 1658 vfs=None,
1659 1659 compression=None,
1660 1660 compopts=None,
1661 1661 ):
1662 1662 if bundletype.startswith(b'HG10'):
1663 1663 cg = changegroup.makechangegroup(repo, outgoing, b'01', source)
1664 1664 return writebundle(
1665 1665 ui,
1666 1666 cg,
1667 1667 filename,
1668 1668 bundletype,
1669 1669 vfs=vfs,
1670 1670 compression=compression,
1671 1671 compopts=compopts,
1672 1672 )
1673 1673 elif not bundletype.startswith(b'HG20'):
1674 1674 raise error.ProgrammingError(b'unknown bundle type: %s' % bundletype)
1675 1675
1676 1676 caps = {}
1677 1677 if b'obsolescence' in opts:
1678 1678 caps[b'obsmarkers'] = (b'V1',)
1679 1679 bundle = bundle20(ui, caps)
1680 1680 bundle.setcompression(compression, compopts)
1681 1681 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1682 1682 chunkiter = bundle.getchunks()
1683 1683
1684 1684 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1685 1685
1686 1686
1687 1687 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1688 1688 # We should eventually reconcile this logic with the one behind
1689 1689 # 'exchange.getbundle2partsgenerator'.
1690 1690 #
1691 1691 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1692 1692 # different right now. So we keep them separated for now for the sake of
1693 1693 # simplicity.
1694 1694
1695 1695 # we might not always want a changegroup in such bundle, for example in
1696 1696 # stream bundles
1697 1697 if opts.get(b'changegroup', True):
1698 1698 cgversion = opts.get(b'cg.version')
1699 1699 if cgversion is None:
1700 1700 cgversion = changegroup.safeversion(repo)
1701 1701 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1702 1702 part = bundler.newpart(b'changegroup', data=cg.getchunks())
1703 1703 part.addparam(b'version', cg.version)
1704 1704 if b'clcount' in cg.extras:
1705 1705 part.addparam(
1706 1706 b'nbchanges', b'%d' % cg.extras[b'clcount'], mandatory=False
1707 1707 )
1708 1708 if opts.get(b'phases') and repo.revs(
1709 1709 b'%ln and secret()', outgoing.missingheads
1710 1710 ):
1711 1711 part.addparam(
1712 1712 b'targetphase', b'%d' % phases.secret, mandatory=False
1713 1713 )
1714 if b'exp-sidedata-flag' in repo.requirements:
1715 part.addparam(b'exp-sidedata', b'1')
1714 1716
1715 1717 if opts.get(b'streamv2', False):
1716 1718 addpartbundlestream2(bundler, repo, stream=True)
1717 1719
1718 1720 if opts.get(b'tagsfnodescache', True):
1719 1721 addparttagsfnodescache(repo, bundler, outgoing)
1720 1722
1721 1723 if opts.get(b'revbranchcache', True):
1722 1724 addpartrevbranchcache(repo, bundler, outgoing)
1723 1725
1724 1726 if opts.get(b'obsolescence', False):
1725 1727 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1726 1728 buildobsmarkerspart(bundler, obsmarkers)
1727 1729
1728 1730 if opts.get(b'phases', False):
1729 1731 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1730 1732 phasedata = phases.binaryencode(headsbyphase)
1731 1733 bundler.newpart(b'phase-heads', data=phasedata)
1732 1734
1733 1735
1734 1736 def addparttagsfnodescache(repo, bundler, outgoing):
1735 1737 # we include the tags fnode cache for the bundle changeset
1736 1738 # (as an optional parts)
1737 1739 cache = tags.hgtagsfnodescache(repo.unfiltered())
1738 1740 chunks = []
1739 1741
1740 1742 # .hgtags fnodes are only relevant for head changesets. While we could
1741 1743 # transfer values for all known nodes, there will likely be little to
1742 1744 # no benefit.
1743 1745 #
1744 1746 # We don't bother using a generator to produce output data because
1745 1747 # a) we only have 40 bytes per head and even esoteric numbers of heads
1746 1748 # consume little memory (1M heads is 40MB) b) we don't want to send the
1747 1749 # part if we don't have entries and knowing if we have entries requires
1748 1750 # cache lookups.
1749 1751 for node in outgoing.missingheads:
1750 1752 # Don't compute missing, as this may slow down serving.
1751 1753 fnode = cache.getfnode(node, computemissing=False)
1752 1754 if fnode is not None:
1753 1755 chunks.extend([node, fnode])
1754 1756
1755 1757 if chunks:
1756 1758 bundler.newpart(b'hgtagsfnodes', data=b''.join(chunks))
1757 1759
1758 1760
1759 1761 def addpartrevbranchcache(repo, bundler, outgoing):
1760 1762 # we include the rev branch cache for the bundle changeset
1761 1763 # (as an optional parts)
1762 1764 cache = repo.revbranchcache()
1763 1765 cl = repo.unfiltered().changelog
1764 1766 branchesdata = collections.defaultdict(lambda: (set(), set()))
1765 1767 for node in outgoing.missing:
1766 1768 branch, close = cache.branchinfo(cl.rev(node))
1767 1769 branchesdata[branch][close].add(node)
1768 1770
1769 1771 def generate():
1770 1772 for branch, (nodes, closed) in sorted(branchesdata.items()):
1771 1773 utf8branch = encoding.fromlocal(branch)
1772 1774 yield rbcstruct.pack(len(utf8branch), len(nodes), len(closed))
1773 1775 yield utf8branch
1774 1776 for n in sorted(nodes):
1775 1777 yield n
1776 1778 for n in sorted(closed):
1777 1779 yield n
1778 1780
1779 1781 bundler.newpart(b'cache:rev-branch-cache', data=generate(), mandatory=False)
1780 1782
1781 1783
1782 1784 def _formatrequirementsspec(requirements):
1783 1785 requirements = [req for req in requirements if req != b"shared"]
1784 1786 return urlreq.quote(b','.join(sorted(requirements)))
1785 1787
1786 1788
1787 1789 def _formatrequirementsparams(requirements):
1788 1790 requirements = _formatrequirementsspec(requirements)
1789 1791 params = b"%s%s" % (urlreq.quote(b"requirements="), requirements)
1790 1792 return params
1791 1793
1792 1794
1793 1795 def addpartbundlestream2(bundler, repo, **kwargs):
1794 1796 if not kwargs.get(r'stream', False):
1795 1797 return
1796 1798
1797 1799 if not streamclone.allowservergeneration(repo):
1798 1800 raise error.Abort(
1799 1801 _(
1800 1802 b'stream data requested but server does not allow '
1801 1803 b'this feature'
1802 1804 ),
1803 1805 hint=_(
1804 1806 b'well-behaved clients should not be '
1805 1807 b'requesting stream data from servers not '
1806 1808 b'advertising it; the client may be buggy'
1807 1809 ),
1808 1810 )
1809 1811
1810 1812 # Stream clones don't compress well. And compression undermines a
1811 1813 # goal of stream clones, which is to be fast. Communicate the desire
1812 1814 # to avoid compression to consumers of the bundle.
1813 1815 bundler.prefercompressed = False
1814 1816
1815 1817 # get the includes and excludes
1816 1818 includepats = kwargs.get(r'includepats')
1817 1819 excludepats = kwargs.get(r'excludepats')
1818 1820
1819 1821 narrowstream = repo.ui.configbool(
1820 1822 b'experimental', b'server.stream-narrow-clones'
1821 1823 )
1822 1824
1823 1825 if (includepats or excludepats) and not narrowstream:
1824 1826 raise error.Abort(_(b'server does not support narrow stream clones'))
1825 1827
1826 1828 includeobsmarkers = False
1827 1829 if repo.obsstore:
1828 1830 remoteversions = obsmarkersversion(bundler.capabilities)
1829 1831 if not remoteversions:
1830 1832 raise error.Abort(
1831 1833 _(
1832 1834 b'server has obsolescence markers, but client '
1833 1835 b'cannot receive them via stream clone'
1834 1836 )
1835 1837 )
1836 1838 elif repo.obsstore._version in remoteversions:
1837 1839 includeobsmarkers = True
1838 1840
1839 1841 filecount, bytecount, it = streamclone.generatev2(
1840 1842 repo, includepats, excludepats, includeobsmarkers
1841 1843 )
1842 1844 requirements = _formatrequirementsspec(repo.requirements)
1843 1845 part = bundler.newpart(b'stream2', data=it)
1844 1846 part.addparam(b'bytecount', b'%d' % bytecount, mandatory=True)
1845 1847 part.addparam(b'filecount', b'%d' % filecount, mandatory=True)
1846 1848 part.addparam(b'requirements', requirements, mandatory=True)
1847 1849
1848 1850
1849 1851 def buildobsmarkerspart(bundler, markers):
1850 1852 """add an obsmarker part to the bundler with <markers>
1851 1853
1852 1854 No part is created if markers is empty.
1853 1855 Raises ValueError if the bundler doesn't support any known obsmarker format.
1854 1856 """
1855 1857 if not markers:
1856 1858 return None
1857 1859
1858 1860 remoteversions = obsmarkersversion(bundler.capabilities)
1859 1861 version = obsolete.commonversion(remoteversions)
1860 1862 if version is None:
1861 1863 raise ValueError(b'bundler does not support common obsmarker format')
1862 1864 stream = obsolete.encodemarkers(markers, True, version=version)
1863 1865 return bundler.newpart(b'obsmarkers', data=stream)
1864 1866
1865 1867
1866 1868 def writebundle(
1867 1869 ui, cg, filename, bundletype, vfs=None, compression=None, compopts=None
1868 1870 ):
1869 1871 """Write a bundle file and return its filename.
1870 1872
1871 1873 Existing files will not be overwritten.
1872 1874 If no filename is specified, a temporary file is created.
1873 1875 bz2 compression can be turned off.
1874 1876 The bundle file will be deleted in case of errors.
1875 1877 """
1876 1878
1877 1879 if bundletype == b"HG20":
1878 1880 bundle = bundle20(ui)
1879 1881 bundle.setcompression(compression, compopts)
1880 1882 part = bundle.newpart(b'changegroup', data=cg.getchunks())
1881 1883 part.addparam(b'version', cg.version)
1882 1884 if b'clcount' in cg.extras:
1883 1885 part.addparam(
1884 1886 b'nbchanges', b'%d' % cg.extras[b'clcount'], mandatory=False
1885 1887 )
1886 1888 chunkiter = bundle.getchunks()
1887 1889 else:
1888 1890 # compression argument is only for the bundle2 case
1889 1891 assert compression is None
1890 1892 if cg.version != b'01':
1891 1893 raise error.Abort(
1892 1894 _(b'old bundle types only supports v1 changegroups')
1893 1895 )
1894 1896 header, comp = bundletypes[bundletype]
1895 1897 if comp not in util.compengines.supportedbundletypes:
1896 1898 raise error.Abort(_(b'unknown stream compression type: %s') % comp)
1897 1899 compengine = util.compengines.forbundletype(comp)
1898 1900
1899 1901 def chunkiter():
1900 1902 yield header
1901 1903 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1902 1904 yield chunk
1903 1905
1904 1906 chunkiter = chunkiter()
1905 1907
1906 1908 # parse the changegroup data, otherwise we will block
1907 1909 # in case of sshrepo because we don't know the end of the stream
1908 1910 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1909 1911
1910 1912
1911 1913 def combinechangegroupresults(op):
1912 1914 """logic to combine 0 or more addchangegroup results into one"""
1913 1915 results = [r.get(b'return', 0) for r in op.records[b'changegroup']]
1914 1916 changedheads = 0
1915 1917 result = 1
1916 1918 for ret in results:
1917 1919 # If any changegroup result is 0, return 0
1918 1920 if ret == 0:
1919 1921 result = 0
1920 1922 break
1921 1923 if ret < -1:
1922 1924 changedheads += ret + 1
1923 1925 elif ret > 1:
1924 1926 changedheads += ret - 1
1925 1927 if changedheads > 0:
1926 1928 result = 1 + changedheads
1927 1929 elif changedheads < 0:
1928 1930 result = -1 + changedheads
1929 1931 return result
1930 1932
1931 1933
1932 1934 @parthandler(
1933 b'changegroup', (b'version', b'nbchanges', b'treemanifest', b'targetphase')
1935 b'changegroup',
1936 (
1937 b'version',
1938 b'nbchanges',
1939 b'exp-sidedata',
1940 b'treemanifest',
1941 b'targetphase',
1942 ),
1934 1943 )
1935 1944 def handlechangegroup(op, inpart):
1936 1945 """apply a changegroup part on the repo
1937 1946
1938 1947 This is a very early implementation that will massive rework before being
1939 1948 inflicted to any end-user.
1940 1949 """
1941 1950 from . import localrepo
1942 1951
1943 1952 tr = op.gettransaction()
1944 1953 unpackerversion = inpart.params.get(b'version', b'01')
1945 1954 # We should raise an appropriate exception here
1946 1955 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1947 1956 # the source and url passed here are overwritten by the one contained in
1948 1957 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1949 1958 nbchangesets = None
1950 1959 if b'nbchanges' in inpart.params:
1951 1960 nbchangesets = int(inpart.params.get(b'nbchanges'))
1952 1961 if (
1953 1962 b'treemanifest' in inpart.params
1954 1963 and b'treemanifest' not in op.repo.requirements
1955 1964 ):
1956 1965 if len(op.repo.changelog) != 0:
1957 1966 raise error.Abort(
1958 1967 _(
1959 1968 b"bundle contains tree manifests, but local repo is "
1960 1969 b"non-empty and does not use tree manifests"
1961 1970 )
1962 1971 )
1963 1972 op.repo.requirements.add(b'treemanifest')
1964 1973 op.repo.svfs.options = localrepo.resolvestorevfsoptions(
1965 1974 op.repo.ui, op.repo.requirements, op.repo.features
1966 1975 )
1967 1976 op.repo._writerequirements()
1977
1978 bundlesidedata = bool(b'exp-sidedata' in inpart.params)
1979 reposidedata = bool(b'exp-sidedata-flag' in op.repo.requirements)
1980 if reposidedata and not bundlesidedata:
1981 msg = b"repository is using sidedata but the bundle source do not"
1982 hint = b'this is currently unsupported'
1983 raise error.Abort(msg, hint=hint)
1984
1968 1985 extrakwargs = {}
1969 1986 targetphase = inpart.params.get(b'targetphase')
1970 1987 if targetphase is not None:
1971 1988 extrakwargs[r'targetphase'] = int(targetphase)
1972 1989 ret = _processchangegroup(
1973 1990 op,
1974 1991 cg,
1975 1992 tr,
1976 1993 b'bundle2',
1977 1994 b'bundle2',
1978 1995 expectedtotal=nbchangesets,
1979 1996 **extrakwargs
1980 1997 )
1981 1998 if op.reply is not None:
1982 1999 # This is definitely not the final form of this
1983 2000 # return. But one need to start somewhere.
1984 2001 part = op.reply.newpart(b'reply:changegroup', mandatory=False)
1985 2002 part.addparam(
1986 2003 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
1987 2004 )
1988 2005 part.addparam(b'return', b'%i' % ret, mandatory=False)
1989 2006 assert not inpart.read()
1990 2007
1991 2008
1992 2009 _remotechangegroupparams = tuple(
1993 2010 [b'url', b'size', b'digests']
1994 2011 + [b'digest:%s' % k for k in util.DIGESTS.keys()]
1995 2012 )
1996 2013
1997 2014
1998 2015 @parthandler(b'remote-changegroup', _remotechangegroupparams)
1999 2016 def handleremotechangegroup(op, inpart):
2000 2017 """apply a bundle10 on the repo, given an url and validation information
2001 2018
2002 2019 All the information about the remote bundle to import are given as
2003 2020 parameters. The parameters include:
2004 2021 - url: the url to the bundle10.
2005 2022 - size: the bundle10 file size. It is used to validate what was
2006 2023 retrieved by the client matches the server knowledge about the bundle.
2007 2024 - digests: a space separated list of the digest types provided as
2008 2025 parameters.
2009 2026 - digest:<digest-type>: the hexadecimal representation of the digest with
2010 2027 that name. Like the size, it is used to validate what was retrieved by
2011 2028 the client matches what the server knows about the bundle.
2012 2029
2013 2030 When multiple digest types are given, all of them are checked.
2014 2031 """
2015 2032 try:
2016 2033 raw_url = inpart.params[b'url']
2017 2034 except KeyError:
2018 2035 raise error.Abort(_(b'remote-changegroup: missing "%s" param') % b'url')
2019 2036 parsed_url = util.url(raw_url)
2020 2037 if parsed_url.scheme not in capabilities[b'remote-changegroup']:
2021 2038 raise error.Abort(
2022 2039 _(b'remote-changegroup does not support %s urls')
2023 2040 % parsed_url.scheme
2024 2041 )
2025 2042
2026 2043 try:
2027 2044 size = int(inpart.params[b'size'])
2028 2045 except ValueError:
2029 2046 raise error.Abort(
2030 2047 _(b'remote-changegroup: invalid value for param "%s"') % b'size'
2031 2048 )
2032 2049 except KeyError:
2033 2050 raise error.Abort(
2034 2051 _(b'remote-changegroup: missing "%s" param') % b'size'
2035 2052 )
2036 2053
2037 2054 digests = {}
2038 2055 for typ in inpart.params.get(b'digests', b'').split():
2039 2056 param = b'digest:%s' % typ
2040 2057 try:
2041 2058 value = inpart.params[param]
2042 2059 except KeyError:
2043 2060 raise error.Abort(
2044 2061 _(b'remote-changegroup: missing "%s" param') % param
2045 2062 )
2046 2063 digests[typ] = value
2047 2064
2048 2065 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
2049 2066
2050 2067 tr = op.gettransaction()
2051 2068 from . import exchange
2052 2069
2053 2070 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
2054 2071 if not isinstance(cg, changegroup.cg1unpacker):
2055 2072 raise error.Abort(
2056 2073 _(b'%s: not a bundle version 1.0') % util.hidepassword(raw_url)
2057 2074 )
2058 2075 ret = _processchangegroup(op, cg, tr, b'bundle2', b'bundle2')
2059 2076 if op.reply is not None:
2060 2077 # This is definitely not the final form of this
2061 2078 # return. But one need to start somewhere.
2062 2079 part = op.reply.newpart(b'reply:changegroup')
2063 2080 part.addparam(
2064 2081 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2065 2082 )
2066 2083 part.addparam(b'return', b'%i' % ret, mandatory=False)
2067 2084 try:
2068 2085 real_part.validate()
2069 2086 except error.Abort as e:
2070 2087 raise error.Abort(
2071 2088 _(b'bundle at %s is corrupted:\n%s')
2072 2089 % (util.hidepassword(raw_url), bytes(e))
2073 2090 )
2074 2091 assert not inpart.read()
2075 2092
2076 2093
2077 2094 @parthandler(b'reply:changegroup', (b'return', b'in-reply-to'))
2078 2095 def handlereplychangegroup(op, inpart):
2079 2096 ret = int(inpart.params[b'return'])
2080 2097 replyto = int(inpart.params[b'in-reply-to'])
2081 2098 op.records.add(b'changegroup', {b'return': ret}, replyto)
2082 2099
2083 2100
2084 2101 @parthandler(b'check:bookmarks')
2085 2102 def handlecheckbookmarks(op, inpart):
2086 2103 """check location of bookmarks
2087 2104
2088 2105 This part is to be used to detect push race regarding bookmark, it
2089 2106 contains binary encoded (bookmark, node) tuple. If the local state does
2090 2107 not marks the one in the part, a PushRaced exception is raised
2091 2108 """
2092 2109 bookdata = bookmarks.binarydecode(inpart)
2093 2110
2094 2111 msgstandard = (
2095 2112 b'remote repository changed while pushing - please try again '
2096 2113 b'(bookmark "%s" move from %s to %s)'
2097 2114 )
2098 2115 msgmissing = (
2099 2116 b'remote repository changed while pushing - please try again '
2100 2117 b'(bookmark "%s" is missing, expected %s)'
2101 2118 )
2102 2119 msgexist = (
2103 2120 b'remote repository changed while pushing - please try again '
2104 2121 b'(bookmark "%s" set on %s, expected missing)'
2105 2122 )
2106 2123 for book, node in bookdata:
2107 2124 currentnode = op.repo._bookmarks.get(book)
2108 2125 if currentnode != node:
2109 2126 if node is None:
2110 2127 finalmsg = msgexist % (book, nodemod.short(currentnode))
2111 2128 elif currentnode is None:
2112 2129 finalmsg = msgmissing % (book, nodemod.short(node))
2113 2130 else:
2114 2131 finalmsg = msgstandard % (
2115 2132 book,
2116 2133 nodemod.short(node),
2117 2134 nodemod.short(currentnode),
2118 2135 )
2119 2136 raise error.PushRaced(finalmsg)
2120 2137
2121 2138
2122 2139 @parthandler(b'check:heads')
2123 2140 def handlecheckheads(op, inpart):
2124 2141 """check that head of the repo did not change
2125 2142
2126 2143 This is used to detect a push race when using unbundle.
2127 2144 This replaces the "heads" argument of unbundle."""
2128 2145 h = inpart.read(20)
2129 2146 heads = []
2130 2147 while len(h) == 20:
2131 2148 heads.append(h)
2132 2149 h = inpart.read(20)
2133 2150 assert not h
2134 2151 # Trigger a transaction so that we are guaranteed to have the lock now.
2135 2152 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2136 2153 op.gettransaction()
2137 2154 if sorted(heads) != sorted(op.repo.heads()):
2138 2155 raise error.PushRaced(
2139 2156 b'remote repository changed while pushing - please try again'
2140 2157 )
2141 2158
2142 2159
2143 2160 @parthandler(b'check:updated-heads')
2144 2161 def handlecheckupdatedheads(op, inpart):
2145 2162 """check for race on the heads touched by a push
2146 2163
2147 2164 This is similar to 'check:heads' but focus on the heads actually updated
2148 2165 during the push. If other activities happen on unrelated heads, it is
2149 2166 ignored.
2150 2167
2151 2168 This allow server with high traffic to avoid push contention as long as
2152 2169 unrelated parts of the graph are involved."""
2153 2170 h = inpart.read(20)
2154 2171 heads = []
2155 2172 while len(h) == 20:
2156 2173 heads.append(h)
2157 2174 h = inpart.read(20)
2158 2175 assert not h
2159 2176 # trigger a transaction so that we are guaranteed to have the lock now.
2160 2177 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2161 2178 op.gettransaction()
2162 2179
2163 2180 currentheads = set()
2164 2181 for ls in op.repo.branchmap().iterheads():
2165 2182 currentheads.update(ls)
2166 2183
2167 2184 for h in heads:
2168 2185 if h not in currentheads:
2169 2186 raise error.PushRaced(
2170 2187 b'remote repository changed while pushing - '
2171 2188 b'please try again'
2172 2189 )
2173 2190
2174 2191
2175 2192 @parthandler(b'check:phases')
2176 2193 def handlecheckphases(op, inpart):
2177 2194 """check that phase boundaries of the repository did not change
2178 2195
2179 2196 This is used to detect a push race.
2180 2197 """
2181 2198 phasetonodes = phases.binarydecode(inpart)
2182 2199 unfi = op.repo.unfiltered()
2183 2200 cl = unfi.changelog
2184 2201 phasecache = unfi._phasecache
2185 2202 msg = (
2186 2203 b'remote repository changed while pushing - please try again '
2187 2204 b'(%s is %s expected %s)'
2188 2205 )
2189 2206 for expectedphase, nodes in enumerate(phasetonodes):
2190 2207 for n in nodes:
2191 2208 actualphase = phasecache.phase(unfi, cl.rev(n))
2192 2209 if actualphase != expectedphase:
2193 2210 finalmsg = msg % (
2194 2211 nodemod.short(n),
2195 2212 phases.phasenames[actualphase],
2196 2213 phases.phasenames[expectedphase],
2197 2214 )
2198 2215 raise error.PushRaced(finalmsg)
2199 2216
2200 2217
2201 2218 @parthandler(b'output')
2202 2219 def handleoutput(op, inpart):
2203 2220 """forward output captured on the server to the client"""
2204 2221 for line in inpart.read().splitlines():
2205 2222 op.ui.status(_(b'remote: %s\n') % line)
2206 2223
2207 2224
2208 2225 @parthandler(b'replycaps')
2209 2226 def handlereplycaps(op, inpart):
2210 2227 """Notify that a reply bundle should be created
2211 2228
2212 2229 The payload contains the capabilities information for the reply"""
2213 2230 caps = decodecaps(inpart.read())
2214 2231 if op.reply is None:
2215 2232 op.reply = bundle20(op.ui, caps)
2216 2233
2217 2234
2218 2235 class AbortFromPart(error.Abort):
2219 2236 """Sub-class of Abort that denotes an error from a bundle2 part."""
2220 2237
2221 2238
2222 2239 @parthandler(b'error:abort', (b'message', b'hint'))
2223 2240 def handleerrorabort(op, inpart):
2224 2241 """Used to transmit abort error over the wire"""
2225 2242 raise AbortFromPart(
2226 2243 inpart.params[b'message'], hint=inpart.params.get(b'hint')
2227 2244 )
2228 2245
2229 2246
2230 2247 @parthandler(
2231 2248 b'error:pushkey',
2232 2249 (b'namespace', b'key', b'new', b'old', b'ret', b'in-reply-to'),
2233 2250 )
2234 2251 def handleerrorpushkey(op, inpart):
2235 2252 """Used to transmit failure of a mandatory pushkey over the wire"""
2236 2253 kwargs = {}
2237 2254 for name in (b'namespace', b'key', b'new', b'old', b'ret'):
2238 2255 value = inpart.params.get(name)
2239 2256 if value is not None:
2240 2257 kwargs[name] = value
2241 2258 raise error.PushkeyFailed(
2242 2259 inpart.params[b'in-reply-to'], **pycompat.strkwargs(kwargs)
2243 2260 )
2244 2261
2245 2262
2246 2263 @parthandler(b'error:unsupportedcontent', (b'parttype', b'params'))
2247 2264 def handleerrorunsupportedcontent(op, inpart):
2248 2265 """Used to transmit unknown content error over the wire"""
2249 2266 kwargs = {}
2250 2267 parttype = inpart.params.get(b'parttype')
2251 2268 if parttype is not None:
2252 2269 kwargs[b'parttype'] = parttype
2253 2270 params = inpart.params.get(b'params')
2254 2271 if params is not None:
2255 2272 kwargs[b'params'] = params.split(b'\0')
2256 2273
2257 2274 raise error.BundleUnknownFeatureError(**pycompat.strkwargs(kwargs))
2258 2275
2259 2276
2260 2277 @parthandler(b'error:pushraced', (b'message',))
2261 2278 def handleerrorpushraced(op, inpart):
2262 2279 """Used to transmit push race error over the wire"""
2263 2280 raise error.ResponseError(_(b'push failed:'), inpart.params[b'message'])
2264 2281
2265 2282
2266 2283 @parthandler(b'listkeys', (b'namespace',))
2267 2284 def handlelistkeys(op, inpart):
2268 2285 """retrieve pushkey namespace content stored in a bundle2"""
2269 2286 namespace = inpart.params[b'namespace']
2270 2287 r = pushkey.decodekeys(inpart.read())
2271 2288 op.records.add(b'listkeys', (namespace, r))
2272 2289
2273 2290
2274 2291 @parthandler(b'pushkey', (b'namespace', b'key', b'old', b'new'))
2275 2292 def handlepushkey(op, inpart):
2276 2293 """process a pushkey request"""
2277 2294 dec = pushkey.decode
2278 2295 namespace = dec(inpart.params[b'namespace'])
2279 2296 key = dec(inpart.params[b'key'])
2280 2297 old = dec(inpart.params[b'old'])
2281 2298 new = dec(inpart.params[b'new'])
2282 2299 # Grab the transaction to ensure that we have the lock before performing the
2283 2300 # pushkey.
2284 2301 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2285 2302 op.gettransaction()
2286 2303 ret = op.repo.pushkey(namespace, key, old, new)
2287 2304 record = {b'namespace': namespace, b'key': key, b'old': old, b'new': new}
2288 2305 op.records.add(b'pushkey', record)
2289 2306 if op.reply is not None:
2290 2307 rpart = op.reply.newpart(b'reply:pushkey')
2291 2308 rpart.addparam(
2292 2309 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2293 2310 )
2294 2311 rpart.addparam(b'return', b'%i' % ret, mandatory=False)
2295 2312 if inpart.mandatory and not ret:
2296 2313 kwargs = {}
2297 2314 for key in (b'namespace', b'key', b'new', b'old', b'ret'):
2298 2315 if key in inpart.params:
2299 2316 kwargs[key] = inpart.params[key]
2300 2317 raise error.PushkeyFailed(
2301 2318 partid=b'%d' % inpart.id, **pycompat.strkwargs(kwargs)
2302 2319 )
2303 2320
2304 2321
2305 2322 @parthandler(b'bookmarks')
2306 2323 def handlebookmark(op, inpart):
2307 2324 """transmit bookmark information
2308 2325
2309 2326 The part contains binary encoded bookmark information.
2310 2327
2311 2328 The exact behavior of this part can be controlled by the 'bookmarks' mode
2312 2329 on the bundle operation.
2313 2330
2314 2331 When mode is 'apply' (the default) the bookmark information is applied as
2315 2332 is to the unbundling repository. Make sure a 'check:bookmarks' part is
2316 2333 issued earlier to check for push races in such update. This behavior is
2317 2334 suitable for pushing.
2318 2335
2319 2336 When mode is 'records', the information is recorded into the 'bookmarks'
2320 2337 records of the bundle operation. This behavior is suitable for pulling.
2321 2338 """
2322 2339 changes = bookmarks.binarydecode(inpart)
2323 2340
2324 2341 pushkeycompat = op.repo.ui.configbool(
2325 2342 b'server', b'bookmarks-pushkey-compat'
2326 2343 )
2327 2344 bookmarksmode = op.modes.get(b'bookmarks', b'apply')
2328 2345
2329 2346 if bookmarksmode == b'apply':
2330 2347 tr = op.gettransaction()
2331 2348 bookstore = op.repo._bookmarks
2332 2349 if pushkeycompat:
2333 2350 allhooks = []
2334 2351 for book, node in changes:
2335 2352 hookargs = tr.hookargs.copy()
2336 2353 hookargs[b'pushkeycompat'] = b'1'
2337 2354 hookargs[b'namespace'] = b'bookmarks'
2338 2355 hookargs[b'key'] = book
2339 2356 hookargs[b'old'] = nodemod.hex(bookstore.get(book, b''))
2340 2357 hookargs[b'new'] = nodemod.hex(
2341 2358 node if node is not None else b''
2342 2359 )
2343 2360 allhooks.append(hookargs)
2344 2361
2345 2362 for hookargs in allhooks:
2346 2363 op.repo.hook(
2347 2364 b'prepushkey', throw=True, **pycompat.strkwargs(hookargs)
2348 2365 )
2349 2366
2350 2367 bookstore.applychanges(op.repo, op.gettransaction(), changes)
2351 2368
2352 2369 if pushkeycompat:
2353 2370
2354 2371 def runhook():
2355 2372 for hookargs in allhooks:
2356 2373 op.repo.hook(b'pushkey', **pycompat.strkwargs(hookargs))
2357 2374
2358 2375 op.repo._afterlock(runhook)
2359 2376
2360 2377 elif bookmarksmode == b'records':
2361 2378 for book, node in changes:
2362 2379 record = {b'bookmark': book, b'node': node}
2363 2380 op.records.add(b'bookmarks', record)
2364 2381 else:
2365 2382 raise error.ProgrammingError(
2366 2383 b'unkown bookmark mode: %s' % bookmarksmode
2367 2384 )
2368 2385
2369 2386
2370 2387 @parthandler(b'phase-heads')
2371 2388 def handlephases(op, inpart):
2372 2389 """apply phases from bundle part to repo"""
2373 2390 headsbyphase = phases.binarydecode(inpart)
2374 2391 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
2375 2392
2376 2393
2377 2394 @parthandler(b'reply:pushkey', (b'return', b'in-reply-to'))
2378 2395 def handlepushkeyreply(op, inpart):
2379 2396 """retrieve the result of a pushkey request"""
2380 2397 ret = int(inpart.params[b'return'])
2381 2398 partid = int(inpart.params[b'in-reply-to'])
2382 2399 op.records.add(b'pushkey', {b'return': ret}, partid)
2383 2400
2384 2401
2385 2402 @parthandler(b'obsmarkers')
2386 2403 def handleobsmarker(op, inpart):
2387 2404 """add a stream of obsmarkers to the repo"""
2388 2405 tr = op.gettransaction()
2389 2406 markerdata = inpart.read()
2390 2407 if op.ui.config(b'experimental', b'obsmarkers-exchange-debug'):
2391 2408 op.ui.writenoi18n(
2392 2409 b'obsmarker-exchange: %i bytes received\n' % len(markerdata)
2393 2410 )
2394 2411 # The mergemarkers call will crash if marker creation is not enabled.
2395 2412 # we want to avoid this if the part is advisory.
2396 2413 if not inpart.mandatory and op.repo.obsstore.readonly:
2397 2414 op.repo.ui.debug(
2398 2415 b'ignoring obsolescence markers, feature not enabled\n'
2399 2416 )
2400 2417 return
2401 2418 new = op.repo.obsstore.mergemarkers(tr, markerdata)
2402 2419 op.repo.invalidatevolatilesets()
2403 2420 op.records.add(b'obsmarkers', {b'new': new})
2404 2421 if op.reply is not None:
2405 2422 rpart = op.reply.newpart(b'reply:obsmarkers')
2406 2423 rpart.addparam(
2407 2424 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2408 2425 )
2409 2426 rpart.addparam(b'new', b'%i' % new, mandatory=False)
2410 2427
2411 2428
2412 2429 @parthandler(b'reply:obsmarkers', (b'new', b'in-reply-to'))
2413 2430 def handleobsmarkerreply(op, inpart):
2414 2431 """retrieve the result of a pushkey request"""
2415 2432 ret = int(inpart.params[b'new'])
2416 2433 partid = int(inpart.params[b'in-reply-to'])
2417 2434 op.records.add(b'obsmarkers', {b'new': ret}, partid)
2418 2435
2419 2436
2420 2437 @parthandler(b'hgtagsfnodes')
2421 2438 def handlehgtagsfnodes(op, inpart):
2422 2439 """Applies .hgtags fnodes cache entries to the local repo.
2423 2440
2424 2441 Payload is pairs of 20 byte changeset nodes and filenodes.
2425 2442 """
2426 2443 # Grab the transaction so we ensure that we have the lock at this point.
2427 2444 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2428 2445 op.gettransaction()
2429 2446 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
2430 2447
2431 2448 count = 0
2432 2449 while True:
2433 2450 node = inpart.read(20)
2434 2451 fnode = inpart.read(20)
2435 2452 if len(node) < 20 or len(fnode) < 20:
2436 2453 op.ui.debug(b'ignoring incomplete received .hgtags fnodes data\n')
2437 2454 break
2438 2455 cache.setfnode(node, fnode)
2439 2456 count += 1
2440 2457
2441 2458 cache.write()
2442 2459 op.ui.debug(b'applied %i hgtags fnodes cache entries\n' % count)
2443 2460
2444 2461
2445 2462 rbcstruct = struct.Struct(b'>III')
2446 2463
2447 2464
2448 2465 @parthandler(b'cache:rev-branch-cache')
2449 2466 def handlerbc(op, inpart):
2450 2467 """receive a rev-branch-cache payload and update the local cache
2451 2468
2452 2469 The payload is a series of data related to each branch
2453 2470
2454 2471 1) branch name length
2455 2472 2) number of open heads
2456 2473 3) number of closed heads
2457 2474 4) open heads nodes
2458 2475 5) closed heads nodes
2459 2476 """
2460 2477 total = 0
2461 2478 rawheader = inpart.read(rbcstruct.size)
2462 2479 cache = op.repo.revbranchcache()
2463 2480 cl = op.repo.unfiltered().changelog
2464 2481 while rawheader:
2465 2482 header = rbcstruct.unpack(rawheader)
2466 2483 total += header[1] + header[2]
2467 2484 utf8branch = inpart.read(header[0])
2468 2485 branch = encoding.tolocal(utf8branch)
2469 2486 for x in pycompat.xrange(header[1]):
2470 2487 node = inpart.read(20)
2471 2488 rev = cl.rev(node)
2472 2489 cache.setdata(branch, rev, node, False)
2473 2490 for x in pycompat.xrange(header[2]):
2474 2491 node = inpart.read(20)
2475 2492 rev = cl.rev(node)
2476 2493 cache.setdata(branch, rev, node, True)
2477 2494 rawheader = inpart.read(rbcstruct.size)
2478 2495 cache.write()
2479 2496
2480 2497
2481 2498 @parthandler(b'pushvars')
2482 2499 def bundle2getvars(op, part):
2483 2500 '''unbundle a bundle2 containing shellvars on the server'''
2484 2501 # An option to disable unbundling on server-side for security reasons
2485 2502 if op.ui.configbool(b'push', b'pushvars.server'):
2486 2503 hookargs = {}
2487 2504 for key, value in part.advisoryparams:
2488 2505 key = key.upper()
2489 2506 # We want pushed variables to have USERVAR_ prepended so we know
2490 2507 # they came from the --pushvar flag.
2491 2508 key = b"USERVAR_" + key
2492 2509 hookargs[key] = value
2493 2510 op.addhookargs(hookargs)
2494 2511
2495 2512
2496 2513 @parthandler(b'stream2', (b'requirements', b'filecount', b'bytecount'))
2497 2514 def handlestreamv2bundle(op, part):
2498 2515
2499 2516 requirements = urlreq.unquote(part.params[b'requirements']).split(b',')
2500 2517 filecount = int(part.params[b'filecount'])
2501 2518 bytecount = int(part.params[b'bytecount'])
2502 2519
2503 2520 repo = op.repo
2504 2521 if len(repo):
2505 2522 msg = _(b'cannot apply stream clone to non empty repository')
2506 2523 raise error.Abort(msg)
2507 2524
2508 2525 repo.ui.debug(b'applying stream bundle\n')
2509 2526 streamclone.applybundlev2(repo, part, filecount, bytecount, requirements)
2510 2527
2511 2528
2512 2529 def widen_bundle(
2513 2530 bundler, repo, oldmatcher, newmatcher, common, known, cgversion, ellipses
2514 2531 ):
2515 2532 """generates bundle2 for widening a narrow clone
2516 2533
2517 2534 bundler is the bundle to which data should be added
2518 2535 repo is the localrepository instance
2519 2536 oldmatcher matches what the client already has
2520 2537 newmatcher matches what the client needs (including what it already has)
2521 2538 common is set of common heads between server and client
2522 2539 known is a set of revs known on the client side (used in ellipses)
2523 2540 cgversion is the changegroup version to send
2524 2541 ellipses is boolean value telling whether to send ellipses data or not
2525 2542
2526 2543 returns bundle2 of the data required for extending
2527 2544 """
2528 2545 commonnodes = set()
2529 2546 cl = repo.changelog
2530 2547 for r in repo.revs(b"::%ln", common):
2531 2548 commonnodes.add(cl.node(r))
2532 2549 if commonnodes:
2533 2550 # XXX: we should only send the filelogs (and treemanifest). user
2534 2551 # already has the changelog and manifest
2535 2552 packer = changegroup.getbundler(
2536 2553 cgversion,
2537 2554 repo,
2538 2555 oldmatcher=oldmatcher,
2539 2556 matcher=newmatcher,
2540 2557 fullnodes=commonnodes,
2541 2558 )
2542 2559 cgdata = packer.generate(
2543 2560 {nodemod.nullid},
2544 2561 list(commonnodes),
2545 2562 False,
2546 2563 b'narrow_widen',
2547 2564 changelog=False,
2548 2565 )
2549 2566
2550 2567 part = bundler.newpart(b'changegroup', data=cgdata)
2551 2568 part.addparam(b'version', cgversion)
2552 2569 if b'treemanifest' in repo.requirements:
2553 2570 part.addparam(b'treemanifest', b'1')
2571 if b'exp-sidedata-flag' in repo.requirements:
2572 part.addparam(b'exp-sidedata', b'1')
2554 2573
2555 2574 return bundler
@@ -1,3079 +1,3084
1 1 # exchange.py - utility to exchange data between repos.
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import collections
11 11 import hashlib
12 12
13 13 from .i18n import _
14 14 from .node import (
15 15 hex,
16 16 nullid,
17 17 nullrev,
18 18 )
19 19 from .thirdparty import attr
20 20 from . import (
21 21 bookmarks as bookmod,
22 22 bundle2,
23 23 changegroup,
24 24 discovery,
25 25 error,
26 26 exchangev2,
27 27 lock as lockmod,
28 28 logexchange,
29 29 narrowspec,
30 30 obsolete,
31 31 phases,
32 32 pushkey,
33 33 pycompat,
34 34 scmutil,
35 35 sslutil,
36 36 streamclone,
37 37 url as urlmod,
38 38 util,
39 39 wireprototypes,
40 40 )
41 41 from .interfaces import repository
42 42 from .utils import stringutil
43 43
44 44 urlerr = util.urlerr
45 45 urlreq = util.urlreq
46 46
47 47 _NARROWACL_SECTION = b'narrowacl'
48 48
49 49 # Maps bundle version human names to changegroup versions.
50 50 _bundlespeccgversions = {
51 51 b'v1': b'01',
52 52 b'v2': b'02',
53 53 b'packed1': b's1',
54 54 b'bundle2': b'02', # legacy
55 55 }
56 56
57 57 # Maps bundle version with content opts to choose which part to bundle
58 58 _bundlespeccontentopts = {
59 59 b'v1': {
60 60 b'changegroup': True,
61 61 b'cg.version': b'01',
62 62 b'obsolescence': False,
63 63 b'phases': False,
64 64 b'tagsfnodescache': False,
65 65 b'revbranchcache': False,
66 66 },
67 67 b'v2': {
68 68 b'changegroup': True,
69 69 b'cg.version': b'02',
70 70 b'obsolescence': False,
71 71 b'phases': False,
72 72 b'tagsfnodescache': True,
73 73 b'revbranchcache': True,
74 74 },
75 75 b'packed1': {b'cg.version': b's1'},
76 76 }
77 77 _bundlespeccontentopts[b'bundle2'] = _bundlespeccontentopts[b'v2']
78 78
79 79 _bundlespecvariants = {
80 80 b"streamv2": {
81 81 b"changegroup": False,
82 82 b"streamv2": True,
83 83 b"tagsfnodescache": False,
84 84 b"revbranchcache": False,
85 85 }
86 86 }
87 87
88 88 # Compression engines allowed in version 1. THIS SHOULD NEVER CHANGE.
89 89 _bundlespecv1compengines = {b'gzip', b'bzip2', b'none'}
90 90
91 91
92 92 @attr.s
93 93 class bundlespec(object):
94 94 compression = attr.ib()
95 95 wirecompression = attr.ib()
96 96 version = attr.ib()
97 97 wireversion = attr.ib()
98 98 params = attr.ib()
99 99 contentopts = attr.ib()
100 100
101 101
102 102 def parsebundlespec(repo, spec, strict=True):
103 103 """Parse a bundle string specification into parts.
104 104
105 105 Bundle specifications denote a well-defined bundle/exchange format.
106 106 The content of a given specification should not change over time in
107 107 order to ensure that bundles produced by a newer version of Mercurial are
108 108 readable from an older version.
109 109
110 110 The string currently has the form:
111 111
112 112 <compression>-<type>[;<parameter0>[;<parameter1>]]
113 113
114 114 Where <compression> is one of the supported compression formats
115 115 and <type> is (currently) a version string. A ";" can follow the type and
116 116 all text afterwards is interpreted as URI encoded, ";" delimited key=value
117 117 pairs.
118 118
119 119 If ``strict`` is True (the default) <compression> is required. Otherwise,
120 120 it is optional.
121 121
122 122 Returns a bundlespec object of (compression, version, parameters).
123 123 Compression will be ``None`` if not in strict mode and a compression isn't
124 124 defined.
125 125
126 126 An ``InvalidBundleSpecification`` is raised when the specification is
127 127 not syntactically well formed.
128 128
129 129 An ``UnsupportedBundleSpecification`` is raised when the compression or
130 130 bundle type/version is not recognized.
131 131
132 132 Note: this function will likely eventually return a more complex data
133 133 structure, including bundle2 part information.
134 134 """
135 135
136 136 def parseparams(s):
137 137 if b';' not in s:
138 138 return s, {}
139 139
140 140 params = {}
141 141 version, paramstr = s.split(b';', 1)
142 142
143 143 for p in paramstr.split(b';'):
144 144 if b'=' not in p:
145 145 raise error.InvalidBundleSpecification(
146 146 _(
147 147 b'invalid bundle specification: '
148 148 b'missing "=" in parameter: %s'
149 149 )
150 150 % p
151 151 )
152 152
153 153 key, value = p.split(b'=', 1)
154 154 key = urlreq.unquote(key)
155 155 value = urlreq.unquote(value)
156 156 params[key] = value
157 157
158 158 return version, params
159 159
160 160 if strict and b'-' not in spec:
161 161 raise error.InvalidBundleSpecification(
162 162 _(
163 163 b'invalid bundle specification; '
164 164 b'must be prefixed with compression: %s'
165 165 )
166 166 % spec
167 167 )
168 168
169 169 if b'-' in spec:
170 170 compression, version = spec.split(b'-', 1)
171 171
172 172 if compression not in util.compengines.supportedbundlenames:
173 173 raise error.UnsupportedBundleSpecification(
174 174 _(b'%s compression is not supported') % compression
175 175 )
176 176
177 177 version, params = parseparams(version)
178 178
179 179 if version not in _bundlespeccgversions:
180 180 raise error.UnsupportedBundleSpecification(
181 181 _(b'%s is not a recognized bundle version') % version
182 182 )
183 183 else:
184 184 # Value could be just the compression or just the version, in which
185 185 # case some defaults are assumed (but only when not in strict mode).
186 186 assert not strict
187 187
188 188 spec, params = parseparams(spec)
189 189
190 190 if spec in util.compengines.supportedbundlenames:
191 191 compression = spec
192 192 version = b'v1'
193 193 # Generaldelta repos require v2.
194 194 if b'generaldelta' in repo.requirements:
195 195 version = b'v2'
196 196 # Modern compression engines require v2.
197 197 if compression not in _bundlespecv1compengines:
198 198 version = b'v2'
199 199 elif spec in _bundlespeccgversions:
200 200 if spec == b'packed1':
201 201 compression = b'none'
202 202 else:
203 203 compression = b'bzip2'
204 204 version = spec
205 205 else:
206 206 raise error.UnsupportedBundleSpecification(
207 207 _(b'%s is not a recognized bundle specification') % spec
208 208 )
209 209
210 210 # Bundle version 1 only supports a known set of compression engines.
211 211 if version == b'v1' and compression not in _bundlespecv1compengines:
212 212 raise error.UnsupportedBundleSpecification(
213 213 _(b'compression engine %s is not supported on v1 bundles')
214 214 % compression
215 215 )
216 216
217 217 # The specification for packed1 can optionally declare the data formats
218 218 # required to apply it. If we see this metadata, compare against what the
219 219 # repo supports and error if the bundle isn't compatible.
220 220 if version == b'packed1' and b'requirements' in params:
221 221 requirements = set(params[b'requirements'].split(b','))
222 222 missingreqs = requirements - repo.supportedformats
223 223 if missingreqs:
224 224 raise error.UnsupportedBundleSpecification(
225 225 _(b'missing support for repository features: %s')
226 226 % b', '.join(sorted(missingreqs))
227 227 )
228 228
229 229 # Compute contentopts based on the version
230 230 contentopts = _bundlespeccontentopts.get(version, {}).copy()
231 231
232 232 # Process the variants
233 233 if b"stream" in params and params[b"stream"] == b"v2":
234 234 variant = _bundlespecvariants[b"streamv2"]
235 235 contentopts.update(variant)
236 236
237 237 engine = util.compengines.forbundlename(compression)
238 238 compression, wirecompression = engine.bundletype()
239 239 wireversion = _bundlespeccgversions[version]
240 240
241 241 return bundlespec(
242 242 compression, wirecompression, version, wireversion, params, contentopts
243 243 )
244 244
245 245
246 246 def readbundle(ui, fh, fname, vfs=None):
247 247 header = changegroup.readexactly(fh, 4)
248 248
249 249 alg = None
250 250 if not fname:
251 251 fname = b"stream"
252 252 if not header.startswith(b'HG') and header.startswith(b'\0'):
253 253 fh = changegroup.headerlessfixup(fh, header)
254 254 header = b"HG10"
255 255 alg = b'UN'
256 256 elif vfs:
257 257 fname = vfs.join(fname)
258 258
259 259 magic, version = header[0:2], header[2:4]
260 260
261 261 if magic != b'HG':
262 262 raise error.Abort(_(b'%s: not a Mercurial bundle') % fname)
263 263 if version == b'10':
264 264 if alg is None:
265 265 alg = changegroup.readexactly(fh, 2)
266 266 return changegroup.cg1unpacker(fh, alg)
267 267 elif version.startswith(b'2'):
268 268 return bundle2.getunbundler(ui, fh, magicstring=magic + version)
269 269 elif version == b'S1':
270 270 return streamclone.streamcloneapplier(fh)
271 271 else:
272 272 raise error.Abort(
273 273 _(b'%s: unknown bundle version %s') % (fname, version)
274 274 )
275 275
276 276
277 277 def getbundlespec(ui, fh):
278 278 """Infer the bundlespec from a bundle file handle.
279 279
280 280 The input file handle is seeked and the original seek position is not
281 281 restored.
282 282 """
283 283
284 284 def speccompression(alg):
285 285 try:
286 286 return util.compengines.forbundletype(alg).bundletype()[0]
287 287 except KeyError:
288 288 return None
289 289
290 290 b = readbundle(ui, fh, None)
291 291 if isinstance(b, changegroup.cg1unpacker):
292 292 alg = b._type
293 293 if alg == b'_truncatedBZ':
294 294 alg = b'BZ'
295 295 comp = speccompression(alg)
296 296 if not comp:
297 297 raise error.Abort(_(b'unknown compression algorithm: %s') % alg)
298 298 return b'%s-v1' % comp
299 299 elif isinstance(b, bundle2.unbundle20):
300 300 if b'Compression' in b.params:
301 301 comp = speccompression(b.params[b'Compression'])
302 302 if not comp:
303 303 raise error.Abort(
304 304 _(b'unknown compression algorithm: %s') % comp
305 305 )
306 306 else:
307 307 comp = b'none'
308 308
309 309 version = None
310 310 for part in b.iterparts():
311 311 if part.type == b'changegroup':
312 312 version = part.params[b'version']
313 313 if version in (b'01', b'02'):
314 314 version = b'v2'
315 315 else:
316 316 raise error.Abort(
317 317 _(
318 318 b'changegroup version %s does not have '
319 319 b'a known bundlespec'
320 320 )
321 321 % version,
322 322 hint=_(b'try upgrading your Mercurial client'),
323 323 )
324 324 elif part.type == b'stream2' and version is None:
325 325 # A stream2 part requires to be part of a v2 bundle
326 326 requirements = urlreq.unquote(part.params[b'requirements'])
327 327 splitted = requirements.split()
328 328 params = bundle2._formatrequirementsparams(splitted)
329 329 return b'none-v2;stream=v2;%s' % params
330 330
331 331 if not version:
332 332 raise error.Abort(
333 333 _(b'could not identify changegroup version in bundle')
334 334 )
335 335
336 336 return b'%s-%s' % (comp, version)
337 337 elif isinstance(b, streamclone.streamcloneapplier):
338 338 requirements = streamclone.readbundle1header(fh)[2]
339 339 formatted = bundle2._formatrequirementsparams(requirements)
340 340 return b'none-packed1;%s' % formatted
341 341 else:
342 342 raise error.Abort(_(b'unknown bundle type: %s') % b)
343 343
344 344
345 345 def _computeoutgoing(repo, heads, common):
346 346 """Computes which revs are outgoing given a set of common
347 347 and a set of heads.
348 348
349 349 This is a separate function so extensions can have access to
350 350 the logic.
351 351
352 352 Returns a discovery.outgoing object.
353 353 """
354 354 cl = repo.changelog
355 355 if common:
356 356 hasnode = cl.hasnode
357 357 common = [n for n in common if hasnode(n)]
358 358 else:
359 359 common = [nullid]
360 360 if not heads:
361 361 heads = cl.heads()
362 362 return discovery.outgoing(repo, common, heads)
363 363
364 364
365 365 def _checkpublish(pushop):
366 366 repo = pushop.repo
367 367 ui = repo.ui
368 368 behavior = ui.config(b'experimental', b'auto-publish')
369 369 if pushop.publish or behavior not in (b'warn', b'confirm', b'abort'):
370 370 return
371 371 remotephases = listkeys(pushop.remote, b'phases')
372 372 if not remotephases.get(b'publishing', False):
373 373 return
374 374
375 375 if pushop.revs is None:
376 376 published = repo.filtered(b'served').revs(b'not public()')
377 377 else:
378 378 published = repo.revs(b'::%ln - public()', pushop.revs)
379 379 if published:
380 380 if behavior == b'warn':
381 381 ui.warn(
382 382 _(b'%i changesets about to be published\n') % len(published)
383 383 )
384 384 elif behavior == b'confirm':
385 385 if ui.promptchoice(
386 386 _(b'push and publish %i changesets (yn)?$$ &Yes $$ &No')
387 387 % len(published)
388 388 ):
389 389 raise error.Abort(_(b'user quit'))
390 390 elif behavior == b'abort':
391 391 msg = _(b'push would publish %i changesets') % len(published)
392 392 hint = _(
393 393 b"use --publish or adjust 'experimental.auto-publish'"
394 394 b" config"
395 395 )
396 396 raise error.Abort(msg, hint=hint)
397 397
398 398
399 399 def _forcebundle1(op):
400 400 """return true if a pull/push must use bundle1
401 401
402 402 This function is used to allow testing of the older bundle version"""
403 403 ui = op.repo.ui
404 404 # The goal is this config is to allow developer to choose the bundle
405 405 # version used during exchanged. This is especially handy during test.
406 406 # Value is a list of bundle version to be picked from, highest version
407 407 # should be used.
408 408 #
409 409 # developer config: devel.legacy.exchange
410 410 exchange = ui.configlist(b'devel', b'legacy.exchange')
411 411 forcebundle1 = b'bundle2' not in exchange and b'bundle1' in exchange
412 412 return forcebundle1 or not op.remote.capable(b'bundle2')
413 413
414 414
415 415 class pushoperation(object):
416 416 """A object that represent a single push operation
417 417
418 418 Its purpose is to carry push related state and very common operations.
419 419
420 420 A new pushoperation should be created at the beginning of each push and
421 421 discarded afterward.
422 422 """
423 423
424 424 def __init__(
425 425 self,
426 426 repo,
427 427 remote,
428 428 force=False,
429 429 revs=None,
430 430 newbranch=False,
431 431 bookmarks=(),
432 432 publish=False,
433 433 pushvars=None,
434 434 ):
435 435 # repo we push from
436 436 self.repo = repo
437 437 self.ui = repo.ui
438 438 # repo we push to
439 439 self.remote = remote
440 440 # force option provided
441 441 self.force = force
442 442 # revs to be pushed (None is "all")
443 443 self.revs = revs
444 444 # bookmark explicitly pushed
445 445 self.bookmarks = bookmarks
446 446 # allow push of new branch
447 447 self.newbranch = newbranch
448 448 # step already performed
449 449 # (used to check what steps have been already performed through bundle2)
450 450 self.stepsdone = set()
451 451 # Integer version of the changegroup push result
452 452 # - None means nothing to push
453 453 # - 0 means HTTP error
454 454 # - 1 means we pushed and remote head count is unchanged *or*
455 455 # we have outgoing changesets but refused to push
456 456 # - other values as described by addchangegroup()
457 457 self.cgresult = None
458 458 # Boolean value for the bookmark push
459 459 self.bkresult = None
460 460 # discover.outgoing object (contains common and outgoing data)
461 461 self.outgoing = None
462 462 # all remote topological heads before the push
463 463 self.remoteheads = None
464 464 # Details of the remote branch pre and post push
465 465 #
466 466 # mapping: {'branch': ([remoteheads],
467 467 # [newheads],
468 468 # [unsyncedheads],
469 469 # [discardedheads])}
470 470 # - branch: the branch name
471 471 # - remoteheads: the list of remote heads known locally
472 472 # None if the branch is new
473 473 # - newheads: the new remote heads (known locally) with outgoing pushed
474 474 # - unsyncedheads: the list of remote heads unknown locally.
475 475 # - discardedheads: the list of remote heads made obsolete by the push
476 476 self.pushbranchmap = None
477 477 # testable as a boolean indicating if any nodes are missing locally.
478 478 self.incoming = None
479 479 # summary of the remote phase situation
480 480 self.remotephases = None
481 481 # phases changes that must be pushed along side the changesets
482 482 self.outdatedphases = None
483 483 # phases changes that must be pushed if changeset push fails
484 484 self.fallbackoutdatedphases = None
485 485 # outgoing obsmarkers
486 486 self.outobsmarkers = set()
487 487 # outgoing bookmarks, list of (bm, oldnode | '', newnode | '')
488 488 self.outbookmarks = []
489 489 # transaction manager
490 490 self.trmanager = None
491 491 # map { pushkey partid -> callback handling failure}
492 492 # used to handle exception from mandatory pushkey part failure
493 493 self.pkfailcb = {}
494 494 # an iterable of pushvars or None
495 495 self.pushvars = pushvars
496 496 # publish pushed changesets
497 497 self.publish = publish
498 498
499 499 @util.propertycache
500 500 def futureheads(self):
501 501 """future remote heads if the changeset push succeeds"""
502 502 return self.outgoing.missingheads
503 503
504 504 @util.propertycache
505 505 def fallbackheads(self):
506 506 """future remote heads if the changeset push fails"""
507 507 if self.revs is None:
508 508 # not target to push, all common are relevant
509 509 return self.outgoing.commonheads
510 510 unfi = self.repo.unfiltered()
511 511 # I want cheads = heads(::missingheads and ::commonheads)
512 512 # (missingheads is revs with secret changeset filtered out)
513 513 #
514 514 # This can be expressed as:
515 515 # cheads = ( (missingheads and ::commonheads)
516 516 # + (commonheads and ::missingheads))"
517 517 # )
518 518 #
519 519 # while trying to push we already computed the following:
520 520 # common = (::commonheads)
521 521 # missing = ((commonheads::missingheads) - commonheads)
522 522 #
523 523 # We can pick:
524 524 # * missingheads part of common (::commonheads)
525 525 common = self.outgoing.common
526 526 nm = self.repo.changelog.nodemap
527 527 cheads = [node for node in self.revs if nm[node] in common]
528 528 # and
529 529 # * commonheads parents on missing
530 530 revset = unfi.set(
531 531 b'%ln and parents(roots(%ln))',
532 532 self.outgoing.commonheads,
533 533 self.outgoing.missing,
534 534 )
535 535 cheads.extend(c.node() for c in revset)
536 536 return cheads
537 537
538 538 @property
539 539 def commonheads(self):
540 540 """set of all common heads after changeset bundle push"""
541 541 if self.cgresult:
542 542 return self.futureheads
543 543 else:
544 544 return self.fallbackheads
545 545
546 546
547 547 # mapping of message used when pushing bookmark
548 548 bookmsgmap = {
549 549 b'update': (
550 550 _(b"updating bookmark %s\n"),
551 551 _(b'updating bookmark %s failed!\n'),
552 552 ),
553 553 b'export': (
554 554 _(b"exporting bookmark %s\n"),
555 555 _(b'exporting bookmark %s failed!\n'),
556 556 ),
557 557 b'delete': (
558 558 _(b"deleting remote bookmark %s\n"),
559 559 _(b'deleting remote bookmark %s failed!\n'),
560 560 ),
561 561 }
562 562
563 563
564 564 def push(
565 565 repo,
566 566 remote,
567 567 force=False,
568 568 revs=None,
569 569 newbranch=False,
570 570 bookmarks=(),
571 571 publish=False,
572 572 opargs=None,
573 573 ):
574 574 '''Push outgoing changesets (limited by revs) from a local
575 575 repository to remote. Return an integer:
576 576 - None means nothing to push
577 577 - 0 means HTTP error
578 578 - 1 means we pushed and remote head count is unchanged *or*
579 579 we have outgoing changesets but refused to push
580 580 - other values as described by addchangegroup()
581 581 '''
582 582 if opargs is None:
583 583 opargs = {}
584 584 pushop = pushoperation(
585 585 repo,
586 586 remote,
587 587 force,
588 588 revs,
589 589 newbranch,
590 590 bookmarks,
591 591 publish,
592 592 **pycompat.strkwargs(opargs)
593 593 )
594 594 if pushop.remote.local():
595 595 missing = (
596 596 set(pushop.repo.requirements) - pushop.remote.local().supported
597 597 )
598 598 if missing:
599 599 msg = _(
600 600 b"required features are not"
601 601 b" supported in the destination:"
602 602 b" %s"
603 603 ) % (b', '.join(sorted(missing)))
604 604 raise error.Abort(msg)
605 605
606 606 if not pushop.remote.canpush():
607 607 raise error.Abort(_(b"destination does not support push"))
608 608
609 609 if not pushop.remote.capable(b'unbundle'):
610 610 raise error.Abort(
611 611 _(
612 612 b'cannot push: destination does not support the '
613 613 b'unbundle wire protocol command'
614 614 )
615 615 )
616 616
617 617 # get lock as we might write phase data
618 618 wlock = lock = None
619 619 try:
620 620 # bundle2 push may receive a reply bundle touching bookmarks
621 621 # requiring the wlock. Take it now to ensure proper ordering.
622 622 maypushback = pushop.ui.configbool(b'experimental', b'bundle2.pushback')
623 623 if (
624 624 (not _forcebundle1(pushop))
625 625 and maypushback
626 626 and not bookmod.bookmarksinstore(repo)
627 627 ):
628 628 wlock = pushop.repo.wlock()
629 629 lock = pushop.repo.lock()
630 630 pushop.trmanager = transactionmanager(
631 631 pushop.repo, b'push-response', pushop.remote.url()
632 632 )
633 633 except error.LockUnavailable as err:
634 634 # source repo cannot be locked.
635 635 # We do not abort the push, but just disable the local phase
636 636 # synchronisation.
637 637 msg = b'cannot lock source repository: %s\n' % stringutil.forcebytestr(
638 638 err
639 639 )
640 640 pushop.ui.debug(msg)
641 641
642 642 with wlock or util.nullcontextmanager():
643 643 with lock or util.nullcontextmanager():
644 644 with pushop.trmanager or util.nullcontextmanager():
645 645 pushop.repo.checkpush(pushop)
646 646 _checkpublish(pushop)
647 647 _pushdiscovery(pushop)
648 648 if not _forcebundle1(pushop):
649 649 _pushbundle2(pushop)
650 650 _pushchangeset(pushop)
651 651 _pushsyncphase(pushop)
652 652 _pushobsolete(pushop)
653 653 _pushbookmark(pushop)
654 654
655 655 if repo.ui.configbool(b'experimental', b'remotenames'):
656 656 logexchange.pullremotenames(repo, remote)
657 657
658 658 return pushop
659 659
660 660
661 661 # list of steps to perform discovery before push
662 662 pushdiscoveryorder = []
663 663
664 664 # Mapping between step name and function
665 665 #
666 666 # This exists to help extensions wrap steps if necessary
667 667 pushdiscoverymapping = {}
668 668
669 669
670 670 def pushdiscovery(stepname):
671 671 """decorator for function performing discovery before push
672 672
673 673 The function is added to the step -> function mapping and appended to the
674 674 list of steps. Beware that decorated function will be added in order (this
675 675 may matter).
676 676
677 677 You can only use this decorator for a new step, if you want to wrap a step
678 678 from an extension, change the pushdiscovery dictionary directly."""
679 679
680 680 def dec(func):
681 681 assert stepname not in pushdiscoverymapping
682 682 pushdiscoverymapping[stepname] = func
683 683 pushdiscoveryorder.append(stepname)
684 684 return func
685 685
686 686 return dec
687 687
688 688
689 689 def _pushdiscovery(pushop):
690 690 """Run all discovery steps"""
691 691 for stepname in pushdiscoveryorder:
692 692 step = pushdiscoverymapping[stepname]
693 693 step(pushop)
694 694
695 695
696 696 @pushdiscovery(b'changeset')
697 697 def _pushdiscoverychangeset(pushop):
698 698 """discover the changeset that need to be pushed"""
699 699 fci = discovery.findcommonincoming
700 700 if pushop.revs:
701 701 commoninc = fci(
702 702 pushop.repo,
703 703 pushop.remote,
704 704 force=pushop.force,
705 705 ancestorsof=pushop.revs,
706 706 )
707 707 else:
708 708 commoninc = fci(pushop.repo, pushop.remote, force=pushop.force)
709 709 common, inc, remoteheads = commoninc
710 710 fco = discovery.findcommonoutgoing
711 711 outgoing = fco(
712 712 pushop.repo,
713 713 pushop.remote,
714 714 onlyheads=pushop.revs,
715 715 commoninc=commoninc,
716 716 force=pushop.force,
717 717 )
718 718 pushop.outgoing = outgoing
719 719 pushop.remoteheads = remoteheads
720 720 pushop.incoming = inc
721 721
722 722
723 723 @pushdiscovery(b'phase')
724 724 def _pushdiscoveryphase(pushop):
725 725 """discover the phase that needs to be pushed
726 726
727 727 (computed for both success and failure case for changesets push)"""
728 728 outgoing = pushop.outgoing
729 729 unfi = pushop.repo.unfiltered()
730 730 remotephases = listkeys(pushop.remote, b'phases')
731 731
732 732 if (
733 733 pushop.ui.configbool(b'ui', b'_usedassubrepo')
734 734 and remotephases # server supports phases
735 735 and not pushop.outgoing.missing # no changesets to be pushed
736 736 and remotephases.get(b'publishing', False)
737 737 ):
738 738 # When:
739 739 # - this is a subrepo push
740 740 # - and remote support phase
741 741 # - and no changeset are to be pushed
742 742 # - and remote is publishing
743 743 # We may be in issue 3781 case!
744 744 # We drop the possible phase synchronisation done by
745 745 # courtesy to publish changesets possibly locally draft
746 746 # on the remote.
747 747 pushop.outdatedphases = []
748 748 pushop.fallbackoutdatedphases = []
749 749 return
750 750
751 751 pushop.remotephases = phases.remotephasessummary(
752 752 pushop.repo, pushop.fallbackheads, remotephases
753 753 )
754 754 droots = pushop.remotephases.draftroots
755 755
756 756 extracond = b''
757 757 if not pushop.remotephases.publishing:
758 758 extracond = b' and public()'
759 759 revset = b'heads((%%ln::%%ln) %s)' % extracond
760 760 # Get the list of all revs draft on remote by public here.
761 761 # XXX Beware that revset break if droots is not strictly
762 762 # XXX root we may want to ensure it is but it is costly
763 763 fallback = list(unfi.set(revset, droots, pushop.fallbackheads))
764 764 if not pushop.remotephases.publishing and pushop.publish:
765 765 future = list(
766 766 unfi.set(
767 767 b'%ln and (not public() or %ln::)', pushop.futureheads, droots
768 768 )
769 769 )
770 770 elif not outgoing.missing:
771 771 future = fallback
772 772 else:
773 773 # adds changeset we are going to push as draft
774 774 #
775 775 # should not be necessary for publishing server, but because of an
776 776 # issue fixed in xxxxx we have to do it anyway.
777 777 fdroots = list(
778 778 unfi.set(b'roots(%ln + %ln::)', outgoing.missing, droots)
779 779 )
780 780 fdroots = [f.node() for f in fdroots]
781 781 future = list(unfi.set(revset, fdroots, pushop.futureheads))
782 782 pushop.outdatedphases = future
783 783 pushop.fallbackoutdatedphases = fallback
784 784
785 785
786 786 @pushdiscovery(b'obsmarker')
787 787 def _pushdiscoveryobsmarkers(pushop):
788 788 if not obsolete.isenabled(pushop.repo, obsolete.exchangeopt):
789 789 return
790 790
791 791 if not pushop.repo.obsstore:
792 792 return
793 793
794 794 if b'obsolete' not in listkeys(pushop.remote, b'namespaces'):
795 795 return
796 796
797 797 repo = pushop.repo
798 798 # very naive computation, that can be quite expensive on big repo.
799 799 # However: evolution is currently slow on them anyway.
800 800 nodes = (c.node() for c in repo.set(b'::%ln', pushop.futureheads))
801 801 pushop.outobsmarkers = pushop.repo.obsstore.relevantmarkers(nodes)
802 802
803 803
804 804 @pushdiscovery(b'bookmarks')
805 805 def _pushdiscoverybookmarks(pushop):
806 806 ui = pushop.ui
807 807 repo = pushop.repo.unfiltered()
808 808 remote = pushop.remote
809 809 ui.debug(b"checking for updated bookmarks\n")
810 810 ancestors = ()
811 811 if pushop.revs:
812 812 revnums = pycompat.maplist(repo.changelog.rev, pushop.revs)
813 813 ancestors = repo.changelog.ancestors(revnums, inclusive=True)
814 814
815 815 remotebookmark = bookmod.unhexlifybookmarks(listkeys(remote, b'bookmarks'))
816 816
817 817 explicit = {
818 818 repo._bookmarks.expandname(bookmark) for bookmark in pushop.bookmarks
819 819 }
820 820
821 821 comp = bookmod.comparebookmarks(repo, repo._bookmarks, remotebookmark)
822 822 return _processcompared(pushop, ancestors, explicit, remotebookmark, comp)
823 823
824 824
825 825 def _processcompared(pushop, pushed, explicit, remotebms, comp):
826 826 """take decision on bookmarks to push to the remote repo
827 827
828 828 Exists to help extensions alter this behavior.
829 829 """
830 830 addsrc, adddst, advsrc, advdst, diverge, differ, invalid, same = comp
831 831
832 832 repo = pushop.repo
833 833
834 834 for b, scid, dcid in advsrc:
835 835 if b in explicit:
836 836 explicit.remove(b)
837 837 if not pushed or repo[scid].rev() in pushed:
838 838 pushop.outbookmarks.append((b, dcid, scid))
839 839 # search added bookmark
840 840 for b, scid, dcid in addsrc:
841 841 if b in explicit:
842 842 explicit.remove(b)
843 843 pushop.outbookmarks.append((b, b'', scid))
844 844 # search for overwritten bookmark
845 845 for b, scid, dcid in list(advdst) + list(diverge) + list(differ):
846 846 if b in explicit:
847 847 explicit.remove(b)
848 848 pushop.outbookmarks.append((b, dcid, scid))
849 849 # search for bookmark to delete
850 850 for b, scid, dcid in adddst:
851 851 if b in explicit:
852 852 explicit.remove(b)
853 853 # treat as "deleted locally"
854 854 pushop.outbookmarks.append((b, dcid, b''))
855 855 # identical bookmarks shouldn't get reported
856 856 for b, scid, dcid in same:
857 857 if b in explicit:
858 858 explicit.remove(b)
859 859
860 860 if explicit:
861 861 explicit = sorted(explicit)
862 862 # we should probably list all of them
863 863 pushop.ui.warn(
864 864 _(
865 865 b'bookmark %s does not exist on the local '
866 866 b'or remote repository!\n'
867 867 )
868 868 % explicit[0]
869 869 )
870 870 pushop.bkresult = 2
871 871
872 872 pushop.outbookmarks.sort()
873 873
874 874
875 875 def _pushcheckoutgoing(pushop):
876 876 outgoing = pushop.outgoing
877 877 unfi = pushop.repo.unfiltered()
878 878 if not outgoing.missing:
879 879 # nothing to push
880 880 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
881 881 return False
882 882 # something to push
883 883 if not pushop.force:
884 884 # if repo.obsstore == False --> no obsolete
885 885 # then, save the iteration
886 886 if unfi.obsstore:
887 887 # this message are here for 80 char limit reason
888 888 mso = _(b"push includes obsolete changeset: %s!")
889 889 mspd = _(b"push includes phase-divergent changeset: %s!")
890 890 mscd = _(b"push includes content-divergent changeset: %s!")
891 891 mst = {
892 892 b"orphan": _(b"push includes orphan changeset: %s!"),
893 893 b"phase-divergent": mspd,
894 894 b"content-divergent": mscd,
895 895 }
896 896 # If we are to push if there is at least one
897 897 # obsolete or unstable changeset in missing, at
898 898 # least one of the missinghead will be obsolete or
899 899 # unstable. So checking heads only is ok
900 900 for node in outgoing.missingheads:
901 901 ctx = unfi[node]
902 902 if ctx.obsolete():
903 903 raise error.Abort(mso % ctx)
904 904 elif ctx.isunstable():
905 905 # TODO print more than one instability in the abort
906 906 # message
907 907 raise error.Abort(mst[ctx.instabilities()[0]] % ctx)
908 908
909 909 discovery.checkheads(pushop)
910 910 return True
911 911
912 912
913 913 # List of names of steps to perform for an outgoing bundle2, order matters.
914 914 b2partsgenorder = []
915 915
916 916 # Mapping between step name and function
917 917 #
918 918 # This exists to help extensions wrap steps if necessary
919 919 b2partsgenmapping = {}
920 920
921 921
922 922 def b2partsgenerator(stepname, idx=None):
923 923 """decorator for function generating bundle2 part
924 924
925 925 The function is added to the step -> function mapping and appended to the
926 926 list of steps. Beware that decorated functions will be added in order
927 927 (this may matter).
928 928
929 929 You can only use this decorator for new steps, if you want to wrap a step
930 930 from an extension, attack the b2partsgenmapping dictionary directly."""
931 931
932 932 def dec(func):
933 933 assert stepname not in b2partsgenmapping
934 934 b2partsgenmapping[stepname] = func
935 935 if idx is None:
936 936 b2partsgenorder.append(stepname)
937 937 else:
938 938 b2partsgenorder.insert(idx, stepname)
939 939 return func
940 940
941 941 return dec
942 942
943 943
944 944 def _pushb2ctxcheckheads(pushop, bundler):
945 945 """Generate race condition checking parts
946 946
947 947 Exists as an independent function to aid extensions
948 948 """
949 949 # * 'force' do not check for push race,
950 950 # * if we don't push anything, there are nothing to check.
951 951 if not pushop.force and pushop.outgoing.missingheads:
952 952 allowunrelated = b'related' in bundler.capabilities.get(
953 953 b'checkheads', ()
954 954 )
955 955 emptyremote = pushop.pushbranchmap is None
956 956 if not allowunrelated or emptyremote:
957 957 bundler.newpart(b'check:heads', data=iter(pushop.remoteheads))
958 958 else:
959 959 affected = set()
960 960 for branch, heads in pycompat.iteritems(pushop.pushbranchmap):
961 961 remoteheads, newheads, unsyncedheads, discardedheads = heads
962 962 if remoteheads is not None:
963 963 remote = set(remoteheads)
964 964 affected |= set(discardedheads) & remote
965 965 affected |= remote - set(newheads)
966 966 if affected:
967 967 data = iter(sorted(affected))
968 968 bundler.newpart(b'check:updated-heads', data=data)
969 969
970 970
971 971 def _pushing(pushop):
972 972 """return True if we are pushing anything"""
973 973 return bool(
974 974 pushop.outgoing.missing
975 975 or pushop.outdatedphases
976 976 or pushop.outobsmarkers
977 977 or pushop.outbookmarks
978 978 )
979 979
980 980
981 981 @b2partsgenerator(b'check-bookmarks')
982 982 def _pushb2checkbookmarks(pushop, bundler):
983 983 """insert bookmark move checking"""
984 984 if not _pushing(pushop) or pushop.force:
985 985 return
986 986 b2caps = bundle2.bundle2caps(pushop.remote)
987 987 hasbookmarkcheck = b'bookmarks' in b2caps
988 988 if not (pushop.outbookmarks and hasbookmarkcheck):
989 989 return
990 990 data = []
991 991 for book, old, new in pushop.outbookmarks:
992 992 data.append((book, old))
993 993 checkdata = bookmod.binaryencode(data)
994 994 bundler.newpart(b'check:bookmarks', data=checkdata)
995 995
996 996
997 997 @b2partsgenerator(b'check-phases')
998 998 def _pushb2checkphases(pushop, bundler):
999 999 """insert phase move checking"""
1000 1000 if not _pushing(pushop) or pushop.force:
1001 1001 return
1002 1002 b2caps = bundle2.bundle2caps(pushop.remote)
1003 1003 hasphaseheads = b'heads' in b2caps.get(b'phases', ())
1004 1004 if pushop.remotephases is not None and hasphaseheads:
1005 1005 # check that the remote phase has not changed
1006 1006 checks = [[] for p in phases.allphases]
1007 1007 checks[phases.public].extend(pushop.remotephases.publicheads)
1008 1008 checks[phases.draft].extend(pushop.remotephases.draftroots)
1009 1009 if any(checks):
1010 1010 for nodes in checks:
1011 1011 nodes.sort()
1012 1012 checkdata = phases.binaryencode(checks)
1013 1013 bundler.newpart(b'check:phases', data=checkdata)
1014 1014
1015 1015
1016 1016 @b2partsgenerator(b'changeset')
1017 1017 def _pushb2ctx(pushop, bundler):
1018 1018 """handle changegroup push through bundle2
1019 1019
1020 1020 addchangegroup result is stored in the ``pushop.cgresult`` attribute.
1021 1021 """
1022 1022 if b'changesets' in pushop.stepsdone:
1023 1023 return
1024 1024 pushop.stepsdone.add(b'changesets')
1025 1025 # Send known heads to the server for race detection.
1026 1026 if not _pushcheckoutgoing(pushop):
1027 1027 return
1028 1028 pushop.repo.prepushoutgoinghooks(pushop)
1029 1029
1030 1030 _pushb2ctxcheckheads(pushop, bundler)
1031 1031
1032 1032 b2caps = bundle2.bundle2caps(pushop.remote)
1033 1033 version = b'01'
1034 1034 cgversions = b2caps.get(b'changegroup')
1035 1035 if cgversions: # 3.1 and 3.2 ship with an empty value
1036 1036 cgversions = [
1037 1037 v
1038 1038 for v in cgversions
1039 1039 if v in changegroup.supportedoutgoingversions(pushop.repo)
1040 1040 ]
1041 1041 if not cgversions:
1042 1042 raise error.Abort(_(b'no common changegroup version'))
1043 1043 version = max(cgversions)
1044 1044 cgstream = changegroup.makestream(
1045 1045 pushop.repo, pushop.outgoing, version, b'push'
1046 1046 )
1047 1047 cgpart = bundler.newpart(b'changegroup', data=cgstream)
1048 1048 if cgversions:
1049 1049 cgpart.addparam(b'version', version)
1050 1050 if b'treemanifest' in pushop.repo.requirements:
1051 1051 cgpart.addparam(b'treemanifest', b'1')
1052 if b'exp-sidedata-flag' in pushop.repo.requirements:
1053 cgpart.addparam(b'exp-sidedata', b'1')
1052 1054
1053 1055 def handlereply(op):
1054 1056 """extract addchangegroup returns from server reply"""
1055 1057 cgreplies = op.records.getreplies(cgpart.id)
1056 1058 assert len(cgreplies[b'changegroup']) == 1
1057 1059 pushop.cgresult = cgreplies[b'changegroup'][0][b'return']
1058 1060
1059 1061 return handlereply
1060 1062
1061 1063
1062 1064 @b2partsgenerator(b'phase')
1063 1065 def _pushb2phases(pushop, bundler):
1064 1066 """handle phase push through bundle2"""
1065 1067 if b'phases' in pushop.stepsdone:
1066 1068 return
1067 1069 b2caps = bundle2.bundle2caps(pushop.remote)
1068 1070 ui = pushop.repo.ui
1069 1071
1070 1072 legacyphase = b'phases' in ui.configlist(b'devel', b'legacy.exchange')
1071 1073 haspushkey = b'pushkey' in b2caps
1072 1074 hasphaseheads = b'heads' in b2caps.get(b'phases', ())
1073 1075
1074 1076 if hasphaseheads and not legacyphase:
1075 1077 return _pushb2phaseheads(pushop, bundler)
1076 1078 elif haspushkey:
1077 1079 return _pushb2phasespushkey(pushop, bundler)
1078 1080
1079 1081
1080 1082 def _pushb2phaseheads(pushop, bundler):
1081 1083 """push phase information through a bundle2 - binary part"""
1082 1084 pushop.stepsdone.add(b'phases')
1083 1085 if pushop.outdatedphases:
1084 1086 updates = [[] for p in phases.allphases]
1085 1087 updates[0].extend(h.node() for h in pushop.outdatedphases)
1086 1088 phasedata = phases.binaryencode(updates)
1087 1089 bundler.newpart(b'phase-heads', data=phasedata)
1088 1090
1089 1091
1090 1092 def _pushb2phasespushkey(pushop, bundler):
1091 1093 """push phase information through a bundle2 - pushkey part"""
1092 1094 pushop.stepsdone.add(b'phases')
1093 1095 part2node = []
1094 1096
1095 1097 def handlefailure(pushop, exc):
1096 1098 targetid = int(exc.partid)
1097 1099 for partid, node in part2node:
1098 1100 if partid == targetid:
1099 1101 raise error.Abort(_(b'updating %s to public failed') % node)
1100 1102
1101 1103 enc = pushkey.encode
1102 1104 for newremotehead in pushop.outdatedphases:
1103 1105 part = bundler.newpart(b'pushkey')
1104 1106 part.addparam(b'namespace', enc(b'phases'))
1105 1107 part.addparam(b'key', enc(newremotehead.hex()))
1106 1108 part.addparam(b'old', enc(b'%d' % phases.draft))
1107 1109 part.addparam(b'new', enc(b'%d' % phases.public))
1108 1110 part2node.append((part.id, newremotehead))
1109 1111 pushop.pkfailcb[part.id] = handlefailure
1110 1112
1111 1113 def handlereply(op):
1112 1114 for partid, node in part2node:
1113 1115 partrep = op.records.getreplies(partid)
1114 1116 results = partrep[b'pushkey']
1115 1117 assert len(results) <= 1
1116 1118 msg = None
1117 1119 if not results:
1118 1120 msg = _(b'server ignored update of %s to public!\n') % node
1119 1121 elif not int(results[0][b'return']):
1120 1122 msg = _(b'updating %s to public failed!\n') % node
1121 1123 if msg is not None:
1122 1124 pushop.ui.warn(msg)
1123 1125
1124 1126 return handlereply
1125 1127
1126 1128
1127 1129 @b2partsgenerator(b'obsmarkers')
1128 1130 def _pushb2obsmarkers(pushop, bundler):
1129 1131 if b'obsmarkers' in pushop.stepsdone:
1130 1132 return
1131 1133 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
1132 1134 if obsolete.commonversion(remoteversions) is None:
1133 1135 return
1134 1136 pushop.stepsdone.add(b'obsmarkers')
1135 1137 if pushop.outobsmarkers:
1136 1138 markers = sorted(pushop.outobsmarkers)
1137 1139 bundle2.buildobsmarkerspart(bundler, markers)
1138 1140
1139 1141
1140 1142 @b2partsgenerator(b'bookmarks')
1141 1143 def _pushb2bookmarks(pushop, bundler):
1142 1144 """handle bookmark push through bundle2"""
1143 1145 if b'bookmarks' in pushop.stepsdone:
1144 1146 return
1145 1147 b2caps = bundle2.bundle2caps(pushop.remote)
1146 1148
1147 1149 legacy = pushop.repo.ui.configlist(b'devel', b'legacy.exchange')
1148 1150 legacybooks = b'bookmarks' in legacy
1149 1151
1150 1152 if not legacybooks and b'bookmarks' in b2caps:
1151 1153 return _pushb2bookmarkspart(pushop, bundler)
1152 1154 elif b'pushkey' in b2caps:
1153 1155 return _pushb2bookmarkspushkey(pushop, bundler)
1154 1156
1155 1157
1156 1158 def _bmaction(old, new):
1157 1159 """small utility for bookmark pushing"""
1158 1160 if not old:
1159 1161 return b'export'
1160 1162 elif not new:
1161 1163 return b'delete'
1162 1164 return b'update'
1163 1165
1164 1166
1165 1167 def _abortonsecretctx(pushop, node, b):
1166 1168 """abort if a given bookmark points to a secret changeset"""
1167 1169 if node and pushop.repo[node].phase() == phases.secret:
1168 1170 raise error.Abort(
1169 1171 _(b'cannot push bookmark %s as it points to a secret changeset') % b
1170 1172 )
1171 1173
1172 1174
1173 1175 def _pushb2bookmarkspart(pushop, bundler):
1174 1176 pushop.stepsdone.add(b'bookmarks')
1175 1177 if not pushop.outbookmarks:
1176 1178 return
1177 1179
1178 1180 allactions = []
1179 1181 data = []
1180 1182 for book, old, new in pushop.outbookmarks:
1181 1183 _abortonsecretctx(pushop, new, book)
1182 1184 data.append((book, new))
1183 1185 allactions.append((book, _bmaction(old, new)))
1184 1186 checkdata = bookmod.binaryencode(data)
1185 1187 bundler.newpart(b'bookmarks', data=checkdata)
1186 1188
1187 1189 def handlereply(op):
1188 1190 ui = pushop.ui
1189 1191 # if success
1190 1192 for book, action in allactions:
1191 1193 ui.status(bookmsgmap[action][0] % book)
1192 1194
1193 1195 return handlereply
1194 1196
1195 1197
1196 1198 def _pushb2bookmarkspushkey(pushop, bundler):
1197 1199 pushop.stepsdone.add(b'bookmarks')
1198 1200 part2book = []
1199 1201 enc = pushkey.encode
1200 1202
1201 1203 def handlefailure(pushop, exc):
1202 1204 targetid = int(exc.partid)
1203 1205 for partid, book, action in part2book:
1204 1206 if partid == targetid:
1205 1207 raise error.Abort(bookmsgmap[action][1].rstrip() % book)
1206 1208 # we should not be called for part we did not generated
1207 1209 assert False
1208 1210
1209 1211 for book, old, new in pushop.outbookmarks:
1210 1212 _abortonsecretctx(pushop, new, book)
1211 1213 part = bundler.newpart(b'pushkey')
1212 1214 part.addparam(b'namespace', enc(b'bookmarks'))
1213 1215 part.addparam(b'key', enc(book))
1214 1216 part.addparam(b'old', enc(hex(old)))
1215 1217 part.addparam(b'new', enc(hex(new)))
1216 1218 action = b'update'
1217 1219 if not old:
1218 1220 action = b'export'
1219 1221 elif not new:
1220 1222 action = b'delete'
1221 1223 part2book.append((part.id, book, action))
1222 1224 pushop.pkfailcb[part.id] = handlefailure
1223 1225
1224 1226 def handlereply(op):
1225 1227 ui = pushop.ui
1226 1228 for partid, book, action in part2book:
1227 1229 partrep = op.records.getreplies(partid)
1228 1230 results = partrep[b'pushkey']
1229 1231 assert len(results) <= 1
1230 1232 if not results:
1231 1233 pushop.ui.warn(_(b'server ignored bookmark %s update\n') % book)
1232 1234 else:
1233 1235 ret = int(results[0][b'return'])
1234 1236 if ret:
1235 1237 ui.status(bookmsgmap[action][0] % book)
1236 1238 else:
1237 1239 ui.warn(bookmsgmap[action][1] % book)
1238 1240 if pushop.bkresult is not None:
1239 1241 pushop.bkresult = 1
1240 1242
1241 1243 return handlereply
1242 1244
1243 1245
1244 1246 @b2partsgenerator(b'pushvars', idx=0)
1245 1247 def _getbundlesendvars(pushop, bundler):
1246 1248 '''send shellvars via bundle2'''
1247 1249 pushvars = pushop.pushvars
1248 1250 if pushvars:
1249 1251 shellvars = {}
1250 1252 for raw in pushvars:
1251 1253 if b'=' not in raw:
1252 1254 msg = (
1253 1255 b"unable to parse variable '%s', should follow "
1254 1256 b"'KEY=VALUE' or 'KEY=' format"
1255 1257 )
1256 1258 raise error.Abort(msg % raw)
1257 1259 k, v = raw.split(b'=', 1)
1258 1260 shellvars[k] = v
1259 1261
1260 1262 part = bundler.newpart(b'pushvars')
1261 1263
1262 1264 for key, value in pycompat.iteritems(shellvars):
1263 1265 part.addparam(key, value, mandatory=False)
1264 1266
1265 1267
1266 1268 def _pushbundle2(pushop):
1267 1269 """push data to the remote using bundle2
1268 1270
1269 1271 The only currently supported type of data is changegroup but this will
1270 1272 evolve in the future."""
1271 1273 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
1272 1274 pushback = pushop.trmanager and pushop.ui.configbool(
1273 1275 b'experimental', b'bundle2.pushback'
1274 1276 )
1275 1277
1276 1278 # create reply capability
1277 1279 capsblob = bundle2.encodecaps(
1278 1280 bundle2.getrepocaps(pushop.repo, allowpushback=pushback, role=b'client')
1279 1281 )
1280 1282 bundler.newpart(b'replycaps', data=capsblob)
1281 1283 replyhandlers = []
1282 1284 for partgenname in b2partsgenorder:
1283 1285 partgen = b2partsgenmapping[partgenname]
1284 1286 ret = partgen(pushop, bundler)
1285 1287 if callable(ret):
1286 1288 replyhandlers.append(ret)
1287 1289 # do not push if nothing to push
1288 1290 if bundler.nbparts <= 1:
1289 1291 return
1290 1292 stream = util.chunkbuffer(bundler.getchunks())
1291 1293 try:
1292 1294 try:
1293 1295 with pushop.remote.commandexecutor() as e:
1294 1296 reply = e.callcommand(
1295 1297 b'unbundle',
1296 1298 {
1297 1299 b'bundle': stream,
1298 1300 b'heads': [b'force'],
1299 1301 b'url': pushop.remote.url(),
1300 1302 },
1301 1303 ).result()
1302 1304 except error.BundleValueError as exc:
1303 1305 raise error.Abort(_(b'missing support for %s') % exc)
1304 1306 try:
1305 1307 trgetter = None
1306 1308 if pushback:
1307 1309 trgetter = pushop.trmanager.transaction
1308 1310 op = bundle2.processbundle(pushop.repo, reply, trgetter)
1309 1311 except error.BundleValueError as exc:
1310 1312 raise error.Abort(_(b'missing support for %s') % exc)
1311 1313 except bundle2.AbortFromPart as exc:
1312 1314 pushop.ui.status(_(b'remote: %s\n') % exc)
1313 1315 if exc.hint is not None:
1314 1316 pushop.ui.status(_(b'remote: %s\n') % (b'(%s)' % exc.hint))
1315 1317 raise error.Abort(_(b'push failed on remote'))
1316 1318 except error.PushkeyFailed as exc:
1317 1319 partid = int(exc.partid)
1318 1320 if partid not in pushop.pkfailcb:
1319 1321 raise
1320 1322 pushop.pkfailcb[partid](pushop, exc)
1321 1323 for rephand in replyhandlers:
1322 1324 rephand(op)
1323 1325
1324 1326
1325 1327 def _pushchangeset(pushop):
1326 1328 """Make the actual push of changeset bundle to remote repo"""
1327 1329 if b'changesets' in pushop.stepsdone:
1328 1330 return
1329 1331 pushop.stepsdone.add(b'changesets')
1330 1332 if not _pushcheckoutgoing(pushop):
1331 1333 return
1332 1334
1333 1335 # Should have verified this in push().
1334 1336 assert pushop.remote.capable(b'unbundle')
1335 1337
1336 1338 pushop.repo.prepushoutgoinghooks(pushop)
1337 1339 outgoing = pushop.outgoing
1338 1340 # TODO: get bundlecaps from remote
1339 1341 bundlecaps = None
1340 1342 # create a changegroup from local
1341 1343 if pushop.revs is None and not (
1342 1344 outgoing.excluded or pushop.repo.changelog.filteredrevs
1343 1345 ):
1344 1346 # push everything,
1345 1347 # use the fast path, no race possible on push
1346 1348 cg = changegroup.makechangegroup(
1347 1349 pushop.repo,
1348 1350 outgoing,
1349 1351 b'01',
1350 1352 b'push',
1351 1353 fastpath=True,
1352 1354 bundlecaps=bundlecaps,
1353 1355 )
1354 1356 else:
1355 1357 cg = changegroup.makechangegroup(
1356 1358 pushop.repo, outgoing, b'01', b'push', bundlecaps=bundlecaps
1357 1359 )
1358 1360
1359 1361 # apply changegroup to remote
1360 1362 # local repo finds heads on server, finds out what
1361 1363 # revs it must push. once revs transferred, if server
1362 1364 # finds it has different heads (someone else won
1363 1365 # commit/push race), server aborts.
1364 1366 if pushop.force:
1365 1367 remoteheads = [b'force']
1366 1368 else:
1367 1369 remoteheads = pushop.remoteheads
1368 1370 # ssh: return remote's addchangegroup()
1369 1371 # http: return remote's addchangegroup() or 0 for error
1370 1372 pushop.cgresult = pushop.remote.unbundle(cg, remoteheads, pushop.repo.url())
1371 1373
1372 1374
1373 1375 def _pushsyncphase(pushop):
1374 1376 """synchronise phase information locally and remotely"""
1375 1377 cheads = pushop.commonheads
1376 1378 # even when we don't push, exchanging phase data is useful
1377 1379 remotephases = listkeys(pushop.remote, b'phases')
1378 1380 if (
1379 1381 pushop.ui.configbool(b'ui', b'_usedassubrepo')
1380 1382 and remotephases # server supports phases
1381 1383 and pushop.cgresult is None # nothing was pushed
1382 1384 and remotephases.get(b'publishing', False)
1383 1385 ):
1384 1386 # When:
1385 1387 # - this is a subrepo push
1386 1388 # - and remote support phase
1387 1389 # - and no changeset was pushed
1388 1390 # - and remote is publishing
1389 1391 # We may be in issue 3871 case!
1390 1392 # We drop the possible phase synchronisation done by
1391 1393 # courtesy to publish changesets possibly locally draft
1392 1394 # on the remote.
1393 1395 remotephases = {b'publishing': b'True'}
1394 1396 if not remotephases: # old server or public only reply from non-publishing
1395 1397 _localphasemove(pushop, cheads)
1396 1398 # don't push any phase data as there is nothing to push
1397 1399 else:
1398 1400 ana = phases.analyzeremotephases(pushop.repo, cheads, remotephases)
1399 1401 pheads, droots = ana
1400 1402 ### Apply remote phase on local
1401 1403 if remotephases.get(b'publishing', False):
1402 1404 _localphasemove(pushop, cheads)
1403 1405 else: # publish = False
1404 1406 _localphasemove(pushop, pheads)
1405 1407 _localphasemove(pushop, cheads, phases.draft)
1406 1408 ### Apply local phase on remote
1407 1409
1408 1410 if pushop.cgresult:
1409 1411 if b'phases' in pushop.stepsdone:
1410 1412 # phases already pushed though bundle2
1411 1413 return
1412 1414 outdated = pushop.outdatedphases
1413 1415 else:
1414 1416 outdated = pushop.fallbackoutdatedphases
1415 1417
1416 1418 pushop.stepsdone.add(b'phases')
1417 1419
1418 1420 # filter heads already turned public by the push
1419 1421 outdated = [c for c in outdated if c.node() not in pheads]
1420 1422 # fallback to independent pushkey command
1421 1423 for newremotehead in outdated:
1422 1424 with pushop.remote.commandexecutor() as e:
1423 1425 r = e.callcommand(
1424 1426 b'pushkey',
1425 1427 {
1426 1428 b'namespace': b'phases',
1427 1429 b'key': newremotehead.hex(),
1428 1430 b'old': b'%d' % phases.draft,
1429 1431 b'new': b'%d' % phases.public,
1430 1432 },
1431 1433 ).result()
1432 1434
1433 1435 if not r:
1434 1436 pushop.ui.warn(
1435 1437 _(b'updating %s to public failed!\n') % newremotehead
1436 1438 )
1437 1439
1438 1440
1439 1441 def _localphasemove(pushop, nodes, phase=phases.public):
1440 1442 """move <nodes> to <phase> in the local source repo"""
1441 1443 if pushop.trmanager:
1442 1444 phases.advanceboundary(
1443 1445 pushop.repo, pushop.trmanager.transaction(), phase, nodes
1444 1446 )
1445 1447 else:
1446 1448 # repo is not locked, do not change any phases!
1447 1449 # Informs the user that phases should have been moved when
1448 1450 # applicable.
1449 1451 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
1450 1452 phasestr = phases.phasenames[phase]
1451 1453 if actualmoves:
1452 1454 pushop.ui.status(
1453 1455 _(
1454 1456 b'cannot lock source repo, skipping '
1455 1457 b'local %s phase update\n'
1456 1458 )
1457 1459 % phasestr
1458 1460 )
1459 1461
1460 1462
1461 1463 def _pushobsolete(pushop):
1462 1464 """utility function to push obsolete markers to a remote"""
1463 1465 if b'obsmarkers' in pushop.stepsdone:
1464 1466 return
1465 1467 repo = pushop.repo
1466 1468 remote = pushop.remote
1467 1469 pushop.stepsdone.add(b'obsmarkers')
1468 1470 if pushop.outobsmarkers:
1469 1471 pushop.ui.debug(b'try to push obsolete markers to remote\n')
1470 1472 rslts = []
1471 1473 remotedata = obsolete._pushkeyescape(sorted(pushop.outobsmarkers))
1472 1474 for key in sorted(remotedata, reverse=True):
1473 1475 # reverse sort to ensure we end with dump0
1474 1476 data = remotedata[key]
1475 1477 rslts.append(remote.pushkey(b'obsolete', key, b'', data))
1476 1478 if [r for r in rslts if not r]:
1477 1479 msg = _(b'failed to push some obsolete markers!\n')
1478 1480 repo.ui.warn(msg)
1479 1481
1480 1482
1481 1483 def _pushbookmark(pushop):
1482 1484 """Update bookmark position on remote"""
1483 1485 if pushop.cgresult == 0 or b'bookmarks' in pushop.stepsdone:
1484 1486 return
1485 1487 pushop.stepsdone.add(b'bookmarks')
1486 1488 ui = pushop.ui
1487 1489 remote = pushop.remote
1488 1490
1489 1491 for b, old, new in pushop.outbookmarks:
1490 1492 action = b'update'
1491 1493 if not old:
1492 1494 action = b'export'
1493 1495 elif not new:
1494 1496 action = b'delete'
1495 1497
1496 1498 with remote.commandexecutor() as e:
1497 1499 r = e.callcommand(
1498 1500 b'pushkey',
1499 1501 {
1500 1502 b'namespace': b'bookmarks',
1501 1503 b'key': b,
1502 1504 b'old': hex(old),
1503 1505 b'new': hex(new),
1504 1506 },
1505 1507 ).result()
1506 1508
1507 1509 if r:
1508 1510 ui.status(bookmsgmap[action][0] % b)
1509 1511 else:
1510 1512 ui.warn(bookmsgmap[action][1] % b)
1511 1513 # discovery can have set the value form invalid entry
1512 1514 if pushop.bkresult is not None:
1513 1515 pushop.bkresult = 1
1514 1516
1515 1517
1516 1518 class pulloperation(object):
1517 1519 """A object that represent a single pull operation
1518 1520
1519 1521 It purpose is to carry pull related state and very common operation.
1520 1522
1521 1523 A new should be created at the beginning of each pull and discarded
1522 1524 afterward.
1523 1525 """
1524 1526
1525 1527 def __init__(
1526 1528 self,
1527 1529 repo,
1528 1530 remote,
1529 1531 heads=None,
1530 1532 force=False,
1531 1533 bookmarks=(),
1532 1534 remotebookmarks=None,
1533 1535 streamclonerequested=None,
1534 1536 includepats=None,
1535 1537 excludepats=None,
1536 1538 depth=None,
1537 1539 ):
1538 1540 # repo we pull into
1539 1541 self.repo = repo
1540 1542 # repo we pull from
1541 1543 self.remote = remote
1542 1544 # revision we try to pull (None is "all")
1543 1545 self.heads = heads
1544 1546 # bookmark pulled explicitly
1545 1547 self.explicitbookmarks = [
1546 1548 repo._bookmarks.expandname(bookmark) for bookmark in bookmarks
1547 1549 ]
1548 1550 # do we force pull?
1549 1551 self.force = force
1550 1552 # whether a streaming clone was requested
1551 1553 self.streamclonerequested = streamclonerequested
1552 1554 # transaction manager
1553 1555 self.trmanager = None
1554 1556 # set of common changeset between local and remote before pull
1555 1557 self.common = None
1556 1558 # set of pulled head
1557 1559 self.rheads = None
1558 1560 # list of missing changeset to fetch remotely
1559 1561 self.fetch = None
1560 1562 # remote bookmarks data
1561 1563 self.remotebookmarks = remotebookmarks
1562 1564 # result of changegroup pulling (used as return code by pull)
1563 1565 self.cgresult = None
1564 1566 # list of step already done
1565 1567 self.stepsdone = set()
1566 1568 # Whether we attempted a clone from pre-generated bundles.
1567 1569 self.clonebundleattempted = False
1568 1570 # Set of file patterns to include.
1569 1571 self.includepats = includepats
1570 1572 # Set of file patterns to exclude.
1571 1573 self.excludepats = excludepats
1572 1574 # Number of ancestor changesets to pull from each pulled head.
1573 1575 self.depth = depth
1574 1576
1575 1577 @util.propertycache
1576 1578 def pulledsubset(self):
1577 1579 """heads of the set of changeset target by the pull"""
1578 1580 # compute target subset
1579 1581 if self.heads is None:
1580 1582 # We pulled every thing possible
1581 1583 # sync on everything common
1582 1584 c = set(self.common)
1583 1585 ret = list(self.common)
1584 1586 for n in self.rheads:
1585 1587 if n not in c:
1586 1588 ret.append(n)
1587 1589 return ret
1588 1590 else:
1589 1591 # We pulled a specific subset
1590 1592 # sync on this subset
1591 1593 return self.heads
1592 1594
1593 1595 @util.propertycache
1594 1596 def canusebundle2(self):
1595 1597 return not _forcebundle1(self)
1596 1598
1597 1599 @util.propertycache
1598 1600 def remotebundle2caps(self):
1599 1601 return bundle2.bundle2caps(self.remote)
1600 1602
1601 1603 def gettransaction(self):
1602 1604 # deprecated; talk to trmanager directly
1603 1605 return self.trmanager.transaction()
1604 1606
1605 1607
1606 1608 class transactionmanager(util.transactional):
1607 1609 """An object to manage the life cycle of a transaction
1608 1610
1609 1611 It creates the transaction on demand and calls the appropriate hooks when
1610 1612 closing the transaction."""
1611 1613
1612 1614 def __init__(self, repo, source, url):
1613 1615 self.repo = repo
1614 1616 self.source = source
1615 1617 self.url = url
1616 1618 self._tr = None
1617 1619
1618 1620 def transaction(self):
1619 1621 """Return an open transaction object, constructing if necessary"""
1620 1622 if not self._tr:
1621 1623 trname = b'%s\n%s' % (self.source, util.hidepassword(self.url))
1622 1624 self._tr = self.repo.transaction(trname)
1623 1625 self._tr.hookargs[b'source'] = self.source
1624 1626 self._tr.hookargs[b'url'] = self.url
1625 1627 return self._tr
1626 1628
1627 1629 def close(self):
1628 1630 """close transaction if created"""
1629 1631 if self._tr is not None:
1630 1632 self._tr.close()
1631 1633
1632 1634 def release(self):
1633 1635 """release transaction if created"""
1634 1636 if self._tr is not None:
1635 1637 self._tr.release()
1636 1638
1637 1639
1638 1640 def listkeys(remote, namespace):
1639 1641 with remote.commandexecutor() as e:
1640 1642 return e.callcommand(b'listkeys', {b'namespace': namespace}).result()
1641 1643
1642 1644
1643 1645 def _fullpullbundle2(repo, pullop):
1644 1646 # The server may send a partial reply, i.e. when inlining
1645 1647 # pre-computed bundles. In that case, update the common
1646 1648 # set based on the results and pull another bundle.
1647 1649 #
1648 1650 # There are two indicators that the process is finished:
1649 1651 # - no changeset has been added, or
1650 1652 # - all remote heads are known locally.
1651 1653 # The head check must use the unfiltered view as obsoletion
1652 1654 # markers can hide heads.
1653 1655 unfi = repo.unfiltered()
1654 1656 unficl = unfi.changelog
1655 1657
1656 1658 def headsofdiff(h1, h2):
1657 1659 """Returns heads(h1 % h2)"""
1658 1660 res = unfi.set(b'heads(%ln %% %ln)', h1, h2)
1659 1661 return set(ctx.node() for ctx in res)
1660 1662
1661 1663 def headsofunion(h1, h2):
1662 1664 """Returns heads((h1 + h2) - null)"""
1663 1665 res = unfi.set(b'heads((%ln + %ln - null))', h1, h2)
1664 1666 return set(ctx.node() for ctx in res)
1665 1667
1666 1668 while True:
1667 1669 old_heads = unficl.heads()
1668 1670 clstart = len(unficl)
1669 1671 _pullbundle2(pullop)
1670 1672 if repository.NARROW_REQUIREMENT in repo.requirements:
1671 1673 # XXX narrow clones filter the heads on the server side during
1672 1674 # XXX getbundle and result in partial replies as well.
1673 1675 # XXX Disable pull bundles in this case as band aid to avoid
1674 1676 # XXX extra round trips.
1675 1677 break
1676 1678 if clstart == len(unficl):
1677 1679 break
1678 1680 if all(unficl.hasnode(n) for n in pullop.rheads):
1679 1681 break
1680 1682 new_heads = headsofdiff(unficl.heads(), old_heads)
1681 1683 pullop.common = headsofunion(new_heads, pullop.common)
1682 1684 pullop.rheads = set(pullop.rheads) - pullop.common
1683 1685
1684 1686
1685 1687 def pull(
1686 1688 repo,
1687 1689 remote,
1688 1690 heads=None,
1689 1691 force=False,
1690 1692 bookmarks=(),
1691 1693 opargs=None,
1692 1694 streamclonerequested=None,
1693 1695 includepats=None,
1694 1696 excludepats=None,
1695 1697 depth=None,
1696 1698 ):
1697 1699 """Fetch repository data from a remote.
1698 1700
1699 1701 This is the main function used to retrieve data from a remote repository.
1700 1702
1701 1703 ``repo`` is the local repository to clone into.
1702 1704 ``remote`` is a peer instance.
1703 1705 ``heads`` is an iterable of revisions we want to pull. ``None`` (the
1704 1706 default) means to pull everything from the remote.
1705 1707 ``bookmarks`` is an iterable of bookmarks requesting to be pulled. By
1706 1708 default, all remote bookmarks are pulled.
1707 1709 ``opargs`` are additional keyword arguments to pass to ``pulloperation``
1708 1710 initialization.
1709 1711 ``streamclonerequested`` is a boolean indicating whether a "streaming
1710 1712 clone" is requested. A "streaming clone" is essentially a raw file copy
1711 1713 of revlogs from the server. This only works when the local repository is
1712 1714 empty. The default value of ``None`` means to respect the server
1713 1715 configuration for preferring stream clones.
1714 1716 ``includepats`` and ``excludepats`` define explicit file patterns to
1715 1717 include and exclude in storage, respectively. If not defined, narrow
1716 1718 patterns from the repo instance are used, if available.
1717 1719 ``depth`` is an integer indicating the DAG depth of history we're
1718 1720 interested in. If defined, for each revision specified in ``heads``, we
1719 1721 will fetch up to this many of its ancestors and data associated with them.
1720 1722
1721 1723 Returns the ``pulloperation`` created for this pull.
1722 1724 """
1723 1725 if opargs is None:
1724 1726 opargs = {}
1725 1727
1726 1728 # We allow the narrow patterns to be passed in explicitly to provide more
1727 1729 # flexibility for API consumers.
1728 1730 if includepats or excludepats:
1729 1731 includepats = includepats or set()
1730 1732 excludepats = excludepats or set()
1731 1733 else:
1732 1734 includepats, excludepats = repo.narrowpats
1733 1735
1734 1736 narrowspec.validatepatterns(includepats)
1735 1737 narrowspec.validatepatterns(excludepats)
1736 1738
1737 1739 pullop = pulloperation(
1738 1740 repo,
1739 1741 remote,
1740 1742 heads,
1741 1743 force,
1742 1744 bookmarks=bookmarks,
1743 1745 streamclonerequested=streamclonerequested,
1744 1746 includepats=includepats,
1745 1747 excludepats=excludepats,
1746 1748 depth=depth,
1747 1749 **pycompat.strkwargs(opargs)
1748 1750 )
1749 1751
1750 1752 peerlocal = pullop.remote.local()
1751 1753 if peerlocal:
1752 1754 missing = set(peerlocal.requirements) - pullop.repo.supported
1753 1755 if missing:
1754 1756 msg = _(
1755 1757 b"required features are not"
1756 1758 b" supported in the destination:"
1757 1759 b" %s"
1758 1760 ) % (b', '.join(sorted(missing)))
1759 1761 raise error.Abort(msg)
1760 1762
1761 1763 pullop.trmanager = transactionmanager(repo, b'pull', remote.url())
1762 1764 wlock = util.nullcontextmanager()
1763 1765 if not bookmod.bookmarksinstore(repo):
1764 1766 wlock = repo.wlock()
1765 1767 with wlock, repo.lock(), pullop.trmanager:
1766 1768 # Use the modern wire protocol, if available.
1767 1769 if remote.capable(b'command-changesetdata'):
1768 1770 exchangev2.pull(pullop)
1769 1771 else:
1770 1772 # This should ideally be in _pullbundle2(). However, it needs to run
1771 1773 # before discovery to avoid extra work.
1772 1774 _maybeapplyclonebundle(pullop)
1773 1775 streamclone.maybeperformlegacystreamclone(pullop)
1774 1776 _pulldiscovery(pullop)
1775 1777 if pullop.canusebundle2:
1776 1778 _fullpullbundle2(repo, pullop)
1777 1779 _pullchangeset(pullop)
1778 1780 _pullphase(pullop)
1779 1781 _pullbookmarks(pullop)
1780 1782 _pullobsolete(pullop)
1781 1783
1782 1784 # storing remotenames
1783 1785 if repo.ui.configbool(b'experimental', b'remotenames'):
1784 1786 logexchange.pullremotenames(repo, remote)
1785 1787
1786 1788 return pullop
1787 1789
1788 1790
1789 1791 # list of steps to perform discovery before pull
1790 1792 pulldiscoveryorder = []
1791 1793
1792 1794 # Mapping between step name and function
1793 1795 #
1794 1796 # This exists to help extensions wrap steps if necessary
1795 1797 pulldiscoverymapping = {}
1796 1798
1797 1799
1798 1800 def pulldiscovery(stepname):
1799 1801 """decorator for function performing discovery before pull
1800 1802
1801 1803 The function is added to the step -> function mapping and appended to the
1802 1804 list of steps. Beware that decorated function will be added in order (this
1803 1805 may matter).
1804 1806
1805 1807 You can only use this decorator for a new step, if you want to wrap a step
1806 1808 from an extension, change the pulldiscovery dictionary directly."""
1807 1809
1808 1810 def dec(func):
1809 1811 assert stepname not in pulldiscoverymapping
1810 1812 pulldiscoverymapping[stepname] = func
1811 1813 pulldiscoveryorder.append(stepname)
1812 1814 return func
1813 1815
1814 1816 return dec
1815 1817
1816 1818
1817 1819 def _pulldiscovery(pullop):
1818 1820 """Run all discovery steps"""
1819 1821 for stepname in pulldiscoveryorder:
1820 1822 step = pulldiscoverymapping[stepname]
1821 1823 step(pullop)
1822 1824
1823 1825
1824 1826 @pulldiscovery(b'b1:bookmarks')
1825 1827 def _pullbookmarkbundle1(pullop):
1826 1828 """fetch bookmark data in bundle1 case
1827 1829
1828 1830 If not using bundle2, we have to fetch bookmarks before changeset
1829 1831 discovery to reduce the chance and impact of race conditions."""
1830 1832 if pullop.remotebookmarks is not None:
1831 1833 return
1832 1834 if pullop.canusebundle2 and b'listkeys' in pullop.remotebundle2caps:
1833 1835 # all known bundle2 servers now support listkeys, but lets be nice with
1834 1836 # new implementation.
1835 1837 return
1836 1838 books = listkeys(pullop.remote, b'bookmarks')
1837 1839 pullop.remotebookmarks = bookmod.unhexlifybookmarks(books)
1838 1840
1839 1841
1840 1842 @pulldiscovery(b'changegroup')
1841 1843 def _pulldiscoverychangegroup(pullop):
1842 1844 """discovery phase for the pull
1843 1845
1844 1846 Current handle changeset discovery only, will change handle all discovery
1845 1847 at some point."""
1846 1848 tmp = discovery.findcommonincoming(
1847 1849 pullop.repo, pullop.remote, heads=pullop.heads, force=pullop.force
1848 1850 )
1849 1851 common, fetch, rheads = tmp
1850 1852 nm = pullop.repo.unfiltered().changelog.nodemap
1851 1853 if fetch and rheads:
1852 1854 # If a remote heads is filtered locally, put in back in common.
1853 1855 #
1854 1856 # This is a hackish solution to catch most of "common but locally
1855 1857 # hidden situation". We do not performs discovery on unfiltered
1856 1858 # repository because it end up doing a pathological amount of round
1857 1859 # trip for w huge amount of changeset we do not care about.
1858 1860 #
1859 1861 # If a set of such "common but filtered" changeset exist on the server
1860 1862 # but are not including a remote heads, we'll not be able to detect it,
1861 1863 scommon = set(common)
1862 1864 for n in rheads:
1863 1865 if n in nm:
1864 1866 if n not in scommon:
1865 1867 common.append(n)
1866 1868 if set(rheads).issubset(set(common)):
1867 1869 fetch = []
1868 1870 pullop.common = common
1869 1871 pullop.fetch = fetch
1870 1872 pullop.rheads = rheads
1871 1873
1872 1874
1873 1875 def _pullbundle2(pullop):
1874 1876 """pull data using bundle2
1875 1877
1876 1878 For now, the only supported data are changegroup."""
1877 1879 kwargs = {b'bundlecaps': caps20to10(pullop.repo, role=b'client')}
1878 1880
1879 1881 # make ui easier to access
1880 1882 ui = pullop.repo.ui
1881 1883
1882 1884 # At the moment we don't do stream clones over bundle2. If that is
1883 1885 # implemented then here's where the check for that will go.
1884 1886 streaming = streamclone.canperformstreamclone(pullop, bundle2=True)[0]
1885 1887
1886 1888 # declare pull perimeters
1887 1889 kwargs[b'common'] = pullop.common
1888 1890 kwargs[b'heads'] = pullop.heads or pullop.rheads
1889 1891
1890 1892 # check server supports narrow and then adding includepats and excludepats
1891 1893 servernarrow = pullop.remote.capable(wireprototypes.NARROWCAP)
1892 1894 if servernarrow and pullop.includepats:
1893 1895 kwargs[b'includepats'] = pullop.includepats
1894 1896 if servernarrow and pullop.excludepats:
1895 1897 kwargs[b'excludepats'] = pullop.excludepats
1896 1898
1897 1899 if streaming:
1898 1900 kwargs[b'cg'] = False
1899 1901 kwargs[b'stream'] = True
1900 1902 pullop.stepsdone.add(b'changegroup')
1901 1903 pullop.stepsdone.add(b'phases')
1902 1904
1903 1905 else:
1904 1906 # pulling changegroup
1905 1907 pullop.stepsdone.add(b'changegroup')
1906 1908
1907 1909 kwargs[b'cg'] = pullop.fetch
1908 1910
1909 1911 legacyphase = b'phases' in ui.configlist(b'devel', b'legacy.exchange')
1910 1912 hasbinaryphase = b'heads' in pullop.remotebundle2caps.get(b'phases', ())
1911 1913 if not legacyphase and hasbinaryphase:
1912 1914 kwargs[b'phases'] = True
1913 1915 pullop.stepsdone.add(b'phases')
1914 1916
1915 1917 if b'listkeys' in pullop.remotebundle2caps:
1916 1918 if b'phases' not in pullop.stepsdone:
1917 1919 kwargs[b'listkeys'] = [b'phases']
1918 1920
1919 1921 bookmarksrequested = False
1920 1922 legacybookmark = b'bookmarks' in ui.configlist(b'devel', b'legacy.exchange')
1921 1923 hasbinarybook = b'bookmarks' in pullop.remotebundle2caps
1922 1924
1923 1925 if pullop.remotebookmarks is not None:
1924 1926 pullop.stepsdone.add(b'request-bookmarks')
1925 1927
1926 1928 if (
1927 1929 b'request-bookmarks' not in pullop.stepsdone
1928 1930 and pullop.remotebookmarks is None
1929 1931 and not legacybookmark
1930 1932 and hasbinarybook
1931 1933 ):
1932 1934 kwargs[b'bookmarks'] = True
1933 1935 bookmarksrequested = True
1934 1936
1935 1937 if b'listkeys' in pullop.remotebundle2caps:
1936 1938 if b'request-bookmarks' not in pullop.stepsdone:
1937 1939 # make sure to always includes bookmark data when migrating
1938 1940 # `hg incoming --bundle` to using this function.
1939 1941 pullop.stepsdone.add(b'request-bookmarks')
1940 1942 kwargs.setdefault(b'listkeys', []).append(b'bookmarks')
1941 1943
1942 1944 # If this is a full pull / clone and the server supports the clone bundles
1943 1945 # feature, tell the server whether we attempted a clone bundle. The
1944 1946 # presence of this flag indicates the client supports clone bundles. This
1945 1947 # will enable the server to treat clients that support clone bundles
1946 1948 # differently from those that don't.
1947 1949 if (
1948 1950 pullop.remote.capable(b'clonebundles')
1949 1951 and pullop.heads is None
1950 1952 and list(pullop.common) == [nullid]
1951 1953 ):
1952 1954 kwargs[b'cbattempted'] = pullop.clonebundleattempted
1953 1955
1954 1956 if streaming:
1955 1957 pullop.repo.ui.status(_(b'streaming all changes\n'))
1956 1958 elif not pullop.fetch:
1957 1959 pullop.repo.ui.status(_(b"no changes found\n"))
1958 1960 pullop.cgresult = 0
1959 1961 else:
1960 1962 if pullop.heads is None and list(pullop.common) == [nullid]:
1961 1963 pullop.repo.ui.status(_(b"requesting all changes\n"))
1962 1964 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1963 1965 remoteversions = bundle2.obsmarkersversion(pullop.remotebundle2caps)
1964 1966 if obsolete.commonversion(remoteversions) is not None:
1965 1967 kwargs[b'obsmarkers'] = True
1966 1968 pullop.stepsdone.add(b'obsmarkers')
1967 1969 _pullbundle2extraprepare(pullop, kwargs)
1968 1970
1969 1971 with pullop.remote.commandexecutor() as e:
1970 1972 args = dict(kwargs)
1971 1973 args[b'source'] = b'pull'
1972 1974 bundle = e.callcommand(b'getbundle', args).result()
1973 1975
1974 1976 try:
1975 1977 op = bundle2.bundleoperation(
1976 1978 pullop.repo, pullop.gettransaction, source=b'pull'
1977 1979 )
1978 1980 op.modes[b'bookmarks'] = b'records'
1979 1981 bundle2.processbundle(pullop.repo, bundle, op=op)
1980 1982 except bundle2.AbortFromPart as exc:
1981 1983 pullop.repo.ui.status(_(b'remote: abort: %s\n') % exc)
1982 1984 raise error.Abort(_(b'pull failed on remote'), hint=exc.hint)
1983 1985 except error.BundleValueError as exc:
1984 1986 raise error.Abort(_(b'missing support for %s') % exc)
1985 1987
1986 1988 if pullop.fetch:
1987 1989 pullop.cgresult = bundle2.combinechangegroupresults(op)
1988 1990
1989 1991 # processing phases change
1990 1992 for namespace, value in op.records[b'listkeys']:
1991 1993 if namespace == b'phases':
1992 1994 _pullapplyphases(pullop, value)
1993 1995
1994 1996 # processing bookmark update
1995 1997 if bookmarksrequested:
1996 1998 books = {}
1997 1999 for record in op.records[b'bookmarks']:
1998 2000 books[record[b'bookmark']] = record[b"node"]
1999 2001 pullop.remotebookmarks = books
2000 2002 else:
2001 2003 for namespace, value in op.records[b'listkeys']:
2002 2004 if namespace == b'bookmarks':
2003 2005 pullop.remotebookmarks = bookmod.unhexlifybookmarks(value)
2004 2006
2005 2007 # bookmark data were either already there or pulled in the bundle
2006 2008 if pullop.remotebookmarks is not None:
2007 2009 _pullbookmarks(pullop)
2008 2010
2009 2011
2010 2012 def _pullbundle2extraprepare(pullop, kwargs):
2011 2013 """hook function so that extensions can extend the getbundle call"""
2012 2014
2013 2015
2014 2016 def _pullchangeset(pullop):
2015 2017 """pull changeset from unbundle into the local repo"""
2016 2018 # We delay the open of the transaction as late as possible so we
2017 2019 # don't open transaction for nothing or you break future useful
2018 2020 # rollback call
2019 2021 if b'changegroup' in pullop.stepsdone:
2020 2022 return
2021 2023 pullop.stepsdone.add(b'changegroup')
2022 2024 if not pullop.fetch:
2023 2025 pullop.repo.ui.status(_(b"no changes found\n"))
2024 2026 pullop.cgresult = 0
2025 2027 return
2026 2028 tr = pullop.gettransaction()
2027 2029 if pullop.heads is None and list(pullop.common) == [nullid]:
2028 2030 pullop.repo.ui.status(_(b"requesting all changes\n"))
2029 2031 elif pullop.heads is None and pullop.remote.capable(b'changegroupsubset'):
2030 2032 # issue1320, avoid a race if remote changed after discovery
2031 2033 pullop.heads = pullop.rheads
2032 2034
2033 2035 if pullop.remote.capable(b'getbundle'):
2034 2036 # TODO: get bundlecaps from remote
2035 2037 cg = pullop.remote.getbundle(
2036 2038 b'pull', common=pullop.common, heads=pullop.heads or pullop.rheads
2037 2039 )
2038 2040 elif pullop.heads is None:
2039 2041 with pullop.remote.commandexecutor() as e:
2040 2042 cg = e.callcommand(
2041 2043 b'changegroup', {b'nodes': pullop.fetch, b'source': b'pull',}
2042 2044 ).result()
2043 2045
2044 2046 elif not pullop.remote.capable(b'changegroupsubset'):
2045 2047 raise error.Abort(
2046 2048 _(
2047 2049 b"partial pull cannot be done because "
2048 2050 b"other repository doesn't support "
2049 2051 b"changegroupsubset."
2050 2052 )
2051 2053 )
2052 2054 else:
2053 2055 with pullop.remote.commandexecutor() as e:
2054 2056 cg = e.callcommand(
2055 2057 b'changegroupsubset',
2056 2058 {
2057 2059 b'bases': pullop.fetch,
2058 2060 b'heads': pullop.heads,
2059 2061 b'source': b'pull',
2060 2062 },
2061 2063 ).result()
2062 2064
2063 2065 bundleop = bundle2.applybundle(
2064 2066 pullop.repo, cg, tr, b'pull', pullop.remote.url()
2065 2067 )
2066 2068 pullop.cgresult = bundle2.combinechangegroupresults(bundleop)
2067 2069
2068 2070
2069 2071 def _pullphase(pullop):
2070 2072 # Get remote phases data from remote
2071 2073 if b'phases' in pullop.stepsdone:
2072 2074 return
2073 2075 remotephases = listkeys(pullop.remote, b'phases')
2074 2076 _pullapplyphases(pullop, remotephases)
2075 2077
2076 2078
2077 2079 def _pullapplyphases(pullop, remotephases):
2078 2080 """apply phase movement from observed remote state"""
2079 2081 if b'phases' in pullop.stepsdone:
2080 2082 return
2081 2083 pullop.stepsdone.add(b'phases')
2082 2084 publishing = bool(remotephases.get(b'publishing', False))
2083 2085 if remotephases and not publishing:
2084 2086 # remote is new and non-publishing
2085 2087 pheads, _dr = phases.analyzeremotephases(
2086 2088 pullop.repo, pullop.pulledsubset, remotephases
2087 2089 )
2088 2090 dheads = pullop.pulledsubset
2089 2091 else:
2090 2092 # Remote is old or publishing all common changesets
2091 2093 # should be seen as public
2092 2094 pheads = pullop.pulledsubset
2093 2095 dheads = []
2094 2096 unfi = pullop.repo.unfiltered()
2095 2097 phase = unfi._phasecache.phase
2096 2098 rev = unfi.changelog.nodemap.get
2097 2099 public = phases.public
2098 2100 draft = phases.draft
2099 2101
2100 2102 # exclude changesets already public locally and update the others
2101 2103 pheads = [pn for pn in pheads if phase(unfi, rev(pn)) > public]
2102 2104 if pheads:
2103 2105 tr = pullop.gettransaction()
2104 2106 phases.advanceboundary(pullop.repo, tr, public, pheads)
2105 2107
2106 2108 # exclude changesets already draft locally and update the others
2107 2109 dheads = [pn for pn in dheads if phase(unfi, rev(pn)) > draft]
2108 2110 if dheads:
2109 2111 tr = pullop.gettransaction()
2110 2112 phases.advanceboundary(pullop.repo, tr, draft, dheads)
2111 2113
2112 2114
2113 2115 def _pullbookmarks(pullop):
2114 2116 """process the remote bookmark information to update the local one"""
2115 2117 if b'bookmarks' in pullop.stepsdone:
2116 2118 return
2117 2119 pullop.stepsdone.add(b'bookmarks')
2118 2120 repo = pullop.repo
2119 2121 remotebookmarks = pullop.remotebookmarks
2120 2122 bookmod.updatefromremote(
2121 2123 repo.ui,
2122 2124 repo,
2123 2125 remotebookmarks,
2124 2126 pullop.remote.url(),
2125 2127 pullop.gettransaction,
2126 2128 explicit=pullop.explicitbookmarks,
2127 2129 )
2128 2130
2129 2131
2130 2132 def _pullobsolete(pullop):
2131 2133 """utility function to pull obsolete markers from a remote
2132 2134
2133 2135 The `gettransaction` is function that return the pull transaction, creating
2134 2136 one if necessary. We return the transaction to inform the calling code that
2135 2137 a new transaction have been created (when applicable).
2136 2138
2137 2139 Exists mostly to allow overriding for experimentation purpose"""
2138 2140 if b'obsmarkers' in pullop.stepsdone:
2139 2141 return
2140 2142 pullop.stepsdone.add(b'obsmarkers')
2141 2143 tr = None
2142 2144 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
2143 2145 pullop.repo.ui.debug(b'fetching remote obsolete markers\n')
2144 2146 remoteobs = listkeys(pullop.remote, b'obsolete')
2145 2147 if b'dump0' in remoteobs:
2146 2148 tr = pullop.gettransaction()
2147 2149 markers = []
2148 2150 for key in sorted(remoteobs, reverse=True):
2149 2151 if key.startswith(b'dump'):
2150 2152 data = util.b85decode(remoteobs[key])
2151 2153 version, newmarks = obsolete._readmarkers(data)
2152 2154 markers += newmarks
2153 2155 if markers:
2154 2156 pullop.repo.obsstore.add(tr, markers)
2155 2157 pullop.repo.invalidatevolatilesets()
2156 2158 return tr
2157 2159
2158 2160
2159 2161 def applynarrowacl(repo, kwargs):
2160 2162 """Apply narrow fetch access control.
2161 2163
2162 2164 This massages the named arguments for getbundle wire protocol commands
2163 2165 so requested data is filtered through access control rules.
2164 2166 """
2165 2167 ui = repo.ui
2166 2168 # TODO this assumes existence of HTTP and is a layering violation.
2167 2169 username = ui.shortuser(ui.environ.get(b'REMOTE_USER') or ui.username())
2168 2170 user_includes = ui.configlist(
2169 2171 _NARROWACL_SECTION,
2170 2172 username + b'.includes',
2171 2173 ui.configlist(_NARROWACL_SECTION, b'default.includes'),
2172 2174 )
2173 2175 user_excludes = ui.configlist(
2174 2176 _NARROWACL_SECTION,
2175 2177 username + b'.excludes',
2176 2178 ui.configlist(_NARROWACL_SECTION, b'default.excludes'),
2177 2179 )
2178 2180 if not user_includes:
2179 2181 raise error.Abort(
2180 2182 _(b"{} configuration for user {} is empty").format(
2181 2183 _NARROWACL_SECTION, username
2182 2184 )
2183 2185 )
2184 2186
2185 2187 user_includes = [
2186 2188 b'path:.' if p == b'*' else b'path:' + p for p in user_includes
2187 2189 ]
2188 2190 user_excludes = [
2189 2191 b'path:.' if p == b'*' else b'path:' + p for p in user_excludes
2190 2192 ]
2191 2193
2192 2194 req_includes = set(kwargs.get(r'includepats', []))
2193 2195 req_excludes = set(kwargs.get(r'excludepats', []))
2194 2196
2195 2197 req_includes, req_excludes, invalid_includes = narrowspec.restrictpatterns(
2196 2198 req_includes, req_excludes, user_includes, user_excludes
2197 2199 )
2198 2200
2199 2201 if invalid_includes:
2200 2202 raise error.Abort(
2201 2203 _(b"The following includes are not accessible for {}: {}").format(
2202 2204 username, invalid_includes
2203 2205 )
2204 2206 )
2205 2207
2206 2208 new_args = {}
2207 2209 new_args.update(kwargs)
2208 2210 new_args[r'narrow'] = True
2209 2211 new_args[r'narrow_acl'] = True
2210 2212 new_args[r'includepats'] = req_includes
2211 2213 if req_excludes:
2212 2214 new_args[r'excludepats'] = req_excludes
2213 2215
2214 2216 return new_args
2215 2217
2216 2218
2217 2219 def _computeellipsis(repo, common, heads, known, match, depth=None):
2218 2220 """Compute the shape of a narrowed DAG.
2219 2221
2220 2222 Args:
2221 2223 repo: The repository we're transferring.
2222 2224 common: The roots of the DAG range we're transferring.
2223 2225 May be just [nullid], which means all ancestors of heads.
2224 2226 heads: The heads of the DAG range we're transferring.
2225 2227 match: The narrowmatcher that allows us to identify relevant changes.
2226 2228 depth: If not None, only consider nodes to be full nodes if they are at
2227 2229 most depth changesets away from one of heads.
2228 2230
2229 2231 Returns:
2230 2232 A tuple of (visitnodes, relevant_nodes, ellipsisroots) where:
2231 2233
2232 2234 visitnodes: The list of nodes (either full or ellipsis) which
2233 2235 need to be sent to the client.
2234 2236 relevant_nodes: The set of changelog nodes which change a file inside
2235 2237 the narrowspec. The client needs these as non-ellipsis nodes.
2236 2238 ellipsisroots: A dict of {rev: parents} that is used in
2237 2239 narrowchangegroup to produce ellipsis nodes with the
2238 2240 correct parents.
2239 2241 """
2240 2242 cl = repo.changelog
2241 2243 mfl = repo.manifestlog
2242 2244
2243 2245 clrev = cl.rev
2244 2246
2245 2247 commonrevs = {clrev(n) for n in common} | {nullrev}
2246 2248 headsrevs = {clrev(n) for n in heads}
2247 2249
2248 2250 if depth:
2249 2251 revdepth = {h: 0 for h in headsrevs}
2250 2252
2251 2253 ellipsisheads = collections.defaultdict(set)
2252 2254 ellipsisroots = collections.defaultdict(set)
2253 2255
2254 2256 def addroot(head, curchange):
2255 2257 """Add a root to an ellipsis head, splitting heads with 3 roots."""
2256 2258 ellipsisroots[head].add(curchange)
2257 2259 # Recursively split ellipsis heads with 3 roots by finding the
2258 2260 # roots' youngest common descendant which is an elided merge commit.
2259 2261 # That descendant takes 2 of the 3 roots as its own, and becomes a
2260 2262 # root of the head.
2261 2263 while len(ellipsisroots[head]) > 2:
2262 2264 child, roots = splithead(head)
2263 2265 splitroots(head, child, roots)
2264 2266 head = child # Recurse in case we just added a 3rd root
2265 2267
2266 2268 def splitroots(head, child, roots):
2267 2269 ellipsisroots[head].difference_update(roots)
2268 2270 ellipsisroots[head].add(child)
2269 2271 ellipsisroots[child].update(roots)
2270 2272 ellipsisroots[child].discard(child)
2271 2273
2272 2274 def splithead(head):
2273 2275 r1, r2, r3 = sorted(ellipsisroots[head])
2274 2276 for nr1, nr2 in ((r2, r3), (r1, r3), (r1, r2)):
2275 2277 mid = repo.revs(
2276 2278 b'sort(merge() & %d::%d & %d::%d, -rev)', nr1, head, nr2, head
2277 2279 )
2278 2280 for j in mid:
2279 2281 if j == nr2:
2280 2282 return nr2, (nr1, nr2)
2281 2283 if j not in ellipsisroots or len(ellipsisroots[j]) < 2:
2282 2284 return j, (nr1, nr2)
2283 2285 raise error.Abort(
2284 2286 _(
2285 2287 b'Failed to split up ellipsis node! head: %d, '
2286 2288 b'roots: %d %d %d'
2287 2289 )
2288 2290 % (head, r1, r2, r3)
2289 2291 )
2290 2292
2291 2293 missing = list(cl.findmissingrevs(common=commonrevs, heads=headsrevs))
2292 2294 visit = reversed(missing)
2293 2295 relevant_nodes = set()
2294 2296 visitnodes = [cl.node(m) for m in missing]
2295 2297 required = set(headsrevs) | known
2296 2298 for rev in visit:
2297 2299 clrev = cl.changelogrevision(rev)
2298 2300 ps = [prev for prev in cl.parentrevs(rev) if prev != nullrev]
2299 2301 if depth is not None:
2300 2302 curdepth = revdepth[rev]
2301 2303 for p in ps:
2302 2304 revdepth[p] = min(curdepth + 1, revdepth.get(p, depth + 1))
2303 2305 needed = False
2304 2306 shallow_enough = depth is None or revdepth[rev] <= depth
2305 2307 if shallow_enough:
2306 2308 curmf = mfl[clrev.manifest].read()
2307 2309 if ps:
2308 2310 # We choose to not trust the changed files list in
2309 2311 # changesets because it's not always correct. TODO: could
2310 2312 # we trust it for the non-merge case?
2311 2313 p1mf = mfl[cl.changelogrevision(ps[0]).manifest].read()
2312 2314 needed = bool(curmf.diff(p1mf, match))
2313 2315 if not needed and len(ps) > 1:
2314 2316 # For merge changes, the list of changed files is not
2315 2317 # helpful, since we need to emit the merge if a file
2316 2318 # in the narrow spec has changed on either side of the
2317 2319 # merge. As a result, we do a manifest diff to check.
2318 2320 p2mf = mfl[cl.changelogrevision(ps[1]).manifest].read()
2319 2321 needed = bool(curmf.diff(p2mf, match))
2320 2322 else:
2321 2323 # For a root node, we need to include the node if any
2322 2324 # files in the node match the narrowspec.
2323 2325 needed = any(curmf.walk(match))
2324 2326
2325 2327 if needed:
2326 2328 for head in ellipsisheads[rev]:
2327 2329 addroot(head, rev)
2328 2330 for p in ps:
2329 2331 required.add(p)
2330 2332 relevant_nodes.add(cl.node(rev))
2331 2333 else:
2332 2334 if not ps:
2333 2335 ps = [nullrev]
2334 2336 if rev in required:
2335 2337 for head in ellipsisheads[rev]:
2336 2338 addroot(head, rev)
2337 2339 for p in ps:
2338 2340 ellipsisheads[p].add(rev)
2339 2341 else:
2340 2342 for p in ps:
2341 2343 ellipsisheads[p] |= ellipsisheads[rev]
2342 2344
2343 2345 # add common changesets as roots of their reachable ellipsis heads
2344 2346 for c in commonrevs:
2345 2347 for head in ellipsisheads[c]:
2346 2348 addroot(head, c)
2347 2349 return visitnodes, relevant_nodes, ellipsisroots
2348 2350
2349 2351
2350 2352 def caps20to10(repo, role):
2351 2353 """return a set with appropriate options to use bundle20 during getbundle"""
2352 2354 caps = {b'HG20'}
2353 2355 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role=role))
2354 2356 caps.add(b'bundle2=' + urlreq.quote(capsblob))
2355 2357 return caps
2356 2358
2357 2359
2358 2360 # List of names of steps to perform for a bundle2 for getbundle, order matters.
2359 2361 getbundle2partsorder = []
2360 2362
2361 2363 # Mapping between step name and function
2362 2364 #
2363 2365 # This exists to help extensions wrap steps if necessary
2364 2366 getbundle2partsmapping = {}
2365 2367
2366 2368
2367 2369 def getbundle2partsgenerator(stepname, idx=None):
2368 2370 """decorator for function generating bundle2 part for getbundle
2369 2371
2370 2372 The function is added to the step -> function mapping and appended to the
2371 2373 list of steps. Beware that decorated functions will be added in order
2372 2374 (this may matter).
2373 2375
2374 2376 You can only use this decorator for new steps, if you want to wrap a step
2375 2377 from an extension, attack the getbundle2partsmapping dictionary directly."""
2376 2378
2377 2379 def dec(func):
2378 2380 assert stepname not in getbundle2partsmapping
2379 2381 getbundle2partsmapping[stepname] = func
2380 2382 if idx is None:
2381 2383 getbundle2partsorder.append(stepname)
2382 2384 else:
2383 2385 getbundle2partsorder.insert(idx, stepname)
2384 2386 return func
2385 2387
2386 2388 return dec
2387 2389
2388 2390
2389 2391 def bundle2requested(bundlecaps):
2390 2392 if bundlecaps is not None:
2391 2393 return any(cap.startswith(b'HG2') for cap in bundlecaps)
2392 2394 return False
2393 2395
2394 2396
2395 2397 def getbundlechunks(
2396 2398 repo, source, heads=None, common=None, bundlecaps=None, **kwargs
2397 2399 ):
2398 2400 """Return chunks constituting a bundle's raw data.
2399 2401
2400 2402 Could be a bundle HG10 or a bundle HG20 depending on bundlecaps
2401 2403 passed.
2402 2404
2403 2405 Returns a 2-tuple of a dict with metadata about the generated bundle
2404 2406 and an iterator over raw chunks (of varying sizes).
2405 2407 """
2406 2408 kwargs = pycompat.byteskwargs(kwargs)
2407 2409 info = {}
2408 2410 usebundle2 = bundle2requested(bundlecaps)
2409 2411 # bundle10 case
2410 2412 if not usebundle2:
2411 2413 if bundlecaps and not kwargs.get(b'cg', True):
2412 2414 raise ValueError(
2413 2415 _(b'request for bundle10 must include changegroup')
2414 2416 )
2415 2417
2416 2418 if kwargs:
2417 2419 raise ValueError(
2418 2420 _(b'unsupported getbundle arguments: %s')
2419 2421 % b', '.join(sorted(kwargs.keys()))
2420 2422 )
2421 2423 outgoing = _computeoutgoing(repo, heads, common)
2422 2424 info[b'bundleversion'] = 1
2423 2425 return (
2424 2426 info,
2425 2427 changegroup.makestream(
2426 2428 repo, outgoing, b'01', source, bundlecaps=bundlecaps
2427 2429 ),
2428 2430 )
2429 2431
2430 2432 # bundle20 case
2431 2433 info[b'bundleversion'] = 2
2432 2434 b2caps = {}
2433 2435 for bcaps in bundlecaps:
2434 2436 if bcaps.startswith(b'bundle2='):
2435 2437 blob = urlreq.unquote(bcaps[len(b'bundle2=') :])
2436 2438 b2caps.update(bundle2.decodecaps(blob))
2437 2439 bundler = bundle2.bundle20(repo.ui, b2caps)
2438 2440
2439 2441 kwargs[b'heads'] = heads
2440 2442 kwargs[b'common'] = common
2441 2443
2442 2444 for name in getbundle2partsorder:
2443 2445 func = getbundle2partsmapping[name]
2444 2446 func(
2445 2447 bundler,
2446 2448 repo,
2447 2449 source,
2448 2450 bundlecaps=bundlecaps,
2449 2451 b2caps=b2caps,
2450 2452 **pycompat.strkwargs(kwargs)
2451 2453 )
2452 2454
2453 2455 info[b'prefercompressed'] = bundler.prefercompressed
2454 2456
2455 2457 return info, bundler.getchunks()
2456 2458
2457 2459
2458 2460 @getbundle2partsgenerator(b'stream2')
2459 2461 def _getbundlestream2(bundler, repo, *args, **kwargs):
2460 2462 return bundle2.addpartbundlestream2(bundler, repo, **kwargs)
2461 2463
2462 2464
2463 2465 @getbundle2partsgenerator(b'changegroup')
2464 2466 def _getbundlechangegrouppart(
2465 2467 bundler,
2466 2468 repo,
2467 2469 source,
2468 2470 bundlecaps=None,
2469 2471 b2caps=None,
2470 2472 heads=None,
2471 2473 common=None,
2472 2474 **kwargs
2473 2475 ):
2474 2476 """add a changegroup part to the requested bundle"""
2475 2477 if not kwargs.get(r'cg', True):
2476 2478 return
2477 2479
2478 2480 version = b'01'
2479 2481 cgversions = b2caps.get(b'changegroup')
2480 2482 if cgversions: # 3.1 and 3.2 ship with an empty value
2481 2483 cgversions = [
2482 2484 v
2483 2485 for v in cgversions
2484 2486 if v in changegroup.supportedoutgoingversions(repo)
2485 2487 ]
2486 2488 if not cgversions:
2487 2489 raise error.Abort(_(b'no common changegroup version'))
2488 2490 version = max(cgversions)
2489 2491
2490 2492 outgoing = _computeoutgoing(repo, heads, common)
2491 2493 if not outgoing.missing:
2492 2494 return
2493 2495
2494 2496 if kwargs.get(r'narrow', False):
2495 2497 include = sorted(filter(bool, kwargs.get(r'includepats', [])))
2496 2498 exclude = sorted(filter(bool, kwargs.get(r'excludepats', [])))
2497 2499 matcher = narrowspec.match(repo.root, include=include, exclude=exclude)
2498 2500 else:
2499 2501 matcher = None
2500 2502
2501 2503 cgstream = changegroup.makestream(
2502 2504 repo, outgoing, version, source, bundlecaps=bundlecaps, matcher=matcher
2503 2505 )
2504 2506
2505 2507 part = bundler.newpart(b'changegroup', data=cgstream)
2506 2508 if cgversions:
2507 2509 part.addparam(b'version', version)
2508 2510
2509 2511 part.addparam(b'nbchanges', b'%d' % len(outgoing.missing), mandatory=False)
2510 2512
2511 2513 if b'treemanifest' in repo.requirements:
2512 2514 part.addparam(b'treemanifest', b'1')
2513 2515
2516 if b'exp-sidedata-flag' in repo.requirements:
2517 part.addparam(b'exp-sidedata', b'1')
2518
2514 2519 if (
2515 2520 kwargs.get(r'narrow', False)
2516 2521 and kwargs.get(r'narrow_acl', False)
2517 2522 and (include or exclude)
2518 2523 ):
2519 2524 # this is mandatory because otherwise ACL clients won't work
2520 2525 narrowspecpart = bundler.newpart(b'Narrow:responsespec')
2521 2526 narrowspecpart.data = b'%s\0%s' % (
2522 2527 b'\n'.join(include),
2523 2528 b'\n'.join(exclude),
2524 2529 )
2525 2530
2526 2531
2527 2532 @getbundle2partsgenerator(b'bookmarks')
2528 2533 def _getbundlebookmarkpart(
2529 2534 bundler, repo, source, bundlecaps=None, b2caps=None, **kwargs
2530 2535 ):
2531 2536 """add a bookmark part to the requested bundle"""
2532 2537 if not kwargs.get(r'bookmarks', False):
2533 2538 return
2534 2539 if b'bookmarks' not in b2caps:
2535 2540 raise error.Abort(_(b'no common bookmarks exchange method'))
2536 2541 books = bookmod.listbinbookmarks(repo)
2537 2542 data = bookmod.binaryencode(books)
2538 2543 if data:
2539 2544 bundler.newpart(b'bookmarks', data=data)
2540 2545
2541 2546
2542 2547 @getbundle2partsgenerator(b'listkeys')
2543 2548 def _getbundlelistkeysparts(
2544 2549 bundler, repo, source, bundlecaps=None, b2caps=None, **kwargs
2545 2550 ):
2546 2551 """add parts containing listkeys namespaces to the requested bundle"""
2547 2552 listkeys = kwargs.get(r'listkeys', ())
2548 2553 for namespace in listkeys:
2549 2554 part = bundler.newpart(b'listkeys')
2550 2555 part.addparam(b'namespace', namespace)
2551 2556 keys = repo.listkeys(namespace).items()
2552 2557 part.data = pushkey.encodekeys(keys)
2553 2558
2554 2559
2555 2560 @getbundle2partsgenerator(b'obsmarkers')
2556 2561 def _getbundleobsmarkerpart(
2557 2562 bundler, repo, source, bundlecaps=None, b2caps=None, heads=None, **kwargs
2558 2563 ):
2559 2564 """add an obsolescence markers part to the requested bundle"""
2560 2565 if kwargs.get(r'obsmarkers', False):
2561 2566 if heads is None:
2562 2567 heads = repo.heads()
2563 2568 subset = [c.node() for c in repo.set(b'::%ln', heads)]
2564 2569 markers = repo.obsstore.relevantmarkers(subset)
2565 2570 markers = sorted(markers)
2566 2571 bundle2.buildobsmarkerspart(bundler, markers)
2567 2572
2568 2573
2569 2574 @getbundle2partsgenerator(b'phases')
2570 2575 def _getbundlephasespart(
2571 2576 bundler, repo, source, bundlecaps=None, b2caps=None, heads=None, **kwargs
2572 2577 ):
2573 2578 """add phase heads part to the requested bundle"""
2574 2579 if kwargs.get(r'phases', False):
2575 2580 if not b'heads' in b2caps.get(b'phases'):
2576 2581 raise error.Abort(_(b'no common phases exchange method'))
2577 2582 if heads is None:
2578 2583 heads = repo.heads()
2579 2584
2580 2585 headsbyphase = collections.defaultdict(set)
2581 2586 if repo.publishing():
2582 2587 headsbyphase[phases.public] = heads
2583 2588 else:
2584 2589 # find the appropriate heads to move
2585 2590
2586 2591 phase = repo._phasecache.phase
2587 2592 node = repo.changelog.node
2588 2593 rev = repo.changelog.rev
2589 2594 for h in heads:
2590 2595 headsbyphase[phase(repo, rev(h))].add(h)
2591 2596 seenphases = list(headsbyphase.keys())
2592 2597
2593 2598 # We do not handle anything but public and draft phase for now)
2594 2599 if seenphases:
2595 2600 assert max(seenphases) <= phases.draft
2596 2601
2597 2602 # if client is pulling non-public changesets, we need to find
2598 2603 # intermediate public heads.
2599 2604 draftheads = headsbyphase.get(phases.draft, set())
2600 2605 if draftheads:
2601 2606 publicheads = headsbyphase.get(phases.public, set())
2602 2607
2603 2608 revset = b'heads(only(%ln, %ln) and public())'
2604 2609 extraheads = repo.revs(revset, draftheads, publicheads)
2605 2610 for r in extraheads:
2606 2611 headsbyphase[phases.public].add(node(r))
2607 2612
2608 2613 # transform data in a format used by the encoding function
2609 2614 phasemapping = []
2610 2615 for phase in phases.allphases:
2611 2616 phasemapping.append(sorted(headsbyphase[phase]))
2612 2617
2613 2618 # generate the actual part
2614 2619 phasedata = phases.binaryencode(phasemapping)
2615 2620 bundler.newpart(b'phase-heads', data=phasedata)
2616 2621
2617 2622
2618 2623 @getbundle2partsgenerator(b'hgtagsfnodes')
2619 2624 def _getbundletagsfnodes(
2620 2625 bundler,
2621 2626 repo,
2622 2627 source,
2623 2628 bundlecaps=None,
2624 2629 b2caps=None,
2625 2630 heads=None,
2626 2631 common=None,
2627 2632 **kwargs
2628 2633 ):
2629 2634 """Transfer the .hgtags filenodes mapping.
2630 2635
2631 2636 Only values for heads in this bundle will be transferred.
2632 2637
2633 2638 The part data consists of pairs of 20 byte changeset node and .hgtags
2634 2639 filenodes raw values.
2635 2640 """
2636 2641 # Don't send unless:
2637 2642 # - changeset are being exchanged,
2638 2643 # - the client supports it.
2639 2644 if not (kwargs.get(r'cg', True) and b'hgtagsfnodes' in b2caps):
2640 2645 return
2641 2646
2642 2647 outgoing = _computeoutgoing(repo, heads, common)
2643 2648 bundle2.addparttagsfnodescache(repo, bundler, outgoing)
2644 2649
2645 2650
2646 2651 @getbundle2partsgenerator(b'cache:rev-branch-cache')
2647 2652 def _getbundlerevbranchcache(
2648 2653 bundler,
2649 2654 repo,
2650 2655 source,
2651 2656 bundlecaps=None,
2652 2657 b2caps=None,
2653 2658 heads=None,
2654 2659 common=None,
2655 2660 **kwargs
2656 2661 ):
2657 2662 """Transfer the rev-branch-cache mapping
2658 2663
2659 2664 The payload is a series of data related to each branch
2660 2665
2661 2666 1) branch name length
2662 2667 2) number of open heads
2663 2668 3) number of closed heads
2664 2669 4) open heads nodes
2665 2670 5) closed heads nodes
2666 2671 """
2667 2672 # Don't send unless:
2668 2673 # - changeset are being exchanged,
2669 2674 # - the client supports it.
2670 2675 # - narrow bundle isn't in play (not currently compatible).
2671 2676 if (
2672 2677 not kwargs.get(r'cg', True)
2673 2678 or b'rev-branch-cache' not in b2caps
2674 2679 or kwargs.get(r'narrow', False)
2675 2680 or repo.ui.has_section(_NARROWACL_SECTION)
2676 2681 ):
2677 2682 return
2678 2683
2679 2684 outgoing = _computeoutgoing(repo, heads, common)
2680 2685 bundle2.addpartrevbranchcache(repo, bundler, outgoing)
2681 2686
2682 2687
2683 2688 def check_heads(repo, their_heads, context):
2684 2689 """check if the heads of a repo have been modified
2685 2690
2686 2691 Used by peer for unbundling.
2687 2692 """
2688 2693 heads = repo.heads()
2689 2694 heads_hash = hashlib.sha1(b''.join(sorted(heads))).digest()
2690 2695 if not (
2691 2696 their_heads == [b'force']
2692 2697 or their_heads == heads
2693 2698 or their_heads == [b'hashed', heads_hash]
2694 2699 ):
2695 2700 # someone else committed/pushed/unbundled while we
2696 2701 # were transferring data
2697 2702 raise error.PushRaced(
2698 2703 b'repository changed while %s - please try again' % context
2699 2704 )
2700 2705
2701 2706
2702 2707 def unbundle(repo, cg, heads, source, url):
2703 2708 """Apply a bundle to a repo.
2704 2709
2705 2710 this function makes sure the repo is locked during the application and have
2706 2711 mechanism to check that no push race occurred between the creation of the
2707 2712 bundle and its application.
2708 2713
2709 2714 If the push was raced as PushRaced exception is raised."""
2710 2715 r = 0
2711 2716 # need a transaction when processing a bundle2 stream
2712 2717 # [wlock, lock, tr] - needs to be an array so nested functions can modify it
2713 2718 lockandtr = [None, None, None]
2714 2719 recordout = None
2715 2720 # quick fix for output mismatch with bundle2 in 3.4
2716 2721 captureoutput = repo.ui.configbool(
2717 2722 b'experimental', b'bundle2-output-capture'
2718 2723 )
2719 2724 if url.startswith(b'remote:http:') or url.startswith(b'remote:https:'):
2720 2725 captureoutput = True
2721 2726 try:
2722 2727 # note: outside bundle1, 'heads' is expected to be empty and this
2723 2728 # 'check_heads' call wil be a no-op
2724 2729 check_heads(repo, heads, b'uploading changes')
2725 2730 # push can proceed
2726 2731 if not isinstance(cg, bundle2.unbundle20):
2727 2732 # legacy case: bundle1 (changegroup 01)
2728 2733 txnname = b"\n".join([source, util.hidepassword(url)])
2729 2734 with repo.lock(), repo.transaction(txnname) as tr:
2730 2735 op = bundle2.applybundle(repo, cg, tr, source, url)
2731 2736 r = bundle2.combinechangegroupresults(op)
2732 2737 else:
2733 2738 r = None
2734 2739 try:
2735 2740
2736 2741 def gettransaction():
2737 2742 if not lockandtr[2]:
2738 2743 if not bookmod.bookmarksinstore(repo):
2739 2744 lockandtr[0] = repo.wlock()
2740 2745 lockandtr[1] = repo.lock()
2741 2746 lockandtr[2] = repo.transaction(source)
2742 2747 lockandtr[2].hookargs[b'source'] = source
2743 2748 lockandtr[2].hookargs[b'url'] = url
2744 2749 lockandtr[2].hookargs[b'bundle2'] = b'1'
2745 2750 return lockandtr[2]
2746 2751
2747 2752 # Do greedy locking by default until we're satisfied with lazy
2748 2753 # locking.
2749 2754 if not repo.ui.configbool(
2750 2755 b'experimental', b'bundle2lazylocking'
2751 2756 ):
2752 2757 gettransaction()
2753 2758
2754 2759 op = bundle2.bundleoperation(
2755 2760 repo,
2756 2761 gettransaction,
2757 2762 captureoutput=captureoutput,
2758 2763 source=b'push',
2759 2764 )
2760 2765 try:
2761 2766 op = bundle2.processbundle(repo, cg, op=op)
2762 2767 finally:
2763 2768 r = op.reply
2764 2769 if captureoutput and r is not None:
2765 2770 repo.ui.pushbuffer(error=True, subproc=True)
2766 2771
2767 2772 def recordout(output):
2768 2773 r.newpart(b'output', data=output, mandatory=False)
2769 2774
2770 2775 if lockandtr[2] is not None:
2771 2776 lockandtr[2].close()
2772 2777 except BaseException as exc:
2773 2778 exc.duringunbundle2 = True
2774 2779 if captureoutput and r is not None:
2775 2780 parts = exc._bundle2salvagedoutput = r.salvageoutput()
2776 2781
2777 2782 def recordout(output):
2778 2783 part = bundle2.bundlepart(
2779 2784 b'output', data=output, mandatory=False
2780 2785 )
2781 2786 parts.append(part)
2782 2787
2783 2788 raise
2784 2789 finally:
2785 2790 lockmod.release(lockandtr[2], lockandtr[1], lockandtr[0])
2786 2791 if recordout is not None:
2787 2792 recordout(repo.ui.popbuffer())
2788 2793 return r
2789 2794
2790 2795
2791 2796 def _maybeapplyclonebundle(pullop):
2792 2797 """Apply a clone bundle from a remote, if possible."""
2793 2798
2794 2799 repo = pullop.repo
2795 2800 remote = pullop.remote
2796 2801
2797 2802 if not repo.ui.configbool(b'ui', b'clonebundles'):
2798 2803 return
2799 2804
2800 2805 # Only run if local repo is empty.
2801 2806 if len(repo):
2802 2807 return
2803 2808
2804 2809 if pullop.heads:
2805 2810 return
2806 2811
2807 2812 if not remote.capable(b'clonebundles'):
2808 2813 return
2809 2814
2810 2815 with remote.commandexecutor() as e:
2811 2816 res = e.callcommand(b'clonebundles', {}).result()
2812 2817
2813 2818 # If we call the wire protocol command, that's good enough to record the
2814 2819 # attempt.
2815 2820 pullop.clonebundleattempted = True
2816 2821
2817 2822 entries = parseclonebundlesmanifest(repo, res)
2818 2823 if not entries:
2819 2824 repo.ui.note(
2820 2825 _(
2821 2826 b'no clone bundles available on remote; '
2822 2827 b'falling back to regular clone\n'
2823 2828 )
2824 2829 )
2825 2830 return
2826 2831
2827 2832 entries = filterclonebundleentries(
2828 2833 repo, entries, streamclonerequested=pullop.streamclonerequested
2829 2834 )
2830 2835
2831 2836 if not entries:
2832 2837 # There is a thundering herd concern here. However, if a server
2833 2838 # operator doesn't advertise bundles appropriate for its clients,
2834 2839 # they deserve what's coming. Furthermore, from a client's
2835 2840 # perspective, no automatic fallback would mean not being able to
2836 2841 # clone!
2837 2842 repo.ui.warn(
2838 2843 _(
2839 2844 b'no compatible clone bundles available on server; '
2840 2845 b'falling back to regular clone\n'
2841 2846 )
2842 2847 )
2843 2848 repo.ui.warn(
2844 2849 _(b'(you may want to report this to the server operator)\n')
2845 2850 )
2846 2851 return
2847 2852
2848 2853 entries = sortclonebundleentries(repo.ui, entries)
2849 2854
2850 2855 url = entries[0][b'URL']
2851 2856 repo.ui.status(_(b'applying clone bundle from %s\n') % url)
2852 2857 if trypullbundlefromurl(repo.ui, repo, url):
2853 2858 repo.ui.status(_(b'finished applying clone bundle\n'))
2854 2859 # Bundle failed.
2855 2860 #
2856 2861 # We abort by default to avoid the thundering herd of
2857 2862 # clients flooding a server that was expecting expensive
2858 2863 # clone load to be offloaded.
2859 2864 elif repo.ui.configbool(b'ui', b'clonebundlefallback'):
2860 2865 repo.ui.warn(_(b'falling back to normal clone\n'))
2861 2866 else:
2862 2867 raise error.Abort(
2863 2868 _(b'error applying bundle'),
2864 2869 hint=_(
2865 2870 b'if this error persists, consider contacting '
2866 2871 b'the server operator or disable clone '
2867 2872 b'bundles via '
2868 2873 b'"--config ui.clonebundles=false"'
2869 2874 ),
2870 2875 )
2871 2876
2872 2877
2873 2878 def parseclonebundlesmanifest(repo, s):
2874 2879 """Parses the raw text of a clone bundles manifest.
2875 2880
2876 2881 Returns a list of dicts. The dicts have a ``URL`` key corresponding
2877 2882 to the URL and other keys are the attributes for the entry.
2878 2883 """
2879 2884 m = []
2880 2885 for line in s.splitlines():
2881 2886 fields = line.split()
2882 2887 if not fields:
2883 2888 continue
2884 2889 attrs = {b'URL': fields[0]}
2885 2890 for rawattr in fields[1:]:
2886 2891 key, value = rawattr.split(b'=', 1)
2887 2892 key = urlreq.unquote(key)
2888 2893 value = urlreq.unquote(value)
2889 2894 attrs[key] = value
2890 2895
2891 2896 # Parse BUNDLESPEC into components. This makes client-side
2892 2897 # preferences easier to specify since you can prefer a single
2893 2898 # component of the BUNDLESPEC.
2894 2899 if key == b'BUNDLESPEC':
2895 2900 try:
2896 2901 bundlespec = parsebundlespec(repo, value)
2897 2902 attrs[b'COMPRESSION'] = bundlespec.compression
2898 2903 attrs[b'VERSION'] = bundlespec.version
2899 2904 except error.InvalidBundleSpecification:
2900 2905 pass
2901 2906 except error.UnsupportedBundleSpecification:
2902 2907 pass
2903 2908
2904 2909 m.append(attrs)
2905 2910
2906 2911 return m
2907 2912
2908 2913
2909 2914 def isstreamclonespec(bundlespec):
2910 2915 # Stream clone v1
2911 2916 if bundlespec.wirecompression == b'UN' and bundlespec.wireversion == b's1':
2912 2917 return True
2913 2918
2914 2919 # Stream clone v2
2915 2920 if (
2916 2921 bundlespec.wirecompression == b'UN'
2917 2922 and bundlespec.wireversion == b'02'
2918 2923 and bundlespec.contentopts.get(b'streamv2')
2919 2924 ):
2920 2925 return True
2921 2926
2922 2927 return False
2923 2928
2924 2929
2925 2930 def filterclonebundleentries(repo, entries, streamclonerequested=False):
2926 2931 """Remove incompatible clone bundle manifest entries.
2927 2932
2928 2933 Accepts a list of entries parsed with ``parseclonebundlesmanifest``
2929 2934 and returns a new list consisting of only the entries that this client
2930 2935 should be able to apply.
2931 2936
2932 2937 There is no guarantee we'll be able to apply all returned entries because
2933 2938 the metadata we use to filter on may be missing or wrong.
2934 2939 """
2935 2940 newentries = []
2936 2941 for entry in entries:
2937 2942 spec = entry.get(b'BUNDLESPEC')
2938 2943 if spec:
2939 2944 try:
2940 2945 bundlespec = parsebundlespec(repo, spec, strict=True)
2941 2946
2942 2947 # If a stream clone was requested, filter out non-streamclone
2943 2948 # entries.
2944 2949 if streamclonerequested and not isstreamclonespec(bundlespec):
2945 2950 repo.ui.debug(
2946 2951 b'filtering %s because not a stream clone\n'
2947 2952 % entry[b'URL']
2948 2953 )
2949 2954 continue
2950 2955
2951 2956 except error.InvalidBundleSpecification as e:
2952 2957 repo.ui.debug(stringutil.forcebytestr(e) + b'\n')
2953 2958 continue
2954 2959 except error.UnsupportedBundleSpecification as e:
2955 2960 repo.ui.debug(
2956 2961 b'filtering %s because unsupported bundle '
2957 2962 b'spec: %s\n' % (entry[b'URL'], stringutil.forcebytestr(e))
2958 2963 )
2959 2964 continue
2960 2965 # If we don't have a spec and requested a stream clone, we don't know
2961 2966 # what the entry is so don't attempt to apply it.
2962 2967 elif streamclonerequested:
2963 2968 repo.ui.debug(
2964 2969 b'filtering %s because cannot determine if a stream '
2965 2970 b'clone bundle\n' % entry[b'URL']
2966 2971 )
2967 2972 continue
2968 2973
2969 2974 if b'REQUIRESNI' in entry and not sslutil.hassni:
2970 2975 repo.ui.debug(
2971 2976 b'filtering %s because SNI not supported\n' % entry[b'URL']
2972 2977 )
2973 2978 continue
2974 2979
2975 2980 newentries.append(entry)
2976 2981
2977 2982 return newentries
2978 2983
2979 2984
2980 2985 class clonebundleentry(object):
2981 2986 """Represents an item in a clone bundles manifest.
2982 2987
2983 2988 This rich class is needed to support sorting since sorted() in Python 3
2984 2989 doesn't support ``cmp`` and our comparison is complex enough that ``key=``
2985 2990 won't work.
2986 2991 """
2987 2992
2988 2993 def __init__(self, value, prefers):
2989 2994 self.value = value
2990 2995 self.prefers = prefers
2991 2996
2992 2997 def _cmp(self, other):
2993 2998 for prefkey, prefvalue in self.prefers:
2994 2999 avalue = self.value.get(prefkey)
2995 3000 bvalue = other.value.get(prefkey)
2996 3001
2997 3002 # Special case for b missing attribute and a matches exactly.
2998 3003 if avalue is not None and bvalue is None and avalue == prefvalue:
2999 3004 return -1
3000 3005
3001 3006 # Special case for a missing attribute and b matches exactly.
3002 3007 if bvalue is not None and avalue is None and bvalue == prefvalue:
3003 3008 return 1
3004 3009
3005 3010 # We can't compare unless attribute present on both.
3006 3011 if avalue is None or bvalue is None:
3007 3012 continue
3008 3013
3009 3014 # Same values should fall back to next attribute.
3010 3015 if avalue == bvalue:
3011 3016 continue
3012 3017
3013 3018 # Exact matches come first.
3014 3019 if avalue == prefvalue:
3015 3020 return -1
3016 3021 if bvalue == prefvalue:
3017 3022 return 1
3018 3023
3019 3024 # Fall back to next attribute.
3020 3025 continue
3021 3026
3022 3027 # If we got here we couldn't sort by attributes and prefers. Fall
3023 3028 # back to index order.
3024 3029 return 0
3025 3030
3026 3031 def __lt__(self, other):
3027 3032 return self._cmp(other) < 0
3028 3033
3029 3034 def __gt__(self, other):
3030 3035 return self._cmp(other) > 0
3031 3036
3032 3037 def __eq__(self, other):
3033 3038 return self._cmp(other) == 0
3034 3039
3035 3040 def __le__(self, other):
3036 3041 return self._cmp(other) <= 0
3037 3042
3038 3043 def __ge__(self, other):
3039 3044 return self._cmp(other) >= 0
3040 3045
3041 3046 def __ne__(self, other):
3042 3047 return self._cmp(other) != 0
3043 3048
3044 3049
3045 3050 def sortclonebundleentries(ui, entries):
3046 3051 prefers = ui.configlist(b'ui', b'clonebundleprefers')
3047 3052 if not prefers:
3048 3053 return list(entries)
3049 3054
3050 3055 prefers = [p.split(b'=', 1) for p in prefers]
3051 3056
3052 3057 items = sorted(clonebundleentry(v, prefers) for v in entries)
3053 3058 return [i.value for i in items]
3054 3059
3055 3060
3056 3061 def trypullbundlefromurl(ui, repo, url):
3057 3062 """Attempt to apply a bundle from a URL."""
3058 3063 with repo.lock(), repo.transaction(b'bundleurl') as tr:
3059 3064 try:
3060 3065 fh = urlmod.open(ui, url)
3061 3066 cg = readbundle(ui, fh, b'stream')
3062 3067
3063 3068 if isinstance(cg, streamclone.streamcloneapplier):
3064 3069 cg.apply(repo)
3065 3070 else:
3066 3071 bundle2.applybundle(repo, cg, tr, b'clonebundles', url)
3067 3072 return True
3068 3073 except urlerr.httperror as e:
3069 3074 ui.warn(
3070 3075 _(b'HTTP error fetching bundle: %s\n')
3071 3076 % stringutil.forcebytestr(e)
3072 3077 )
3073 3078 except urlerr.urlerror as e:
3074 3079 ui.warn(
3075 3080 _(b'error fetching bundle: %s\n')
3076 3081 % stringutil.forcebytestr(e.reason)
3077 3082 )
3078 3083
3079 3084 return False
General Comments 0
You need to be logged in to leave comments. Login now