##// END OF EJS Templates
bundle2: remove restriction around sidedata...
Raphaël Gomès -
r47845:1680c947 default
parent child Browse files
Show More
@@ -1,2594 +1,2587 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import, division
149 149
150 150 import collections
151 151 import errno
152 152 import os
153 153 import re
154 154 import string
155 155 import struct
156 156 import sys
157 157
158 158 from .i18n import _
159 159 from .node import (
160 160 hex,
161 161 short,
162 162 )
163 163 from . import (
164 164 bookmarks,
165 165 changegroup,
166 166 encoding,
167 167 error,
168 168 obsolete,
169 169 phases,
170 170 pushkey,
171 171 pycompat,
172 172 requirements,
173 173 scmutil,
174 174 streamclone,
175 175 tags,
176 176 url,
177 177 util,
178 178 )
179 179 from .utils import (
180 180 stringutil,
181 181 urlutil,
182 182 )
183 183
184 184 urlerr = util.urlerr
185 185 urlreq = util.urlreq
186 186
187 187 _pack = struct.pack
188 188 _unpack = struct.unpack
189 189
190 190 _fstreamparamsize = b'>i'
191 191 _fpartheadersize = b'>i'
192 192 _fparttypesize = b'>B'
193 193 _fpartid = b'>I'
194 194 _fpayloadsize = b'>i'
195 195 _fpartparamcount = b'>BB'
196 196
197 197 preferedchunksize = 32768
198 198
199 199 _parttypeforbidden = re.compile(b'[^a-zA-Z0-9_:-]')
200 200
201 201
202 202 def outdebug(ui, message):
203 203 """debug regarding output stream (bundling)"""
204 204 if ui.configbool(b'devel', b'bundle2.debug'):
205 205 ui.debug(b'bundle2-output: %s\n' % message)
206 206
207 207
208 208 def indebug(ui, message):
209 209 """debug on input stream (unbundling)"""
210 210 if ui.configbool(b'devel', b'bundle2.debug'):
211 211 ui.debug(b'bundle2-input: %s\n' % message)
212 212
213 213
214 214 def validateparttype(parttype):
215 215 """raise ValueError if a parttype contains invalid character"""
216 216 if _parttypeforbidden.search(parttype):
217 217 raise ValueError(parttype)
218 218
219 219
220 220 def _makefpartparamsizes(nbparams):
221 221 """return a struct format to read part parameter sizes
222 222
223 223 The number parameters is variable so we need to build that format
224 224 dynamically.
225 225 """
226 226 return b'>' + (b'BB' * nbparams)
227 227
228 228
229 229 parthandlermapping = {}
230 230
231 231
232 232 def parthandler(parttype, params=()):
233 233 """decorator that register a function as a bundle2 part handler
234 234
235 235 eg::
236 236
237 237 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
238 238 def myparttypehandler(...):
239 239 '''process a part of type "my part".'''
240 240 ...
241 241 """
242 242 validateparttype(parttype)
243 243
244 244 def _decorator(func):
245 245 lparttype = parttype.lower() # enforce lower case matching.
246 246 assert lparttype not in parthandlermapping
247 247 parthandlermapping[lparttype] = func
248 248 func.params = frozenset(params)
249 249 return func
250 250
251 251 return _decorator
252 252
253 253
254 254 class unbundlerecords(object):
255 255 """keep record of what happens during and unbundle
256 256
257 257 New records are added using `records.add('cat', obj)`. Where 'cat' is a
258 258 category of record and obj is an arbitrary object.
259 259
260 260 `records['cat']` will return all entries of this category 'cat'.
261 261
262 262 Iterating on the object itself will yield `('category', obj)` tuples
263 263 for all entries.
264 264
265 265 All iterations happens in chronological order.
266 266 """
267 267
268 268 def __init__(self):
269 269 self._categories = {}
270 270 self._sequences = []
271 271 self._replies = {}
272 272
273 273 def add(self, category, entry, inreplyto=None):
274 274 """add a new record of a given category.
275 275
276 276 The entry can then be retrieved in the list returned by
277 277 self['category']."""
278 278 self._categories.setdefault(category, []).append(entry)
279 279 self._sequences.append((category, entry))
280 280 if inreplyto is not None:
281 281 self.getreplies(inreplyto).add(category, entry)
282 282
283 283 def getreplies(self, partid):
284 284 """get the records that are replies to a specific part"""
285 285 return self._replies.setdefault(partid, unbundlerecords())
286 286
287 287 def __getitem__(self, cat):
288 288 return tuple(self._categories.get(cat, ()))
289 289
290 290 def __iter__(self):
291 291 return iter(self._sequences)
292 292
293 293 def __len__(self):
294 294 return len(self._sequences)
295 295
296 296 def __nonzero__(self):
297 297 return bool(self._sequences)
298 298
299 299 __bool__ = __nonzero__
300 300
301 301
302 302 class bundleoperation(object):
303 303 """an object that represents a single bundling process
304 304
305 305 Its purpose is to carry unbundle-related objects and states.
306 306
307 307 A new object should be created at the beginning of each bundle processing.
308 308 The object is to be returned by the processing function.
309 309
310 310 The object has very little content now it will ultimately contain:
311 311 * an access to the repo the bundle is applied to,
312 312 * a ui object,
313 313 * a way to retrieve a transaction to add changes to the repo,
314 314 * a way to record the result of processing each part,
315 315 * a way to construct a bundle response when applicable.
316 316 """
317 317
318 318 def __init__(self, repo, transactiongetter, captureoutput=True, source=b''):
319 319 self.repo = repo
320 320 self.ui = repo.ui
321 321 self.records = unbundlerecords()
322 322 self.reply = None
323 323 self.captureoutput = captureoutput
324 324 self.hookargs = {}
325 325 self._gettransaction = transactiongetter
326 326 # carries value that can modify part behavior
327 327 self.modes = {}
328 328 self.source = source
329 329
330 330 def gettransaction(self):
331 331 transaction = self._gettransaction()
332 332
333 333 if self.hookargs:
334 334 # the ones added to the transaction supercede those added
335 335 # to the operation.
336 336 self.hookargs.update(transaction.hookargs)
337 337 transaction.hookargs = self.hookargs
338 338
339 339 # mark the hookargs as flushed. further attempts to add to
340 340 # hookargs will result in an abort.
341 341 self.hookargs = None
342 342
343 343 return transaction
344 344
345 345 def addhookargs(self, hookargs):
346 346 if self.hookargs is None:
347 347 raise error.ProgrammingError(
348 348 b'attempted to add hookargs to '
349 349 b'operation after transaction started'
350 350 )
351 351 self.hookargs.update(hookargs)
352 352
353 353
354 354 class TransactionUnavailable(RuntimeError):
355 355 pass
356 356
357 357
358 358 def _notransaction():
359 359 """default method to get a transaction while processing a bundle
360 360
361 361 Raise an exception to highlight the fact that no transaction was expected
362 362 to be created"""
363 363 raise TransactionUnavailable()
364 364
365 365
366 366 def applybundle(repo, unbundler, tr, source, url=None, **kwargs):
367 367 # transform me into unbundler.apply() as soon as the freeze is lifted
368 368 if isinstance(unbundler, unbundle20):
369 369 tr.hookargs[b'bundle2'] = b'1'
370 370 if source is not None and b'source' not in tr.hookargs:
371 371 tr.hookargs[b'source'] = source
372 372 if url is not None and b'url' not in tr.hookargs:
373 373 tr.hookargs[b'url'] = url
374 374 return processbundle(repo, unbundler, lambda: tr, source=source)
375 375 else:
376 376 # the transactiongetter won't be used, but we might as well set it
377 377 op = bundleoperation(repo, lambda: tr, source=source)
378 378 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
379 379 return op
380 380
381 381
382 382 class partiterator(object):
383 383 def __init__(self, repo, op, unbundler):
384 384 self.repo = repo
385 385 self.op = op
386 386 self.unbundler = unbundler
387 387 self.iterator = None
388 388 self.count = 0
389 389 self.current = None
390 390
391 391 def __enter__(self):
392 392 def func():
393 393 itr = enumerate(self.unbundler.iterparts(), 1)
394 394 for count, p in itr:
395 395 self.count = count
396 396 self.current = p
397 397 yield p
398 398 p.consume()
399 399 self.current = None
400 400
401 401 self.iterator = func()
402 402 return self.iterator
403 403
404 404 def __exit__(self, type, exc, tb):
405 405 if not self.iterator:
406 406 return
407 407
408 408 # Only gracefully abort in a normal exception situation. User aborts
409 409 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
410 410 # and should not gracefully cleanup.
411 411 if isinstance(exc, Exception):
412 412 # Any exceptions seeking to the end of the bundle at this point are
413 413 # almost certainly related to the underlying stream being bad.
414 414 # And, chances are that the exception we're handling is related to
415 415 # getting in that bad state. So, we swallow the seeking error and
416 416 # re-raise the original error.
417 417 seekerror = False
418 418 try:
419 419 if self.current:
420 420 # consume the part content to not corrupt the stream.
421 421 self.current.consume()
422 422
423 423 for part in self.iterator:
424 424 # consume the bundle content
425 425 part.consume()
426 426 except Exception:
427 427 seekerror = True
428 428
429 429 # Small hack to let caller code distinguish exceptions from bundle2
430 430 # processing from processing the old format. This is mostly needed
431 431 # to handle different return codes to unbundle according to the type
432 432 # of bundle. We should probably clean up or drop this return code
433 433 # craziness in a future version.
434 434 exc.duringunbundle2 = True
435 435 salvaged = []
436 436 replycaps = None
437 437 if self.op.reply is not None:
438 438 salvaged = self.op.reply.salvageoutput()
439 439 replycaps = self.op.reply.capabilities
440 440 exc._replycaps = replycaps
441 441 exc._bundle2salvagedoutput = salvaged
442 442
443 443 # Re-raising from a variable loses the original stack. So only use
444 444 # that form if we need to.
445 445 if seekerror:
446 446 raise exc
447 447
448 448 self.repo.ui.debug(
449 449 b'bundle2-input-bundle: %i parts total\n' % self.count
450 450 )
451 451
452 452
453 453 def processbundle(repo, unbundler, transactiongetter=None, op=None, source=b''):
454 454 """This function process a bundle, apply effect to/from a repo
455 455
456 456 It iterates over each part then searches for and uses the proper handling
457 457 code to process the part. Parts are processed in order.
458 458
459 459 Unknown Mandatory part will abort the process.
460 460
461 461 It is temporarily possible to provide a prebuilt bundleoperation to the
462 462 function. This is used to ensure output is properly propagated in case of
463 463 an error during the unbundling. This output capturing part will likely be
464 464 reworked and this ability will probably go away in the process.
465 465 """
466 466 if op is None:
467 467 if transactiongetter is None:
468 468 transactiongetter = _notransaction
469 469 op = bundleoperation(repo, transactiongetter, source=source)
470 470 # todo:
471 471 # - replace this is a init function soon.
472 472 # - exception catching
473 473 unbundler.params
474 474 if repo.ui.debugflag:
475 475 msg = [b'bundle2-input-bundle:']
476 476 if unbundler.params:
477 477 msg.append(b' %i params' % len(unbundler.params))
478 478 if op._gettransaction is None or op._gettransaction is _notransaction:
479 479 msg.append(b' no-transaction')
480 480 else:
481 481 msg.append(b' with-transaction')
482 482 msg.append(b'\n')
483 483 repo.ui.debug(b''.join(msg))
484 484
485 485 processparts(repo, op, unbundler)
486 486
487 487 return op
488 488
489 489
490 490 def processparts(repo, op, unbundler):
491 491 with partiterator(repo, op, unbundler) as parts:
492 492 for part in parts:
493 493 _processpart(op, part)
494 494
495 495
496 496 def _processchangegroup(op, cg, tr, source, url, **kwargs):
497 497 ret = cg.apply(op.repo, tr, source, url, **kwargs)
498 498 op.records.add(
499 499 b'changegroup',
500 500 {
501 501 b'return': ret,
502 502 },
503 503 )
504 504 return ret
505 505
506 506
507 507 def _gethandler(op, part):
508 508 status = b'unknown' # used by debug output
509 509 try:
510 510 handler = parthandlermapping.get(part.type)
511 511 if handler is None:
512 512 status = b'unsupported-type'
513 513 raise error.BundleUnknownFeatureError(parttype=part.type)
514 514 indebug(op.ui, b'found a handler for part %s' % part.type)
515 515 unknownparams = part.mandatorykeys - handler.params
516 516 if unknownparams:
517 517 unknownparams = list(unknownparams)
518 518 unknownparams.sort()
519 519 status = b'unsupported-params (%s)' % b', '.join(unknownparams)
520 520 raise error.BundleUnknownFeatureError(
521 521 parttype=part.type, params=unknownparams
522 522 )
523 523 status = b'supported'
524 524 except error.BundleUnknownFeatureError as exc:
525 525 if part.mandatory: # mandatory parts
526 526 raise
527 527 indebug(op.ui, b'ignoring unsupported advisory part %s' % exc)
528 528 return # skip to part processing
529 529 finally:
530 530 if op.ui.debugflag:
531 531 msg = [b'bundle2-input-part: "%s"' % part.type]
532 532 if not part.mandatory:
533 533 msg.append(b' (advisory)')
534 534 nbmp = len(part.mandatorykeys)
535 535 nbap = len(part.params) - nbmp
536 536 if nbmp or nbap:
537 537 msg.append(b' (params:')
538 538 if nbmp:
539 539 msg.append(b' %i mandatory' % nbmp)
540 540 if nbap:
541 541 msg.append(b' %i advisory' % nbmp)
542 542 msg.append(b')')
543 543 msg.append(b' %s\n' % status)
544 544 op.ui.debug(b''.join(msg))
545 545
546 546 return handler
547 547
548 548
549 549 def _processpart(op, part):
550 550 """process a single part from a bundle
551 551
552 552 The part is guaranteed to have been fully consumed when the function exits
553 553 (even if an exception is raised)."""
554 554 handler = _gethandler(op, part)
555 555 if handler is None:
556 556 return
557 557
558 558 # handler is called outside the above try block so that we don't
559 559 # risk catching KeyErrors from anything other than the
560 560 # parthandlermapping lookup (any KeyError raised by handler()
561 561 # itself represents a defect of a different variety).
562 562 output = None
563 563 if op.captureoutput and op.reply is not None:
564 564 op.ui.pushbuffer(error=True, subproc=True)
565 565 output = b''
566 566 try:
567 567 handler(op, part)
568 568 finally:
569 569 if output is not None:
570 570 output = op.ui.popbuffer()
571 571 if output:
572 572 outpart = op.reply.newpart(b'output', data=output, mandatory=False)
573 573 outpart.addparam(
574 574 b'in-reply-to', pycompat.bytestr(part.id), mandatory=False
575 575 )
576 576
577 577
578 578 def decodecaps(blob):
579 579 """decode a bundle2 caps bytes blob into a dictionary
580 580
581 581 The blob is a list of capabilities (one per line)
582 582 Capabilities may have values using a line of the form::
583 583
584 584 capability=value1,value2,value3
585 585
586 586 The values are always a list."""
587 587 caps = {}
588 588 for line in blob.splitlines():
589 589 if not line:
590 590 continue
591 591 if b'=' not in line:
592 592 key, vals = line, ()
593 593 else:
594 594 key, vals = line.split(b'=', 1)
595 595 vals = vals.split(b',')
596 596 key = urlreq.unquote(key)
597 597 vals = [urlreq.unquote(v) for v in vals]
598 598 caps[key] = vals
599 599 return caps
600 600
601 601
602 602 def encodecaps(caps):
603 603 """encode a bundle2 caps dictionary into a bytes blob"""
604 604 chunks = []
605 605 for ca in sorted(caps):
606 606 vals = caps[ca]
607 607 ca = urlreq.quote(ca)
608 608 vals = [urlreq.quote(v) for v in vals]
609 609 if vals:
610 610 ca = b"%s=%s" % (ca, b','.join(vals))
611 611 chunks.append(ca)
612 612 return b'\n'.join(chunks)
613 613
614 614
615 615 bundletypes = {
616 616 b"": (b"", b'UN'), # only when using unbundle on ssh and old http servers
617 617 # since the unification ssh accepts a header but there
618 618 # is no capability signaling it.
619 619 b"HG20": (), # special-cased below
620 620 b"HG10UN": (b"HG10UN", b'UN'),
621 621 b"HG10BZ": (b"HG10", b'BZ'),
622 622 b"HG10GZ": (b"HG10GZ", b'GZ'),
623 623 }
624 624
625 625 # hgweb uses this list to communicate its preferred type
626 626 bundlepriority = [b'HG10GZ', b'HG10BZ', b'HG10UN']
627 627
628 628
629 629 class bundle20(object):
630 630 """represent an outgoing bundle2 container
631 631
632 632 Use the `addparam` method to add stream level parameter. and `newpart` to
633 633 populate it. Then call `getchunks` to retrieve all the binary chunks of
634 634 data that compose the bundle2 container."""
635 635
636 636 _magicstring = b'HG20'
637 637
638 638 def __init__(self, ui, capabilities=()):
639 639 self.ui = ui
640 640 self._params = []
641 641 self._parts = []
642 642 self.capabilities = dict(capabilities)
643 643 self._compengine = util.compengines.forbundletype(b'UN')
644 644 self._compopts = None
645 645 # If compression is being handled by a consumer of the raw
646 646 # data (e.g. the wire protocol), unsetting this flag tells
647 647 # consumers that the bundle is best left uncompressed.
648 648 self.prefercompressed = True
649 649
650 650 def setcompression(self, alg, compopts=None):
651 651 """setup core part compression to <alg>"""
652 652 if alg in (None, b'UN'):
653 653 return
654 654 assert not any(n.lower() == b'compression' for n, v in self._params)
655 655 self.addparam(b'Compression', alg)
656 656 self._compengine = util.compengines.forbundletype(alg)
657 657 self._compopts = compopts
658 658
659 659 @property
660 660 def nbparts(self):
661 661 """total number of parts added to the bundler"""
662 662 return len(self._parts)
663 663
664 664 # methods used to defines the bundle2 content
665 665 def addparam(self, name, value=None):
666 666 """add a stream level parameter"""
667 667 if not name:
668 668 raise error.ProgrammingError(b'empty parameter name')
669 669 if name[0:1] not in pycompat.bytestr(
670 670 string.ascii_letters # pytype: disable=wrong-arg-types
671 671 ):
672 672 raise error.ProgrammingError(
673 673 b'non letter first character: %s' % name
674 674 )
675 675 self._params.append((name, value))
676 676
677 677 def addpart(self, part):
678 678 """add a new part to the bundle2 container
679 679
680 680 Parts contains the actual applicative payload."""
681 681 assert part.id is None
682 682 part.id = len(self._parts) # very cheap counter
683 683 self._parts.append(part)
684 684
685 685 def newpart(self, typeid, *args, **kwargs):
686 686 """create a new part and add it to the containers
687 687
688 688 As the part is directly added to the containers. For now, this means
689 689 that any failure to properly initialize the part after calling
690 690 ``newpart`` should result in a failure of the whole bundling process.
691 691
692 692 You can still fall back to manually create and add if you need better
693 693 control."""
694 694 part = bundlepart(typeid, *args, **kwargs)
695 695 self.addpart(part)
696 696 return part
697 697
698 698 # methods used to generate the bundle2 stream
699 699 def getchunks(self):
700 700 if self.ui.debugflag:
701 701 msg = [b'bundle2-output-bundle: "%s",' % self._magicstring]
702 702 if self._params:
703 703 msg.append(b' (%i params)' % len(self._params))
704 704 msg.append(b' %i parts total\n' % len(self._parts))
705 705 self.ui.debug(b''.join(msg))
706 706 outdebug(self.ui, b'start emission of %s stream' % self._magicstring)
707 707 yield self._magicstring
708 708 param = self._paramchunk()
709 709 outdebug(self.ui, b'bundle parameter: %s' % param)
710 710 yield _pack(_fstreamparamsize, len(param))
711 711 if param:
712 712 yield param
713 713 for chunk in self._compengine.compressstream(
714 714 self._getcorechunk(), self._compopts
715 715 ):
716 716 yield chunk
717 717
718 718 def _paramchunk(self):
719 719 """return a encoded version of all stream parameters"""
720 720 blocks = []
721 721 for par, value in self._params:
722 722 par = urlreq.quote(par)
723 723 if value is not None:
724 724 value = urlreq.quote(value)
725 725 par = b'%s=%s' % (par, value)
726 726 blocks.append(par)
727 727 return b' '.join(blocks)
728 728
729 729 def _getcorechunk(self):
730 730 """yield chunk for the core part of the bundle
731 731
732 732 (all but headers and parameters)"""
733 733 outdebug(self.ui, b'start of parts')
734 734 for part in self._parts:
735 735 outdebug(self.ui, b'bundle part: "%s"' % part.type)
736 736 for chunk in part.getchunks(ui=self.ui):
737 737 yield chunk
738 738 outdebug(self.ui, b'end of bundle')
739 739 yield _pack(_fpartheadersize, 0)
740 740
741 741 def salvageoutput(self):
742 742 """return a list with a copy of all output parts in the bundle
743 743
744 744 This is meant to be used during error handling to make sure we preserve
745 745 server output"""
746 746 salvaged = []
747 747 for part in self._parts:
748 748 if part.type.startswith(b'output'):
749 749 salvaged.append(part.copy())
750 750 return salvaged
751 751
752 752
753 753 class unpackermixin(object):
754 754 """A mixin to extract bytes and struct data from a stream"""
755 755
756 756 def __init__(self, fp):
757 757 self._fp = fp
758 758
759 759 def _unpack(self, format):
760 760 """unpack this struct format from the stream
761 761
762 762 This method is meant for internal usage by the bundle2 protocol only.
763 763 They directly manipulate the low level stream including bundle2 level
764 764 instruction.
765 765
766 766 Do not use it to implement higher-level logic or methods."""
767 767 data = self._readexact(struct.calcsize(format))
768 768 return _unpack(format, data)
769 769
770 770 def _readexact(self, size):
771 771 """read exactly <size> bytes from the stream
772 772
773 773 This method is meant for internal usage by the bundle2 protocol only.
774 774 They directly manipulate the low level stream including bundle2 level
775 775 instruction.
776 776
777 777 Do not use it to implement higher-level logic or methods."""
778 778 return changegroup.readexactly(self._fp, size)
779 779
780 780
781 781 def getunbundler(ui, fp, magicstring=None):
782 782 """return a valid unbundler object for a given magicstring"""
783 783 if magicstring is None:
784 784 magicstring = changegroup.readexactly(fp, 4)
785 785 magic, version = magicstring[0:2], magicstring[2:4]
786 786 if magic != b'HG':
787 787 ui.debug(
788 788 b"error: invalid magic: %r (version %r), should be 'HG'\n"
789 789 % (magic, version)
790 790 )
791 791 raise error.Abort(_(b'not a Mercurial bundle'))
792 792 unbundlerclass = formatmap.get(version)
793 793 if unbundlerclass is None:
794 794 raise error.Abort(_(b'unknown bundle version %s') % version)
795 795 unbundler = unbundlerclass(ui, fp)
796 796 indebug(ui, b'start processing of %s stream' % magicstring)
797 797 return unbundler
798 798
799 799
800 800 class unbundle20(unpackermixin):
801 801 """interpret a bundle2 stream
802 802
803 803 This class is fed with a binary stream and yields parts through its
804 804 `iterparts` methods."""
805 805
806 806 _magicstring = b'HG20'
807 807
808 808 def __init__(self, ui, fp):
809 809 """If header is specified, we do not read it out of the stream."""
810 810 self.ui = ui
811 811 self._compengine = util.compengines.forbundletype(b'UN')
812 812 self._compressed = None
813 813 super(unbundle20, self).__init__(fp)
814 814
815 815 @util.propertycache
816 816 def params(self):
817 817 """dictionary of stream level parameters"""
818 818 indebug(self.ui, b'reading bundle2 stream parameters')
819 819 params = {}
820 820 paramssize = self._unpack(_fstreamparamsize)[0]
821 821 if paramssize < 0:
822 822 raise error.BundleValueError(
823 823 b'negative bundle param size: %i' % paramssize
824 824 )
825 825 if paramssize:
826 826 params = self._readexact(paramssize)
827 827 params = self._processallparams(params)
828 828 return params
829 829
830 830 def _processallparams(self, paramsblock):
831 831 """"""
832 832 params = util.sortdict()
833 833 for p in paramsblock.split(b' '):
834 834 p = p.split(b'=', 1)
835 835 p = [urlreq.unquote(i) for i in p]
836 836 if len(p) < 2:
837 837 p.append(None)
838 838 self._processparam(*p)
839 839 params[p[0]] = p[1]
840 840 return params
841 841
842 842 def _processparam(self, name, value):
843 843 """process a parameter, applying its effect if needed
844 844
845 845 Parameter starting with a lower case letter are advisory and will be
846 846 ignored when unknown. Those starting with an upper case letter are
847 847 mandatory and will this function will raise a KeyError when unknown.
848 848
849 849 Note: no option are currently supported. Any input will be either
850 850 ignored or failing.
851 851 """
852 852 if not name:
853 853 raise ValueError('empty parameter name')
854 854 if name[0:1] not in pycompat.bytestr(
855 855 string.ascii_letters # pytype: disable=wrong-arg-types
856 856 ):
857 857 raise ValueError('non letter first character: %s' % name)
858 858 try:
859 859 handler = b2streamparamsmap[name.lower()]
860 860 except KeyError:
861 861 if name[0:1].islower():
862 862 indebug(self.ui, b"ignoring unknown parameter %s" % name)
863 863 else:
864 864 raise error.BundleUnknownFeatureError(params=(name,))
865 865 else:
866 866 handler(self, name, value)
867 867
868 868 def _forwardchunks(self):
869 869 """utility to transfer a bundle2 as binary
870 870
871 871 This is made necessary by the fact the 'getbundle' command over 'ssh'
872 872 have no way to know then the reply end, relying on the bundle to be
873 873 interpreted to know its end. This is terrible and we are sorry, but we
874 874 needed to move forward to get general delta enabled.
875 875 """
876 876 yield self._magicstring
877 877 assert 'params' not in vars(self)
878 878 paramssize = self._unpack(_fstreamparamsize)[0]
879 879 if paramssize < 0:
880 880 raise error.BundleValueError(
881 881 b'negative bundle param size: %i' % paramssize
882 882 )
883 883 if paramssize:
884 884 params = self._readexact(paramssize)
885 885 self._processallparams(params)
886 886 # The payload itself is decompressed below, so drop
887 887 # the compression parameter passed down to compensate.
888 888 outparams = []
889 889 for p in params.split(b' '):
890 890 k, v = p.split(b'=', 1)
891 891 if k.lower() != b'compression':
892 892 outparams.append(p)
893 893 outparams = b' '.join(outparams)
894 894 yield _pack(_fstreamparamsize, len(outparams))
895 895 yield outparams
896 896 else:
897 897 yield _pack(_fstreamparamsize, paramssize)
898 898 # From there, payload might need to be decompressed
899 899 self._fp = self._compengine.decompressorreader(self._fp)
900 900 emptycount = 0
901 901 while emptycount < 2:
902 902 # so we can brainlessly loop
903 903 assert _fpartheadersize == _fpayloadsize
904 904 size = self._unpack(_fpartheadersize)[0]
905 905 yield _pack(_fpartheadersize, size)
906 906 if size:
907 907 emptycount = 0
908 908 else:
909 909 emptycount += 1
910 910 continue
911 911 if size == flaginterrupt:
912 912 continue
913 913 elif size < 0:
914 914 raise error.BundleValueError(b'negative chunk size: %i')
915 915 yield self._readexact(size)
916 916
917 917 def iterparts(self, seekable=False):
918 918 """yield all parts contained in the stream"""
919 919 cls = seekableunbundlepart if seekable else unbundlepart
920 920 # make sure param have been loaded
921 921 self.params
922 922 # From there, payload need to be decompressed
923 923 self._fp = self._compengine.decompressorreader(self._fp)
924 924 indebug(self.ui, b'start extraction of bundle2 parts')
925 925 headerblock = self._readpartheader()
926 926 while headerblock is not None:
927 927 part = cls(self.ui, headerblock, self._fp)
928 928 yield part
929 929 # Ensure part is fully consumed so we can start reading the next
930 930 # part.
931 931 part.consume()
932 932
933 933 headerblock = self._readpartheader()
934 934 indebug(self.ui, b'end of bundle2 stream')
935 935
936 936 def _readpartheader(self):
937 937 """reads a part header size and return the bytes blob
938 938
939 939 returns None if empty"""
940 940 headersize = self._unpack(_fpartheadersize)[0]
941 941 if headersize < 0:
942 942 raise error.BundleValueError(
943 943 b'negative part header size: %i' % headersize
944 944 )
945 945 indebug(self.ui, b'part header size: %i' % headersize)
946 946 if headersize:
947 947 return self._readexact(headersize)
948 948 return None
949 949
950 950 def compressed(self):
951 951 self.params # load params
952 952 return self._compressed
953 953
954 954 def close(self):
955 955 """close underlying file"""
956 956 if util.safehasattr(self._fp, 'close'):
957 957 return self._fp.close()
958 958
959 959
960 960 formatmap = {b'20': unbundle20}
961 961
962 962 b2streamparamsmap = {}
963 963
964 964
965 965 def b2streamparamhandler(name):
966 966 """register a handler for a stream level parameter"""
967 967
968 968 def decorator(func):
969 969 assert name not in formatmap
970 970 b2streamparamsmap[name] = func
971 971 return func
972 972
973 973 return decorator
974 974
975 975
976 976 @b2streamparamhandler(b'compression')
977 977 def processcompression(unbundler, param, value):
978 978 """read compression parameter and install payload decompression"""
979 979 if value not in util.compengines.supportedbundletypes:
980 980 raise error.BundleUnknownFeatureError(params=(param,), values=(value,))
981 981 unbundler._compengine = util.compengines.forbundletype(value)
982 982 if value is not None:
983 983 unbundler._compressed = True
984 984
985 985
986 986 class bundlepart(object):
987 987 """A bundle2 part contains application level payload
988 988
989 989 The part `type` is used to route the part to the application level
990 990 handler.
991 991
992 992 The part payload is contained in ``part.data``. It could be raw bytes or a
993 993 generator of byte chunks.
994 994
995 995 You can add parameters to the part using the ``addparam`` method.
996 996 Parameters can be either mandatory (default) or advisory. Remote side
997 997 should be able to safely ignore the advisory ones.
998 998
999 999 Both data and parameters cannot be modified after the generation has begun.
1000 1000 """
1001 1001
1002 1002 def __init__(
1003 1003 self,
1004 1004 parttype,
1005 1005 mandatoryparams=(),
1006 1006 advisoryparams=(),
1007 1007 data=b'',
1008 1008 mandatory=True,
1009 1009 ):
1010 1010 validateparttype(parttype)
1011 1011 self.id = None
1012 1012 self.type = parttype
1013 1013 self._data = data
1014 1014 self._mandatoryparams = list(mandatoryparams)
1015 1015 self._advisoryparams = list(advisoryparams)
1016 1016 # checking for duplicated entries
1017 1017 self._seenparams = set()
1018 1018 for pname, __ in self._mandatoryparams + self._advisoryparams:
1019 1019 if pname in self._seenparams:
1020 1020 raise error.ProgrammingError(b'duplicated params: %s' % pname)
1021 1021 self._seenparams.add(pname)
1022 1022 # status of the part's generation:
1023 1023 # - None: not started,
1024 1024 # - False: currently generated,
1025 1025 # - True: generation done.
1026 1026 self._generated = None
1027 1027 self.mandatory = mandatory
1028 1028
1029 1029 def __repr__(self):
1030 1030 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
1031 1031 return '<%s object at %x; id: %s; type: %s; mandatory: %s>' % (
1032 1032 cls,
1033 1033 id(self),
1034 1034 self.id,
1035 1035 self.type,
1036 1036 self.mandatory,
1037 1037 )
1038 1038
1039 1039 def copy(self):
1040 1040 """return a copy of the part
1041 1041
1042 1042 The new part have the very same content but no partid assigned yet.
1043 1043 Parts with generated data cannot be copied."""
1044 1044 assert not util.safehasattr(self.data, 'next')
1045 1045 return self.__class__(
1046 1046 self.type,
1047 1047 self._mandatoryparams,
1048 1048 self._advisoryparams,
1049 1049 self._data,
1050 1050 self.mandatory,
1051 1051 )
1052 1052
1053 1053 # methods used to defines the part content
1054 1054 @property
1055 1055 def data(self):
1056 1056 return self._data
1057 1057
1058 1058 @data.setter
1059 1059 def data(self, data):
1060 1060 if self._generated is not None:
1061 1061 raise error.ReadOnlyPartError(b'part is being generated')
1062 1062 self._data = data
1063 1063
1064 1064 @property
1065 1065 def mandatoryparams(self):
1066 1066 # make it an immutable tuple to force people through ``addparam``
1067 1067 return tuple(self._mandatoryparams)
1068 1068
1069 1069 @property
1070 1070 def advisoryparams(self):
1071 1071 # make it an immutable tuple to force people through ``addparam``
1072 1072 return tuple(self._advisoryparams)
1073 1073
1074 1074 def addparam(self, name, value=b'', mandatory=True):
1075 1075 """add a parameter to the part
1076 1076
1077 1077 If 'mandatory' is set to True, the remote handler must claim support
1078 1078 for this parameter or the unbundling will be aborted.
1079 1079
1080 1080 The 'name' and 'value' cannot exceed 255 bytes each.
1081 1081 """
1082 1082 if self._generated is not None:
1083 1083 raise error.ReadOnlyPartError(b'part is being generated')
1084 1084 if name in self._seenparams:
1085 1085 raise ValueError(b'duplicated params: %s' % name)
1086 1086 self._seenparams.add(name)
1087 1087 params = self._advisoryparams
1088 1088 if mandatory:
1089 1089 params = self._mandatoryparams
1090 1090 params.append((name, value))
1091 1091
1092 1092 # methods used to generates the bundle2 stream
1093 1093 def getchunks(self, ui):
1094 1094 if self._generated is not None:
1095 1095 raise error.ProgrammingError(b'part can only be consumed once')
1096 1096 self._generated = False
1097 1097
1098 1098 if ui.debugflag:
1099 1099 msg = [b'bundle2-output-part: "%s"' % self.type]
1100 1100 if not self.mandatory:
1101 1101 msg.append(b' (advisory)')
1102 1102 nbmp = len(self.mandatoryparams)
1103 1103 nbap = len(self.advisoryparams)
1104 1104 if nbmp or nbap:
1105 1105 msg.append(b' (params:')
1106 1106 if nbmp:
1107 1107 msg.append(b' %i mandatory' % nbmp)
1108 1108 if nbap:
1109 1109 msg.append(b' %i advisory' % nbmp)
1110 1110 msg.append(b')')
1111 1111 if not self.data:
1112 1112 msg.append(b' empty payload')
1113 1113 elif util.safehasattr(self.data, 'next') or util.safehasattr(
1114 1114 self.data, b'__next__'
1115 1115 ):
1116 1116 msg.append(b' streamed payload')
1117 1117 else:
1118 1118 msg.append(b' %i bytes payload' % len(self.data))
1119 1119 msg.append(b'\n')
1120 1120 ui.debug(b''.join(msg))
1121 1121
1122 1122 #### header
1123 1123 if self.mandatory:
1124 1124 parttype = self.type.upper()
1125 1125 else:
1126 1126 parttype = self.type.lower()
1127 1127 outdebug(ui, b'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1128 1128 ## parttype
1129 1129 header = [
1130 1130 _pack(_fparttypesize, len(parttype)),
1131 1131 parttype,
1132 1132 _pack(_fpartid, self.id),
1133 1133 ]
1134 1134 ## parameters
1135 1135 # count
1136 1136 manpar = self.mandatoryparams
1137 1137 advpar = self.advisoryparams
1138 1138 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1139 1139 # size
1140 1140 parsizes = []
1141 1141 for key, value in manpar:
1142 1142 parsizes.append(len(key))
1143 1143 parsizes.append(len(value))
1144 1144 for key, value in advpar:
1145 1145 parsizes.append(len(key))
1146 1146 parsizes.append(len(value))
1147 1147 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1148 1148 header.append(paramsizes)
1149 1149 # key, value
1150 1150 for key, value in manpar:
1151 1151 header.append(key)
1152 1152 header.append(value)
1153 1153 for key, value in advpar:
1154 1154 header.append(key)
1155 1155 header.append(value)
1156 1156 ## finalize header
1157 1157 try:
1158 1158 headerchunk = b''.join(header)
1159 1159 except TypeError:
1160 1160 raise TypeError(
1161 1161 'Found a non-bytes trying to '
1162 1162 'build bundle part header: %r' % header
1163 1163 )
1164 1164 outdebug(ui, b'header chunk size: %i' % len(headerchunk))
1165 1165 yield _pack(_fpartheadersize, len(headerchunk))
1166 1166 yield headerchunk
1167 1167 ## payload
1168 1168 try:
1169 1169 for chunk in self._payloadchunks():
1170 1170 outdebug(ui, b'payload chunk size: %i' % len(chunk))
1171 1171 yield _pack(_fpayloadsize, len(chunk))
1172 1172 yield chunk
1173 1173 except GeneratorExit:
1174 1174 # GeneratorExit means that nobody is listening for our
1175 1175 # results anyway, so just bail quickly rather than trying
1176 1176 # to produce an error part.
1177 1177 ui.debug(b'bundle2-generatorexit\n')
1178 1178 raise
1179 1179 except BaseException as exc:
1180 1180 bexc = stringutil.forcebytestr(exc)
1181 1181 # backup exception data for later
1182 1182 ui.debug(
1183 1183 b'bundle2-input-stream-interrupt: encoding exception %s' % bexc
1184 1184 )
1185 1185 tb = sys.exc_info()[2]
1186 1186 msg = b'unexpected error: %s' % bexc
1187 1187 interpart = bundlepart(
1188 1188 b'error:abort', [(b'message', msg)], mandatory=False
1189 1189 )
1190 1190 interpart.id = 0
1191 1191 yield _pack(_fpayloadsize, -1)
1192 1192 for chunk in interpart.getchunks(ui=ui):
1193 1193 yield chunk
1194 1194 outdebug(ui, b'closing payload chunk')
1195 1195 # abort current part payload
1196 1196 yield _pack(_fpayloadsize, 0)
1197 1197 pycompat.raisewithtb(exc, tb)
1198 1198 # end of payload
1199 1199 outdebug(ui, b'closing payload chunk')
1200 1200 yield _pack(_fpayloadsize, 0)
1201 1201 self._generated = True
1202 1202
1203 1203 def _payloadchunks(self):
1204 1204 """yield chunks of a the part payload
1205 1205
1206 1206 Exists to handle the different methods to provide data to a part."""
1207 1207 # we only support fixed size data now.
1208 1208 # This will be improved in the future.
1209 1209 if util.safehasattr(self.data, 'next') or util.safehasattr(
1210 1210 self.data, b'__next__'
1211 1211 ):
1212 1212 buff = util.chunkbuffer(self.data)
1213 1213 chunk = buff.read(preferedchunksize)
1214 1214 while chunk:
1215 1215 yield chunk
1216 1216 chunk = buff.read(preferedchunksize)
1217 1217 elif len(self.data):
1218 1218 yield self.data
1219 1219
1220 1220
1221 1221 flaginterrupt = -1
1222 1222
1223 1223
1224 1224 class interrupthandler(unpackermixin):
1225 1225 """read one part and process it with restricted capability
1226 1226
1227 1227 This allows to transmit exception raised on the producer size during part
1228 1228 iteration while the consumer is reading a part.
1229 1229
1230 1230 Part processed in this manner only have access to a ui object,"""
1231 1231
1232 1232 def __init__(self, ui, fp):
1233 1233 super(interrupthandler, self).__init__(fp)
1234 1234 self.ui = ui
1235 1235
1236 1236 def _readpartheader(self):
1237 1237 """reads a part header size and return the bytes blob
1238 1238
1239 1239 returns None if empty"""
1240 1240 headersize = self._unpack(_fpartheadersize)[0]
1241 1241 if headersize < 0:
1242 1242 raise error.BundleValueError(
1243 1243 b'negative part header size: %i' % headersize
1244 1244 )
1245 1245 indebug(self.ui, b'part header size: %i\n' % headersize)
1246 1246 if headersize:
1247 1247 return self._readexact(headersize)
1248 1248 return None
1249 1249
1250 1250 def __call__(self):
1251 1251
1252 1252 self.ui.debug(
1253 1253 b'bundle2-input-stream-interrupt: opening out of band context\n'
1254 1254 )
1255 1255 indebug(self.ui, b'bundle2 stream interruption, looking for a part.')
1256 1256 headerblock = self._readpartheader()
1257 1257 if headerblock is None:
1258 1258 indebug(self.ui, b'no part found during interruption.')
1259 1259 return
1260 1260 part = unbundlepart(self.ui, headerblock, self._fp)
1261 1261 op = interruptoperation(self.ui)
1262 1262 hardabort = False
1263 1263 try:
1264 1264 _processpart(op, part)
1265 1265 except (SystemExit, KeyboardInterrupt):
1266 1266 hardabort = True
1267 1267 raise
1268 1268 finally:
1269 1269 if not hardabort:
1270 1270 part.consume()
1271 1271 self.ui.debug(
1272 1272 b'bundle2-input-stream-interrupt: closing out of band context\n'
1273 1273 )
1274 1274
1275 1275
1276 1276 class interruptoperation(object):
1277 1277 """A limited operation to be use by part handler during interruption
1278 1278
1279 1279 It only have access to an ui object.
1280 1280 """
1281 1281
1282 1282 def __init__(self, ui):
1283 1283 self.ui = ui
1284 1284 self.reply = None
1285 1285 self.captureoutput = False
1286 1286
1287 1287 @property
1288 1288 def repo(self):
1289 1289 raise error.ProgrammingError(b'no repo access from stream interruption')
1290 1290
1291 1291 def gettransaction(self):
1292 1292 raise TransactionUnavailable(b'no repo access from stream interruption')
1293 1293
1294 1294
1295 1295 def decodepayloadchunks(ui, fh):
1296 1296 """Reads bundle2 part payload data into chunks.
1297 1297
1298 1298 Part payload data consists of framed chunks. This function takes
1299 1299 a file handle and emits those chunks.
1300 1300 """
1301 1301 dolog = ui.configbool(b'devel', b'bundle2.debug')
1302 1302 debug = ui.debug
1303 1303
1304 1304 headerstruct = struct.Struct(_fpayloadsize)
1305 1305 headersize = headerstruct.size
1306 1306 unpack = headerstruct.unpack
1307 1307
1308 1308 readexactly = changegroup.readexactly
1309 1309 read = fh.read
1310 1310
1311 1311 chunksize = unpack(readexactly(fh, headersize))[0]
1312 1312 indebug(ui, b'payload chunk size: %i' % chunksize)
1313 1313
1314 1314 # changegroup.readexactly() is inlined below for performance.
1315 1315 while chunksize:
1316 1316 if chunksize >= 0:
1317 1317 s = read(chunksize)
1318 1318 if len(s) < chunksize:
1319 1319 raise error.Abort(
1320 1320 _(
1321 1321 b'stream ended unexpectedly '
1322 1322 b' (got %d bytes, expected %d)'
1323 1323 )
1324 1324 % (len(s), chunksize)
1325 1325 )
1326 1326
1327 1327 yield s
1328 1328 elif chunksize == flaginterrupt:
1329 1329 # Interrupt "signal" detected. The regular stream is interrupted
1330 1330 # and a bundle2 part follows. Consume it.
1331 1331 interrupthandler(ui, fh)()
1332 1332 else:
1333 1333 raise error.BundleValueError(
1334 1334 b'negative payload chunk size: %s' % chunksize
1335 1335 )
1336 1336
1337 1337 s = read(headersize)
1338 1338 if len(s) < headersize:
1339 1339 raise error.Abort(
1340 1340 _(b'stream ended unexpectedly (got %d bytes, expected %d)')
1341 1341 % (len(s), chunksize)
1342 1342 )
1343 1343
1344 1344 chunksize = unpack(s)[0]
1345 1345
1346 1346 # indebug() inlined for performance.
1347 1347 if dolog:
1348 1348 debug(b'bundle2-input: payload chunk size: %i\n' % chunksize)
1349 1349
1350 1350
1351 1351 class unbundlepart(unpackermixin):
1352 1352 """a bundle part read from a bundle"""
1353 1353
1354 1354 def __init__(self, ui, header, fp):
1355 1355 super(unbundlepart, self).__init__(fp)
1356 1356 self._seekable = util.safehasattr(fp, 'seek') and util.safehasattr(
1357 1357 fp, b'tell'
1358 1358 )
1359 1359 self.ui = ui
1360 1360 # unbundle state attr
1361 1361 self._headerdata = header
1362 1362 self._headeroffset = 0
1363 1363 self._initialized = False
1364 1364 self.consumed = False
1365 1365 # part data
1366 1366 self.id = None
1367 1367 self.type = None
1368 1368 self.mandatoryparams = None
1369 1369 self.advisoryparams = None
1370 1370 self.params = None
1371 1371 self.mandatorykeys = ()
1372 1372 self._readheader()
1373 1373 self._mandatory = None
1374 1374 self._pos = 0
1375 1375
1376 1376 def _fromheader(self, size):
1377 1377 """return the next <size> byte from the header"""
1378 1378 offset = self._headeroffset
1379 1379 data = self._headerdata[offset : (offset + size)]
1380 1380 self._headeroffset = offset + size
1381 1381 return data
1382 1382
1383 1383 def _unpackheader(self, format):
1384 1384 """read given format from header
1385 1385
1386 1386 This automatically compute the size of the format to read."""
1387 1387 data = self._fromheader(struct.calcsize(format))
1388 1388 return _unpack(format, data)
1389 1389
1390 1390 def _initparams(self, mandatoryparams, advisoryparams):
1391 1391 """internal function to setup all logic related parameters"""
1392 1392 # make it read only to prevent people touching it by mistake.
1393 1393 self.mandatoryparams = tuple(mandatoryparams)
1394 1394 self.advisoryparams = tuple(advisoryparams)
1395 1395 # user friendly UI
1396 1396 self.params = util.sortdict(self.mandatoryparams)
1397 1397 self.params.update(self.advisoryparams)
1398 1398 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1399 1399
1400 1400 def _readheader(self):
1401 1401 """read the header and setup the object"""
1402 1402 typesize = self._unpackheader(_fparttypesize)[0]
1403 1403 self.type = self._fromheader(typesize)
1404 1404 indebug(self.ui, b'part type: "%s"' % self.type)
1405 1405 self.id = self._unpackheader(_fpartid)[0]
1406 1406 indebug(self.ui, b'part id: "%s"' % pycompat.bytestr(self.id))
1407 1407 # extract mandatory bit from type
1408 1408 self.mandatory = self.type != self.type.lower()
1409 1409 self.type = self.type.lower()
1410 1410 ## reading parameters
1411 1411 # param count
1412 1412 mancount, advcount = self._unpackheader(_fpartparamcount)
1413 1413 indebug(self.ui, b'part parameters: %i' % (mancount + advcount))
1414 1414 # param size
1415 1415 fparamsizes = _makefpartparamsizes(mancount + advcount)
1416 1416 paramsizes = self._unpackheader(fparamsizes)
1417 1417 # make it a list of couple again
1418 1418 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1419 1419 # split mandatory from advisory
1420 1420 mansizes = paramsizes[:mancount]
1421 1421 advsizes = paramsizes[mancount:]
1422 1422 # retrieve param value
1423 1423 manparams = []
1424 1424 for key, value in mansizes:
1425 1425 manparams.append((self._fromheader(key), self._fromheader(value)))
1426 1426 advparams = []
1427 1427 for key, value in advsizes:
1428 1428 advparams.append((self._fromheader(key), self._fromheader(value)))
1429 1429 self._initparams(manparams, advparams)
1430 1430 ## part payload
1431 1431 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1432 1432 # we read the data, tell it
1433 1433 self._initialized = True
1434 1434
1435 1435 def _payloadchunks(self):
1436 1436 """Generator of decoded chunks in the payload."""
1437 1437 return decodepayloadchunks(self.ui, self._fp)
1438 1438
1439 1439 def consume(self):
1440 1440 """Read the part payload until completion.
1441 1441
1442 1442 By consuming the part data, the underlying stream read offset will
1443 1443 be advanced to the next part (or end of stream).
1444 1444 """
1445 1445 if self.consumed:
1446 1446 return
1447 1447
1448 1448 chunk = self.read(32768)
1449 1449 while chunk:
1450 1450 self._pos += len(chunk)
1451 1451 chunk = self.read(32768)
1452 1452
1453 1453 def read(self, size=None):
1454 1454 """read payload data"""
1455 1455 if not self._initialized:
1456 1456 self._readheader()
1457 1457 if size is None:
1458 1458 data = self._payloadstream.read()
1459 1459 else:
1460 1460 data = self._payloadstream.read(size)
1461 1461 self._pos += len(data)
1462 1462 if size is None or len(data) < size:
1463 1463 if not self.consumed and self._pos:
1464 1464 self.ui.debug(
1465 1465 b'bundle2-input-part: total payload size %i\n' % self._pos
1466 1466 )
1467 1467 self.consumed = True
1468 1468 return data
1469 1469
1470 1470
1471 1471 class seekableunbundlepart(unbundlepart):
1472 1472 """A bundle2 part in a bundle that is seekable.
1473 1473
1474 1474 Regular ``unbundlepart`` instances can only be read once. This class
1475 1475 extends ``unbundlepart`` to enable bi-directional seeking within the
1476 1476 part.
1477 1477
1478 1478 Bundle2 part data consists of framed chunks. Offsets when seeking
1479 1479 refer to the decoded data, not the offsets in the underlying bundle2
1480 1480 stream.
1481 1481
1482 1482 To facilitate quickly seeking within the decoded data, instances of this
1483 1483 class maintain a mapping between offsets in the underlying stream and
1484 1484 the decoded payload. This mapping will consume memory in proportion
1485 1485 to the number of chunks within the payload (which almost certainly
1486 1486 increases in proportion with the size of the part).
1487 1487 """
1488 1488
1489 1489 def __init__(self, ui, header, fp):
1490 1490 # (payload, file) offsets for chunk starts.
1491 1491 self._chunkindex = []
1492 1492
1493 1493 super(seekableunbundlepart, self).__init__(ui, header, fp)
1494 1494
1495 1495 def _payloadchunks(self, chunknum=0):
1496 1496 '''seek to specified chunk and start yielding data'''
1497 1497 if len(self._chunkindex) == 0:
1498 1498 assert chunknum == 0, b'Must start with chunk 0'
1499 1499 self._chunkindex.append((0, self._tellfp()))
1500 1500 else:
1501 1501 assert chunknum < len(self._chunkindex), (
1502 1502 b'Unknown chunk %d' % chunknum
1503 1503 )
1504 1504 self._seekfp(self._chunkindex[chunknum][1])
1505 1505
1506 1506 pos = self._chunkindex[chunknum][0]
1507 1507
1508 1508 for chunk in decodepayloadchunks(self.ui, self._fp):
1509 1509 chunknum += 1
1510 1510 pos += len(chunk)
1511 1511 if chunknum == len(self._chunkindex):
1512 1512 self._chunkindex.append((pos, self._tellfp()))
1513 1513
1514 1514 yield chunk
1515 1515
1516 1516 def _findchunk(self, pos):
1517 1517 '''for a given payload position, return a chunk number and offset'''
1518 1518 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1519 1519 if ppos == pos:
1520 1520 return chunk, 0
1521 1521 elif ppos > pos:
1522 1522 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1523 1523 raise ValueError(b'Unknown chunk')
1524 1524
1525 1525 def tell(self):
1526 1526 return self._pos
1527 1527
1528 1528 def seek(self, offset, whence=os.SEEK_SET):
1529 1529 if whence == os.SEEK_SET:
1530 1530 newpos = offset
1531 1531 elif whence == os.SEEK_CUR:
1532 1532 newpos = self._pos + offset
1533 1533 elif whence == os.SEEK_END:
1534 1534 if not self.consumed:
1535 1535 # Can't use self.consume() here because it advances self._pos.
1536 1536 chunk = self.read(32768)
1537 1537 while chunk:
1538 1538 chunk = self.read(32768)
1539 1539 newpos = self._chunkindex[-1][0] - offset
1540 1540 else:
1541 1541 raise ValueError(b'Unknown whence value: %r' % (whence,))
1542 1542
1543 1543 if newpos > self._chunkindex[-1][0] and not self.consumed:
1544 1544 # Can't use self.consume() here because it advances self._pos.
1545 1545 chunk = self.read(32768)
1546 1546 while chunk:
1547 1547 chunk = self.read(32668)
1548 1548
1549 1549 if not 0 <= newpos <= self._chunkindex[-1][0]:
1550 1550 raise ValueError(b'Offset out of range')
1551 1551
1552 1552 if self._pos != newpos:
1553 1553 chunk, internaloffset = self._findchunk(newpos)
1554 1554 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1555 1555 adjust = self.read(internaloffset)
1556 1556 if len(adjust) != internaloffset:
1557 1557 raise error.Abort(_(b'Seek failed\n'))
1558 1558 self._pos = newpos
1559 1559
1560 1560 def _seekfp(self, offset, whence=0):
1561 1561 """move the underlying file pointer
1562 1562
1563 1563 This method is meant for internal usage by the bundle2 protocol only.
1564 1564 They directly manipulate the low level stream including bundle2 level
1565 1565 instruction.
1566 1566
1567 1567 Do not use it to implement higher-level logic or methods."""
1568 1568 if self._seekable:
1569 1569 return self._fp.seek(offset, whence)
1570 1570 else:
1571 1571 raise NotImplementedError(_(b'File pointer is not seekable'))
1572 1572
1573 1573 def _tellfp(self):
1574 1574 """return the file offset, or None if file is not seekable
1575 1575
1576 1576 This method is meant for internal usage by the bundle2 protocol only.
1577 1577 They directly manipulate the low level stream including bundle2 level
1578 1578 instruction.
1579 1579
1580 1580 Do not use it to implement higher-level logic or methods."""
1581 1581 if self._seekable:
1582 1582 try:
1583 1583 return self._fp.tell()
1584 1584 except IOError as e:
1585 1585 if e.errno == errno.ESPIPE:
1586 1586 self._seekable = False
1587 1587 else:
1588 1588 raise
1589 1589 return None
1590 1590
1591 1591
1592 1592 # These are only the static capabilities.
1593 1593 # Check the 'getrepocaps' function for the rest.
1594 1594 capabilities = {
1595 1595 b'HG20': (),
1596 1596 b'bookmarks': (),
1597 1597 b'error': (b'abort', b'unsupportedcontent', b'pushraced', b'pushkey'),
1598 1598 b'listkeys': (),
1599 1599 b'pushkey': (),
1600 1600 b'digests': tuple(sorted(util.DIGESTS.keys())),
1601 1601 b'remote-changegroup': (b'http', b'https'),
1602 1602 b'hgtagsfnodes': (),
1603 1603 b'phases': (b'heads',),
1604 1604 b'stream': (b'v2',),
1605 1605 }
1606 1606
1607 1607
1608 1608 def getrepocaps(repo, allowpushback=False, role=None):
1609 1609 """return the bundle2 capabilities for a given repo
1610 1610
1611 1611 Exists to allow extensions (like evolution) to mutate the capabilities.
1612 1612
1613 1613 The returned value is used for servers advertising their capabilities as
1614 1614 well as clients advertising their capabilities to servers as part of
1615 1615 bundle2 requests. The ``role`` argument specifies which is which.
1616 1616 """
1617 1617 if role not in (b'client', b'server'):
1618 1618 raise error.ProgrammingError(b'role argument must be client or server')
1619 1619
1620 1620 caps = capabilities.copy()
1621 1621 caps[b'changegroup'] = tuple(
1622 1622 sorted(changegroup.supportedincomingversions(repo))
1623 1623 )
1624 1624 if obsolete.isenabled(repo, obsolete.exchangeopt):
1625 1625 supportedformat = tuple(b'V%i' % v for v in obsolete.formats)
1626 1626 caps[b'obsmarkers'] = supportedformat
1627 1627 if allowpushback:
1628 1628 caps[b'pushback'] = ()
1629 1629 cpmode = repo.ui.config(b'server', b'concurrent-push-mode')
1630 1630 if cpmode == b'check-related':
1631 1631 caps[b'checkheads'] = (b'related',)
1632 1632 if b'phases' in repo.ui.configlist(b'devel', b'legacy.exchange'):
1633 1633 caps.pop(b'phases')
1634 1634
1635 1635 # Don't advertise stream clone support in server mode if not configured.
1636 1636 if role == b'server':
1637 1637 streamsupported = repo.ui.configbool(
1638 1638 b'server', b'uncompressed', untrusted=True
1639 1639 )
1640 1640 featuresupported = repo.ui.configbool(b'server', b'bundle2.stream')
1641 1641
1642 1642 if not streamsupported or not featuresupported:
1643 1643 caps.pop(b'stream')
1644 1644 # Else always advertise support on client, because payload support
1645 1645 # should always be advertised.
1646 1646
1647 1647 # b'rev-branch-cache is no longer advertised, but still supported
1648 1648 # for legacy clients.
1649 1649
1650 1650 return caps
1651 1651
1652 1652
1653 1653 def bundle2caps(remote):
1654 1654 """return the bundle capabilities of a peer as dict"""
1655 1655 raw = remote.capable(b'bundle2')
1656 1656 if not raw and raw != b'':
1657 1657 return {}
1658 1658 capsblob = urlreq.unquote(remote.capable(b'bundle2'))
1659 1659 return decodecaps(capsblob)
1660 1660
1661 1661
1662 1662 def obsmarkersversion(caps):
1663 1663 """extract the list of supported obsmarkers versions from a bundle2caps dict"""
1664 1664 obscaps = caps.get(b'obsmarkers', ())
1665 1665 return [int(c[1:]) for c in obscaps if c.startswith(b'V')]
1666 1666
1667 1667
1668 1668 def writenewbundle(
1669 1669 ui,
1670 1670 repo,
1671 1671 source,
1672 1672 filename,
1673 1673 bundletype,
1674 1674 outgoing,
1675 1675 opts,
1676 1676 vfs=None,
1677 1677 compression=None,
1678 1678 compopts=None,
1679 1679 ):
1680 1680 if bundletype.startswith(b'HG10'):
1681 1681 cg = changegroup.makechangegroup(repo, outgoing, b'01', source)
1682 1682 return writebundle(
1683 1683 ui,
1684 1684 cg,
1685 1685 filename,
1686 1686 bundletype,
1687 1687 vfs=vfs,
1688 1688 compression=compression,
1689 1689 compopts=compopts,
1690 1690 )
1691 1691 elif not bundletype.startswith(b'HG20'):
1692 1692 raise error.ProgrammingError(b'unknown bundle type: %s' % bundletype)
1693 1693
1694 1694 caps = {}
1695 1695 if b'obsolescence' in opts:
1696 1696 caps[b'obsmarkers'] = (b'V1',)
1697 1697 bundle = bundle20(ui, caps)
1698 1698 bundle.setcompression(compression, compopts)
1699 1699 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1700 1700 chunkiter = bundle.getchunks()
1701 1701
1702 1702 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1703 1703
1704 1704
1705 1705 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1706 1706 # We should eventually reconcile this logic with the one behind
1707 1707 # 'exchange.getbundle2partsgenerator'.
1708 1708 #
1709 1709 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1710 1710 # different right now. So we keep them separated for now for the sake of
1711 1711 # simplicity.
1712 1712
1713 1713 # we might not always want a changegroup in such bundle, for example in
1714 1714 # stream bundles
1715 1715 if opts.get(b'changegroup', True):
1716 1716 cgversion = opts.get(b'cg.version')
1717 1717 if cgversion is None:
1718 1718 cgversion = changegroup.safeversion(repo)
1719 1719 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1720 1720 part = bundler.newpart(b'changegroup', data=cg.getchunks())
1721 1721 part.addparam(b'version', cg.version)
1722 1722 if b'clcount' in cg.extras:
1723 1723 part.addparam(
1724 1724 b'nbchanges', b'%d' % cg.extras[b'clcount'], mandatory=False
1725 1725 )
1726 1726 if opts.get(b'phases') and repo.revs(
1727 1727 b'%ln and secret()', outgoing.ancestorsof
1728 1728 ):
1729 1729 part.addparam(
1730 1730 b'targetphase', b'%d' % phases.secret, mandatory=False
1731 1731 )
1732 1732 if b'exp-sidedata-flag' in repo.requirements:
1733 1733 part.addparam(b'exp-sidedata', b'1')
1734 1734
1735 1735 if opts.get(b'streamv2', False):
1736 1736 addpartbundlestream2(bundler, repo, stream=True)
1737 1737
1738 1738 if opts.get(b'tagsfnodescache', True):
1739 1739 addparttagsfnodescache(repo, bundler, outgoing)
1740 1740
1741 1741 if opts.get(b'revbranchcache', True):
1742 1742 addpartrevbranchcache(repo, bundler, outgoing)
1743 1743
1744 1744 if opts.get(b'obsolescence', False):
1745 1745 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1746 1746 buildobsmarkerspart(
1747 1747 bundler,
1748 1748 obsmarkers,
1749 1749 mandatory=opts.get(b'obsolescence-mandatory', True),
1750 1750 )
1751 1751
1752 1752 if opts.get(b'phases', False):
1753 1753 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1754 1754 phasedata = phases.binaryencode(headsbyphase)
1755 1755 bundler.newpart(b'phase-heads', data=phasedata)
1756 1756
1757 1757
1758 1758 def addparttagsfnodescache(repo, bundler, outgoing):
1759 1759 # we include the tags fnode cache for the bundle changeset
1760 1760 # (as an optional parts)
1761 1761 cache = tags.hgtagsfnodescache(repo.unfiltered())
1762 1762 chunks = []
1763 1763
1764 1764 # .hgtags fnodes are only relevant for head changesets. While we could
1765 1765 # transfer values for all known nodes, there will likely be little to
1766 1766 # no benefit.
1767 1767 #
1768 1768 # We don't bother using a generator to produce output data because
1769 1769 # a) we only have 40 bytes per head and even esoteric numbers of heads
1770 1770 # consume little memory (1M heads is 40MB) b) we don't want to send the
1771 1771 # part if we don't have entries and knowing if we have entries requires
1772 1772 # cache lookups.
1773 1773 for node in outgoing.ancestorsof:
1774 1774 # Don't compute missing, as this may slow down serving.
1775 1775 fnode = cache.getfnode(node, computemissing=False)
1776 1776 if fnode:
1777 1777 chunks.extend([node, fnode])
1778 1778
1779 1779 if chunks:
1780 1780 bundler.newpart(b'hgtagsfnodes', data=b''.join(chunks))
1781 1781
1782 1782
1783 1783 def addpartrevbranchcache(repo, bundler, outgoing):
1784 1784 # we include the rev branch cache for the bundle changeset
1785 1785 # (as an optional parts)
1786 1786 cache = repo.revbranchcache()
1787 1787 cl = repo.unfiltered().changelog
1788 1788 branchesdata = collections.defaultdict(lambda: (set(), set()))
1789 1789 for node in outgoing.missing:
1790 1790 branch, close = cache.branchinfo(cl.rev(node))
1791 1791 branchesdata[branch][close].add(node)
1792 1792
1793 1793 def generate():
1794 1794 for branch, (nodes, closed) in sorted(branchesdata.items()):
1795 1795 utf8branch = encoding.fromlocal(branch)
1796 1796 yield rbcstruct.pack(len(utf8branch), len(nodes), len(closed))
1797 1797 yield utf8branch
1798 1798 for n in sorted(nodes):
1799 1799 yield n
1800 1800 for n in sorted(closed):
1801 1801 yield n
1802 1802
1803 1803 bundler.newpart(b'cache:rev-branch-cache', data=generate(), mandatory=False)
1804 1804
1805 1805
1806 1806 def _formatrequirementsspec(requirements):
1807 1807 requirements = [req for req in requirements if req != b"shared"]
1808 1808 return urlreq.quote(b','.join(sorted(requirements)))
1809 1809
1810 1810
1811 1811 def _formatrequirementsparams(requirements):
1812 1812 requirements = _formatrequirementsspec(requirements)
1813 1813 params = b"%s%s" % (urlreq.quote(b"requirements="), requirements)
1814 1814 return params
1815 1815
1816 1816
1817 1817 def format_remote_wanted_sidedata(repo):
1818 1818 """Formats a repo's wanted sidedata categories into a bytestring for
1819 1819 capabilities exchange."""
1820 1820 wanted = b""
1821 1821 if repo._wanted_sidedata:
1822 1822 wanted = b','.join(
1823 1823 pycompat.bytestr(c) for c in sorted(repo._wanted_sidedata)
1824 1824 )
1825 1825 return wanted
1826 1826
1827 1827
1828 1828 def read_remote_wanted_sidedata(remote):
1829 1829 sidedata_categories = remote.capable(b'exp-wanted-sidedata')
1830 1830 return read_wanted_sidedata(sidedata_categories)
1831 1831
1832 1832
1833 1833 def read_wanted_sidedata(formatted):
1834 1834 if formatted:
1835 1835 return set(formatted.split(b','))
1836 1836 return set()
1837 1837
1838 1838
1839 1839 def addpartbundlestream2(bundler, repo, **kwargs):
1840 1840 if not kwargs.get('stream', False):
1841 1841 return
1842 1842
1843 1843 if not streamclone.allowservergeneration(repo):
1844 1844 raise error.Abort(
1845 1845 _(
1846 1846 b'stream data requested but server does not allow '
1847 1847 b'this feature'
1848 1848 ),
1849 1849 hint=_(
1850 1850 b'well-behaved clients should not be '
1851 1851 b'requesting stream data from servers not '
1852 1852 b'advertising it; the client may be buggy'
1853 1853 ),
1854 1854 )
1855 1855
1856 1856 # Stream clones don't compress well. And compression undermines a
1857 1857 # goal of stream clones, which is to be fast. Communicate the desire
1858 1858 # to avoid compression to consumers of the bundle.
1859 1859 bundler.prefercompressed = False
1860 1860
1861 1861 # get the includes and excludes
1862 1862 includepats = kwargs.get('includepats')
1863 1863 excludepats = kwargs.get('excludepats')
1864 1864
1865 1865 narrowstream = repo.ui.configbool(
1866 1866 b'experimental', b'server.stream-narrow-clones'
1867 1867 )
1868 1868
1869 1869 if (includepats or excludepats) and not narrowstream:
1870 1870 raise error.Abort(_(b'server does not support narrow stream clones'))
1871 1871
1872 1872 includeobsmarkers = False
1873 1873 if repo.obsstore:
1874 1874 remoteversions = obsmarkersversion(bundler.capabilities)
1875 1875 if not remoteversions:
1876 1876 raise error.Abort(
1877 1877 _(
1878 1878 b'server has obsolescence markers, but client '
1879 1879 b'cannot receive them via stream clone'
1880 1880 )
1881 1881 )
1882 1882 elif repo.obsstore._version in remoteversions:
1883 1883 includeobsmarkers = True
1884 1884
1885 1885 filecount, bytecount, it = streamclone.generatev2(
1886 1886 repo, includepats, excludepats, includeobsmarkers
1887 1887 )
1888 1888 requirements = _formatrequirementsspec(repo.requirements)
1889 1889 part = bundler.newpart(b'stream2', data=it)
1890 1890 part.addparam(b'bytecount', b'%d' % bytecount, mandatory=True)
1891 1891 part.addparam(b'filecount', b'%d' % filecount, mandatory=True)
1892 1892 part.addparam(b'requirements', requirements, mandatory=True)
1893 1893
1894 1894
1895 1895 def buildobsmarkerspart(bundler, markers, mandatory=True):
1896 1896 """add an obsmarker part to the bundler with <markers>
1897 1897
1898 1898 No part is created if markers is empty.
1899 1899 Raises ValueError if the bundler doesn't support any known obsmarker format.
1900 1900 """
1901 1901 if not markers:
1902 1902 return None
1903 1903
1904 1904 remoteversions = obsmarkersversion(bundler.capabilities)
1905 1905 version = obsolete.commonversion(remoteversions)
1906 1906 if version is None:
1907 1907 raise ValueError(b'bundler does not support common obsmarker format')
1908 1908 stream = obsolete.encodemarkers(markers, True, version=version)
1909 1909 return bundler.newpart(b'obsmarkers', data=stream, mandatory=mandatory)
1910 1910
1911 1911
1912 1912 def writebundle(
1913 1913 ui, cg, filename, bundletype, vfs=None, compression=None, compopts=None
1914 1914 ):
1915 1915 """Write a bundle file and return its filename.
1916 1916
1917 1917 Existing files will not be overwritten.
1918 1918 If no filename is specified, a temporary file is created.
1919 1919 bz2 compression can be turned off.
1920 1920 The bundle file will be deleted in case of errors.
1921 1921 """
1922 1922
1923 1923 if bundletype == b"HG20":
1924 1924 bundle = bundle20(ui)
1925 1925 bundle.setcompression(compression, compopts)
1926 1926 part = bundle.newpart(b'changegroup', data=cg.getchunks())
1927 1927 part.addparam(b'version', cg.version)
1928 1928 if b'clcount' in cg.extras:
1929 1929 part.addparam(
1930 1930 b'nbchanges', b'%d' % cg.extras[b'clcount'], mandatory=False
1931 1931 )
1932 1932 chunkiter = bundle.getchunks()
1933 1933 else:
1934 1934 # compression argument is only for the bundle2 case
1935 1935 assert compression is None
1936 1936 if cg.version != b'01':
1937 1937 raise error.Abort(
1938 1938 _(b'old bundle types only supports v1 changegroups')
1939 1939 )
1940 1940 header, comp = bundletypes[bundletype]
1941 1941 if comp not in util.compengines.supportedbundletypes:
1942 1942 raise error.Abort(_(b'unknown stream compression type: %s') % comp)
1943 1943 compengine = util.compengines.forbundletype(comp)
1944 1944
1945 1945 def chunkiter():
1946 1946 yield header
1947 1947 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1948 1948 yield chunk
1949 1949
1950 1950 chunkiter = chunkiter()
1951 1951
1952 1952 # parse the changegroup data, otherwise we will block
1953 1953 # in case of sshrepo because we don't know the end of the stream
1954 1954 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1955 1955
1956 1956
1957 1957 def combinechangegroupresults(op):
1958 1958 """logic to combine 0 or more addchangegroup results into one"""
1959 1959 results = [r.get(b'return', 0) for r in op.records[b'changegroup']]
1960 1960 changedheads = 0
1961 1961 result = 1
1962 1962 for ret in results:
1963 1963 # If any changegroup result is 0, return 0
1964 1964 if ret == 0:
1965 1965 result = 0
1966 1966 break
1967 1967 if ret < -1:
1968 1968 changedheads += ret + 1
1969 1969 elif ret > 1:
1970 1970 changedheads += ret - 1
1971 1971 if changedheads > 0:
1972 1972 result = 1 + changedheads
1973 1973 elif changedheads < 0:
1974 1974 result = -1 + changedheads
1975 1975 return result
1976 1976
1977 1977
1978 1978 @parthandler(
1979 1979 b'changegroup',
1980 1980 (
1981 1981 b'version',
1982 1982 b'nbchanges',
1983 1983 b'exp-sidedata',
1984 1984 b'exp-wanted-sidedata',
1985 1985 b'treemanifest',
1986 1986 b'targetphase',
1987 1987 ),
1988 1988 )
1989 1989 def handlechangegroup(op, inpart):
1990 1990 """apply a changegroup part on the repo"""
1991 1991 from . import localrepo
1992 1992
1993 1993 tr = op.gettransaction()
1994 1994 unpackerversion = inpart.params.get(b'version', b'01')
1995 1995 # We should raise an appropriate exception here
1996 1996 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1997 1997 # the source and url passed here are overwritten by the one contained in
1998 1998 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1999 1999 nbchangesets = None
2000 2000 if b'nbchanges' in inpart.params:
2001 2001 nbchangesets = int(inpart.params.get(b'nbchanges'))
2002 2002 if b'treemanifest' in inpart.params and not scmutil.istreemanifest(op.repo):
2003 2003 if len(op.repo.changelog) != 0:
2004 2004 raise error.Abort(
2005 2005 _(
2006 2006 b"bundle contains tree manifests, but local repo is "
2007 2007 b"non-empty and does not use tree manifests"
2008 2008 )
2009 2009 )
2010 2010 op.repo.requirements.add(requirements.TREEMANIFEST_REQUIREMENT)
2011 2011 op.repo.svfs.options = localrepo.resolvestorevfsoptions(
2012 2012 op.repo.ui, op.repo.requirements, op.repo.features
2013 2013 )
2014 2014 scmutil.writereporequirements(op.repo)
2015 2015
2016 bundlesidedata = bool(b'exp-sidedata' in inpart.params)
2017 reposidedata = bool(b'exp-sidedata-flag' in op.repo.requirements)
2018 if reposidedata and not bundlesidedata:
2019 msg = b"repository is using sidedata but the bundle source do not"
2020 hint = b'this is currently unsupported'
2021 raise error.Abort(msg, hint=hint)
2022
2023 2016 extrakwargs = {}
2024 2017 targetphase = inpart.params.get(b'targetphase')
2025 2018 if targetphase is not None:
2026 2019 extrakwargs['targetphase'] = int(targetphase)
2027 2020
2028 2021 remote_sidedata = inpart.params.get(b'exp-wanted-sidedata')
2029 2022 extrakwargs['sidedata_categories'] = read_wanted_sidedata(remote_sidedata)
2030 2023
2031 2024 ret = _processchangegroup(
2032 2025 op,
2033 2026 cg,
2034 2027 tr,
2035 2028 op.source,
2036 2029 b'bundle2',
2037 2030 expectedtotal=nbchangesets,
2038 2031 **extrakwargs
2039 2032 )
2040 2033 if op.reply is not None:
2041 2034 # This is definitely not the final form of this
2042 2035 # return. But one need to start somewhere.
2043 2036 part = op.reply.newpart(b'reply:changegroup', mandatory=False)
2044 2037 part.addparam(
2045 2038 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2046 2039 )
2047 2040 part.addparam(b'return', b'%i' % ret, mandatory=False)
2048 2041 assert not inpart.read()
2049 2042
2050 2043
2051 2044 _remotechangegroupparams = tuple(
2052 2045 [b'url', b'size', b'digests']
2053 2046 + [b'digest:%s' % k for k in util.DIGESTS.keys()]
2054 2047 )
2055 2048
2056 2049
2057 2050 @parthandler(b'remote-changegroup', _remotechangegroupparams)
2058 2051 def handleremotechangegroup(op, inpart):
2059 2052 """apply a bundle10 on the repo, given an url and validation information
2060 2053
2061 2054 All the information about the remote bundle to import are given as
2062 2055 parameters. The parameters include:
2063 2056 - url: the url to the bundle10.
2064 2057 - size: the bundle10 file size. It is used to validate what was
2065 2058 retrieved by the client matches the server knowledge about the bundle.
2066 2059 - digests: a space separated list of the digest types provided as
2067 2060 parameters.
2068 2061 - digest:<digest-type>: the hexadecimal representation of the digest with
2069 2062 that name. Like the size, it is used to validate what was retrieved by
2070 2063 the client matches what the server knows about the bundle.
2071 2064
2072 2065 When multiple digest types are given, all of them are checked.
2073 2066 """
2074 2067 try:
2075 2068 raw_url = inpart.params[b'url']
2076 2069 except KeyError:
2077 2070 raise error.Abort(_(b'remote-changegroup: missing "%s" param') % b'url')
2078 2071 parsed_url = urlutil.url(raw_url)
2079 2072 if parsed_url.scheme not in capabilities[b'remote-changegroup']:
2080 2073 raise error.Abort(
2081 2074 _(b'remote-changegroup does not support %s urls')
2082 2075 % parsed_url.scheme
2083 2076 )
2084 2077
2085 2078 try:
2086 2079 size = int(inpart.params[b'size'])
2087 2080 except ValueError:
2088 2081 raise error.Abort(
2089 2082 _(b'remote-changegroup: invalid value for param "%s"') % b'size'
2090 2083 )
2091 2084 except KeyError:
2092 2085 raise error.Abort(
2093 2086 _(b'remote-changegroup: missing "%s" param') % b'size'
2094 2087 )
2095 2088
2096 2089 digests = {}
2097 2090 for typ in inpart.params.get(b'digests', b'').split():
2098 2091 param = b'digest:%s' % typ
2099 2092 try:
2100 2093 value = inpart.params[param]
2101 2094 except KeyError:
2102 2095 raise error.Abort(
2103 2096 _(b'remote-changegroup: missing "%s" param') % param
2104 2097 )
2105 2098 digests[typ] = value
2106 2099
2107 2100 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
2108 2101
2109 2102 tr = op.gettransaction()
2110 2103 from . import exchange
2111 2104
2112 2105 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
2113 2106 if not isinstance(cg, changegroup.cg1unpacker):
2114 2107 raise error.Abort(
2115 2108 _(b'%s: not a bundle version 1.0') % urlutil.hidepassword(raw_url)
2116 2109 )
2117 2110 ret = _processchangegroup(op, cg, tr, op.source, b'bundle2')
2118 2111 if op.reply is not None:
2119 2112 # This is definitely not the final form of this
2120 2113 # return. But one need to start somewhere.
2121 2114 part = op.reply.newpart(b'reply:changegroup')
2122 2115 part.addparam(
2123 2116 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2124 2117 )
2125 2118 part.addparam(b'return', b'%i' % ret, mandatory=False)
2126 2119 try:
2127 2120 real_part.validate()
2128 2121 except error.Abort as e:
2129 2122 raise error.Abort(
2130 2123 _(b'bundle at %s is corrupted:\n%s')
2131 2124 % (urlutil.hidepassword(raw_url), e.message)
2132 2125 )
2133 2126 assert not inpart.read()
2134 2127
2135 2128
2136 2129 @parthandler(b'reply:changegroup', (b'return', b'in-reply-to'))
2137 2130 def handlereplychangegroup(op, inpart):
2138 2131 ret = int(inpart.params[b'return'])
2139 2132 replyto = int(inpart.params[b'in-reply-to'])
2140 2133 op.records.add(b'changegroup', {b'return': ret}, replyto)
2141 2134
2142 2135
2143 2136 @parthandler(b'check:bookmarks')
2144 2137 def handlecheckbookmarks(op, inpart):
2145 2138 """check location of bookmarks
2146 2139
2147 2140 This part is to be used to detect push race regarding bookmark, it
2148 2141 contains binary encoded (bookmark, node) tuple. If the local state does
2149 2142 not marks the one in the part, a PushRaced exception is raised
2150 2143 """
2151 2144 bookdata = bookmarks.binarydecode(op.repo, inpart)
2152 2145
2153 2146 msgstandard = (
2154 2147 b'remote repository changed while pushing - please try again '
2155 2148 b'(bookmark "%s" move from %s to %s)'
2156 2149 )
2157 2150 msgmissing = (
2158 2151 b'remote repository changed while pushing - please try again '
2159 2152 b'(bookmark "%s" is missing, expected %s)'
2160 2153 )
2161 2154 msgexist = (
2162 2155 b'remote repository changed while pushing - please try again '
2163 2156 b'(bookmark "%s" set on %s, expected missing)'
2164 2157 )
2165 2158 for book, node in bookdata:
2166 2159 currentnode = op.repo._bookmarks.get(book)
2167 2160 if currentnode != node:
2168 2161 if node is None:
2169 2162 finalmsg = msgexist % (book, short(currentnode))
2170 2163 elif currentnode is None:
2171 2164 finalmsg = msgmissing % (book, short(node))
2172 2165 else:
2173 2166 finalmsg = msgstandard % (
2174 2167 book,
2175 2168 short(node),
2176 2169 short(currentnode),
2177 2170 )
2178 2171 raise error.PushRaced(finalmsg)
2179 2172
2180 2173
2181 2174 @parthandler(b'check:heads')
2182 2175 def handlecheckheads(op, inpart):
2183 2176 """check that head of the repo did not change
2184 2177
2185 2178 This is used to detect a push race when using unbundle.
2186 2179 This replaces the "heads" argument of unbundle."""
2187 2180 h = inpart.read(20)
2188 2181 heads = []
2189 2182 while len(h) == 20:
2190 2183 heads.append(h)
2191 2184 h = inpart.read(20)
2192 2185 assert not h
2193 2186 # Trigger a transaction so that we are guaranteed to have the lock now.
2194 2187 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2195 2188 op.gettransaction()
2196 2189 if sorted(heads) != sorted(op.repo.heads()):
2197 2190 raise error.PushRaced(
2198 2191 b'remote repository changed while pushing - please try again'
2199 2192 )
2200 2193
2201 2194
2202 2195 @parthandler(b'check:updated-heads')
2203 2196 def handlecheckupdatedheads(op, inpart):
2204 2197 """check for race on the heads touched by a push
2205 2198
2206 2199 This is similar to 'check:heads' but focus on the heads actually updated
2207 2200 during the push. If other activities happen on unrelated heads, it is
2208 2201 ignored.
2209 2202
2210 2203 This allow server with high traffic to avoid push contention as long as
2211 2204 unrelated parts of the graph are involved."""
2212 2205 h = inpart.read(20)
2213 2206 heads = []
2214 2207 while len(h) == 20:
2215 2208 heads.append(h)
2216 2209 h = inpart.read(20)
2217 2210 assert not h
2218 2211 # trigger a transaction so that we are guaranteed to have the lock now.
2219 2212 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2220 2213 op.gettransaction()
2221 2214
2222 2215 currentheads = set()
2223 2216 for ls in op.repo.branchmap().iterheads():
2224 2217 currentheads.update(ls)
2225 2218
2226 2219 for h in heads:
2227 2220 if h not in currentheads:
2228 2221 raise error.PushRaced(
2229 2222 b'remote repository changed while pushing - '
2230 2223 b'please try again'
2231 2224 )
2232 2225
2233 2226
2234 2227 @parthandler(b'check:phases')
2235 2228 def handlecheckphases(op, inpart):
2236 2229 """check that phase boundaries of the repository did not change
2237 2230
2238 2231 This is used to detect a push race.
2239 2232 """
2240 2233 phasetonodes = phases.binarydecode(inpart)
2241 2234 unfi = op.repo.unfiltered()
2242 2235 cl = unfi.changelog
2243 2236 phasecache = unfi._phasecache
2244 2237 msg = (
2245 2238 b'remote repository changed while pushing - please try again '
2246 2239 b'(%s is %s expected %s)'
2247 2240 )
2248 2241 for expectedphase, nodes in pycompat.iteritems(phasetonodes):
2249 2242 for n in nodes:
2250 2243 actualphase = phasecache.phase(unfi, cl.rev(n))
2251 2244 if actualphase != expectedphase:
2252 2245 finalmsg = msg % (
2253 2246 short(n),
2254 2247 phases.phasenames[actualphase],
2255 2248 phases.phasenames[expectedphase],
2256 2249 )
2257 2250 raise error.PushRaced(finalmsg)
2258 2251
2259 2252
2260 2253 @parthandler(b'output')
2261 2254 def handleoutput(op, inpart):
2262 2255 """forward output captured on the server to the client"""
2263 2256 for line in inpart.read().splitlines():
2264 2257 op.ui.status(_(b'remote: %s\n') % line)
2265 2258
2266 2259
2267 2260 @parthandler(b'replycaps')
2268 2261 def handlereplycaps(op, inpart):
2269 2262 """Notify that a reply bundle should be created
2270 2263
2271 2264 The payload contains the capabilities information for the reply"""
2272 2265 caps = decodecaps(inpart.read())
2273 2266 if op.reply is None:
2274 2267 op.reply = bundle20(op.ui, caps)
2275 2268
2276 2269
2277 2270 class AbortFromPart(error.Abort):
2278 2271 """Sub-class of Abort that denotes an error from a bundle2 part."""
2279 2272
2280 2273
2281 2274 @parthandler(b'error:abort', (b'message', b'hint'))
2282 2275 def handleerrorabort(op, inpart):
2283 2276 """Used to transmit abort error over the wire"""
2284 2277 raise AbortFromPart(
2285 2278 inpart.params[b'message'], hint=inpart.params.get(b'hint')
2286 2279 )
2287 2280
2288 2281
2289 2282 @parthandler(
2290 2283 b'error:pushkey',
2291 2284 (b'namespace', b'key', b'new', b'old', b'ret', b'in-reply-to'),
2292 2285 )
2293 2286 def handleerrorpushkey(op, inpart):
2294 2287 """Used to transmit failure of a mandatory pushkey over the wire"""
2295 2288 kwargs = {}
2296 2289 for name in (b'namespace', b'key', b'new', b'old', b'ret'):
2297 2290 value = inpart.params.get(name)
2298 2291 if value is not None:
2299 2292 kwargs[name] = value
2300 2293 raise error.PushkeyFailed(
2301 2294 inpart.params[b'in-reply-to'], **pycompat.strkwargs(kwargs)
2302 2295 )
2303 2296
2304 2297
2305 2298 @parthandler(b'error:unsupportedcontent', (b'parttype', b'params'))
2306 2299 def handleerrorunsupportedcontent(op, inpart):
2307 2300 """Used to transmit unknown content error over the wire"""
2308 2301 kwargs = {}
2309 2302 parttype = inpart.params.get(b'parttype')
2310 2303 if parttype is not None:
2311 2304 kwargs[b'parttype'] = parttype
2312 2305 params = inpart.params.get(b'params')
2313 2306 if params is not None:
2314 2307 kwargs[b'params'] = params.split(b'\0')
2315 2308
2316 2309 raise error.BundleUnknownFeatureError(**pycompat.strkwargs(kwargs))
2317 2310
2318 2311
2319 2312 @parthandler(b'error:pushraced', (b'message',))
2320 2313 def handleerrorpushraced(op, inpart):
2321 2314 """Used to transmit push race error over the wire"""
2322 2315 raise error.ResponseError(_(b'push failed:'), inpart.params[b'message'])
2323 2316
2324 2317
2325 2318 @parthandler(b'listkeys', (b'namespace',))
2326 2319 def handlelistkeys(op, inpart):
2327 2320 """retrieve pushkey namespace content stored in a bundle2"""
2328 2321 namespace = inpart.params[b'namespace']
2329 2322 r = pushkey.decodekeys(inpart.read())
2330 2323 op.records.add(b'listkeys', (namespace, r))
2331 2324
2332 2325
2333 2326 @parthandler(b'pushkey', (b'namespace', b'key', b'old', b'new'))
2334 2327 def handlepushkey(op, inpart):
2335 2328 """process a pushkey request"""
2336 2329 dec = pushkey.decode
2337 2330 namespace = dec(inpart.params[b'namespace'])
2338 2331 key = dec(inpart.params[b'key'])
2339 2332 old = dec(inpart.params[b'old'])
2340 2333 new = dec(inpart.params[b'new'])
2341 2334 # Grab the transaction to ensure that we have the lock before performing the
2342 2335 # pushkey.
2343 2336 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2344 2337 op.gettransaction()
2345 2338 ret = op.repo.pushkey(namespace, key, old, new)
2346 2339 record = {b'namespace': namespace, b'key': key, b'old': old, b'new': new}
2347 2340 op.records.add(b'pushkey', record)
2348 2341 if op.reply is not None:
2349 2342 rpart = op.reply.newpart(b'reply:pushkey')
2350 2343 rpart.addparam(
2351 2344 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2352 2345 )
2353 2346 rpart.addparam(b'return', b'%i' % ret, mandatory=False)
2354 2347 if inpart.mandatory and not ret:
2355 2348 kwargs = {}
2356 2349 for key in (b'namespace', b'key', b'new', b'old', b'ret'):
2357 2350 if key in inpart.params:
2358 2351 kwargs[key] = inpart.params[key]
2359 2352 raise error.PushkeyFailed(
2360 2353 partid=b'%d' % inpart.id, **pycompat.strkwargs(kwargs)
2361 2354 )
2362 2355
2363 2356
2364 2357 @parthandler(b'bookmarks')
2365 2358 def handlebookmark(op, inpart):
2366 2359 """transmit bookmark information
2367 2360
2368 2361 The part contains binary encoded bookmark information.
2369 2362
2370 2363 The exact behavior of this part can be controlled by the 'bookmarks' mode
2371 2364 on the bundle operation.
2372 2365
2373 2366 When mode is 'apply' (the default) the bookmark information is applied as
2374 2367 is to the unbundling repository. Make sure a 'check:bookmarks' part is
2375 2368 issued earlier to check for push races in such update. This behavior is
2376 2369 suitable for pushing.
2377 2370
2378 2371 When mode is 'records', the information is recorded into the 'bookmarks'
2379 2372 records of the bundle operation. This behavior is suitable for pulling.
2380 2373 """
2381 2374 changes = bookmarks.binarydecode(op.repo, inpart)
2382 2375
2383 2376 pushkeycompat = op.repo.ui.configbool(
2384 2377 b'server', b'bookmarks-pushkey-compat'
2385 2378 )
2386 2379 bookmarksmode = op.modes.get(b'bookmarks', b'apply')
2387 2380
2388 2381 if bookmarksmode == b'apply':
2389 2382 tr = op.gettransaction()
2390 2383 bookstore = op.repo._bookmarks
2391 2384 if pushkeycompat:
2392 2385 allhooks = []
2393 2386 for book, node in changes:
2394 2387 hookargs = tr.hookargs.copy()
2395 2388 hookargs[b'pushkeycompat'] = b'1'
2396 2389 hookargs[b'namespace'] = b'bookmarks'
2397 2390 hookargs[b'key'] = book
2398 2391 hookargs[b'old'] = hex(bookstore.get(book, b''))
2399 2392 hookargs[b'new'] = hex(node if node is not None else b'')
2400 2393 allhooks.append(hookargs)
2401 2394
2402 2395 for hookargs in allhooks:
2403 2396 op.repo.hook(
2404 2397 b'prepushkey', throw=True, **pycompat.strkwargs(hookargs)
2405 2398 )
2406 2399
2407 2400 for book, node in changes:
2408 2401 if bookmarks.isdivergent(book):
2409 2402 msg = _(b'cannot accept divergent bookmark %s!') % book
2410 2403 raise error.Abort(msg)
2411 2404
2412 2405 bookstore.applychanges(op.repo, op.gettransaction(), changes)
2413 2406
2414 2407 if pushkeycompat:
2415 2408
2416 2409 def runhook(unused_success):
2417 2410 for hookargs in allhooks:
2418 2411 op.repo.hook(b'pushkey', **pycompat.strkwargs(hookargs))
2419 2412
2420 2413 op.repo._afterlock(runhook)
2421 2414
2422 2415 elif bookmarksmode == b'records':
2423 2416 for book, node in changes:
2424 2417 record = {b'bookmark': book, b'node': node}
2425 2418 op.records.add(b'bookmarks', record)
2426 2419 else:
2427 2420 raise error.ProgrammingError(
2428 2421 b'unkown bookmark mode: %s' % bookmarksmode
2429 2422 )
2430 2423
2431 2424
2432 2425 @parthandler(b'phase-heads')
2433 2426 def handlephases(op, inpart):
2434 2427 """apply phases from bundle part to repo"""
2435 2428 headsbyphase = phases.binarydecode(inpart)
2436 2429 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
2437 2430
2438 2431
2439 2432 @parthandler(b'reply:pushkey', (b'return', b'in-reply-to'))
2440 2433 def handlepushkeyreply(op, inpart):
2441 2434 """retrieve the result of a pushkey request"""
2442 2435 ret = int(inpart.params[b'return'])
2443 2436 partid = int(inpart.params[b'in-reply-to'])
2444 2437 op.records.add(b'pushkey', {b'return': ret}, partid)
2445 2438
2446 2439
2447 2440 @parthandler(b'obsmarkers')
2448 2441 def handleobsmarker(op, inpart):
2449 2442 """add a stream of obsmarkers to the repo"""
2450 2443 tr = op.gettransaction()
2451 2444 markerdata = inpart.read()
2452 2445 if op.ui.config(b'experimental', b'obsmarkers-exchange-debug'):
2453 2446 op.ui.writenoi18n(
2454 2447 b'obsmarker-exchange: %i bytes received\n' % len(markerdata)
2455 2448 )
2456 2449 # The mergemarkers call will crash if marker creation is not enabled.
2457 2450 # we want to avoid this if the part is advisory.
2458 2451 if not inpart.mandatory and op.repo.obsstore.readonly:
2459 2452 op.repo.ui.debug(
2460 2453 b'ignoring obsolescence markers, feature not enabled\n'
2461 2454 )
2462 2455 return
2463 2456 new = op.repo.obsstore.mergemarkers(tr, markerdata)
2464 2457 op.repo.invalidatevolatilesets()
2465 2458 op.records.add(b'obsmarkers', {b'new': new})
2466 2459 if op.reply is not None:
2467 2460 rpart = op.reply.newpart(b'reply:obsmarkers')
2468 2461 rpart.addparam(
2469 2462 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2470 2463 )
2471 2464 rpart.addparam(b'new', b'%i' % new, mandatory=False)
2472 2465
2473 2466
2474 2467 @parthandler(b'reply:obsmarkers', (b'new', b'in-reply-to'))
2475 2468 def handleobsmarkerreply(op, inpart):
2476 2469 """retrieve the result of a pushkey request"""
2477 2470 ret = int(inpart.params[b'new'])
2478 2471 partid = int(inpart.params[b'in-reply-to'])
2479 2472 op.records.add(b'obsmarkers', {b'new': ret}, partid)
2480 2473
2481 2474
2482 2475 @parthandler(b'hgtagsfnodes')
2483 2476 def handlehgtagsfnodes(op, inpart):
2484 2477 """Applies .hgtags fnodes cache entries to the local repo.
2485 2478
2486 2479 Payload is pairs of 20 byte changeset nodes and filenodes.
2487 2480 """
2488 2481 # Grab the transaction so we ensure that we have the lock at this point.
2489 2482 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2490 2483 op.gettransaction()
2491 2484 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
2492 2485
2493 2486 count = 0
2494 2487 while True:
2495 2488 node = inpart.read(20)
2496 2489 fnode = inpart.read(20)
2497 2490 if len(node) < 20 or len(fnode) < 20:
2498 2491 op.ui.debug(b'ignoring incomplete received .hgtags fnodes data\n')
2499 2492 break
2500 2493 cache.setfnode(node, fnode)
2501 2494 count += 1
2502 2495
2503 2496 cache.write()
2504 2497 op.ui.debug(b'applied %i hgtags fnodes cache entries\n' % count)
2505 2498
2506 2499
2507 2500 rbcstruct = struct.Struct(b'>III')
2508 2501
2509 2502
2510 2503 @parthandler(b'cache:rev-branch-cache')
2511 2504 def handlerbc(op, inpart):
2512 2505 """Legacy part, ignored for compatibility with bundles from or
2513 2506 for Mercurial before 5.7. Newer Mercurial computes the cache
2514 2507 efficiently enough during unbundling that the additional transfer
2515 2508 is unnecessary."""
2516 2509
2517 2510
2518 2511 @parthandler(b'pushvars')
2519 2512 def bundle2getvars(op, part):
2520 2513 '''unbundle a bundle2 containing shellvars on the server'''
2521 2514 # An option to disable unbundling on server-side for security reasons
2522 2515 if op.ui.configbool(b'push', b'pushvars.server'):
2523 2516 hookargs = {}
2524 2517 for key, value in part.advisoryparams:
2525 2518 key = key.upper()
2526 2519 # We want pushed variables to have USERVAR_ prepended so we know
2527 2520 # they came from the --pushvar flag.
2528 2521 key = b"USERVAR_" + key
2529 2522 hookargs[key] = value
2530 2523 op.addhookargs(hookargs)
2531 2524
2532 2525
2533 2526 @parthandler(b'stream2', (b'requirements', b'filecount', b'bytecount'))
2534 2527 def handlestreamv2bundle(op, part):
2535 2528
2536 2529 requirements = urlreq.unquote(part.params[b'requirements']).split(b',')
2537 2530 filecount = int(part.params[b'filecount'])
2538 2531 bytecount = int(part.params[b'bytecount'])
2539 2532
2540 2533 repo = op.repo
2541 2534 if len(repo):
2542 2535 msg = _(b'cannot apply stream clone to non empty repository')
2543 2536 raise error.Abort(msg)
2544 2537
2545 2538 repo.ui.debug(b'applying stream bundle\n')
2546 2539 streamclone.applybundlev2(repo, part, filecount, bytecount, requirements)
2547 2540
2548 2541
2549 2542 def widen_bundle(
2550 2543 bundler, repo, oldmatcher, newmatcher, common, known, cgversion, ellipses
2551 2544 ):
2552 2545 """generates bundle2 for widening a narrow clone
2553 2546
2554 2547 bundler is the bundle to which data should be added
2555 2548 repo is the localrepository instance
2556 2549 oldmatcher matches what the client already has
2557 2550 newmatcher matches what the client needs (including what it already has)
2558 2551 common is set of common heads between server and client
2559 2552 known is a set of revs known on the client side (used in ellipses)
2560 2553 cgversion is the changegroup version to send
2561 2554 ellipses is boolean value telling whether to send ellipses data or not
2562 2555
2563 2556 returns bundle2 of the data required for extending
2564 2557 """
2565 2558 commonnodes = set()
2566 2559 cl = repo.changelog
2567 2560 for r in repo.revs(b"::%ln", common):
2568 2561 commonnodes.add(cl.node(r))
2569 2562 if commonnodes:
2570 2563 packer = changegroup.getbundler(
2571 2564 cgversion,
2572 2565 repo,
2573 2566 oldmatcher=oldmatcher,
2574 2567 matcher=newmatcher,
2575 2568 fullnodes=commonnodes,
2576 2569 )
2577 2570 cgdata = packer.generate(
2578 2571 {repo.nullid},
2579 2572 list(commonnodes),
2580 2573 False,
2581 2574 b'narrow_widen',
2582 2575 changelog=False,
2583 2576 )
2584 2577
2585 2578 part = bundler.newpart(b'changegroup', data=cgdata)
2586 2579 part.addparam(b'version', cgversion)
2587 2580 if scmutil.istreemanifest(repo):
2588 2581 part.addparam(b'treemanifest', b'1')
2589 2582 if b'exp-sidedata-flag' in repo.requirements:
2590 2583 part.addparam(b'exp-sidedata', b'1')
2591 2584 wanted = format_remote_wanted_sidedata(repo)
2592 2585 part.addparam(b'exp-wanted-sidedata', wanted)
2593 2586
2594 2587 return bundler
General Comments 0
You need to be logged in to leave comments. Login now