##// END OF EJS Templates
typing: minor tweaks to allow updating to pytype 2022.11.18
Matt Harbison -
r50543:9be765b8 default
parent child Browse files
Show More
@@ -1,2589 +1,2594 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148
149 149 import collections
150 150 import errno
151 151 import os
152 152 import re
153 153 import string
154 154 import struct
155 155 import sys
156 156
157 157 from .i18n import _
158 158 from .node import (
159 159 hex,
160 160 short,
161 161 )
162 162 from . import (
163 163 bookmarks,
164 164 changegroup,
165 165 encoding,
166 166 error,
167 167 obsolete,
168 168 phases,
169 169 pushkey,
170 170 pycompat,
171 171 requirements,
172 172 scmutil,
173 173 streamclone,
174 174 tags,
175 175 url,
176 176 util,
177 177 )
178 178 from .utils import (
179 179 stringutil,
180 180 urlutil,
181 181 )
182 182 from .interfaces import repository
183 183
184 184 urlerr = util.urlerr
185 185 urlreq = util.urlreq
186 186
187 187 _pack = struct.pack
188 188 _unpack = struct.unpack
189 189
190 190 _fstreamparamsize = b'>i'
191 191 _fpartheadersize = b'>i'
192 192 _fparttypesize = b'>B'
193 193 _fpartid = b'>I'
194 194 _fpayloadsize = b'>i'
195 195 _fpartparamcount = b'>BB'
196 196
197 197 preferedchunksize = 32768
198 198
199 199 _parttypeforbidden = re.compile(b'[^a-zA-Z0-9_:-]')
200 200
201 201
202 202 def outdebug(ui, message):
203 203 """debug regarding output stream (bundling)"""
204 204 if ui.configbool(b'devel', b'bundle2.debug'):
205 205 ui.debug(b'bundle2-output: %s\n' % message)
206 206
207 207
208 208 def indebug(ui, message):
209 209 """debug on input stream (unbundling)"""
210 210 if ui.configbool(b'devel', b'bundle2.debug'):
211 211 ui.debug(b'bundle2-input: %s\n' % message)
212 212
213 213
214 214 def validateparttype(parttype):
215 215 """raise ValueError if a parttype contains invalid character"""
216 216 if _parttypeforbidden.search(parttype):
217 217 raise ValueError(parttype)
218 218
219 219
220 220 def _makefpartparamsizes(nbparams):
221 221 """return a struct format to read part parameter sizes
222 222
223 223 The number parameters is variable so we need to build that format
224 224 dynamically.
225 225 """
226 226 return b'>' + (b'BB' * nbparams)
227 227
228 228
229 229 parthandlermapping = {}
230 230
231 231
232 232 def parthandler(parttype, params=()):
233 233 """decorator that register a function as a bundle2 part handler
234 234
235 235 eg::
236 236
237 237 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
238 238 def myparttypehandler(...):
239 239 '''process a part of type "my part".'''
240 240 ...
241 241 """
242 242 validateparttype(parttype)
243 243
244 244 def _decorator(func):
245 245 lparttype = parttype.lower() # enforce lower case matching.
246 246 assert lparttype not in parthandlermapping
247 247 parthandlermapping[lparttype] = func
248 248 func.params = frozenset(params)
249 249 return func
250 250
251 251 return _decorator
252 252
253 253
254 254 class unbundlerecords:
255 255 """keep record of what happens during and unbundle
256 256
257 257 New records are added using `records.add('cat', obj)`. Where 'cat' is a
258 258 category of record and obj is an arbitrary object.
259 259
260 260 `records['cat']` will return all entries of this category 'cat'.
261 261
262 262 Iterating on the object itself will yield `('category', obj)` tuples
263 263 for all entries.
264 264
265 265 All iterations happens in chronological order.
266 266 """
267 267
268 268 def __init__(self):
269 269 self._categories = {}
270 270 self._sequences = []
271 271 self._replies = {}
272 272
273 273 def add(self, category, entry, inreplyto=None):
274 274 """add a new record of a given category.
275 275
276 276 The entry can then be retrieved in the list returned by
277 277 self['category']."""
278 278 self._categories.setdefault(category, []).append(entry)
279 279 self._sequences.append((category, entry))
280 280 if inreplyto is not None:
281 281 self.getreplies(inreplyto).add(category, entry)
282 282
283 283 def getreplies(self, partid):
284 284 """get the records that are replies to a specific part"""
285 285 return self._replies.setdefault(partid, unbundlerecords())
286 286
287 287 def __getitem__(self, cat):
288 288 return tuple(self._categories.get(cat, ()))
289 289
290 290 def __iter__(self):
291 291 return iter(self._sequences)
292 292
293 293 def __len__(self):
294 294 return len(self._sequences)
295 295
296 296 def __nonzero__(self):
297 297 return bool(self._sequences)
298 298
299 299 __bool__ = __nonzero__
300 300
301 301
302 302 class bundleoperation:
303 303 """an object that represents a single bundling process
304 304
305 305 Its purpose is to carry unbundle-related objects and states.
306 306
307 307 A new object should be created at the beginning of each bundle processing.
308 308 The object is to be returned by the processing function.
309 309
310 310 The object has very little content now it will ultimately contain:
311 311 * an access to the repo the bundle is applied to,
312 312 * a ui object,
313 313 * a way to retrieve a transaction to add changes to the repo,
314 314 * a way to record the result of processing each part,
315 315 * a way to construct a bundle response when applicable.
316 316 """
317 317
318 318 def __init__(self, repo, transactiongetter, captureoutput=True, source=b''):
319 319 self.repo = repo
320 320 self.ui = repo.ui
321 321 self.records = unbundlerecords()
322 322 self.reply = None
323 323 self.captureoutput = captureoutput
324 324 self.hookargs = {}
325 325 self._gettransaction = transactiongetter
326 326 # carries value that can modify part behavior
327 327 self.modes = {}
328 328 self.source = source
329 329
330 330 def gettransaction(self):
331 331 transaction = self._gettransaction()
332 332
333 333 if self.hookargs:
334 334 # the ones added to the transaction supercede those added
335 335 # to the operation.
336 336 self.hookargs.update(transaction.hookargs)
337 337 transaction.hookargs = self.hookargs
338 338
339 339 # mark the hookargs as flushed. further attempts to add to
340 340 # hookargs will result in an abort.
341 341 self.hookargs = None
342 342
343 343 return transaction
344 344
345 345 def addhookargs(self, hookargs):
346 346 if self.hookargs is None:
347 347 raise error.ProgrammingError(
348 348 b'attempted to add hookargs to '
349 349 b'operation after transaction started'
350 350 )
351 351 self.hookargs.update(hookargs)
352 352
353 353
354 354 class TransactionUnavailable(RuntimeError):
355 355 pass
356 356
357 357
358 358 def _notransaction():
359 359 """default method to get a transaction while processing a bundle
360 360
361 361 Raise an exception to highlight the fact that no transaction was expected
362 362 to be created"""
363 363 raise TransactionUnavailable()
364 364
365 365
366 366 def applybundle(repo, unbundler, tr, source, url=None, **kwargs):
367 367 # transform me into unbundler.apply() as soon as the freeze is lifted
368 368 if isinstance(unbundler, unbundle20):
369 369 tr.hookargs[b'bundle2'] = b'1'
370 370 if source is not None and b'source' not in tr.hookargs:
371 371 tr.hookargs[b'source'] = source
372 372 if url is not None and b'url' not in tr.hookargs:
373 373 tr.hookargs[b'url'] = url
374 374 return processbundle(repo, unbundler, lambda: tr, source=source)
375 375 else:
376 376 # the transactiongetter won't be used, but we might as well set it
377 377 op = bundleoperation(repo, lambda: tr, source=source)
378 378 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
379 379 return op
380 380
381 381
382 382 class partiterator:
383 383 def __init__(self, repo, op, unbundler):
384 384 self.repo = repo
385 385 self.op = op
386 386 self.unbundler = unbundler
387 387 self.iterator = None
388 388 self.count = 0
389 389 self.current = None
390 390
391 391 def __enter__(self):
392 392 def func():
393 393 itr = enumerate(self.unbundler.iterparts(), 1)
394 394 for count, p in itr:
395 395 self.count = count
396 396 self.current = p
397 397 yield p
398 398 p.consume()
399 399 self.current = None
400 400
401 401 self.iterator = func()
402 402 return self.iterator
403 403
404 404 def __exit__(self, type, exc, tb):
405 405 if not self.iterator:
406 406 return
407 407
408 408 # Only gracefully abort in a normal exception situation. User aborts
409 409 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
410 410 # and should not gracefully cleanup.
411 411 if isinstance(exc, Exception):
412 412 # Any exceptions seeking to the end of the bundle at this point are
413 413 # almost certainly related to the underlying stream being bad.
414 414 # And, chances are that the exception we're handling is related to
415 415 # getting in that bad state. So, we swallow the seeking error and
416 416 # re-raise the original error.
417 417 seekerror = False
418 418 try:
419 419 if self.current:
420 420 # consume the part content to not corrupt the stream.
421 421 self.current.consume()
422 422
423 423 for part in self.iterator:
424 424 # consume the bundle content
425 425 part.consume()
426 426 except Exception:
427 427 seekerror = True
428 428
429 429 # Small hack to let caller code distinguish exceptions from bundle2
430 430 # processing from processing the old format. This is mostly needed
431 431 # to handle different return codes to unbundle according to the type
432 432 # of bundle. We should probably clean up or drop this return code
433 433 # craziness in a future version.
434 434 exc.duringunbundle2 = True
435 435 salvaged = []
436 436 replycaps = None
437 437 if self.op.reply is not None:
438 438 salvaged = self.op.reply.salvageoutput()
439 439 replycaps = self.op.reply.capabilities
440 440 exc._replycaps = replycaps
441 441 exc._bundle2salvagedoutput = salvaged
442 442
443 443 # Re-raising from a variable loses the original stack. So only use
444 444 # that form if we need to.
445 445 if seekerror:
446 446 raise exc
447 447
448 448 self.repo.ui.debug(
449 449 b'bundle2-input-bundle: %i parts total\n' % self.count
450 450 )
451 451
452 452
453 453 def processbundle(repo, unbundler, transactiongetter=None, op=None, source=b''):
454 454 """This function process a bundle, apply effect to/from a repo
455 455
456 456 It iterates over each part then searches for and uses the proper handling
457 457 code to process the part. Parts are processed in order.
458 458
459 459 Unknown Mandatory part will abort the process.
460 460
461 461 It is temporarily possible to provide a prebuilt bundleoperation to the
462 462 function. This is used to ensure output is properly propagated in case of
463 463 an error during the unbundling. This output capturing part will likely be
464 464 reworked and this ability will probably go away in the process.
465 465 """
466 466 if op is None:
467 467 if transactiongetter is None:
468 468 transactiongetter = _notransaction
469 469 op = bundleoperation(repo, transactiongetter, source=source)
470 470 # todo:
471 471 # - replace this is a init function soon.
472 472 # - exception catching
473 473 unbundler.params
474 474 if repo.ui.debugflag:
475 475 msg = [b'bundle2-input-bundle:']
476 476 if unbundler.params:
477 477 msg.append(b' %i params' % len(unbundler.params))
478 478 if op._gettransaction is None or op._gettransaction is _notransaction:
479 479 msg.append(b' no-transaction')
480 480 else:
481 481 msg.append(b' with-transaction')
482 482 msg.append(b'\n')
483 483 repo.ui.debug(b''.join(msg))
484 484
485 485 processparts(repo, op, unbundler)
486 486
487 487 return op
488 488
489 489
490 490 def processparts(repo, op, unbundler):
491 491 with partiterator(repo, op, unbundler) as parts:
492 492 for part in parts:
493 493 _processpart(op, part)
494 494
495 495
496 496 def _processchangegroup(op, cg, tr, source, url, **kwargs):
497 497 ret = cg.apply(op.repo, tr, source, url, **kwargs)
498 498 op.records.add(
499 499 b'changegroup',
500 500 {
501 501 b'return': ret,
502 502 },
503 503 )
504 504 return ret
505 505
506 506
507 507 def _gethandler(op, part):
508 508 status = b'unknown' # used by debug output
509 509 try:
510 510 handler = parthandlermapping.get(part.type)
511 511 if handler is None:
512 512 status = b'unsupported-type'
513 513 raise error.BundleUnknownFeatureError(parttype=part.type)
514 514 indebug(op.ui, b'found a handler for part %s' % part.type)
515 515 unknownparams = part.mandatorykeys - handler.params
516 516 if unknownparams:
517 517 unknownparams = list(unknownparams)
518 518 unknownparams.sort()
519 519 status = b'unsupported-params (%s)' % b', '.join(unknownparams)
520 520 raise error.BundleUnknownFeatureError(
521 521 parttype=part.type, params=unknownparams
522 522 )
523 523 status = b'supported'
524 524 except error.BundleUnknownFeatureError as exc:
525 525 if part.mandatory: # mandatory parts
526 526 raise
527 527 indebug(op.ui, b'ignoring unsupported advisory part %s' % exc)
528 528 return # skip to part processing
529 529 finally:
530 530 if op.ui.debugflag:
531 531 msg = [b'bundle2-input-part: "%s"' % part.type]
532 532 if not part.mandatory:
533 533 msg.append(b' (advisory)')
534 534 nbmp = len(part.mandatorykeys)
535 535 nbap = len(part.params) - nbmp
536 536 if nbmp or nbap:
537 537 msg.append(b' (params:')
538 538 if nbmp:
539 539 msg.append(b' %i mandatory' % nbmp)
540 540 if nbap:
541 541 msg.append(b' %i advisory' % nbmp)
542 542 msg.append(b')')
543 543 msg.append(b' %s\n' % status)
544 544 op.ui.debug(b''.join(msg))
545 545
546 546 return handler
547 547
548 548
549 549 def _processpart(op, part):
550 550 """process a single part from a bundle
551 551
552 552 The part is guaranteed to have been fully consumed when the function exits
553 553 (even if an exception is raised)."""
554 554 handler = _gethandler(op, part)
555 555 if handler is None:
556 556 return
557 557
558 558 # handler is called outside the above try block so that we don't
559 559 # risk catching KeyErrors from anything other than the
560 560 # parthandlermapping lookup (any KeyError raised by handler()
561 561 # itself represents a defect of a different variety).
562 562 output = None
563 563 if op.captureoutput and op.reply is not None:
564 564 op.ui.pushbuffer(error=True, subproc=True)
565 565 output = b''
566 566 try:
567 567 handler(op, part)
568 568 finally:
569 569 if output is not None:
570 570 output = op.ui.popbuffer()
571 571 if output:
572 572 outpart = op.reply.newpart(b'output', data=output, mandatory=False)
573 573 outpart.addparam(
574 574 b'in-reply-to', pycompat.bytestr(part.id), mandatory=False
575 575 )
576 576
577 577
578 578 def decodecaps(blob):
579 579 """decode a bundle2 caps bytes blob into a dictionary
580 580
581 581 The blob is a list of capabilities (one per line)
582 582 Capabilities may have values using a line of the form::
583 583
584 584 capability=value1,value2,value3
585 585
586 586 The values are always a list."""
587 587 caps = {}
588 588 for line in blob.splitlines():
589 589 if not line:
590 590 continue
591 591 if b'=' not in line:
592 592 key, vals = line, ()
593 593 else:
594 594 key, vals = line.split(b'=', 1)
595 595 vals = vals.split(b',')
596 596 key = urlreq.unquote(key)
597 597 vals = [urlreq.unquote(v) for v in vals]
598 598 caps[key] = vals
599 599 return caps
600 600
601 601
602 602 def encodecaps(caps):
603 603 """encode a bundle2 caps dictionary into a bytes blob"""
604 604 chunks = []
605 605 for ca in sorted(caps):
606 606 vals = caps[ca]
607 607 ca = urlreq.quote(ca)
608 608 vals = [urlreq.quote(v) for v in vals]
609 609 if vals:
610 610 ca = b"%s=%s" % (ca, b','.join(vals))
611 611 chunks.append(ca)
612 612 return b'\n'.join(chunks)
613 613
614 614
615 615 bundletypes = {
616 616 b"": (b"", b'UN'), # only when using unbundle on ssh and old http servers
617 617 # since the unification ssh accepts a header but there
618 618 # is no capability signaling it.
619 619 b"HG20": (), # special-cased below
620 620 b"HG10UN": (b"HG10UN", b'UN'),
621 621 b"HG10BZ": (b"HG10", b'BZ'),
622 622 b"HG10GZ": (b"HG10GZ", b'GZ'),
623 623 }
624 624
625 625 # hgweb uses this list to communicate its preferred type
626 626 bundlepriority = [b'HG10GZ', b'HG10BZ', b'HG10UN']
627 627
628 628
629 629 class bundle20:
630 630 """represent an outgoing bundle2 container
631 631
632 632 Use the `addparam` method to add stream level parameter. and `newpart` to
633 633 populate it. Then call `getchunks` to retrieve all the binary chunks of
634 634 data that compose the bundle2 container."""
635 635
636 636 _magicstring = b'HG20'
637 637
638 638 def __init__(self, ui, capabilities=()):
639 639 self.ui = ui
640 640 self._params = []
641 641 self._parts = []
642 642 self.capabilities = dict(capabilities)
643 643 self._compengine = util.compengines.forbundletype(b'UN')
644 644 self._compopts = None
645 645 # If compression is being handled by a consumer of the raw
646 646 # data (e.g. the wire protocol), unsetting this flag tells
647 647 # consumers that the bundle is best left uncompressed.
648 648 self.prefercompressed = True
649 649
650 650 def setcompression(self, alg, compopts=None):
651 651 """setup core part compression to <alg>"""
652 652 if alg in (None, b'UN'):
653 653 return
654 654 assert not any(n.lower() == b'compression' for n, v in self._params)
655 655 self.addparam(b'Compression', alg)
656 656 self._compengine = util.compengines.forbundletype(alg)
657 657 self._compopts = compopts
658 658
659 659 @property
660 660 def nbparts(self):
661 661 """total number of parts added to the bundler"""
662 662 return len(self._parts)
663 663
664 664 # methods used to defines the bundle2 content
665 665 def addparam(self, name, value=None):
666 666 """add a stream level parameter"""
667 667 if not name:
668 668 raise error.ProgrammingError(b'empty parameter name')
669 669 if name[0:1] not in pycompat.bytestr(
670 670 string.ascii_letters # pytype: disable=wrong-arg-types
671 671 ):
672 672 raise error.ProgrammingError(
673 673 b'non letter first character: %s' % name
674 674 )
675 675 self._params.append((name, value))
676 676
677 677 def addpart(self, part):
678 678 """add a new part to the bundle2 container
679 679
680 680 Parts contains the actual applicative payload."""
681 681 assert part.id is None
682 682 part.id = len(self._parts) # very cheap counter
683 683 self._parts.append(part)
684 684
685 685 def newpart(self, typeid, *args, **kwargs):
686 686 """create a new part and add it to the containers
687 687
688 688 As the part is directly added to the containers. For now, this means
689 689 that any failure to properly initialize the part after calling
690 690 ``newpart`` should result in a failure of the whole bundling process.
691 691
692 692 You can still fall back to manually create and add if you need better
693 693 control."""
694 694 part = bundlepart(typeid, *args, **kwargs)
695 695 self.addpart(part)
696 696 return part
697 697
698 698 # methods used to generate the bundle2 stream
699 699 def getchunks(self):
700 700 if self.ui.debugflag:
701 701 msg = [b'bundle2-output-bundle: "%s",' % self._magicstring]
702 702 if self._params:
703 703 msg.append(b' (%i params)' % len(self._params))
704 704 msg.append(b' %i parts total\n' % len(self._parts))
705 705 self.ui.debug(b''.join(msg))
706 706 outdebug(self.ui, b'start emission of %s stream' % self._magicstring)
707 707 yield self._magicstring
708 708 param = self._paramchunk()
709 709 outdebug(self.ui, b'bundle parameter: %s' % param)
710 710 yield _pack(_fstreamparamsize, len(param))
711 711 if param:
712 712 yield param
713 713 for chunk in self._compengine.compressstream(
714 714 self._getcorechunk(), self._compopts
715 715 ):
716 716 yield chunk
717 717
718 718 def _paramchunk(self):
719 719 """return a encoded version of all stream parameters"""
720 720 blocks = []
721 721 for par, value in self._params:
722 722 par = urlreq.quote(par)
723 723 if value is not None:
724 724 value = urlreq.quote(value)
725 725 par = b'%s=%s' % (par, value)
726 726 blocks.append(par)
727 727 return b' '.join(blocks)
728 728
729 729 def _getcorechunk(self):
730 730 """yield chunk for the core part of the bundle
731 731
732 732 (all but headers and parameters)"""
733 733 outdebug(self.ui, b'start of parts')
734 734 for part in self._parts:
735 735 outdebug(self.ui, b'bundle part: "%s"' % part.type)
736 736 for chunk in part.getchunks(ui=self.ui):
737 737 yield chunk
738 738 outdebug(self.ui, b'end of bundle')
739 739 yield _pack(_fpartheadersize, 0)
740 740
741 741 def salvageoutput(self):
742 742 """return a list with a copy of all output parts in the bundle
743 743
744 744 This is meant to be used during error handling to make sure we preserve
745 745 server output"""
746 746 salvaged = []
747 747 for part in self._parts:
748 748 if part.type.startswith(b'output'):
749 749 salvaged.append(part.copy())
750 750 return salvaged
751 751
752 752
753 753 class unpackermixin:
754 754 """A mixin to extract bytes and struct data from a stream"""
755 755
756 756 def __init__(self, fp):
757 757 self._fp = fp
758 758
759 759 def _unpack(self, format):
760 760 """unpack this struct format from the stream
761 761
762 762 This method is meant for internal usage by the bundle2 protocol only.
763 763 They directly manipulate the low level stream including bundle2 level
764 764 instruction.
765 765
766 766 Do not use it to implement higher-level logic or methods."""
767 767 data = self._readexact(struct.calcsize(format))
768 768 return _unpack(format, data)
769 769
770 770 def _readexact(self, size):
771 771 """read exactly <size> bytes from the stream
772 772
773 773 This method is meant for internal usage by the bundle2 protocol only.
774 774 They directly manipulate the low level stream including bundle2 level
775 775 instruction.
776 776
777 777 Do not use it to implement higher-level logic or methods."""
778 778 return changegroup.readexactly(self._fp, size)
779 779
780 780
781 781 def getunbundler(ui, fp, magicstring=None):
782 782 """return a valid unbundler object for a given magicstring"""
783 783 if magicstring is None:
784 784 magicstring = changegroup.readexactly(fp, 4)
785 785 magic, version = magicstring[0:2], magicstring[2:4]
786 786 if magic != b'HG':
787 787 ui.debug(
788 788 b"error: invalid magic: %r (version %r), should be 'HG'\n"
789 789 % (magic, version)
790 790 )
791 791 raise error.Abort(_(b'not a Mercurial bundle'))
792 792 unbundlerclass = formatmap.get(version)
793 793 if unbundlerclass is None:
794 794 raise error.Abort(_(b'unknown bundle version %s') % version)
795 795 unbundler = unbundlerclass(ui, fp)
796 796 indebug(ui, b'start processing of %s stream' % magicstring)
797 797 return unbundler
798 798
799 799
800 800 class unbundle20(unpackermixin):
801 801 """interpret a bundle2 stream
802 802
803 803 This class is fed with a binary stream and yields parts through its
804 804 `iterparts` methods."""
805 805
806 806 _magicstring = b'HG20'
807 807
808 808 def __init__(self, ui, fp):
809 809 """If header is specified, we do not read it out of the stream."""
810 810 self.ui = ui
811 811 self._compengine = util.compengines.forbundletype(b'UN')
812 812 self._compressed = None
813 813 super(unbundle20, self).__init__(fp)
814 814
815 815 @util.propertycache
816 816 def params(self):
817 817 """dictionary of stream level parameters"""
818 818 indebug(self.ui, b'reading bundle2 stream parameters')
819 819 params = {}
820 820 paramssize = self._unpack(_fstreamparamsize)[0]
821 821 if paramssize < 0:
822 822 raise error.BundleValueError(
823 823 b'negative bundle param size: %i' % paramssize
824 824 )
825 825 if paramssize:
826 826 params = self._readexact(paramssize)
827 827 params = self._processallparams(params)
828 828 return params
829 829
830 830 def _processallparams(self, paramsblock):
831 831 """ """
832 832 params = util.sortdict()
833 833 for p in paramsblock.split(b' '):
834 834 p = p.split(b'=', 1)
835 835 p = [urlreq.unquote(i) for i in p]
836 836 if len(p) < 2:
837 837 p.append(None)
838 838 self._processparam(*p)
839 839 params[p[0]] = p[1]
840 840 return params
841 841
842 842 def _processparam(self, name, value):
843 843 """process a parameter, applying its effect if needed
844 844
845 845 Parameter starting with a lower case letter are advisory and will be
846 846 ignored when unknown. Those starting with an upper case letter are
847 847 mandatory and will this function will raise a KeyError when unknown.
848 848
849 849 Note: no option are currently supported. Any input will be either
850 850 ignored or failing.
851 851 """
852 852 if not name:
853 853 raise ValueError('empty parameter name')
854 854 if name[0:1] not in pycompat.bytestr(
855 855 string.ascii_letters # pytype: disable=wrong-arg-types
856 856 ):
857 857 raise ValueError('non letter first character: %s' % name)
858 858 try:
859 859 handler = b2streamparamsmap[name.lower()]
860 860 except KeyError:
861 861 if name[0:1].islower():
862 862 indebug(self.ui, b"ignoring unknown parameter %s" % name)
863 863 else:
864 864 raise error.BundleUnknownFeatureError(params=(name,))
865 865 else:
866 866 handler(self, name, value)
867 867
868 868 def _forwardchunks(self):
869 869 """utility to transfer a bundle2 as binary
870 870
871 871 This is made necessary by the fact the 'getbundle' command over 'ssh'
872 872 have no way to know then the reply end, relying on the bundle to be
873 873 interpreted to know its end. This is terrible and we are sorry, but we
874 874 needed to move forward to get general delta enabled.
875 875 """
876 876 yield self._magicstring
877 877 assert 'params' not in vars(self)
878 878 paramssize = self._unpack(_fstreamparamsize)[0]
879 879 if paramssize < 0:
880 880 raise error.BundleValueError(
881 881 b'negative bundle param size: %i' % paramssize
882 882 )
883 883 if paramssize:
884 884 params = self._readexact(paramssize)
885 885 self._processallparams(params)
886 886 # The payload itself is decompressed below, so drop
887 887 # the compression parameter passed down to compensate.
888 888 outparams = []
889 889 for p in params.split(b' '):
890 890 k, v = p.split(b'=', 1)
891 891 if k.lower() != b'compression':
892 892 outparams.append(p)
893 893 outparams = b' '.join(outparams)
894 894 yield _pack(_fstreamparamsize, len(outparams))
895 895 yield outparams
896 896 else:
897 897 yield _pack(_fstreamparamsize, paramssize)
898 898 # From there, payload might need to be decompressed
899 899 self._fp = self._compengine.decompressorreader(self._fp)
900 900 emptycount = 0
901 901 while emptycount < 2:
902 902 # so we can brainlessly loop
903 903 assert _fpartheadersize == _fpayloadsize
904 904 size = self._unpack(_fpartheadersize)[0]
905 905 yield _pack(_fpartheadersize, size)
906 906 if size:
907 907 emptycount = 0
908 908 else:
909 909 emptycount += 1
910 910 continue
911 911 if size == flaginterrupt:
912 912 continue
913 913 elif size < 0:
914 914 raise error.BundleValueError(b'negative chunk size: %i')
915 915 yield self._readexact(size)
916 916
917 917 def iterparts(self, seekable=False):
918 918 """yield all parts contained in the stream"""
919 919 cls = seekableunbundlepart if seekable else unbundlepart
920 920 # make sure param have been loaded
921 921 self.params
922 922 # From there, payload need to be decompressed
923 923 self._fp = self._compengine.decompressorreader(self._fp)
924 924 indebug(self.ui, b'start extraction of bundle2 parts')
925 925 headerblock = self._readpartheader()
926 926 while headerblock is not None:
927 927 part = cls(self.ui, headerblock, self._fp)
928 928 yield part
929 929 # Ensure part is fully consumed so we can start reading the next
930 930 # part.
931 931 part.consume()
932 932
933 933 headerblock = self._readpartheader()
934 934 indebug(self.ui, b'end of bundle2 stream')
935 935
936 936 def _readpartheader(self):
937 937 """reads a part header size and return the bytes blob
938 938
939 939 returns None if empty"""
940 940 headersize = self._unpack(_fpartheadersize)[0]
941 941 if headersize < 0:
942 942 raise error.BundleValueError(
943 943 b'negative part header size: %i' % headersize
944 944 )
945 945 indebug(self.ui, b'part header size: %i' % headersize)
946 946 if headersize:
947 947 return self._readexact(headersize)
948 948 return None
949 949
950 950 def compressed(self):
951 951 self.params # load params
952 952 return self._compressed
953 953
954 954 def close(self):
955 955 """close underlying file"""
956 956 if util.safehasattr(self._fp, 'close'):
957 957 return self._fp.close()
958 958
959 959
960 960 formatmap = {b'20': unbundle20}
961 961
962 962 b2streamparamsmap = {}
963 963
964 964
965 965 def b2streamparamhandler(name):
966 966 """register a handler for a stream level parameter"""
967 967
968 968 def decorator(func):
969 969 assert name not in formatmap
970 970 b2streamparamsmap[name] = func
971 971 return func
972 972
973 973 return decorator
974 974
975 975
976 976 @b2streamparamhandler(b'compression')
977 977 def processcompression(unbundler, param, value):
978 978 """read compression parameter and install payload decompression"""
979 979 if value not in util.compengines.supportedbundletypes:
980 980 raise error.BundleUnknownFeatureError(params=(param,), values=(value,))
981 981 unbundler._compengine = util.compengines.forbundletype(value)
982 982 if value is not None:
983 983 unbundler._compressed = True
984 984
985 985
986 986 class bundlepart:
987 987 """A bundle2 part contains application level payload
988 988
989 989 The part `type` is used to route the part to the application level
990 990 handler.
991 991
992 992 The part payload is contained in ``part.data``. It could be raw bytes or a
993 993 generator of byte chunks.
994 994
995 995 You can add parameters to the part using the ``addparam`` method.
996 996 Parameters can be either mandatory (default) or advisory. Remote side
997 997 should be able to safely ignore the advisory ones.
998 998
999 999 Both data and parameters cannot be modified after the generation has begun.
1000 1000 """
1001 1001
1002 1002 def __init__(
1003 1003 self,
1004 1004 parttype,
1005 1005 mandatoryparams=(),
1006 1006 advisoryparams=(),
1007 1007 data=b'',
1008 1008 mandatory=True,
1009 1009 ):
1010 1010 validateparttype(parttype)
1011 1011 self.id = None
1012 1012 self.type = parttype
1013 1013 self._data = data
1014 1014 self._mandatoryparams = list(mandatoryparams)
1015 1015 self._advisoryparams = list(advisoryparams)
1016 1016 # checking for duplicated entries
1017 1017 self._seenparams = set()
1018 1018 for pname, __ in self._mandatoryparams + self._advisoryparams:
1019 1019 if pname in self._seenparams:
1020 1020 raise error.ProgrammingError(b'duplicated params: %s' % pname)
1021 1021 self._seenparams.add(pname)
1022 1022 # status of the part's generation:
1023 1023 # - None: not started,
1024 1024 # - False: currently generated,
1025 1025 # - True: generation done.
1026 1026 self._generated = None
1027 1027 self.mandatory = mandatory
1028 1028
1029 1029 def __repr__(self):
1030 1030 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
1031 1031 return '<%s object at %x; id: %s; type: %s; mandatory: %s>' % (
1032 1032 cls,
1033 1033 id(self),
1034 1034 self.id,
1035 1035 self.type,
1036 1036 self.mandatory,
1037 1037 )
1038 1038
1039 1039 def copy(self):
1040 1040 """return a copy of the part
1041 1041
1042 1042 The new part have the very same content but no partid assigned yet.
1043 1043 Parts with generated data cannot be copied."""
1044 1044 assert not util.safehasattr(self.data, 'next')
1045 1045 return self.__class__(
1046 1046 self.type,
1047 1047 self._mandatoryparams,
1048 1048 self._advisoryparams,
1049 1049 self._data,
1050 1050 self.mandatory,
1051 1051 )
1052 1052
1053 1053 # methods used to defines the part content
1054 1054 @property
1055 1055 def data(self):
1056 1056 return self._data
1057 1057
1058 1058 @data.setter
1059 1059 def data(self, data):
1060 1060 if self._generated is not None:
1061 1061 raise error.ReadOnlyPartError(b'part is being generated')
1062 1062 self._data = data
1063 1063
1064 1064 @property
1065 1065 def mandatoryparams(self):
1066 1066 # make it an immutable tuple to force people through ``addparam``
1067 1067 return tuple(self._mandatoryparams)
1068 1068
1069 1069 @property
1070 1070 def advisoryparams(self):
1071 1071 # make it an immutable tuple to force people through ``addparam``
1072 1072 return tuple(self._advisoryparams)
1073 1073
1074 1074 def addparam(self, name, value=b'', mandatory=True):
1075 1075 """add a parameter to the part
1076 1076
1077 1077 If 'mandatory' is set to True, the remote handler must claim support
1078 1078 for this parameter or the unbundling will be aborted.
1079 1079
1080 1080 The 'name' and 'value' cannot exceed 255 bytes each.
1081 1081 """
1082 1082 if self._generated is not None:
1083 1083 raise error.ReadOnlyPartError(b'part is being generated')
1084 1084 if name in self._seenparams:
1085 1085 raise ValueError(b'duplicated params: %s' % name)
1086 1086 self._seenparams.add(name)
1087 1087 params = self._advisoryparams
1088 1088 if mandatory:
1089 1089 params = self._mandatoryparams
1090 1090 params.append((name, value))
1091 1091
1092 1092 # methods used to generates the bundle2 stream
1093 1093 def getchunks(self, ui):
1094 1094 if self._generated is not None:
1095 1095 raise error.ProgrammingError(b'part can only be consumed once')
1096 1096 self._generated = False
1097 1097
1098 1098 if ui.debugflag:
1099 1099 msg = [b'bundle2-output-part: "%s"' % self.type]
1100 1100 if not self.mandatory:
1101 1101 msg.append(b' (advisory)')
1102 1102 nbmp = len(self.mandatoryparams)
1103 1103 nbap = len(self.advisoryparams)
1104 1104 if nbmp or nbap:
1105 1105 msg.append(b' (params:')
1106 1106 if nbmp:
1107 1107 msg.append(b' %i mandatory' % nbmp)
1108 1108 if nbap:
1109 1109 msg.append(b' %i advisory' % nbmp)
1110 1110 msg.append(b')')
1111 1111 if not self.data:
1112 1112 msg.append(b' empty payload')
1113 1113 elif util.safehasattr(self.data, 'next') or util.safehasattr(
1114 1114 self.data, b'__next__'
1115 1115 ):
1116 1116 msg.append(b' streamed payload')
1117 1117 else:
1118 1118 msg.append(b' %i bytes payload' % len(self.data))
1119 1119 msg.append(b'\n')
1120 1120 ui.debug(b''.join(msg))
1121 1121
1122 1122 #### header
1123 1123 if self.mandatory:
1124 1124 parttype = self.type.upper()
1125 1125 else:
1126 1126 parttype = self.type.lower()
1127 1127 outdebug(ui, b'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1128 1128 ## parttype
1129 1129 header = [
1130 1130 _pack(_fparttypesize, len(parttype)),
1131 1131 parttype,
1132 1132 _pack(_fpartid, self.id),
1133 1133 ]
1134 1134 ## parameters
1135 1135 # count
1136 1136 manpar = self.mandatoryparams
1137 1137 advpar = self.advisoryparams
1138 1138 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1139 1139 # size
1140 1140 parsizes = []
1141 1141 for key, value in manpar:
1142 1142 parsizes.append(len(key))
1143 1143 parsizes.append(len(value))
1144 1144 for key, value in advpar:
1145 1145 parsizes.append(len(key))
1146 1146 parsizes.append(len(value))
1147 1147 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1148 1148 header.append(paramsizes)
1149 1149 # key, value
1150 1150 for key, value in manpar:
1151 1151 header.append(key)
1152 1152 header.append(value)
1153 1153 for key, value in advpar:
1154 1154 header.append(key)
1155 1155 header.append(value)
1156 1156 ## finalize header
1157 1157 try:
1158 1158 headerchunk = b''.join(header)
1159 1159 except TypeError:
1160 1160 raise TypeError(
1161 1161 'Found a non-bytes trying to '
1162 1162 'build bundle part header: %r' % header
1163 1163 )
1164 1164 outdebug(ui, b'header chunk size: %i' % len(headerchunk))
1165 1165 yield _pack(_fpartheadersize, len(headerchunk))
1166 1166 yield headerchunk
1167 1167 ## payload
1168 1168 try:
1169 1169 for chunk in self._payloadchunks():
1170 1170 outdebug(ui, b'payload chunk size: %i' % len(chunk))
1171 1171 yield _pack(_fpayloadsize, len(chunk))
1172 1172 yield chunk
1173 1173 except GeneratorExit:
1174 1174 # GeneratorExit means that nobody is listening for our
1175 1175 # results anyway, so just bail quickly rather than trying
1176 1176 # to produce an error part.
1177 1177 ui.debug(b'bundle2-generatorexit\n')
1178 1178 raise
1179 1179 except BaseException as exc:
1180 1180 bexc = stringutil.forcebytestr(exc)
1181 1181 # backup exception data for later
1182 1182 ui.debug(
1183 1183 b'bundle2-input-stream-interrupt: encoding exception %s' % bexc
1184 1184 )
1185 1185 tb = sys.exc_info()[2]
1186 1186 msg = b'unexpected error: %s' % bexc
1187 1187 interpart = bundlepart(
1188 1188 b'error:abort', [(b'message', msg)], mandatory=False
1189 1189 )
1190 1190 interpart.id = 0
1191 1191 yield _pack(_fpayloadsize, -1)
1192 1192 for chunk in interpart.getchunks(ui=ui):
1193 1193 yield chunk
1194 1194 outdebug(ui, b'closing payload chunk')
1195 1195 # abort current part payload
1196 1196 yield _pack(_fpayloadsize, 0)
1197 1197 pycompat.raisewithtb(exc, tb)
1198 1198 # end of payload
1199 1199 outdebug(ui, b'closing payload chunk')
1200 1200 yield _pack(_fpayloadsize, 0)
1201 1201 self._generated = True
1202 1202
1203 1203 def _payloadchunks(self):
1204 1204 """yield chunks of a the part payload
1205 1205
1206 1206 Exists to handle the different methods to provide data to a part."""
1207 1207 # we only support fixed size data now.
1208 1208 # This will be improved in the future.
1209 1209 if util.safehasattr(self.data, 'next') or util.safehasattr(
1210 1210 self.data, b'__next__'
1211 1211 ):
1212 1212 buff = util.chunkbuffer(self.data)
1213 1213 chunk = buff.read(preferedchunksize)
1214 1214 while chunk:
1215 1215 yield chunk
1216 1216 chunk = buff.read(preferedchunksize)
1217 1217 elif len(self.data):
1218 1218 yield self.data
1219 1219
1220 1220
1221 1221 flaginterrupt = -1
1222 1222
1223 1223
1224 1224 class interrupthandler(unpackermixin):
1225 1225 """read one part and process it with restricted capability
1226 1226
1227 1227 This allows to transmit exception raised on the producer size during part
1228 1228 iteration while the consumer is reading a part.
1229 1229
1230 1230 Part processed in this manner only have access to a ui object,"""
1231 1231
1232 1232 def __init__(self, ui, fp):
1233 1233 super(interrupthandler, self).__init__(fp)
1234 1234 self.ui = ui
1235 1235
1236 1236 def _readpartheader(self):
1237 1237 """reads a part header size and return the bytes blob
1238 1238
1239 1239 returns None if empty"""
1240 1240 headersize = self._unpack(_fpartheadersize)[0]
1241 1241 if headersize < 0:
1242 1242 raise error.BundleValueError(
1243 1243 b'negative part header size: %i' % headersize
1244 1244 )
1245 1245 indebug(self.ui, b'part header size: %i\n' % headersize)
1246 1246 if headersize:
1247 1247 return self._readexact(headersize)
1248 1248 return None
1249 1249
1250 1250 def __call__(self):
1251 1251
1252 1252 self.ui.debug(
1253 1253 b'bundle2-input-stream-interrupt: opening out of band context\n'
1254 1254 )
1255 1255 indebug(self.ui, b'bundle2 stream interruption, looking for a part.')
1256 1256 headerblock = self._readpartheader()
1257 1257 if headerblock is None:
1258 1258 indebug(self.ui, b'no part found during interruption.')
1259 1259 return
1260 1260 part = unbundlepart(self.ui, headerblock, self._fp)
1261 1261 op = interruptoperation(self.ui)
1262 1262 hardabort = False
1263 1263 try:
1264 1264 _processpart(op, part)
1265 1265 except (SystemExit, KeyboardInterrupt):
1266 1266 hardabort = True
1267 1267 raise
1268 1268 finally:
1269 1269 if not hardabort:
1270 1270 part.consume()
1271 1271 self.ui.debug(
1272 1272 b'bundle2-input-stream-interrupt: closing out of band context\n'
1273 1273 )
1274 1274
1275 1275
1276 1276 class interruptoperation:
1277 1277 """A limited operation to be use by part handler during interruption
1278 1278
1279 1279 It only have access to an ui object.
1280 1280 """
1281 1281
1282 1282 def __init__(self, ui):
1283 1283 self.ui = ui
1284 1284 self.reply = None
1285 1285 self.captureoutput = False
1286 1286
1287 1287 @property
1288 1288 def repo(self):
1289 1289 raise error.ProgrammingError(b'no repo access from stream interruption')
1290 1290
1291 1291 def gettransaction(self):
1292 1292 raise TransactionUnavailable(b'no repo access from stream interruption')
1293 1293
1294 1294
1295 1295 def decodepayloadchunks(ui, fh):
1296 1296 """Reads bundle2 part payload data into chunks.
1297 1297
1298 1298 Part payload data consists of framed chunks. This function takes
1299 1299 a file handle and emits those chunks.
1300 1300 """
1301 1301 dolog = ui.configbool(b'devel', b'bundle2.debug')
1302 1302 debug = ui.debug
1303 1303
1304 1304 headerstruct = struct.Struct(_fpayloadsize)
1305 1305 headersize = headerstruct.size
1306 1306 unpack = headerstruct.unpack
1307 1307
1308 1308 readexactly = changegroup.readexactly
1309 1309 read = fh.read
1310 1310
1311 1311 chunksize = unpack(readexactly(fh, headersize))[0]
1312 1312 indebug(ui, b'payload chunk size: %i' % chunksize)
1313 1313
1314 1314 # changegroup.readexactly() is inlined below for performance.
1315 1315 while chunksize:
1316 1316 if chunksize >= 0:
1317 1317 s = read(chunksize)
1318 1318 if len(s) < chunksize:
1319 1319 raise error.Abort(
1320 1320 _(
1321 1321 b'stream ended unexpectedly '
1322 1322 b' (got %d bytes, expected %d)'
1323 1323 )
1324 1324 % (len(s), chunksize)
1325 1325 )
1326 1326
1327 1327 yield s
1328 1328 elif chunksize == flaginterrupt:
1329 1329 # Interrupt "signal" detected. The regular stream is interrupted
1330 1330 # and a bundle2 part follows. Consume it.
1331 1331 interrupthandler(ui, fh)()
1332 1332 else:
1333 1333 raise error.BundleValueError(
1334 1334 b'negative payload chunk size: %s' % chunksize
1335 1335 )
1336 1336
1337 1337 s = read(headersize)
1338 1338 if len(s) < headersize:
1339 1339 raise error.Abort(
1340 1340 _(b'stream ended unexpectedly (got %d bytes, expected %d)')
1341 1341 % (len(s), chunksize)
1342 1342 )
1343 1343
1344 1344 chunksize = unpack(s)[0]
1345 1345
1346 1346 # indebug() inlined for performance.
1347 1347 if dolog:
1348 1348 debug(b'bundle2-input: payload chunk size: %i\n' % chunksize)
1349 1349
1350 1350
1351 1351 class unbundlepart(unpackermixin):
1352 1352 """a bundle part read from a bundle"""
1353 1353
1354 1354 def __init__(self, ui, header, fp):
1355 1355 super(unbundlepart, self).__init__(fp)
1356 1356 self._seekable = util.safehasattr(fp, 'seek') and util.safehasattr(
1357 1357 fp, b'tell'
1358 1358 )
1359 1359 self.ui = ui
1360 1360 # unbundle state attr
1361 1361 self._headerdata = header
1362 1362 self._headeroffset = 0
1363 1363 self._initialized = False
1364 1364 self.consumed = False
1365 1365 # part data
1366 1366 self.id = None
1367 1367 self.type = None
1368 1368 self.mandatoryparams = None
1369 1369 self.advisoryparams = None
1370 1370 self.params = None
1371 1371 self.mandatorykeys = ()
1372 1372 self._readheader()
1373 1373 self._mandatory = None
1374 1374 self._pos = 0
1375 1375
1376 1376 def _fromheader(self, size):
1377 1377 """return the next <size> byte from the header"""
1378 1378 offset = self._headeroffset
1379 1379 data = self._headerdata[offset : (offset + size)]
1380 1380 self._headeroffset = offset + size
1381 1381 return data
1382 1382
1383 1383 def _unpackheader(self, format):
1384 1384 """read given format from header
1385 1385
1386 1386 This automatically compute the size of the format to read."""
1387 1387 data = self._fromheader(struct.calcsize(format))
1388 1388 return _unpack(format, data)
1389 1389
1390 1390 def _initparams(self, mandatoryparams, advisoryparams):
1391 1391 """internal function to setup all logic related parameters"""
1392 1392 # make it read only to prevent people touching it by mistake.
1393 1393 self.mandatoryparams = tuple(mandatoryparams)
1394 1394 self.advisoryparams = tuple(advisoryparams)
1395 1395 # user friendly UI
1396 1396 self.params = util.sortdict(self.mandatoryparams)
1397 1397 self.params.update(self.advisoryparams)
1398 1398 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1399 1399
1400 1400 def _readheader(self):
1401 1401 """read the header and setup the object"""
1402 1402 typesize = self._unpackheader(_fparttypesize)[0]
1403 1403 self.type = self._fromheader(typesize)
1404 1404 indebug(self.ui, b'part type: "%s"' % self.type)
1405 1405 self.id = self._unpackheader(_fpartid)[0]
1406 1406 indebug(self.ui, b'part id: "%s"' % pycompat.bytestr(self.id))
1407 1407 # extract mandatory bit from type
1408 1408 self.mandatory = self.type != self.type.lower()
1409 1409 self.type = self.type.lower()
1410 1410 ## reading parameters
1411 1411 # param count
1412 1412 mancount, advcount = self._unpackheader(_fpartparamcount)
1413 1413 indebug(self.ui, b'part parameters: %i' % (mancount + advcount))
1414 1414 # param size
1415 1415 fparamsizes = _makefpartparamsizes(mancount + advcount)
1416 1416 paramsizes = self._unpackheader(fparamsizes)
1417 1417 # make it a list of couple again
1418 1418 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1419 1419 # split mandatory from advisory
1420 1420 mansizes = paramsizes[:mancount]
1421 1421 advsizes = paramsizes[mancount:]
1422 1422 # retrieve param value
1423 1423 manparams = []
1424 1424 for key, value in mansizes:
1425 1425 manparams.append((self._fromheader(key), self._fromheader(value)))
1426 1426 advparams = []
1427 1427 for key, value in advsizes:
1428 1428 advparams.append((self._fromheader(key), self._fromheader(value)))
1429 1429 self._initparams(manparams, advparams)
1430 1430 ## part payload
1431 1431 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1432 1432 # we read the data, tell it
1433 1433 self._initialized = True
1434 1434
1435 1435 def _payloadchunks(self):
1436 1436 """Generator of decoded chunks in the payload."""
1437 1437 return decodepayloadchunks(self.ui, self._fp)
1438 1438
1439 1439 def consume(self):
1440 1440 """Read the part payload until completion.
1441 1441
1442 1442 By consuming the part data, the underlying stream read offset will
1443 1443 be advanced to the next part (or end of stream).
1444 1444 """
1445 1445 if self.consumed:
1446 1446 return
1447 1447
1448 1448 chunk = self.read(32768)
1449 1449 while chunk:
1450 1450 self._pos += len(chunk)
1451 1451 chunk = self.read(32768)
1452 1452
1453 1453 def read(self, size=None):
1454 1454 """read payload data"""
1455 1455 if not self._initialized:
1456 1456 self._readheader()
1457 1457 if size is None:
1458 1458 data = self._payloadstream.read()
1459 1459 else:
1460 1460 data = self._payloadstream.read(size)
1461 1461 self._pos += len(data)
1462 1462 if size is None or len(data) < size:
1463 1463 if not self.consumed and self._pos:
1464 1464 self.ui.debug(
1465 1465 b'bundle2-input-part: total payload size %i\n' % self._pos
1466 1466 )
1467 1467 self.consumed = True
1468 1468 return data
1469 1469
1470 1470
1471 1471 class seekableunbundlepart(unbundlepart):
1472 1472 """A bundle2 part in a bundle that is seekable.
1473 1473
1474 1474 Regular ``unbundlepart`` instances can only be read once. This class
1475 1475 extends ``unbundlepart`` to enable bi-directional seeking within the
1476 1476 part.
1477 1477
1478 1478 Bundle2 part data consists of framed chunks. Offsets when seeking
1479 1479 refer to the decoded data, not the offsets in the underlying bundle2
1480 1480 stream.
1481 1481
1482 1482 To facilitate quickly seeking within the decoded data, instances of this
1483 1483 class maintain a mapping between offsets in the underlying stream and
1484 1484 the decoded payload. This mapping will consume memory in proportion
1485 1485 to the number of chunks within the payload (which almost certainly
1486 1486 increases in proportion with the size of the part).
1487 1487 """
1488 1488
1489 1489 def __init__(self, ui, header, fp):
1490 1490 # (payload, file) offsets for chunk starts.
1491 1491 self._chunkindex = []
1492 1492
1493 1493 super(seekableunbundlepart, self).__init__(ui, header, fp)
1494 1494
1495 1495 def _payloadchunks(self, chunknum=0):
1496 1496 '''seek to specified chunk and start yielding data'''
1497 1497 if len(self._chunkindex) == 0:
1498 1498 assert chunknum == 0, b'Must start with chunk 0'
1499 1499 self._chunkindex.append((0, self._tellfp()))
1500 1500 else:
1501 1501 assert chunknum < len(self._chunkindex), (
1502 1502 b'Unknown chunk %d' % chunknum
1503 1503 )
1504 1504 self._seekfp(self._chunkindex[chunknum][1])
1505 1505
1506 1506 pos = self._chunkindex[chunknum][0]
1507 1507
1508 1508 for chunk in decodepayloadchunks(self.ui, self._fp):
1509 1509 chunknum += 1
1510 1510 pos += len(chunk)
1511 1511 if chunknum == len(self._chunkindex):
1512 1512 self._chunkindex.append((pos, self._tellfp()))
1513 1513
1514 1514 yield chunk
1515 1515
1516 1516 def _findchunk(self, pos):
1517 1517 '''for a given payload position, return a chunk number and offset'''
1518 1518 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1519 1519 if ppos == pos:
1520 1520 return chunk, 0
1521 1521 elif ppos > pos:
1522 1522 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1523 1523 raise ValueError(b'Unknown chunk')
1524 1524
1525 1525 def tell(self):
1526 1526 return self._pos
1527 1527
1528 1528 def seek(self, offset, whence=os.SEEK_SET):
1529 1529 if whence == os.SEEK_SET:
1530 1530 newpos = offset
1531 1531 elif whence == os.SEEK_CUR:
1532 1532 newpos = self._pos + offset
1533 1533 elif whence == os.SEEK_END:
1534 1534 if not self.consumed:
1535 1535 # Can't use self.consume() here because it advances self._pos.
1536 1536 chunk = self.read(32768)
1537 1537 while chunk:
1538 1538 chunk = self.read(32768)
1539 1539 newpos = self._chunkindex[-1][0] - offset
1540 1540 else:
1541 1541 raise ValueError(b'Unknown whence value: %r' % (whence,))
1542 1542
1543 1543 if newpos > self._chunkindex[-1][0] and not self.consumed:
1544 1544 # Can't use self.consume() here because it advances self._pos.
1545 1545 chunk = self.read(32768)
1546 1546 while chunk:
1547 1547 chunk = self.read(32668)
1548 1548
1549 1549 if not 0 <= newpos <= self._chunkindex[-1][0]:
1550 1550 raise ValueError(b'Offset out of range')
1551 1551
1552 1552 if self._pos != newpos:
1553 1553 chunk, internaloffset = self._findchunk(newpos)
1554 1554 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1555 1555 adjust = self.read(internaloffset)
1556 1556 if len(adjust) != internaloffset:
1557 1557 raise error.Abort(_(b'Seek failed\n'))
1558 1558 self._pos = newpos
1559 1559
1560 1560 def _seekfp(self, offset, whence=0):
1561 1561 """move the underlying file pointer
1562 1562
1563 1563 This method is meant for internal usage by the bundle2 protocol only.
1564 1564 They directly manipulate the low level stream including bundle2 level
1565 1565 instruction.
1566 1566
1567 1567 Do not use it to implement higher-level logic or methods."""
1568 1568 if self._seekable:
1569 1569 return self._fp.seek(offset, whence)
1570 1570 else:
1571 1571 raise NotImplementedError(_(b'File pointer is not seekable'))
1572 1572
1573 1573 def _tellfp(self):
1574 1574 """return the file offset, or None if file is not seekable
1575 1575
1576 1576 This method is meant for internal usage by the bundle2 protocol only.
1577 1577 They directly manipulate the low level stream including bundle2 level
1578 1578 instruction.
1579 1579
1580 1580 Do not use it to implement higher-level logic or methods."""
1581 1581 if self._seekable:
1582 1582 try:
1583 1583 return self._fp.tell()
1584 1584 except IOError as e:
1585 1585 if e.errno == errno.ESPIPE:
1586 1586 self._seekable = False
1587 1587 else:
1588 1588 raise
1589 1589 return None
1590 1590
1591 1591
1592 1592 # These are only the static capabilities.
1593 1593 # Check the 'getrepocaps' function for the rest.
1594 1594 capabilities = {
1595 1595 b'HG20': (),
1596 1596 b'bookmarks': (),
1597 1597 b'error': (b'abort', b'unsupportedcontent', b'pushraced', b'pushkey'),
1598 1598 b'listkeys': (),
1599 1599 b'pushkey': (),
1600 1600 b'digests': tuple(sorted(util.DIGESTS.keys())),
1601 1601 b'remote-changegroup': (b'http', b'https'),
1602 1602 b'hgtagsfnodes': (),
1603 1603 b'phases': (b'heads',),
1604 1604 b'stream': (b'v2',),
1605 1605 }
1606 1606
1607 1607
1608 1608 def getrepocaps(repo, allowpushback=False, role=None):
1609 1609 """return the bundle2 capabilities for a given repo
1610 1610
1611 1611 Exists to allow extensions (like evolution) to mutate the capabilities.
1612 1612
1613 1613 The returned value is used for servers advertising their capabilities as
1614 1614 well as clients advertising their capabilities to servers as part of
1615 1615 bundle2 requests. The ``role`` argument specifies which is which.
1616 1616 """
1617 1617 if role not in (b'client', b'server'):
1618 1618 raise error.ProgrammingError(b'role argument must be client or server')
1619 1619
1620 1620 caps = capabilities.copy()
1621 1621 caps[b'changegroup'] = tuple(
1622 1622 sorted(changegroup.supportedincomingversions(repo))
1623 1623 )
1624 1624 if obsolete.isenabled(repo, obsolete.exchangeopt):
1625 1625 supportedformat = tuple(b'V%i' % v for v in obsolete.formats)
1626 1626 caps[b'obsmarkers'] = supportedformat
1627 1627 if allowpushback:
1628 1628 caps[b'pushback'] = ()
1629 1629 cpmode = repo.ui.config(b'server', b'concurrent-push-mode')
1630 1630 if cpmode == b'check-related':
1631 1631 caps[b'checkheads'] = (b'related',)
1632 1632 if b'phases' in repo.ui.configlist(b'devel', b'legacy.exchange'):
1633 1633 caps.pop(b'phases')
1634 1634
1635 1635 # Don't advertise stream clone support in server mode if not configured.
1636 1636 if role == b'server':
1637 1637 streamsupported = repo.ui.configbool(
1638 1638 b'server', b'uncompressed', untrusted=True
1639 1639 )
1640 1640 featuresupported = repo.ui.configbool(b'server', b'bundle2.stream')
1641 1641
1642 1642 if not streamsupported or not featuresupported:
1643 1643 caps.pop(b'stream')
1644 1644 # Else always advertise support on client, because payload support
1645 1645 # should always be advertised.
1646 1646
1647 1647 # b'rev-branch-cache is no longer advertised, but still supported
1648 1648 # for legacy clients.
1649 1649
1650 1650 return caps
1651 1651
1652 1652
1653 1653 def bundle2caps(remote):
1654 1654 """return the bundle capabilities of a peer as dict"""
1655 1655 raw = remote.capable(b'bundle2')
1656 1656 if not raw and raw != b'':
1657 1657 return {}
1658 1658 capsblob = urlreq.unquote(remote.capable(b'bundle2'))
1659 1659 return decodecaps(capsblob)
1660 1660
1661 1661
1662 1662 def obsmarkersversion(caps):
1663 1663 """extract the list of supported obsmarkers versions from a bundle2caps dict"""
1664 1664 obscaps = caps.get(b'obsmarkers', ())
1665 1665 return [int(c[1:]) for c in obscaps if c.startswith(b'V')]
1666 1666
1667 1667
1668 1668 def writenewbundle(
1669 1669 ui,
1670 1670 repo,
1671 1671 source,
1672 1672 filename,
1673 1673 bundletype,
1674 1674 outgoing,
1675 1675 opts,
1676 1676 vfs=None,
1677 1677 compression=None,
1678 1678 compopts=None,
1679 1679 ):
1680 1680 if bundletype.startswith(b'HG10'):
1681 1681 cg = changegroup.makechangegroup(repo, outgoing, b'01', source)
1682 1682 return writebundle(
1683 1683 ui,
1684 1684 cg,
1685 1685 filename,
1686 1686 bundletype,
1687 1687 vfs=vfs,
1688 1688 compression=compression,
1689 1689 compopts=compopts,
1690 1690 )
1691 1691 elif not bundletype.startswith(b'HG20'):
1692 1692 raise error.ProgrammingError(b'unknown bundle type: %s' % bundletype)
1693 1693
1694 1694 caps = {}
1695 1695 if opts.get(b'obsolescence', False):
1696 1696 caps[b'obsmarkers'] = (b'V1',)
1697 1697 bundle = bundle20(ui, caps)
1698 1698 bundle.setcompression(compression, compopts)
1699 1699 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1700 1700 chunkiter = bundle.getchunks()
1701 1701
1702 1702 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1703 1703
1704 1704
1705 1705 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1706 1706 # We should eventually reconcile this logic with the one behind
1707 1707 # 'exchange.getbundle2partsgenerator'.
1708 1708 #
1709 1709 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1710 1710 # different right now. So we keep them separated for now for the sake of
1711 1711 # simplicity.
1712 1712
1713 1713 # we might not always want a changegroup in such bundle, for example in
1714 1714 # stream bundles
1715 1715 if opts.get(b'changegroup', True):
1716 1716 cgversion = opts.get(b'cg.version')
1717 1717 if cgversion is None:
1718 1718 cgversion = changegroup.safeversion(repo)
1719 1719 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1720 1720 part = bundler.newpart(b'changegroup', data=cg.getchunks())
1721 1721 part.addparam(b'version', cg.version)
1722 1722 if b'clcount' in cg.extras:
1723 1723 part.addparam(
1724 1724 b'nbchanges', b'%d' % cg.extras[b'clcount'], mandatory=False
1725 1725 )
1726 1726 if opts.get(b'phases') and repo.revs(
1727 1727 b'%ln and secret()', outgoing.ancestorsof
1728 1728 ):
1729 1729 part.addparam(
1730 1730 b'targetphase', b'%d' % phases.secret, mandatory=False
1731 1731 )
1732 1732 if repository.REPO_FEATURE_SIDE_DATA in repo.features:
1733 1733 part.addparam(b'exp-sidedata', b'1')
1734 1734
1735 1735 if opts.get(b'streamv2', False):
1736 1736 addpartbundlestream2(bundler, repo, stream=True)
1737 1737
1738 1738 if opts.get(b'tagsfnodescache', True):
1739 1739 addparttagsfnodescache(repo, bundler, outgoing)
1740 1740
1741 1741 if opts.get(b'revbranchcache', True):
1742 1742 addpartrevbranchcache(repo, bundler, outgoing)
1743 1743
1744 1744 if opts.get(b'obsolescence', False):
1745 1745 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1746 1746 buildobsmarkerspart(
1747 1747 bundler,
1748 1748 obsmarkers,
1749 1749 mandatory=opts.get(b'obsolescence-mandatory', True),
1750 1750 )
1751 1751
1752 1752 if opts.get(b'phases', False):
1753 1753 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1754 1754 phasedata = phases.binaryencode(headsbyphase)
1755 1755 bundler.newpart(b'phase-heads', data=phasedata)
1756 1756
1757 1757
1758 1758 def addparttagsfnodescache(repo, bundler, outgoing):
1759 1759 # we include the tags fnode cache for the bundle changeset
1760 1760 # (as an optional parts)
1761 1761 cache = tags.hgtagsfnodescache(repo.unfiltered())
1762 1762 chunks = []
1763 1763
1764 1764 # .hgtags fnodes are only relevant for head changesets. While we could
1765 1765 # transfer values for all known nodes, there will likely be little to
1766 1766 # no benefit.
1767 1767 #
1768 1768 # We don't bother using a generator to produce output data because
1769 1769 # a) we only have 40 bytes per head and even esoteric numbers of heads
1770 1770 # consume little memory (1M heads is 40MB) b) we don't want to send the
1771 1771 # part if we don't have entries and knowing if we have entries requires
1772 1772 # cache lookups.
1773 1773 for node in outgoing.ancestorsof:
1774 1774 # Don't compute missing, as this may slow down serving.
1775 1775 fnode = cache.getfnode(node, computemissing=False)
1776 1776 if fnode:
1777 1777 chunks.extend([node, fnode])
1778 1778
1779 1779 if chunks:
1780 1780 bundler.newpart(b'hgtagsfnodes', data=b''.join(chunks))
1781 1781
1782 1782
1783 1783 def addpartrevbranchcache(repo, bundler, outgoing):
1784 1784 # we include the rev branch cache for the bundle changeset
1785 1785 # (as an optional parts)
1786 1786 cache = repo.revbranchcache()
1787 1787 cl = repo.unfiltered().changelog
1788 1788 branchesdata = collections.defaultdict(lambda: (set(), set()))
1789 1789 for node in outgoing.missing:
1790 1790 branch, close = cache.branchinfo(cl.rev(node))
1791 1791 branchesdata[branch][close].add(node)
1792 1792
1793 1793 def generate():
1794 1794 for branch, (nodes, closed) in sorted(branchesdata.items()):
1795 1795 utf8branch = encoding.fromlocal(branch)
1796 1796 yield rbcstruct.pack(len(utf8branch), len(nodes), len(closed))
1797 1797 yield utf8branch
1798 1798 for n in sorted(nodes):
1799 1799 yield n
1800 1800 for n in sorted(closed):
1801 1801 yield n
1802 1802
1803 1803 bundler.newpart(b'cache:rev-branch-cache', data=generate(), mandatory=False)
1804 1804
1805 1805
1806 1806 def _formatrequirementsspec(requirements):
1807 1807 requirements = [req for req in requirements if req != b"shared"]
1808 1808 return urlreq.quote(b','.join(sorted(requirements)))
1809 1809
1810 1810
1811 1811 def _formatrequirementsparams(requirements):
1812 1812 requirements = _formatrequirementsspec(requirements)
1813 1813 params = b"%s%s" % (urlreq.quote(b"requirements="), requirements)
1814 1814 return params
1815 1815
1816 1816
1817 1817 def format_remote_wanted_sidedata(repo):
1818 1818 """Formats a repo's wanted sidedata categories into a bytestring for
1819 1819 capabilities exchange."""
1820 1820 wanted = b""
1821 1821 if repo._wanted_sidedata:
1822 1822 wanted = b','.join(
1823 1823 pycompat.bytestr(c) for c in sorted(repo._wanted_sidedata)
1824 1824 )
1825 1825 return wanted
1826 1826
1827 1827
1828 1828 def read_remote_wanted_sidedata(remote):
1829 1829 sidedata_categories = remote.capable(b'exp-wanted-sidedata')
1830 1830 return read_wanted_sidedata(sidedata_categories)
1831 1831
1832 1832
1833 1833 def read_wanted_sidedata(formatted):
1834 1834 if formatted:
1835 1835 return set(formatted.split(b','))
1836 1836 return set()
1837 1837
1838 1838
1839 1839 def addpartbundlestream2(bundler, repo, **kwargs):
1840 1840 if not kwargs.get('stream', False):
1841 1841 return
1842 1842
1843 1843 if not streamclone.allowservergeneration(repo):
1844 1844 raise error.Abort(
1845 1845 _(
1846 1846 b'stream data requested but server does not allow '
1847 1847 b'this feature'
1848 1848 ),
1849 1849 hint=_(
1850 1850 b'well-behaved clients should not be '
1851 1851 b'requesting stream data from servers not '
1852 1852 b'advertising it; the client may be buggy'
1853 1853 ),
1854 1854 )
1855 1855
1856 1856 # Stream clones don't compress well. And compression undermines a
1857 1857 # goal of stream clones, which is to be fast. Communicate the desire
1858 1858 # to avoid compression to consumers of the bundle.
1859 1859 bundler.prefercompressed = False
1860 1860
1861 1861 # get the includes and excludes
1862 1862 includepats = kwargs.get('includepats')
1863 1863 excludepats = kwargs.get('excludepats')
1864 1864
1865 1865 narrowstream = repo.ui.configbool(
1866 1866 b'experimental', b'server.stream-narrow-clones'
1867 1867 )
1868 1868
1869 1869 if (includepats or excludepats) and not narrowstream:
1870 1870 raise error.Abort(_(b'server does not support narrow stream clones'))
1871 1871
1872 1872 includeobsmarkers = False
1873 1873 if repo.obsstore:
1874 1874 remoteversions = obsmarkersversion(bundler.capabilities)
1875 1875 if not remoteversions:
1876 1876 raise error.Abort(
1877 1877 _(
1878 1878 b'server has obsolescence markers, but client '
1879 1879 b'cannot receive them via stream clone'
1880 1880 )
1881 1881 )
1882 1882 elif repo.obsstore._version in remoteversions:
1883 1883 includeobsmarkers = True
1884 1884
1885 1885 filecount, bytecount, it = streamclone.generatev2(
1886 1886 repo, includepats, excludepats, includeobsmarkers
1887 1887 )
1888 1888 requirements = streamclone.streamed_requirements(repo)
1889 1889 requirements = _formatrequirementsspec(requirements)
1890 1890 part = bundler.newpart(b'stream2', data=it)
1891 1891 part.addparam(b'bytecount', b'%d' % bytecount, mandatory=True)
1892 1892 part.addparam(b'filecount', b'%d' % filecount, mandatory=True)
1893 1893 part.addparam(b'requirements', requirements, mandatory=True)
1894 1894
1895 1895
1896 1896 def buildobsmarkerspart(bundler, markers, mandatory=True):
1897 1897 """add an obsmarker part to the bundler with <markers>
1898 1898
1899 1899 No part is created if markers is empty.
1900 1900 Raises ValueError if the bundler doesn't support any known obsmarker format.
1901 1901 """
1902 1902 if not markers:
1903 1903 return None
1904 1904
1905 1905 remoteversions = obsmarkersversion(bundler.capabilities)
1906 1906 version = obsolete.commonversion(remoteversions)
1907 1907 if version is None:
1908 1908 raise ValueError(b'bundler does not support common obsmarker format')
1909 1909 stream = obsolete.encodemarkers(markers, True, version=version)
1910 1910 return bundler.newpart(b'obsmarkers', data=stream, mandatory=mandatory)
1911 1911
1912 1912
1913 1913 def writebundle(
1914 1914 ui, cg, filename, bundletype, vfs=None, compression=None, compopts=None
1915 1915 ):
1916 1916 """Write a bundle file and return its filename.
1917 1917
1918 1918 Existing files will not be overwritten.
1919 1919 If no filename is specified, a temporary file is created.
1920 1920 bz2 compression can be turned off.
1921 1921 The bundle file will be deleted in case of errors.
1922 1922 """
1923 1923
1924 1924 if bundletype == b"HG20":
1925 1925 bundle = bundle20(ui)
1926 1926 bundle.setcompression(compression, compopts)
1927 1927 part = bundle.newpart(b'changegroup', data=cg.getchunks())
1928 1928 part.addparam(b'version', cg.version)
1929 1929 if b'clcount' in cg.extras:
1930 1930 part.addparam(
1931 1931 b'nbchanges', b'%d' % cg.extras[b'clcount'], mandatory=False
1932 1932 )
1933 1933 chunkiter = bundle.getchunks()
1934 1934 else:
1935 1935 # compression argument is only for the bundle2 case
1936 1936 assert compression is None
1937 1937 if cg.version != b'01':
1938 1938 raise error.Abort(
1939 1939 _(b'old bundle types only supports v1 changegroups')
1940 1940 )
1941
1942 # HG20 is the case without 2 values to unpack, but is handled above.
1943 # pytype: disable=bad-unpacking
1941 1944 header, comp = bundletypes[bundletype]
1945 # pytype: enable=bad-unpacking
1946
1942 1947 if comp not in util.compengines.supportedbundletypes:
1943 1948 raise error.Abort(_(b'unknown stream compression type: %s') % comp)
1944 1949 compengine = util.compengines.forbundletype(comp)
1945 1950
1946 1951 def chunkiter():
1947 1952 yield header
1948 1953 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1949 1954 yield chunk
1950 1955
1951 1956 chunkiter = chunkiter()
1952 1957
1953 1958 # parse the changegroup data, otherwise we will block
1954 1959 # in case of sshrepo because we don't know the end of the stream
1955 1960 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1956 1961
1957 1962
1958 1963 def combinechangegroupresults(op):
1959 1964 """logic to combine 0 or more addchangegroup results into one"""
1960 1965 results = [r.get(b'return', 0) for r in op.records[b'changegroup']]
1961 1966 changedheads = 0
1962 1967 result = 1
1963 1968 for ret in results:
1964 1969 # If any changegroup result is 0, return 0
1965 1970 if ret == 0:
1966 1971 result = 0
1967 1972 break
1968 1973 if ret < -1:
1969 1974 changedheads += ret + 1
1970 1975 elif ret > 1:
1971 1976 changedheads += ret - 1
1972 1977 if changedheads > 0:
1973 1978 result = 1 + changedheads
1974 1979 elif changedheads < 0:
1975 1980 result = -1 + changedheads
1976 1981 return result
1977 1982
1978 1983
1979 1984 @parthandler(
1980 1985 b'changegroup',
1981 1986 (
1982 1987 b'version',
1983 1988 b'nbchanges',
1984 1989 b'exp-sidedata',
1985 1990 b'exp-wanted-sidedata',
1986 1991 b'treemanifest',
1987 1992 b'targetphase',
1988 1993 ),
1989 1994 )
1990 1995 def handlechangegroup(op, inpart):
1991 1996 """apply a changegroup part on the repo"""
1992 1997 from . import localrepo
1993 1998
1994 1999 tr = op.gettransaction()
1995 2000 unpackerversion = inpart.params.get(b'version', b'01')
1996 2001 # We should raise an appropriate exception here
1997 2002 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1998 2003 # the source and url passed here are overwritten by the one contained in
1999 2004 # the transaction.hookargs argument. So 'bundle2' is a placeholder
2000 2005 nbchangesets = None
2001 2006 if b'nbchanges' in inpart.params:
2002 2007 nbchangesets = int(inpart.params.get(b'nbchanges'))
2003 2008 if b'treemanifest' in inpart.params and not scmutil.istreemanifest(op.repo):
2004 2009 if len(op.repo.changelog) != 0:
2005 2010 raise error.Abort(
2006 2011 _(
2007 2012 b"bundle contains tree manifests, but local repo is "
2008 2013 b"non-empty and does not use tree manifests"
2009 2014 )
2010 2015 )
2011 2016 op.repo.requirements.add(requirements.TREEMANIFEST_REQUIREMENT)
2012 2017 op.repo.svfs.options = localrepo.resolvestorevfsoptions(
2013 2018 op.repo.ui, op.repo.requirements, op.repo.features
2014 2019 )
2015 2020 scmutil.writereporequirements(op.repo)
2016 2021
2017 2022 extrakwargs = {}
2018 2023 targetphase = inpart.params.get(b'targetphase')
2019 2024 if targetphase is not None:
2020 2025 extrakwargs['targetphase'] = int(targetphase)
2021 2026
2022 2027 remote_sidedata = inpart.params.get(b'exp-wanted-sidedata')
2023 2028 extrakwargs['sidedata_categories'] = read_wanted_sidedata(remote_sidedata)
2024 2029
2025 2030 ret = _processchangegroup(
2026 2031 op,
2027 2032 cg,
2028 2033 tr,
2029 2034 op.source,
2030 2035 b'bundle2',
2031 2036 expectedtotal=nbchangesets,
2032 2037 **extrakwargs
2033 2038 )
2034 2039 if op.reply is not None:
2035 2040 # This is definitely not the final form of this
2036 2041 # return. But one need to start somewhere.
2037 2042 part = op.reply.newpart(b'reply:changegroup', mandatory=False)
2038 2043 part.addparam(
2039 2044 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2040 2045 )
2041 2046 part.addparam(b'return', b'%i' % ret, mandatory=False)
2042 2047 assert not inpart.read()
2043 2048
2044 2049
2045 2050 _remotechangegroupparams = tuple(
2046 2051 [b'url', b'size', b'digests']
2047 2052 + [b'digest:%s' % k for k in util.DIGESTS.keys()]
2048 2053 )
2049 2054
2050 2055
2051 2056 @parthandler(b'remote-changegroup', _remotechangegroupparams)
2052 2057 def handleremotechangegroup(op, inpart):
2053 2058 """apply a bundle10 on the repo, given an url and validation information
2054 2059
2055 2060 All the information about the remote bundle to import are given as
2056 2061 parameters. The parameters include:
2057 2062 - url: the url to the bundle10.
2058 2063 - size: the bundle10 file size. It is used to validate what was
2059 2064 retrieved by the client matches the server knowledge about the bundle.
2060 2065 - digests: a space separated list of the digest types provided as
2061 2066 parameters.
2062 2067 - digest:<digest-type>: the hexadecimal representation of the digest with
2063 2068 that name. Like the size, it is used to validate what was retrieved by
2064 2069 the client matches what the server knows about the bundle.
2065 2070
2066 2071 When multiple digest types are given, all of them are checked.
2067 2072 """
2068 2073 try:
2069 2074 raw_url = inpart.params[b'url']
2070 2075 except KeyError:
2071 2076 raise error.Abort(_(b'remote-changegroup: missing "%s" param') % b'url')
2072 2077 parsed_url = urlutil.url(raw_url)
2073 2078 if parsed_url.scheme not in capabilities[b'remote-changegroup']:
2074 2079 raise error.Abort(
2075 2080 _(b'remote-changegroup does not support %s urls')
2076 2081 % parsed_url.scheme
2077 2082 )
2078 2083
2079 2084 try:
2080 2085 size = int(inpart.params[b'size'])
2081 2086 except ValueError:
2082 2087 raise error.Abort(
2083 2088 _(b'remote-changegroup: invalid value for param "%s"') % b'size'
2084 2089 )
2085 2090 except KeyError:
2086 2091 raise error.Abort(
2087 2092 _(b'remote-changegroup: missing "%s" param') % b'size'
2088 2093 )
2089 2094
2090 2095 digests = {}
2091 2096 for typ in inpart.params.get(b'digests', b'').split():
2092 2097 param = b'digest:%s' % typ
2093 2098 try:
2094 2099 value = inpart.params[param]
2095 2100 except KeyError:
2096 2101 raise error.Abort(
2097 2102 _(b'remote-changegroup: missing "%s" param') % param
2098 2103 )
2099 2104 digests[typ] = value
2100 2105
2101 2106 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
2102 2107
2103 2108 tr = op.gettransaction()
2104 2109 from . import exchange
2105 2110
2106 2111 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
2107 2112 if not isinstance(cg, changegroup.cg1unpacker):
2108 2113 raise error.Abort(
2109 2114 _(b'%s: not a bundle version 1.0') % urlutil.hidepassword(raw_url)
2110 2115 )
2111 2116 ret = _processchangegroup(op, cg, tr, op.source, b'bundle2')
2112 2117 if op.reply is not None:
2113 2118 # This is definitely not the final form of this
2114 2119 # return. But one need to start somewhere.
2115 2120 part = op.reply.newpart(b'reply:changegroup')
2116 2121 part.addparam(
2117 2122 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2118 2123 )
2119 2124 part.addparam(b'return', b'%i' % ret, mandatory=False)
2120 2125 try:
2121 2126 real_part.validate()
2122 2127 except error.Abort as e:
2123 2128 raise error.Abort(
2124 2129 _(b'bundle at %s is corrupted:\n%s')
2125 2130 % (urlutil.hidepassword(raw_url), e.message)
2126 2131 )
2127 2132 assert not inpart.read()
2128 2133
2129 2134
2130 2135 @parthandler(b'reply:changegroup', (b'return', b'in-reply-to'))
2131 2136 def handlereplychangegroup(op, inpart):
2132 2137 ret = int(inpart.params[b'return'])
2133 2138 replyto = int(inpart.params[b'in-reply-to'])
2134 2139 op.records.add(b'changegroup', {b'return': ret}, replyto)
2135 2140
2136 2141
2137 2142 @parthandler(b'check:bookmarks')
2138 2143 def handlecheckbookmarks(op, inpart):
2139 2144 """check location of bookmarks
2140 2145
2141 2146 This part is to be used to detect push race regarding bookmark, it
2142 2147 contains binary encoded (bookmark, node) tuple. If the local state does
2143 2148 not marks the one in the part, a PushRaced exception is raised
2144 2149 """
2145 2150 bookdata = bookmarks.binarydecode(op.repo, inpart)
2146 2151
2147 2152 msgstandard = (
2148 2153 b'remote repository changed while pushing - please try again '
2149 2154 b'(bookmark "%s" move from %s to %s)'
2150 2155 )
2151 2156 msgmissing = (
2152 2157 b'remote repository changed while pushing - please try again '
2153 2158 b'(bookmark "%s" is missing, expected %s)'
2154 2159 )
2155 2160 msgexist = (
2156 2161 b'remote repository changed while pushing - please try again '
2157 2162 b'(bookmark "%s" set on %s, expected missing)'
2158 2163 )
2159 2164 for book, node in bookdata:
2160 2165 currentnode = op.repo._bookmarks.get(book)
2161 2166 if currentnode != node:
2162 2167 if node is None:
2163 2168 finalmsg = msgexist % (book, short(currentnode))
2164 2169 elif currentnode is None:
2165 2170 finalmsg = msgmissing % (book, short(node))
2166 2171 else:
2167 2172 finalmsg = msgstandard % (
2168 2173 book,
2169 2174 short(node),
2170 2175 short(currentnode),
2171 2176 )
2172 2177 raise error.PushRaced(finalmsg)
2173 2178
2174 2179
2175 2180 @parthandler(b'check:heads')
2176 2181 def handlecheckheads(op, inpart):
2177 2182 """check that head of the repo did not change
2178 2183
2179 2184 This is used to detect a push race when using unbundle.
2180 2185 This replaces the "heads" argument of unbundle."""
2181 2186 h = inpart.read(20)
2182 2187 heads = []
2183 2188 while len(h) == 20:
2184 2189 heads.append(h)
2185 2190 h = inpart.read(20)
2186 2191 assert not h
2187 2192 # Trigger a transaction so that we are guaranteed to have the lock now.
2188 2193 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2189 2194 op.gettransaction()
2190 2195 if sorted(heads) != sorted(op.repo.heads()):
2191 2196 raise error.PushRaced(
2192 2197 b'remote repository changed while pushing - please try again'
2193 2198 )
2194 2199
2195 2200
2196 2201 @parthandler(b'check:updated-heads')
2197 2202 def handlecheckupdatedheads(op, inpart):
2198 2203 """check for race on the heads touched by a push
2199 2204
2200 2205 This is similar to 'check:heads' but focus on the heads actually updated
2201 2206 during the push. If other activities happen on unrelated heads, it is
2202 2207 ignored.
2203 2208
2204 2209 This allow server with high traffic to avoid push contention as long as
2205 2210 unrelated parts of the graph are involved."""
2206 2211 h = inpart.read(20)
2207 2212 heads = []
2208 2213 while len(h) == 20:
2209 2214 heads.append(h)
2210 2215 h = inpart.read(20)
2211 2216 assert not h
2212 2217 # trigger a transaction so that we are guaranteed to have the lock now.
2213 2218 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2214 2219 op.gettransaction()
2215 2220
2216 2221 currentheads = set()
2217 2222 for ls in op.repo.branchmap().iterheads():
2218 2223 currentheads.update(ls)
2219 2224
2220 2225 for h in heads:
2221 2226 if h not in currentheads:
2222 2227 raise error.PushRaced(
2223 2228 b'remote repository changed while pushing - '
2224 2229 b'please try again'
2225 2230 )
2226 2231
2227 2232
2228 2233 @parthandler(b'check:phases')
2229 2234 def handlecheckphases(op, inpart):
2230 2235 """check that phase boundaries of the repository did not change
2231 2236
2232 2237 This is used to detect a push race.
2233 2238 """
2234 2239 phasetonodes = phases.binarydecode(inpart)
2235 2240 unfi = op.repo.unfiltered()
2236 2241 cl = unfi.changelog
2237 2242 phasecache = unfi._phasecache
2238 2243 msg = (
2239 2244 b'remote repository changed while pushing - please try again '
2240 2245 b'(%s is %s expected %s)'
2241 2246 )
2242 2247 for expectedphase, nodes in phasetonodes.items():
2243 2248 for n in nodes:
2244 2249 actualphase = phasecache.phase(unfi, cl.rev(n))
2245 2250 if actualphase != expectedphase:
2246 2251 finalmsg = msg % (
2247 2252 short(n),
2248 2253 phases.phasenames[actualphase],
2249 2254 phases.phasenames[expectedphase],
2250 2255 )
2251 2256 raise error.PushRaced(finalmsg)
2252 2257
2253 2258
2254 2259 @parthandler(b'output')
2255 2260 def handleoutput(op, inpart):
2256 2261 """forward output captured on the server to the client"""
2257 2262 for line in inpart.read().splitlines():
2258 2263 op.ui.status(_(b'remote: %s\n') % line)
2259 2264
2260 2265
2261 2266 @parthandler(b'replycaps')
2262 2267 def handlereplycaps(op, inpart):
2263 2268 """Notify that a reply bundle should be created
2264 2269
2265 2270 The payload contains the capabilities information for the reply"""
2266 2271 caps = decodecaps(inpart.read())
2267 2272 if op.reply is None:
2268 2273 op.reply = bundle20(op.ui, caps)
2269 2274
2270 2275
2271 2276 class AbortFromPart(error.Abort):
2272 2277 """Sub-class of Abort that denotes an error from a bundle2 part."""
2273 2278
2274 2279
2275 2280 @parthandler(b'error:abort', (b'message', b'hint'))
2276 2281 def handleerrorabort(op, inpart):
2277 2282 """Used to transmit abort error over the wire"""
2278 2283 raise AbortFromPart(
2279 2284 inpart.params[b'message'], hint=inpart.params.get(b'hint')
2280 2285 )
2281 2286
2282 2287
2283 2288 @parthandler(
2284 2289 b'error:pushkey',
2285 2290 (b'namespace', b'key', b'new', b'old', b'ret', b'in-reply-to'),
2286 2291 )
2287 2292 def handleerrorpushkey(op, inpart):
2288 2293 """Used to transmit failure of a mandatory pushkey over the wire"""
2289 2294 kwargs = {}
2290 2295 for name in (b'namespace', b'key', b'new', b'old', b'ret'):
2291 2296 value = inpart.params.get(name)
2292 2297 if value is not None:
2293 2298 kwargs[name] = value
2294 2299 raise error.PushkeyFailed(
2295 2300 inpart.params[b'in-reply-to'], **pycompat.strkwargs(kwargs)
2296 2301 )
2297 2302
2298 2303
2299 2304 @parthandler(b'error:unsupportedcontent', (b'parttype', b'params'))
2300 2305 def handleerrorunsupportedcontent(op, inpart):
2301 2306 """Used to transmit unknown content error over the wire"""
2302 2307 kwargs = {}
2303 2308 parttype = inpart.params.get(b'parttype')
2304 2309 if parttype is not None:
2305 2310 kwargs[b'parttype'] = parttype
2306 2311 params = inpart.params.get(b'params')
2307 2312 if params is not None:
2308 2313 kwargs[b'params'] = params.split(b'\0')
2309 2314
2310 2315 raise error.BundleUnknownFeatureError(**pycompat.strkwargs(kwargs))
2311 2316
2312 2317
2313 2318 @parthandler(b'error:pushraced', (b'message',))
2314 2319 def handleerrorpushraced(op, inpart):
2315 2320 """Used to transmit push race error over the wire"""
2316 2321 raise error.ResponseError(_(b'push failed:'), inpart.params[b'message'])
2317 2322
2318 2323
2319 2324 @parthandler(b'listkeys', (b'namespace',))
2320 2325 def handlelistkeys(op, inpart):
2321 2326 """retrieve pushkey namespace content stored in a bundle2"""
2322 2327 namespace = inpart.params[b'namespace']
2323 2328 r = pushkey.decodekeys(inpart.read())
2324 2329 op.records.add(b'listkeys', (namespace, r))
2325 2330
2326 2331
2327 2332 @parthandler(b'pushkey', (b'namespace', b'key', b'old', b'new'))
2328 2333 def handlepushkey(op, inpart):
2329 2334 """process a pushkey request"""
2330 2335 dec = pushkey.decode
2331 2336 namespace = dec(inpart.params[b'namespace'])
2332 2337 key = dec(inpart.params[b'key'])
2333 2338 old = dec(inpart.params[b'old'])
2334 2339 new = dec(inpart.params[b'new'])
2335 2340 # Grab the transaction to ensure that we have the lock before performing the
2336 2341 # pushkey.
2337 2342 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2338 2343 op.gettransaction()
2339 2344 ret = op.repo.pushkey(namespace, key, old, new)
2340 2345 record = {b'namespace': namespace, b'key': key, b'old': old, b'new': new}
2341 2346 op.records.add(b'pushkey', record)
2342 2347 if op.reply is not None:
2343 2348 rpart = op.reply.newpart(b'reply:pushkey')
2344 2349 rpart.addparam(
2345 2350 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2346 2351 )
2347 2352 rpart.addparam(b'return', b'%i' % ret, mandatory=False)
2348 2353 if inpart.mandatory and not ret:
2349 2354 kwargs = {}
2350 2355 for key in (b'namespace', b'key', b'new', b'old', b'ret'):
2351 2356 if key in inpart.params:
2352 2357 kwargs[key] = inpart.params[key]
2353 2358 raise error.PushkeyFailed(
2354 2359 partid=b'%d' % inpart.id, **pycompat.strkwargs(kwargs)
2355 2360 )
2356 2361
2357 2362
2358 2363 @parthandler(b'bookmarks')
2359 2364 def handlebookmark(op, inpart):
2360 2365 """transmit bookmark information
2361 2366
2362 2367 The part contains binary encoded bookmark information.
2363 2368
2364 2369 The exact behavior of this part can be controlled by the 'bookmarks' mode
2365 2370 on the bundle operation.
2366 2371
2367 2372 When mode is 'apply' (the default) the bookmark information is applied as
2368 2373 is to the unbundling repository. Make sure a 'check:bookmarks' part is
2369 2374 issued earlier to check for push races in such update. This behavior is
2370 2375 suitable for pushing.
2371 2376
2372 2377 When mode is 'records', the information is recorded into the 'bookmarks'
2373 2378 records of the bundle operation. This behavior is suitable for pulling.
2374 2379 """
2375 2380 changes = bookmarks.binarydecode(op.repo, inpart)
2376 2381
2377 2382 pushkeycompat = op.repo.ui.configbool(
2378 2383 b'server', b'bookmarks-pushkey-compat'
2379 2384 )
2380 2385 bookmarksmode = op.modes.get(b'bookmarks', b'apply')
2381 2386
2382 2387 if bookmarksmode == b'apply':
2383 2388 tr = op.gettransaction()
2384 2389 bookstore = op.repo._bookmarks
2385 2390 if pushkeycompat:
2386 2391 allhooks = []
2387 2392 for book, node in changes:
2388 2393 hookargs = tr.hookargs.copy()
2389 2394 hookargs[b'pushkeycompat'] = b'1'
2390 2395 hookargs[b'namespace'] = b'bookmarks'
2391 2396 hookargs[b'key'] = book
2392 2397 hookargs[b'old'] = hex(bookstore.get(book, b''))
2393 2398 hookargs[b'new'] = hex(node if node is not None else b'')
2394 2399 allhooks.append(hookargs)
2395 2400
2396 2401 for hookargs in allhooks:
2397 2402 op.repo.hook(
2398 2403 b'prepushkey', throw=True, **pycompat.strkwargs(hookargs)
2399 2404 )
2400 2405
2401 2406 for book, node in changes:
2402 2407 if bookmarks.isdivergent(book):
2403 2408 msg = _(b'cannot accept divergent bookmark %s!') % book
2404 2409 raise error.Abort(msg)
2405 2410
2406 2411 bookstore.applychanges(op.repo, op.gettransaction(), changes)
2407 2412
2408 2413 if pushkeycompat:
2409 2414
2410 2415 def runhook(unused_success):
2411 2416 for hookargs in allhooks:
2412 2417 op.repo.hook(b'pushkey', **pycompat.strkwargs(hookargs))
2413 2418
2414 2419 op.repo._afterlock(runhook)
2415 2420
2416 2421 elif bookmarksmode == b'records':
2417 2422 for book, node in changes:
2418 2423 record = {b'bookmark': book, b'node': node}
2419 2424 op.records.add(b'bookmarks', record)
2420 2425 else:
2421 2426 raise error.ProgrammingError(
2422 2427 b'unknown bookmark mode: %s' % bookmarksmode
2423 2428 )
2424 2429
2425 2430
2426 2431 @parthandler(b'phase-heads')
2427 2432 def handlephases(op, inpart):
2428 2433 """apply phases from bundle part to repo"""
2429 2434 headsbyphase = phases.binarydecode(inpart)
2430 2435 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
2431 2436
2432 2437
2433 2438 @parthandler(b'reply:pushkey', (b'return', b'in-reply-to'))
2434 2439 def handlepushkeyreply(op, inpart):
2435 2440 """retrieve the result of a pushkey request"""
2436 2441 ret = int(inpart.params[b'return'])
2437 2442 partid = int(inpart.params[b'in-reply-to'])
2438 2443 op.records.add(b'pushkey', {b'return': ret}, partid)
2439 2444
2440 2445
2441 2446 @parthandler(b'obsmarkers')
2442 2447 def handleobsmarker(op, inpart):
2443 2448 """add a stream of obsmarkers to the repo"""
2444 2449 tr = op.gettransaction()
2445 2450 markerdata = inpart.read()
2446 2451 if op.ui.config(b'experimental', b'obsmarkers-exchange-debug'):
2447 2452 op.ui.writenoi18n(
2448 2453 b'obsmarker-exchange: %i bytes received\n' % len(markerdata)
2449 2454 )
2450 2455 # The mergemarkers call will crash if marker creation is not enabled.
2451 2456 # we want to avoid this if the part is advisory.
2452 2457 if not inpart.mandatory and op.repo.obsstore.readonly:
2453 2458 op.repo.ui.debug(
2454 2459 b'ignoring obsolescence markers, feature not enabled\n'
2455 2460 )
2456 2461 return
2457 2462 new = op.repo.obsstore.mergemarkers(tr, markerdata)
2458 2463 op.repo.invalidatevolatilesets()
2459 2464 op.records.add(b'obsmarkers', {b'new': new})
2460 2465 if op.reply is not None:
2461 2466 rpart = op.reply.newpart(b'reply:obsmarkers')
2462 2467 rpart.addparam(
2463 2468 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2464 2469 )
2465 2470 rpart.addparam(b'new', b'%i' % new, mandatory=False)
2466 2471
2467 2472
2468 2473 @parthandler(b'reply:obsmarkers', (b'new', b'in-reply-to'))
2469 2474 def handleobsmarkerreply(op, inpart):
2470 2475 """retrieve the result of a pushkey request"""
2471 2476 ret = int(inpart.params[b'new'])
2472 2477 partid = int(inpart.params[b'in-reply-to'])
2473 2478 op.records.add(b'obsmarkers', {b'new': ret}, partid)
2474 2479
2475 2480
2476 2481 @parthandler(b'hgtagsfnodes')
2477 2482 def handlehgtagsfnodes(op, inpart):
2478 2483 """Applies .hgtags fnodes cache entries to the local repo.
2479 2484
2480 2485 Payload is pairs of 20 byte changeset nodes and filenodes.
2481 2486 """
2482 2487 # Grab the transaction so we ensure that we have the lock at this point.
2483 2488 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2484 2489 op.gettransaction()
2485 2490 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
2486 2491
2487 2492 count = 0
2488 2493 while True:
2489 2494 node = inpart.read(20)
2490 2495 fnode = inpart.read(20)
2491 2496 if len(node) < 20 or len(fnode) < 20:
2492 2497 op.ui.debug(b'ignoring incomplete received .hgtags fnodes data\n')
2493 2498 break
2494 2499 cache.setfnode(node, fnode)
2495 2500 count += 1
2496 2501
2497 2502 cache.write()
2498 2503 op.ui.debug(b'applied %i hgtags fnodes cache entries\n' % count)
2499 2504
2500 2505
2501 2506 rbcstruct = struct.Struct(b'>III')
2502 2507
2503 2508
2504 2509 @parthandler(b'cache:rev-branch-cache')
2505 2510 def handlerbc(op, inpart):
2506 2511 """Legacy part, ignored for compatibility with bundles from or
2507 2512 for Mercurial before 5.7. Newer Mercurial computes the cache
2508 2513 efficiently enough during unbundling that the additional transfer
2509 2514 is unnecessary."""
2510 2515
2511 2516
2512 2517 @parthandler(b'pushvars')
2513 2518 def bundle2getvars(op, part):
2514 2519 '''unbundle a bundle2 containing shellvars on the server'''
2515 2520 # An option to disable unbundling on server-side for security reasons
2516 2521 if op.ui.configbool(b'push', b'pushvars.server'):
2517 2522 hookargs = {}
2518 2523 for key, value in part.advisoryparams:
2519 2524 key = key.upper()
2520 2525 # We want pushed variables to have USERVAR_ prepended so we know
2521 2526 # they came from the --pushvar flag.
2522 2527 key = b"USERVAR_" + key
2523 2528 hookargs[key] = value
2524 2529 op.addhookargs(hookargs)
2525 2530
2526 2531
2527 2532 @parthandler(b'stream2', (b'requirements', b'filecount', b'bytecount'))
2528 2533 def handlestreamv2bundle(op, part):
2529 2534
2530 2535 requirements = urlreq.unquote(part.params[b'requirements'])
2531 2536 requirements = requirements.split(b',') if requirements else []
2532 2537 filecount = int(part.params[b'filecount'])
2533 2538 bytecount = int(part.params[b'bytecount'])
2534 2539
2535 2540 repo = op.repo
2536 2541 if len(repo):
2537 2542 msg = _(b'cannot apply stream clone to non empty repository')
2538 2543 raise error.Abort(msg)
2539 2544
2540 2545 repo.ui.debug(b'applying stream bundle\n')
2541 2546 streamclone.applybundlev2(repo, part, filecount, bytecount, requirements)
2542 2547
2543 2548
2544 2549 def widen_bundle(
2545 2550 bundler, repo, oldmatcher, newmatcher, common, known, cgversion, ellipses
2546 2551 ):
2547 2552 """generates bundle2 for widening a narrow clone
2548 2553
2549 2554 bundler is the bundle to which data should be added
2550 2555 repo is the localrepository instance
2551 2556 oldmatcher matches what the client already has
2552 2557 newmatcher matches what the client needs (including what it already has)
2553 2558 common is set of common heads between server and client
2554 2559 known is a set of revs known on the client side (used in ellipses)
2555 2560 cgversion is the changegroup version to send
2556 2561 ellipses is boolean value telling whether to send ellipses data or not
2557 2562
2558 2563 returns bundle2 of the data required for extending
2559 2564 """
2560 2565 commonnodes = set()
2561 2566 cl = repo.changelog
2562 2567 for r in repo.revs(b"::%ln", common):
2563 2568 commonnodes.add(cl.node(r))
2564 2569 if commonnodes:
2565 2570 packer = changegroup.getbundler(
2566 2571 cgversion,
2567 2572 repo,
2568 2573 oldmatcher=oldmatcher,
2569 2574 matcher=newmatcher,
2570 2575 fullnodes=commonnodes,
2571 2576 )
2572 2577 cgdata = packer.generate(
2573 2578 {repo.nullid},
2574 2579 list(commonnodes),
2575 2580 False,
2576 2581 b'narrow_widen',
2577 2582 changelog=False,
2578 2583 )
2579 2584
2580 2585 part = bundler.newpart(b'changegroup', data=cgdata)
2581 2586 part.addparam(b'version', cgversion)
2582 2587 if scmutil.istreemanifest(repo):
2583 2588 part.addparam(b'treemanifest', b'1')
2584 2589 if repository.REPO_FEATURE_SIDE_DATA in repo.features:
2585 2590 part.addparam(b'exp-sidedata', b'1')
2586 2591 wanted = format_remote_wanted_sidedata(repo)
2587 2592 part.addparam(b'exp-wanted-sidedata', wanted)
2588 2593
2589 2594 return bundler
@@ -1,489 +1,493 b''
1 1 # bundlecaches.py - utility to deal with pre-computed bundle for servers
2 2 #
3 3 # This software may be used and distributed according to the terms of the
4 4 # GNU General Public License version 2 or any later version.
5 5
6 6 import collections
7 7
8 from typing import (
9 cast,
10 )
11
8 12 from .i18n import _
9 13
10 14 from .thirdparty import attr
11 15
12 16 from . import (
13 17 error,
14 18 requirements as requirementsmod,
15 19 sslutil,
16 20 util,
17 21 )
18 22 from .utils import stringutil
19 23
20 24 urlreq = util.urlreq
21 25
22 26 CB_MANIFEST_FILE = b'clonebundles.manifest'
23 27
24 28
25 29 @attr.s
26 30 class bundlespec:
27 31 compression = attr.ib()
28 32 wirecompression = attr.ib()
29 33 version = attr.ib()
30 34 wireversion = attr.ib()
31 35 # parameters explicitly overwritten by the config or the specification
32 36 _explicit_params = attr.ib()
33 37 # default parameter for the version
34 38 #
35 39 # Keeping it separated is useful to check what was actually overwritten.
36 40 _default_opts = attr.ib()
37 41
38 42 @property
39 43 def params(self):
40 44 return collections.ChainMap(self._explicit_params, self._default_opts)
41 45
42 46 @property
43 47 def contentopts(self):
44 48 # kept for Backward Compatibility concerns.
45 49 return self.params
46 50
47 51 def set_param(self, key, value, overwrite=True):
48 52 """Set a bundle parameter value.
49 53
50 54 Will only overwrite if overwrite is true"""
51 55 if overwrite or key not in self._explicit_params:
52 56 self._explicit_params[key] = value
53 57
54 58
55 59 # Maps bundle version human names to changegroup versions.
56 60 _bundlespeccgversions = {
57 61 b'v1': b'01',
58 62 b'v2': b'02',
59 63 b'packed1': b's1',
60 64 b'bundle2': b'02', # legacy
61 65 }
62 66
63 67 # Maps bundle version with content opts to choose which part to bundle
64 68 _bundlespeccontentopts = {
65 69 b'v1': {
66 70 b'changegroup': True,
67 71 b'cg.version': b'01',
68 72 b'obsolescence': False,
69 73 b'phases': False,
70 74 b'tagsfnodescache': False,
71 75 b'revbranchcache': False,
72 76 },
73 77 b'v2': {
74 78 b'changegroup': True,
75 79 b'cg.version': b'02',
76 80 b'obsolescence': False,
77 81 b'phases': False,
78 82 b'tagsfnodescache': True,
79 83 b'revbranchcache': True,
80 84 },
81 85 b'streamv2': {
82 86 b'changegroup': False,
83 87 b'cg.version': b'02',
84 88 b'obsolescence': False,
85 89 b'phases': False,
86 90 b"streamv2": True,
87 91 b'tagsfnodescache': False,
88 92 b'revbranchcache': False,
89 93 },
90 94 b'packed1': {
91 95 b'cg.version': b's1',
92 96 },
93 97 b'bundle2': { # legacy
94 98 b'cg.version': b'02',
95 99 },
96 100 }
97 101 _bundlespeccontentopts[b'bundle2'] = _bundlespeccontentopts[b'v2']
98 102
99 103 _bundlespecvariants = {b"streamv2": {}}
100 104
101 105 # Compression engines allowed in version 1. THIS SHOULD NEVER CHANGE.
102 106 _bundlespecv1compengines = {b'gzip', b'bzip2', b'none'}
103 107
104 108
105 109 def param_bool(key, value):
106 110 """make a boolean out of a parameter value"""
107 111 b = stringutil.parsebool(value)
108 112 if b is None:
109 113 msg = _(b"parameter %s should be a boolean ('%s')")
110 114 msg %= (key, value)
111 115 raise error.InvalidBundleSpecification(msg)
112 116 return b
113 117
114 118
115 119 # mapping of known parameter name need their value processed
116 120 bundle_spec_param_processing = {
117 121 b"obsolescence": param_bool,
118 122 b"obsolescence-mandatory": param_bool,
119 123 b"phases": param_bool,
120 124 }
121 125
122 126
123 127 def _parseparams(s):
124 128 """parse bundlespec parameter section
125 129
126 130 input: "comp-version;params" string
127 131
128 132 return: (spec; {param_key: param_value})
129 133 """
130 134 if b';' not in s:
131 135 return s, {}
132 136
133 137 params = {}
134 138 version, paramstr = s.split(b';', 1)
135 139
136 140 err = _(b'invalid bundle specification: missing "=" in parameter: %s')
137 141 for p in paramstr.split(b';'):
138 142 if b'=' not in p:
139 143 msg = err % p
140 144 raise error.InvalidBundleSpecification(msg)
141 145
142 146 key, value = p.split(b'=', 1)
143 147 key = urlreq.unquote(key)
144 148 value = urlreq.unquote(value)
145 149 process = bundle_spec_param_processing.get(key)
146 150 if process is not None:
147 151 value = process(key, value)
148 152 params[key] = value
149 153
150 154 return version, params
151 155
152 156
153 157 def parsebundlespec(repo, spec, strict=True):
154 158 """Parse a bundle string specification into parts.
155 159
156 160 Bundle specifications denote a well-defined bundle/exchange format.
157 161 The content of a given specification should not change over time in
158 162 order to ensure that bundles produced by a newer version of Mercurial are
159 163 readable from an older version.
160 164
161 165 The string currently has the form:
162 166
163 167 <compression>-<type>[;<parameter0>[;<parameter1>]]
164 168
165 169 Where <compression> is one of the supported compression formats
166 170 and <type> is (currently) a version string. A ";" can follow the type and
167 171 all text afterwards is interpreted as URI encoded, ";" delimited key=value
168 172 pairs.
169 173
170 174 If ``strict`` is True (the default) <compression> is required. Otherwise,
171 175 it is optional.
172 176
173 177 Returns a bundlespec object of (compression, version, parameters).
174 178 Compression will be ``None`` if not in strict mode and a compression isn't
175 179 defined.
176 180
177 181 An ``InvalidBundleSpecification`` is raised when the specification is
178 182 not syntactically well formed.
179 183
180 184 An ``UnsupportedBundleSpecification`` is raised when the compression or
181 185 bundle type/version is not recognized.
182 186
183 187 Note: this function will likely eventually return a more complex data
184 188 structure, including bundle2 part information.
185 189 """
186 190 if strict and b'-' not in spec:
187 191 raise error.InvalidBundleSpecification(
188 192 _(
189 193 b'invalid bundle specification; '
190 194 b'must be prefixed with compression: %s'
191 195 )
192 196 % spec
193 197 )
194 198
195 199 pre_args = spec.split(b';', 1)[0]
196 200 if b'-' in pre_args:
197 201 compression, version = spec.split(b'-', 1)
198 202
199 203 if compression not in util.compengines.supportedbundlenames:
200 204 raise error.UnsupportedBundleSpecification(
201 205 _(b'%s compression is not supported') % compression
202 206 )
203 207
204 208 version, params = _parseparams(version)
205 209
206 210 if version not in _bundlespeccontentopts:
207 211 raise error.UnsupportedBundleSpecification(
208 212 _(b'%s is not a recognized bundle version') % version
209 213 )
210 214 else:
211 215 # Value could be just the compression or just the version, in which
212 216 # case some defaults are assumed (but only when not in strict mode).
213 217 assert not strict
214 218
215 219 spec, params = _parseparams(spec)
216 220
217 221 if spec in util.compengines.supportedbundlenames:
218 222 compression = spec
219 223 version = b'v1'
220 224 # Generaldelta repos require v2.
221 225 if requirementsmod.GENERALDELTA_REQUIREMENT in repo.requirements:
222 226 version = b'v2'
223 227 elif requirementsmod.REVLOGV2_REQUIREMENT in repo.requirements:
224 228 version = b'v2'
225 229 # Modern compression engines require v2.
226 230 if compression not in _bundlespecv1compengines:
227 231 version = b'v2'
228 232 elif spec in _bundlespeccontentopts:
229 233 if spec == b'packed1':
230 234 compression = b'none'
231 235 else:
232 236 compression = b'bzip2'
233 237 version = spec
234 238 else:
235 239 raise error.UnsupportedBundleSpecification(
236 240 _(b'%s is not a recognized bundle specification') % spec
237 241 )
238 242
239 243 # Bundle version 1 only supports a known set of compression engines.
240 244 if version == b'v1' and compression not in _bundlespecv1compengines:
241 245 raise error.UnsupportedBundleSpecification(
242 246 _(b'compression engine %s is not supported on v1 bundles')
243 247 % compression
244 248 )
245 249
246 250 # The specification for packed1 can optionally declare the data formats
247 251 # required to apply it. If we see this metadata, compare against what the
248 252 # repo supports and error if the bundle isn't compatible.
249 253 if version == b'packed1' and b'requirements' in params:
250 requirements = set(params[b'requirements'].split(b','))
254 requirements = set(cast(bytes, params[b'requirements']).split(b','))
251 255 missingreqs = requirements - requirementsmod.STREAM_FIXED_REQUIREMENTS
252 256 if missingreqs:
253 257 raise error.UnsupportedBundleSpecification(
254 258 _(b'missing support for repository features: %s')
255 259 % b', '.join(sorted(missingreqs))
256 260 )
257 261
258 262 # Compute contentopts based on the version
259 263 if b"stream" in params and params[b"stream"] == b"v2":
260 264 # That case is fishy as this mostly derails the version selection
261 265 # mechanism. `stream` bundles are quite specific and used differently
262 266 # as "normal" bundles.
263 267 #
264 268 # So we are pinning this to "v2", as this will likely be
265 269 # compatible forever. (see the next conditional).
266 270 #
267 271 # (we should probably define a cleaner way to do this and raise a
268 272 # warning when the old way is encounter)
269 273 version = b"streamv2"
270 274 contentopts = _bundlespeccontentopts.get(version, {}).copy()
271 275 if version == b"streamv2":
272 276 # streamv2 have been reported as "v2" for a while.
273 277 version = b"v2"
274 278
275 279 engine = util.compengines.forbundlename(compression)
276 280 compression, wirecompression = engine.bundletype()
277 281 wireversion = _bundlespeccontentopts[version][b'cg.version']
278 282
279 283 return bundlespec(
280 284 compression, wirecompression, version, wireversion, params, contentopts
281 285 )
282 286
283 287
284 288 def parseclonebundlesmanifest(repo, s):
285 289 """Parses the raw text of a clone bundles manifest.
286 290
287 291 Returns a list of dicts. The dicts have a ``URL`` key corresponding
288 292 to the URL and other keys are the attributes for the entry.
289 293 """
290 294 m = []
291 295 for line in s.splitlines():
292 296 fields = line.split()
293 297 if not fields:
294 298 continue
295 299 attrs = {b'URL': fields[0]}
296 300 for rawattr in fields[1:]:
297 301 key, value = rawattr.split(b'=', 1)
298 302 key = util.urlreq.unquote(key)
299 303 value = util.urlreq.unquote(value)
300 304 attrs[key] = value
301 305
302 306 # Parse BUNDLESPEC into components. This makes client-side
303 307 # preferences easier to specify since you can prefer a single
304 308 # component of the BUNDLESPEC.
305 309 if key == b'BUNDLESPEC':
306 310 try:
307 311 bundlespec = parsebundlespec(repo, value)
308 312 attrs[b'COMPRESSION'] = bundlespec.compression
309 313 attrs[b'VERSION'] = bundlespec.version
310 314 except error.InvalidBundleSpecification:
311 315 pass
312 316 except error.UnsupportedBundleSpecification:
313 317 pass
314 318
315 319 m.append(attrs)
316 320
317 321 return m
318 322
319 323
320 324 def isstreamclonespec(bundlespec):
321 325 # Stream clone v1
322 326 if bundlespec.wirecompression == b'UN' and bundlespec.wireversion == b's1':
323 327 return True
324 328
325 329 # Stream clone v2
326 330 if (
327 331 bundlespec.wirecompression == b'UN'
328 332 and bundlespec.wireversion == b'02'
329 333 and bundlespec.contentopts.get(b'streamv2')
330 334 ):
331 335 return True
332 336
333 337 return False
334 338
335 339
336 340 def filterclonebundleentries(repo, entries, streamclonerequested=False):
337 341 """Remove incompatible clone bundle manifest entries.
338 342
339 343 Accepts a list of entries parsed with ``parseclonebundlesmanifest``
340 344 and returns a new list consisting of only the entries that this client
341 345 should be able to apply.
342 346
343 347 There is no guarantee we'll be able to apply all returned entries because
344 348 the metadata we use to filter on may be missing or wrong.
345 349 """
346 350 newentries = []
347 351 for entry in entries:
348 352 spec = entry.get(b'BUNDLESPEC')
349 353 if spec:
350 354 try:
351 355 bundlespec = parsebundlespec(repo, spec, strict=True)
352 356
353 357 # If a stream clone was requested, filter out non-streamclone
354 358 # entries.
355 359 if streamclonerequested and not isstreamclonespec(bundlespec):
356 360 repo.ui.debug(
357 361 b'filtering %s because not a stream clone\n'
358 362 % entry[b'URL']
359 363 )
360 364 continue
361 365
362 366 except error.InvalidBundleSpecification as e:
363 367 repo.ui.debug(stringutil.forcebytestr(e) + b'\n')
364 368 continue
365 369 except error.UnsupportedBundleSpecification as e:
366 370 repo.ui.debug(
367 371 b'filtering %s because unsupported bundle '
368 372 b'spec: %s\n' % (entry[b'URL'], stringutil.forcebytestr(e))
369 373 )
370 374 continue
371 375 # If we don't have a spec and requested a stream clone, we don't know
372 376 # what the entry is so don't attempt to apply it.
373 377 elif streamclonerequested:
374 378 repo.ui.debug(
375 379 b'filtering %s because cannot determine if a stream '
376 380 b'clone bundle\n' % entry[b'URL']
377 381 )
378 382 continue
379 383
380 384 if b'REQUIRESNI' in entry and not sslutil.hassni:
381 385 repo.ui.debug(
382 386 b'filtering %s because SNI not supported\n' % entry[b'URL']
383 387 )
384 388 continue
385 389
386 390 if b'REQUIREDRAM' in entry:
387 391 try:
388 392 requiredram = util.sizetoint(entry[b'REQUIREDRAM'])
389 393 except error.ParseError:
390 394 repo.ui.debug(
391 395 b'filtering %s due to a bad REQUIREDRAM attribute\n'
392 396 % entry[b'URL']
393 397 )
394 398 continue
395 399 actualram = repo.ui.estimatememory()
396 400 if actualram is not None and actualram * 0.66 < requiredram:
397 401 repo.ui.debug(
398 402 b'filtering %s as it needs more than 2/3 of system memory\n'
399 403 % entry[b'URL']
400 404 )
401 405 continue
402 406
403 407 newentries.append(entry)
404 408
405 409 return newentries
406 410
407 411
408 412 class clonebundleentry:
409 413 """Represents an item in a clone bundles manifest.
410 414
411 415 This rich class is needed to support sorting since sorted() in Python 3
412 416 doesn't support ``cmp`` and our comparison is complex enough that ``key=``
413 417 won't work.
414 418 """
415 419
416 420 def __init__(self, value, prefers):
417 421 self.value = value
418 422 self.prefers = prefers
419 423
420 424 def _cmp(self, other):
421 425 for prefkey, prefvalue in self.prefers:
422 426 avalue = self.value.get(prefkey)
423 427 bvalue = other.value.get(prefkey)
424 428
425 429 # Special case for b missing attribute and a matches exactly.
426 430 if avalue is not None and bvalue is None and avalue == prefvalue:
427 431 return -1
428 432
429 433 # Special case for a missing attribute and b matches exactly.
430 434 if bvalue is not None and avalue is None and bvalue == prefvalue:
431 435 return 1
432 436
433 437 # We can't compare unless attribute present on both.
434 438 if avalue is None or bvalue is None:
435 439 continue
436 440
437 441 # Same values should fall back to next attribute.
438 442 if avalue == bvalue:
439 443 continue
440 444
441 445 # Exact matches come first.
442 446 if avalue == prefvalue:
443 447 return -1
444 448 if bvalue == prefvalue:
445 449 return 1
446 450
447 451 # Fall back to next attribute.
448 452 continue
449 453
450 454 # If we got here we couldn't sort by attributes and prefers. Fall
451 455 # back to index order.
452 456 return 0
453 457
454 458 def __lt__(self, other):
455 459 return self._cmp(other) < 0
456 460
457 461 def __gt__(self, other):
458 462 return self._cmp(other) > 0
459 463
460 464 def __eq__(self, other):
461 465 return self._cmp(other) == 0
462 466
463 467 def __le__(self, other):
464 468 return self._cmp(other) <= 0
465 469
466 470 def __ge__(self, other):
467 471 return self._cmp(other) >= 0
468 472
469 473 def __ne__(self, other):
470 474 return self._cmp(other) != 0
471 475
472 476
473 477 def sortclonebundleentries(ui, entries):
474 478 prefers = ui.configlist(b'ui', b'clonebundleprefers')
475 479 if not prefers:
476 480 return list(entries)
477 481
478 482 def _split(p):
479 483 if b'=' not in p:
480 484 hint = _(b"each comma separated item should be key=value pairs")
481 485 raise error.Abort(
482 486 _(b"invalid ui.clonebundleprefers item: %s") % p, hint=hint
483 487 )
484 488 return p.split(b'=', 1)
485 489
486 490 prefers = [_split(p) for p in prefers]
487 491
488 492 items = sorted(clonebundleentry(v, prefers) for v in entries)
489 493 return [i.value for i in items]
@@ -1,998 +1,1002 b''
1 1 # stringutil.py - utility for generic string formatting, parsing, etc.
2 2 #
3 3 # Copyright 2005 K. Thananchayan <thananck@yahoo.com>
4 4 # Copyright 2005-2007 Olivia Mackall <olivia@selenic.com>
5 5 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
6 6 #
7 7 # This software may be used and distributed according to the terms of the
8 8 # GNU General Public License version 2 or any later version.
9 9
10 10
11 11 import ast
12 12 import codecs
13 13 import re as remod
14 14 import textwrap
15 15 import types
16 16
17 17 from typing import (
18 18 Optional,
19 19 overload,
20 20 )
21 21
22 22 from ..i18n import _
23 23 from ..thirdparty import attr
24 24
25 25 from .. import (
26 26 encoding,
27 27 error,
28 28 pycompat,
29 29 )
30 30
31 31 # regex special chars pulled from https://bugs.python.org/issue29995
32 32 # which was part of Python 3.7.
33 33 _respecial = pycompat.bytestr(b'()[]{}?*+-|^$\\.&~# \t\n\r\v\f')
34 34 _regexescapemap = {ord(i): (b'\\' + i).decode('latin1') for i in _respecial}
35 35 regexbytesescapemap = {i: (b'\\' + i) for i in _respecial}
36 36
37 37
38 38 @overload
39 39 def reescape(pat: bytes) -> bytes:
40 40 ...
41 41
42 42
43 43 @overload
44 44 def reescape(pat: str) -> str:
45 45 ...
46 46
47 47
48 48 def reescape(pat):
49 49 """Drop-in replacement for re.escape."""
50 50 # NOTE: it is intentional that this works on unicodes and not
51 51 # bytes, as it's only possible to do the escaping with
52 52 # unicode.translate, not bytes.translate. Sigh.
53 53 wantuni = True
54 54 if isinstance(pat, bytes):
55 55 wantuni = False
56 56 pat = pat.decode('latin1')
57 57 pat = pat.translate(_regexescapemap)
58 58 if wantuni:
59 59 return pat
60 60 return pat.encode('latin1')
61 61
62 62
63 63 def pprint(o, bprefix: bool = False, indent: int = 0, level: int = 0) -> bytes:
64 64 """Pretty print an object."""
65 65 return b''.join(pprintgen(o, bprefix=bprefix, indent=indent, level=level))
66 66
67 67
68 68 def pprintgen(o, bprefix: bool = False, indent: int = 0, level: int = 0):
69 69 """Pretty print an object to a generator of atoms.
70 70
71 71 ``bprefix`` is a flag influencing whether bytestrings are preferred with
72 72 a ``b''`` prefix.
73 73
74 74 ``indent`` controls whether collections and nested data structures
75 75 span multiple lines via the indentation amount in spaces. By default,
76 76 no newlines are emitted.
77 77
78 78 ``level`` specifies the initial indent level. Used if ``indent > 0``.
79 79 """
80 80
81 81 if isinstance(o, bytes):
82 82 if bprefix:
83 83 yield b"b'%s'" % escapestr(o)
84 84 else:
85 85 yield b"'%s'" % escapestr(o)
86 86 elif isinstance(o, bytearray):
87 87 # codecs.escape_encode() can't handle bytearray, so escapestr fails
88 88 # without coercion.
89 89 yield b"bytearray['%s']" % escapestr(bytes(o))
90 90 elif isinstance(o, list):
91 91 if not o:
92 92 yield b'[]'
93 93 return
94 94
95 95 yield b'['
96 96
97 97 if indent:
98 98 level += 1
99 99 yield b'\n'
100 100 yield b' ' * (level * indent)
101 101
102 102 for i, a in enumerate(o):
103 103 for chunk in pprintgen(
104 104 a, bprefix=bprefix, indent=indent, level=level
105 105 ):
106 106 yield chunk
107 107
108 108 if i + 1 < len(o):
109 109 if indent:
110 110 yield b',\n'
111 111 yield b' ' * (level * indent)
112 112 else:
113 113 yield b', '
114 114
115 115 if indent:
116 116 level -= 1
117 117 yield b'\n'
118 118 yield b' ' * (level * indent)
119 119
120 120 yield b']'
121 121 elif isinstance(o, dict):
122 122 if not o:
123 123 yield b'{}'
124 124 return
125 125
126 126 yield b'{'
127 127
128 128 if indent:
129 129 level += 1
130 130 yield b'\n'
131 131 yield b' ' * (level * indent)
132 132
133 133 for i, (k, v) in enumerate(sorted(o.items())):
134 134 for chunk in pprintgen(
135 135 k, bprefix=bprefix, indent=indent, level=level
136 136 ):
137 137 yield chunk
138 138
139 139 yield b': '
140 140
141 141 for chunk in pprintgen(
142 142 v, bprefix=bprefix, indent=indent, level=level
143 143 ):
144 144 yield chunk
145 145
146 146 if i + 1 < len(o):
147 147 if indent:
148 148 yield b',\n'
149 149 yield b' ' * (level * indent)
150 150 else:
151 151 yield b', '
152 152
153 153 if indent:
154 154 level -= 1
155 155 yield b'\n'
156 156 yield b' ' * (level * indent)
157 157
158 158 yield b'}'
159 159 elif isinstance(o, set):
160 160 if not o:
161 161 yield b'set([])'
162 162 return
163 163
164 164 yield b'set(['
165 165
166 166 if indent:
167 167 level += 1
168 168 yield b'\n'
169 169 yield b' ' * (level * indent)
170 170
171 171 for i, k in enumerate(sorted(o)):
172 172 for chunk in pprintgen(
173 173 k, bprefix=bprefix, indent=indent, level=level
174 174 ):
175 175 yield chunk
176 176
177 177 if i + 1 < len(o):
178 178 if indent:
179 179 yield b',\n'
180 180 yield b' ' * (level * indent)
181 181 else:
182 182 yield b', '
183 183
184 184 if indent:
185 185 level -= 1
186 186 yield b'\n'
187 187 yield b' ' * (level * indent)
188 188
189 189 yield b'])'
190 190 elif isinstance(o, tuple):
191 191 if not o:
192 192 yield b'()'
193 193 return
194 194
195 195 yield b'('
196 196
197 197 if indent:
198 198 level += 1
199 199 yield b'\n'
200 200 yield b' ' * (level * indent)
201 201
202 202 for i, a in enumerate(o):
203 203 for chunk in pprintgen(
204 204 a, bprefix=bprefix, indent=indent, level=level
205 205 ):
206 206 yield chunk
207 207
208 208 if i + 1 < len(o):
209 209 if indent:
210 210 yield b',\n'
211 211 yield b' ' * (level * indent)
212 212 else:
213 213 yield b', '
214 214
215 215 if indent:
216 216 level -= 1
217 217 yield b'\n'
218 218 yield b' ' * (level * indent)
219 219
220 220 yield b')'
221 221 elif isinstance(o, types.GeneratorType):
222 222 # Special case of empty generator.
223 223 try:
224 224 nextitem = next(o)
225 225 except StopIteration:
226 226 yield b'gen[]'
227 227 return
228 228
229 229 yield b'gen['
230 230
231 231 if indent:
232 232 level += 1
233 233 yield b'\n'
234 234 yield b' ' * (level * indent)
235 235
236 236 last = False
237 237
238 238 while not last:
239 239 current = nextitem
240 240
241 241 try:
242 242 nextitem = next(o)
243 243 except StopIteration:
244 244 last = True
245 245
246 246 for chunk in pprintgen(
247 247 current, bprefix=bprefix, indent=indent, level=level
248 248 ):
249 249 yield chunk
250 250
251 251 if not last:
252 252 if indent:
253 253 yield b',\n'
254 254 yield b' ' * (level * indent)
255 255 else:
256 256 yield b', '
257 257
258 258 if indent:
259 259 level -= 1
260 260 yield b'\n'
261 261 yield b' ' * (level * indent)
262 262
263 263 yield b']'
264 264 else:
265 265 yield pycompat.byterepr(o)
266 266
267 267
268 268 def prettyrepr(o) -> bytes:
269 269 """Pretty print a representation of a possibly-nested object"""
270 270 lines = []
271 271 rs = pycompat.byterepr(o)
272 272 p0 = p1 = 0
273 273 while p0 < len(rs):
274 274 # '... field=<type ... field=<type ...'
275 275 # ~~~~~~~~~~~~~~~~
276 276 # p0 p1 q0 q1
277 277 q0 = -1
278 278 q1 = rs.find(b'<', p1 + 1)
279 279 if q1 < 0:
280 280 q1 = len(rs)
281 281 # pytype: disable=wrong-arg-count
282 282 # TODO: figure out why pytype doesn't recognize the optional start
283 283 # arg
284 284 elif q1 > p1 + 1 and rs.startswith(b'=', q1 - 1):
285 285 # pytype: enable=wrong-arg-count
286 286 # backtrack for ' field=<'
287 287 q0 = rs.rfind(b' ', p1 + 1, q1 - 1)
288 288 if q0 < 0:
289 289 q0 = q1
290 290 else:
291 291 q0 += 1 # skip ' '
292 292 l = rs.count(b'<', 0, p0) - rs.count(b'>', 0, p0)
293 293 assert l >= 0
294 294 lines.append((l, rs[p0:q0].rstrip()))
295 295 p0, p1 = q0, q1
296 296 return b'\n'.join(b' ' * l + s for l, s in lines)
297 297
298 298
299 299 def buildrepr(r) -> bytes:
300 300 """Format an optional printable representation from unexpanded bits
301 301
302 302 ======== =================================
303 303 type(r) example
304 304 ======== =================================
305 305 tuple ('<not %r>', other)
306 306 bytes '<branch closed>'
307 307 callable lambda: '<branch %r>' % sorted(b)
308 308 object other
309 309 ======== =================================
310 310 """
311 311 if r is None:
312 312 return b''
313 313 elif isinstance(r, tuple):
314 314 return r[0] % pycompat.rapply(pycompat.maybebytestr, r[1:])
315 315 elif isinstance(r, bytes):
316 316 return r
317 317 elif callable(r):
318 318 return r()
319 319 else:
320 320 return pprint(r)
321 321
322 322
323 323 def binary(s: bytes) -> bool:
324 324 """return true if a string is binary data"""
325 325 return bool(s and b'\0' in s)
326 326
327 327
328 328 def _splitpattern(pattern: bytes):
329 329 if pattern.startswith(b're:'):
330 330 return b're', pattern[3:]
331 331 elif pattern.startswith(b'literal:'):
332 332 return b'literal', pattern[8:]
333 333 return b'literal', pattern
334 334
335 335
336 336 def stringmatcher(pattern: bytes, casesensitive: bool = True):
337 337 """
338 338 accepts a string, possibly starting with 're:' or 'literal:' prefix.
339 339 returns the matcher name, pattern, and matcher function.
340 340 missing or unknown prefixes are treated as literal matches.
341 341
342 342 helper for tests:
343 343 >>> def test(pattern, *tests):
344 344 ... kind, pattern, matcher = stringmatcher(pattern)
345 345 ... return (kind, pattern, [bool(matcher(t)) for t in tests])
346 346 >>> def itest(pattern, *tests):
347 347 ... kind, pattern, matcher = stringmatcher(pattern, casesensitive=False)
348 348 ... return (kind, pattern, [bool(matcher(t)) for t in tests])
349 349
350 350 exact matching (no prefix):
351 351 >>> test(b'abcdefg', b'abc', b'def', b'abcdefg')
352 352 ('literal', 'abcdefg', [False, False, True])
353 353
354 354 regex matching ('re:' prefix)
355 355 >>> test(b're:a.+b', b'nomatch', b'fooadef', b'fooadefbar')
356 356 ('re', 'a.+b', [False, False, True])
357 357
358 358 force exact matches ('literal:' prefix)
359 359 >>> test(b'literal:re:foobar', b'foobar', b're:foobar')
360 360 ('literal', 're:foobar', [False, True])
361 361
362 362 unknown prefixes are ignored and treated as literals
363 363 >>> test(b'foo:bar', b'foo', b'bar', b'foo:bar')
364 364 ('literal', 'foo:bar', [False, False, True])
365 365
366 366 case insensitive regex matches
367 367 >>> itest(b're:A.+b', b'nomatch', b'fooadef', b'fooadefBar')
368 368 ('re', 'A.+b', [False, False, True])
369 369
370 370 case insensitive literal matches
371 371 >>> itest(b'ABCDEFG', b'abc', b'def', b'abcdefg')
372 372 ('literal', 'ABCDEFG', [False, False, True])
373 373 """
374 374 kind, pattern = _splitpattern(pattern)
375 375 if kind == b're':
376 376 try:
377 377 flags = 0
378 378 if not casesensitive:
379 379 flags = remod.I
380 380 regex = remod.compile(pattern, flags)
381 381 except remod.error as e:
382 382 raise error.ParseError(
383 383 _(b'invalid regular expression: %s') % forcebytestr(e)
384 384 )
385 385 return kind, pattern, regex.search
386 386 elif kind == b'literal':
387 387 if casesensitive:
388 388 match = pattern.__eq__
389 389 else:
390 390 ipat = encoding.lower(pattern)
391 391 match = lambda s: ipat == encoding.lower(s)
392 392 return kind, pattern, match
393 393
394 394 raise error.ProgrammingError(b'unhandled pattern kind: %s' % kind)
395 395
396 396
397 397 def substringregexp(pattern: bytes, flags: int = 0):
398 398 """Build a regexp object from a string pattern possibly starting with
399 399 're:' or 'literal:' prefix.
400 400
401 401 helper for tests:
402 402 >>> def test(pattern, *tests):
403 403 ... regexp = substringregexp(pattern)
404 404 ... return [bool(regexp.search(t)) for t in tests]
405 405 >>> def itest(pattern, *tests):
406 406 ... regexp = substringregexp(pattern, remod.I)
407 407 ... return [bool(regexp.search(t)) for t in tests]
408 408
409 409 substring matching (no prefix):
410 410 >>> test(b'bcde', b'abc', b'def', b'abcdefg')
411 411 [False, False, True]
412 412
413 413 substring pattern should be escaped:
414 414 >>> substringregexp(b'.bc').pattern
415 415 '\\\\.bc'
416 416 >>> test(b'.bc', b'abc', b'def', b'abcdefg')
417 417 [False, False, False]
418 418
419 419 regex matching ('re:' prefix)
420 420 >>> test(b're:a.+b', b'nomatch', b'fooadef', b'fooadefbar')
421 421 [False, False, True]
422 422
423 423 force substring matches ('literal:' prefix)
424 424 >>> test(b'literal:re:foobar', b'foobar', b're:foobar')
425 425 [False, True]
426 426
427 427 case insensitive literal matches
428 428 >>> itest(b'BCDE', b'abc', b'def', b'abcdefg')
429 429 [False, False, True]
430 430
431 431 case insensitive regex matches
432 432 >>> itest(b're:A.+b', b'nomatch', b'fooadef', b'fooadefBar')
433 433 [False, False, True]
434 434 """
435 435 kind, pattern = _splitpattern(pattern)
436 436 if kind == b're':
437 437 try:
438 438 return remod.compile(pattern, flags)
439 439 except remod.error as e:
440 440 raise error.ParseError(
441 441 _(b'invalid regular expression: %s') % forcebytestr(e)
442 442 )
443 443 elif kind == b'literal':
444 444 return remod.compile(remod.escape(pattern), flags)
445 445
446 446 raise error.ProgrammingError(b'unhandled pattern kind: %s' % kind)
447 447
448 448
449 449 def shortuser(user: bytes) -> bytes:
450 450 """Return a short representation of a user name or email address."""
451 451 f = user.find(b'@')
452 452 if f >= 0:
453 453 user = user[:f]
454 454 f = user.find(b'<')
455 455 if f >= 0:
456 456 user = user[f + 1 :]
457 457 f = user.find(b' ')
458 458 if f >= 0:
459 459 user = user[:f]
460 460 f = user.find(b'.')
461 461 if f >= 0:
462 462 user = user[:f]
463 463 return user
464 464
465 465
466 466 def emailuser(user: bytes) -> bytes:
467 467 """Return the user portion of an email address."""
468 468 f = user.find(b'@')
469 469 if f >= 0:
470 470 user = user[:f]
471 471 f = user.find(b'<')
472 472 if f >= 0:
473 473 user = user[f + 1 :]
474 474 return user
475 475
476 476
477 477 def email(author: bytes) -> bytes:
478 478 '''get email of author.'''
479 479 r = author.find(b'>')
480 480 if r == -1:
481 481 r = None
482 482 return author[author.find(b'<') + 1 : r]
483 483
484 484
485 485 def person(author: bytes) -> bytes:
486 486 """Returns the name before an email address,
487 487 interpreting it as per RFC 5322
488 488
489 489 >>> person(b'foo@bar')
490 490 'foo'
491 491 >>> person(b'Foo Bar <foo@bar>')
492 492 'Foo Bar'
493 493 >>> person(b'"Foo Bar" <foo@bar>')
494 494 'Foo Bar'
495 495 >>> person(b'"Foo \"buz\" Bar" <foo@bar>')
496 496 'Foo "buz" Bar'
497 497 >>> # The following are invalid, but do exist in real-life
498 498 ...
499 499 >>> person(b'Foo "buz" Bar <foo@bar>')
500 500 'Foo "buz" Bar'
501 501 >>> person(b'"Foo Bar <foo@bar>')
502 502 'Foo Bar'
503 503 """
504 504 if b'@' not in author:
505 505 return author
506 506 f = author.find(b'<')
507 507 if f != -1:
508 508 return author[:f].strip(b' "').replace(b'\\"', b'"')
509 509 f = author.find(b'@')
510 510 return author[:f].replace(b'.', b' ')
511 511
512 512
513 513 @attr.s(hash=True)
514 514 class mailmapping:
515 515 """Represents a username/email key or value in
516 516 a mailmap file"""
517 517
518 518 email = attr.ib()
519 519 name = attr.ib(default=None)
520 520
521 521
522 522 def _ismailmaplineinvalid(names, emails):
523 523 """Returns True if the parsed names and emails
524 524 in a mailmap entry are invalid.
525 525
526 526 >>> # No names or emails fails
527 527 >>> names, emails = [], []
528 528 >>> _ismailmaplineinvalid(names, emails)
529 529 True
530 530 >>> # Only one email fails
531 531 >>> emails = [b'email@email.com']
532 532 >>> _ismailmaplineinvalid(names, emails)
533 533 True
534 534 >>> # One email and one name passes
535 535 >>> names = [b'Test Name']
536 536 >>> _ismailmaplineinvalid(names, emails)
537 537 False
538 538 >>> # No names but two emails passes
539 539 >>> names = []
540 540 >>> emails = [b'proper@email.com', b'commit@email.com']
541 541 >>> _ismailmaplineinvalid(names, emails)
542 542 False
543 543 """
544 544 return not emails or not names and len(emails) < 2
545 545
546 546
547 547 def parsemailmap(mailmapcontent):
548 548 """Parses data in the .mailmap format
549 549
550 550 >>> mmdata = b"\\n".join([
551 551 ... b'# Comment',
552 552 ... b'Name <commit1@email.xx>',
553 553 ... b'<name@email.xx> <commit2@email.xx>',
554 554 ... b'Name <proper@email.xx> <commit3@email.xx>',
555 555 ... b'Name <proper@email.xx> Commit <commit4@email.xx>',
556 556 ... ])
557 557 >>> mm = parsemailmap(mmdata)
558 558 >>> for key in sorted(mm.keys()):
559 559 ... print(key)
560 560 mailmapping(email='commit1@email.xx', name=None)
561 561 mailmapping(email='commit2@email.xx', name=None)
562 562 mailmapping(email='commit3@email.xx', name=None)
563 563 mailmapping(email='commit4@email.xx', name='Commit')
564 564 >>> for val in sorted(mm.values()):
565 565 ... print(val)
566 566 mailmapping(email='commit1@email.xx', name='Name')
567 567 mailmapping(email='name@email.xx', name=None)
568 568 mailmapping(email='proper@email.xx', name='Name')
569 569 mailmapping(email='proper@email.xx', name='Name')
570 570 """
571 571 mailmap = {}
572 572
573 573 if mailmapcontent is None:
574 574 return mailmap
575 575
576 576 for line in mailmapcontent.splitlines():
577 577
578 578 # Don't bother checking the line if it is a comment or
579 579 # is an improperly formed author field
580 580 if line.lstrip().startswith(b'#'):
581 581 continue
582 582
583 583 # names, emails hold the parsed emails and names for each line
584 584 # name_builder holds the words in a persons name
585 585 names, emails = [], []
586 586 namebuilder = []
587 587
588 588 for element in line.split():
589 589 if element.startswith(b'#'):
590 590 # If we reach a comment in the mailmap file, move on
591 591 break
592 592
593 593 elif element.startswith(b'<') and element.endswith(b'>'):
594 594 # We have found an email.
595 595 # Parse it, and finalize any names from earlier
596 596 emails.append(element[1:-1]) # Slice off the "<>"
597 597
598 598 if namebuilder:
599 599 names.append(b' '.join(namebuilder))
600 600 namebuilder = []
601 601
602 602 # Break if we have found a second email, any other
603 603 # data does not fit the spec for .mailmap
604 604 if len(emails) > 1:
605 605 break
606 606
607 607 else:
608 608 # We have found another word in the committers name
609 609 namebuilder.append(element)
610 610
611 611 # Check to see if we have parsed the line into a valid form
612 612 # We require at least one email, and either at least one
613 613 # name or a second email
614 614 if _ismailmaplineinvalid(names, emails):
615 615 continue
616 616
617 617 mailmapkey = mailmapping(
618 618 email=emails[-1],
619 619 name=names[-1] if len(names) == 2 else None,
620 620 )
621 621
622 622 mailmap[mailmapkey] = mailmapping(
623 623 email=emails[0],
624 624 name=names[0] if names else None,
625 625 )
626 626
627 627 return mailmap
628 628
629 629
630 630 def mapname(mailmap, author: bytes) -> bytes:
631 631 """Returns the author field according to the mailmap cache, or
632 632 the original author field.
633 633
634 634 >>> mmdata = b"\\n".join([
635 635 ... b'# Comment',
636 636 ... b'Name <commit1@email.xx>',
637 637 ... b'<name@email.xx> <commit2@email.xx>',
638 638 ... b'Name <proper@email.xx> <commit3@email.xx>',
639 639 ... b'Name <proper@email.xx> Commit <commit4@email.xx>',
640 640 ... ])
641 641 >>> m = parsemailmap(mmdata)
642 642 >>> mapname(m, b'Commit <commit1@email.xx>')
643 643 'Name <commit1@email.xx>'
644 644 >>> mapname(m, b'Name <commit2@email.xx>')
645 645 'Name <name@email.xx>'
646 646 >>> mapname(m, b'Commit <commit3@email.xx>')
647 647 'Name <proper@email.xx>'
648 648 >>> mapname(m, b'Commit <commit4@email.xx>')
649 649 'Name <proper@email.xx>'
650 650 >>> mapname(m, b'Unknown Name <unknown@email.com>')
651 651 'Unknown Name <unknown@email.com>'
652 652 """
653 653 # If the author field coming in isn't in the correct format,
654 654 # or the mailmap is empty just return the original author field
655 655 if not isauthorwellformed(author) or not mailmap:
656 656 return author
657 657
658 658 # Turn the user name into a mailmapping
659 659 commit = mailmapping(name=person(author), email=email(author))
660 660
661 661 try:
662 662 # Try and use both the commit email and name as the key
663 663 proper = mailmap[commit]
664 664
665 665 except KeyError:
666 666 # If the lookup fails, use just the email as the key instead
667 667 # We call this commit2 as not to erase original commit fields
668 668 commit2 = mailmapping(email=commit.email)
669 669 proper = mailmap.get(commit2, mailmapping(None, None))
670 670
671 671 # Return the author field with proper values filled in
672 672 return b'%s <%s>' % (
673 673 proper.name if proper.name else commit.name,
674 674 proper.email if proper.email else commit.email,
675 675 )
676 676
677 677
678 678 _correctauthorformat = remod.compile(br'^[^<]+\s<[^<>]+@[^<>]+>$')
679 679
680 680
681 681 def isauthorwellformed(author: bytes) -> bool:
682 682 """Return True if the author field is well formed
683 683 (ie "Contributor Name <contrib@email.dom>")
684 684
685 685 >>> isauthorwellformed(b'Good Author <good@author.com>')
686 686 True
687 687 >>> isauthorwellformed(b'Author <good@author.com>')
688 688 True
689 689 >>> isauthorwellformed(b'Bad Author')
690 690 False
691 691 >>> isauthorwellformed(b'Bad Author <author@author.com')
692 692 False
693 693 >>> isauthorwellformed(b'Bad Author author@author.com')
694 694 False
695 695 >>> isauthorwellformed(b'<author@author.com>')
696 696 False
697 697 >>> isauthorwellformed(b'Bad Author <author>')
698 698 False
699 699 """
700 700 return _correctauthorformat.match(author) is not None
701 701
702 702
703 703 def firstline(text: bytes) -> bytes:
704 704 """Return the first line of the input"""
705 705 # Try to avoid running splitlines() on the whole string
706 706 i = text.find(b'\n')
707 707 if i != -1:
708 708 text = text[:i]
709 709 try:
710 710 return text.splitlines()[0]
711 711 except IndexError:
712 712 return b''
713 713
714 714
715 715 def ellipsis(text: bytes, maxlength: int = 400) -> bytes:
716 716 """Trim string to at most maxlength (default: 400) columns in display."""
717 717 return encoding.trim(text, maxlength, ellipsis=b'...')
718 718
719 719
720 720 def escapestr(s: bytes) -> bytes:
721 721 # "bytes" is also a typing shortcut for bytes, bytearray, and memoryview
722 722 if isinstance(s, memoryview):
723 723 s = bytes(s)
724 724 # call underlying function of s.encode('string_escape') directly for
725 725 # Python 3 compatibility
726 # pytype: disable=bad-return-type
726 727 return codecs.escape_encode(s)[0] # pytype: disable=module-attr
728 # pytype: enable=bad-return-type
727 729
728 730
729 731 def unescapestr(s: bytes) -> bytes:
732 # pytype: disable=bad-return-type
730 733 return codecs.escape_decode(s)[0] # pytype: disable=module-attr
734 # pytype: enable=bad-return-type
731 735
732 736
733 737 def forcebytestr(obj):
734 738 """Portably format an arbitrary object (e.g. exception) into a byte
735 739 string."""
736 740 try:
737 741 return pycompat.bytestr(obj)
738 742 except UnicodeEncodeError:
739 743 # non-ascii string, may be lossy
740 744 return pycompat.bytestr(encoding.strtolocal(str(obj)))
741 745
742 746
743 747 def uirepr(s: bytes) -> bytes:
744 748 # Avoid double backslash in Windows path repr()
745 749 return pycompat.byterepr(pycompat.bytestr(s)).replace(b'\\\\', b'\\')
746 750
747 751
748 752 # delay import of textwrap
749 753 def _MBTextWrapper(**kwargs):
750 754 class tw(textwrap.TextWrapper):
751 755 """
752 756 Extend TextWrapper for width-awareness.
753 757
754 758 Neither number of 'bytes' in any encoding nor 'characters' is
755 759 appropriate to calculate terminal columns for specified string.
756 760
757 761 Original TextWrapper implementation uses built-in 'len()' directly,
758 762 so overriding is needed to use width information of each characters.
759 763
760 764 In addition, characters classified into 'ambiguous' width are
761 765 treated as wide in East Asian area, but as narrow in other.
762 766
763 767 This requires use decision to determine width of such characters.
764 768 """
765 769
766 770 def _cutdown(self, ucstr, space_left):
767 771 l = 0
768 772 colwidth = encoding.ucolwidth
769 773 for i in range(len(ucstr)):
770 774 l += colwidth(ucstr[i])
771 775 if space_left < l:
772 776 return (ucstr[:i], ucstr[i:])
773 777 return ucstr, b''
774 778
775 779 # overriding of base class
776 780 def _handle_long_word(self, reversed_chunks, cur_line, cur_len, width):
777 781 space_left = max(width - cur_len, 1)
778 782
779 783 if self.break_long_words:
780 784 cut, res = self._cutdown(reversed_chunks[-1], space_left)
781 785 cur_line.append(cut)
782 786 reversed_chunks[-1] = res
783 787 elif not cur_line:
784 788 cur_line.append(reversed_chunks.pop())
785 789
786 790 # this overriding code is imported from TextWrapper of Python 2.6
787 791 # to calculate columns of string by 'encoding.ucolwidth()'
788 792 def _wrap_chunks(self, chunks):
789 793 colwidth = encoding.ucolwidth
790 794
791 795 lines = []
792 796 if self.width <= 0:
793 797 raise ValueError(b"invalid width %r (must be > 0)" % self.width)
794 798
795 799 # Arrange in reverse order so items can be efficiently popped
796 800 # from a stack of chucks.
797 801 chunks.reverse()
798 802
799 803 while chunks:
800 804
801 805 # Start the list of chunks that will make up the current line.
802 806 # cur_len is just the length of all the chunks in cur_line.
803 807 cur_line = []
804 808 cur_len = 0
805 809
806 810 # Figure out which static string will prefix this line.
807 811 if lines:
808 812 indent = self.subsequent_indent
809 813 else:
810 814 indent = self.initial_indent
811 815
812 816 # Maximum width for this line.
813 817 width = self.width - len(indent)
814 818
815 819 # First chunk on line is whitespace -- drop it, unless this
816 820 # is the very beginning of the text (i.e. no lines started yet).
817 821 if self.drop_whitespace and chunks[-1].strip() == '' and lines:
818 822 del chunks[-1]
819 823
820 824 while chunks:
821 825 l = colwidth(chunks[-1])
822 826
823 827 # Can at least squeeze this chunk onto the current line.
824 828 if cur_len + l <= width:
825 829 cur_line.append(chunks.pop())
826 830 cur_len += l
827 831
828 832 # Nope, this line is full.
829 833 else:
830 834 break
831 835
832 836 # The current line is full, and the next chunk is too big to
833 837 # fit on *any* line (not just this one).
834 838 if chunks and colwidth(chunks[-1]) > width:
835 839 self._handle_long_word(chunks, cur_line, cur_len, width)
836 840
837 841 # If the last chunk on this line is all whitespace, drop it.
838 842 if (
839 843 self.drop_whitespace
840 844 and cur_line
841 845 and cur_line[-1].strip() == r''
842 846 ):
843 847 del cur_line[-1]
844 848
845 849 # Convert current line back to a string and store it in list
846 850 # of all lines (return value).
847 851 if cur_line:
848 852 lines.append(indent + ''.join(cur_line))
849 853
850 854 return lines
851 855
852 856 global _MBTextWrapper
853 857 _MBTextWrapper = tw
854 858 return tw(**kwargs)
855 859
856 860
857 861 def wrap(
858 862 line: bytes, width: int, initindent: bytes = b'', hangindent: bytes = b''
859 863 ) -> bytes:
860 864 maxindent = max(len(hangindent), len(initindent))
861 865 if width <= maxindent:
862 866 # adjust for weird terminal size
863 867 width = max(78, maxindent + 1)
864 868 line = line.decode(
865 869 pycompat.sysstr(encoding.encoding),
866 870 pycompat.sysstr(encoding.encodingmode),
867 871 )
868 872 initindent = initindent.decode(
869 873 pycompat.sysstr(encoding.encoding),
870 874 pycompat.sysstr(encoding.encodingmode),
871 875 )
872 876 hangindent = hangindent.decode(
873 877 pycompat.sysstr(encoding.encoding),
874 878 pycompat.sysstr(encoding.encodingmode),
875 879 )
876 880 wrapper = _MBTextWrapper(
877 881 width=width, initial_indent=initindent, subsequent_indent=hangindent
878 882 )
879 883 return wrapper.fill(line).encode(pycompat.sysstr(encoding.encoding))
880 884
881 885
882 886 _booleans = {
883 887 b'1': True,
884 888 b'yes': True,
885 889 b'true': True,
886 890 b'on': True,
887 891 b'always': True,
888 892 b'0': False,
889 893 b'no': False,
890 894 b'false': False,
891 895 b'off': False,
892 896 b'never': False,
893 897 }
894 898
895 899
896 900 def parsebool(s: bytes) -> Optional[bool]:
897 901 """Parse s into a boolean.
898 902
899 903 If s is not a valid boolean, returns None.
900 904 """
901 905 return _booleans.get(s.lower(), None)
902 906
903 907
904 908 # TODO: make arg mandatory (and fix code below?)
905 909 def parselist(value: Optional[bytes]):
906 910 """parse a configuration value as a list of comma/space separated strings
907 911
908 912 >>> parselist(b'this,is "a small" ,test')
909 913 ['this', 'is', 'a small', 'test']
910 914 """
911 915
912 916 def _parse_plain(parts, s, offset):
913 917 whitespace = False
914 918 while offset < len(s) and (
915 919 s[offset : offset + 1].isspace() or s[offset : offset + 1] == b','
916 920 ):
917 921 whitespace = True
918 922 offset += 1
919 923 if offset >= len(s):
920 924 return None, parts, offset
921 925 if whitespace:
922 926 parts.append(b'')
923 927 if s[offset : offset + 1] == b'"' and not parts[-1]:
924 928 return _parse_quote, parts, offset + 1
925 929 elif s[offset : offset + 1] == b'"' and parts[-1][-1:] == b'\\':
926 930 parts[-1] = parts[-1][:-1] + s[offset : offset + 1]
927 931 return _parse_plain, parts, offset + 1
928 932 parts[-1] += s[offset : offset + 1]
929 933 return _parse_plain, parts, offset + 1
930 934
931 935 def _parse_quote(parts, s, offset):
932 936 if offset < len(s) and s[offset : offset + 1] == b'"': # ""
933 937 parts.append(b'')
934 938 offset += 1
935 939 while offset < len(s) and (
936 940 s[offset : offset + 1].isspace()
937 941 or s[offset : offset + 1] == b','
938 942 ):
939 943 offset += 1
940 944 return _parse_plain, parts, offset
941 945
942 946 while offset < len(s) and s[offset : offset + 1] != b'"':
943 947 if (
944 948 s[offset : offset + 1] == b'\\'
945 949 and offset + 1 < len(s)
946 950 and s[offset + 1 : offset + 2] == b'"'
947 951 ):
948 952 offset += 1
949 953 parts[-1] += b'"'
950 954 else:
951 955 parts[-1] += s[offset : offset + 1]
952 956 offset += 1
953 957
954 958 if offset >= len(s):
955 959 real_parts = _configlist(parts[-1])
956 960 if not real_parts:
957 961 parts[-1] = b'"'
958 962 else:
959 963 real_parts[0] = b'"' + real_parts[0]
960 964 parts = parts[:-1]
961 965 parts.extend(real_parts)
962 966 return None, parts, offset
963 967
964 968 offset += 1
965 969 while offset < len(s) and s[offset : offset + 1] in [b' ', b',']:
966 970 offset += 1
967 971
968 972 if offset < len(s):
969 973 if offset + 1 == len(s) and s[offset : offset + 1] == b'"':
970 974 parts[-1] += b'"'
971 975 offset += 1
972 976 else:
973 977 parts.append(b'')
974 978 else:
975 979 return None, parts, offset
976 980
977 981 return _parse_plain, parts, offset
978 982
979 983 def _configlist(s):
980 984 s = s.rstrip(b' ,')
981 985 if not s:
982 986 return []
983 987 parser, parts, offset = _parse_plain, [b''], 0
984 988 while parser:
985 989 parser, parts, offset = parser(parts, s, offset)
986 990 return parts
987 991
988 992 if value is not None and isinstance(value, bytes):
989 993 result = _configlist(value.lstrip(b' ,\n'))
990 994 else:
991 995 result = value
992 996 return result or []
993 997
994 998
995 999 def evalpythonliteral(s: bytes):
996 1000 """Evaluate a string containing a Python literal expression"""
997 1001 # We could backport our tokenizer hack to rewrite '' to u'' if we want
998 1002 return ast.literal_eval(s.decode('latin1'))
General Comments 0
You need to be logged in to leave comments. Login now