##// END OF EJS Templates
bundle2: drop some outdated comment...
marmoute -
r46781:c511fef3 default
parent child Browse files
Show More
@@ -1,2592 +1,2588 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import, division
149 149
150 150 import collections
151 151 import errno
152 152 import os
153 153 import re
154 154 import string
155 155 import struct
156 156 import sys
157 157
158 158 from .i18n import _
159 159 from .node import (
160 160 hex,
161 161 nullid,
162 162 short,
163 163 )
164 164 from . import (
165 165 bookmarks,
166 166 changegroup,
167 167 encoding,
168 168 error,
169 169 obsolete,
170 170 phases,
171 171 pushkey,
172 172 pycompat,
173 173 requirements,
174 174 scmutil,
175 175 streamclone,
176 176 tags,
177 177 url,
178 178 util,
179 179 )
180 180 from .utils import stringutil
181 181
182 182 urlerr = util.urlerr
183 183 urlreq = util.urlreq
184 184
185 185 _pack = struct.pack
186 186 _unpack = struct.unpack
187 187
188 188 _fstreamparamsize = b'>i'
189 189 _fpartheadersize = b'>i'
190 190 _fparttypesize = b'>B'
191 191 _fpartid = b'>I'
192 192 _fpayloadsize = b'>i'
193 193 _fpartparamcount = b'>BB'
194 194
195 195 preferedchunksize = 32768
196 196
197 197 _parttypeforbidden = re.compile(b'[^a-zA-Z0-9_:-]')
198 198
199 199
200 200 def outdebug(ui, message):
201 201 """debug regarding output stream (bundling)"""
202 202 if ui.configbool(b'devel', b'bundle2.debug'):
203 203 ui.debug(b'bundle2-output: %s\n' % message)
204 204
205 205
206 206 def indebug(ui, message):
207 207 """debug on input stream (unbundling)"""
208 208 if ui.configbool(b'devel', b'bundle2.debug'):
209 209 ui.debug(b'bundle2-input: %s\n' % message)
210 210
211 211
212 212 def validateparttype(parttype):
213 213 """raise ValueError if a parttype contains invalid character"""
214 214 if _parttypeforbidden.search(parttype):
215 215 raise ValueError(parttype)
216 216
217 217
218 218 def _makefpartparamsizes(nbparams):
219 219 """return a struct format to read part parameter sizes
220 220
221 221 The number parameters is variable so we need to build that format
222 222 dynamically.
223 223 """
224 224 return b'>' + (b'BB' * nbparams)
225 225
226 226
227 227 parthandlermapping = {}
228 228
229 229
230 230 def parthandler(parttype, params=()):
231 231 """decorator that register a function as a bundle2 part handler
232 232
233 233 eg::
234 234
235 235 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
236 236 def myparttypehandler(...):
237 237 '''process a part of type "my part".'''
238 238 ...
239 239 """
240 240 validateparttype(parttype)
241 241
242 242 def _decorator(func):
243 243 lparttype = parttype.lower() # enforce lower case matching.
244 244 assert lparttype not in parthandlermapping
245 245 parthandlermapping[lparttype] = func
246 246 func.params = frozenset(params)
247 247 return func
248 248
249 249 return _decorator
250 250
251 251
252 252 class unbundlerecords(object):
253 253 """keep record of what happens during and unbundle
254 254
255 255 New records are added using `records.add('cat', obj)`. Where 'cat' is a
256 256 category of record and obj is an arbitrary object.
257 257
258 258 `records['cat']` will return all entries of this category 'cat'.
259 259
260 260 Iterating on the object itself will yield `('category', obj)` tuples
261 261 for all entries.
262 262
263 263 All iterations happens in chronological order.
264 264 """
265 265
266 266 def __init__(self):
267 267 self._categories = {}
268 268 self._sequences = []
269 269 self._replies = {}
270 270
271 271 def add(self, category, entry, inreplyto=None):
272 272 """add a new record of a given category.
273 273
274 274 The entry can then be retrieved in the list returned by
275 275 self['category']."""
276 276 self._categories.setdefault(category, []).append(entry)
277 277 self._sequences.append((category, entry))
278 278 if inreplyto is not None:
279 279 self.getreplies(inreplyto).add(category, entry)
280 280
281 281 def getreplies(self, partid):
282 282 """get the records that are replies to a specific part"""
283 283 return self._replies.setdefault(partid, unbundlerecords())
284 284
285 285 def __getitem__(self, cat):
286 286 return tuple(self._categories.get(cat, ()))
287 287
288 288 def __iter__(self):
289 289 return iter(self._sequences)
290 290
291 291 def __len__(self):
292 292 return len(self._sequences)
293 293
294 294 def __nonzero__(self):
295 295 return bool(self._sequences)
296 296
297 297 __bool__ = __nonzero__
298 298
299 299
300 300 class bundleoperation(object):
301 301 """an object that represents a single bundling process
302 302
303 303 Its purpose is to carry unbundle-related objects and states.
304 304
305 305 A new object should be created at the beginning of each bundle processing.
306 306 The object is to be returned by the processing function.
307 307
308 308 The object has very little content now it will ultimately contain:
309 309 * an access to the repo the bundle is applied to,
310 310 * a ui object,
311 311 * a way to retrieve a transaction to add changes to the repo,
312 312 * a way to record the result of processing each part,
313 313 * a way to construct a bundle response when applicable.
314 314 """
315 315
316 316 def __init__(self, repo, transactiongetter, captureoutput=True, source=b''):
317 317 self.repo = repo
318 318 self.ui = repo.ui
319 319 self.records = unbundlerecords()
320 320 self.reply = None
321 321 self.captureoutput = captureoutput
322 322 self.hookargs = {}
323 323 self._gettransaction = transactiongetter
324 324 # carries value that can modify part behavior
325 325 self.modes = {}
326 326 self.source = source
327 327
328 328 def gettransaction(self):
329 329 transaction = self._gettransaction()
330 330
331 331 if self.hookargs:
332 332 # the ones added to the transaction supercede those added
333 333 # to the operation.
334 334 self.hookargs.update(transaction.hookargs)
335 335 transaction.hookargs = self.hookargs
336 336
337 337 # mark the hookargs as flushed. further attempts to add to
338 338 # hookargs will result in an abort.
339 339 self.hookargs = None
340 340
341 341 return transaction
342 342
343 343 def addhookargs(self, hookargs):
344 344 if self.hookargs is None:
345 345 raise error.ProgrammingError(
346 346 b'attempted to add hookargs to '
347 347 b'operation after transaction started'
348 348 )
349 349 self.hookargs.update(hookargs)
350 350
351 351
352 352 class TransactionUnavailable(RuntimeError):
353 353 pass
354 354
355 355
356 356 def _notransaction():
357 357 """default method to get a transaction while processing a bundle
358 358
359 359 Raise an exception to highlight the fact that no transaction was expected
360 360 to be created"""
361 361 raise TransactionUnavailable()
362 362
363 363
364 364 def applybundle(repo, unbundler, tr, source, url=None, **kwargs):
365 365 # transform me into unbundler.apply() as soon as the freeze is lifted
366 366 if isinstance(unbundler, unbundle20):
367 367 tr.hookargs[b'bundle2'] = b'1'
368 368 if source is not None and b'source' not in tr.hookargs:
369 369 tr.hookargs[b'source'] = source
370 370 if url is not None and b'url' not in tr.hookargs:
371 371 tr.hookargs[b'url'] = url
372 372 return processbundle(repo, unbundler, lambda: tr, source=source)
373 373 else:
374 374 # the transactiongetter won't be used, but we might as well set it
375 375 op = bundleoperation(repo, lambda: tr, source=source)
376 376 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
377 377 return op
378 378
379 379
380 380 class partiterator(object):
381 381 def __init__(self, repo, op, unbundler):
382 382 self.repo = repo
383 383 self.op = op
384 384 self.unbundler = unbundler
385 385 self.iterator = None
386 386 self.count = 0
387 387 self.current = None
388 388
389 389 def __enter__(self):
390 390 def func():
391 391 itr = enumerate(self.unbundler.iterparts(), 1)
392 392 for count, p in itr:
393 393 self.count = count
394 394 self.current = p
395 395 yield p
396 396 p.consume()
397 397 self.current = None
398 398
399 399 self.iterator = func()
400 400 return self.iterator
401 401
402 402 def __exit__(self, type, exc, tb):
403 403 if not self.iterator:
404 404 return
405 405
406 406 # Only gracefully abort in a normal exception situation. User aborts
407 407 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
408 408 # and should not gracefully cleanup.
409 409 if isinstance(exc, Exception):
410 410 # Any exceptions seeking to the end of the bundle at this point are
411 411 # almost certainly related to the underlying stream being bad.
412 412 # And, chances are that the exception we're handling is related to
413 413 # getting in that bad state. So, we swallow the seeking error and
414 414 # re-raise the original error.
415 415 seekerror = False
416 416 try:
417 417 if self.current:
418 418 # consume the part content to not corrupt the stream.
419 419 self.current.consume()
420 420
421 421 for part in self.iterator:
422 422 # consume the bundle content
423 423 part.consume()
424 424 except Exception:
425 425 seekerror = True
426 426
427 427 # Small hack to let caller code distinguish exceptions from bundle2
428 428 # processing from processing the old format. This is mostly needed
429 429 # to handle different return codes to unbundle according to the type
430 430 # of bundle. We should probably clean up or drop this return code
431 431 # craziness in a future version.
432 432 exc.duringunbundle2 = True
433 433 salvaged = []
434 434 replycaps = None
435 435 if self.op.reply is not None:
436 436 salvaged = self.op.reply.salvageoutput()
437 437 replycaps = self.op.reply.capabilities
438 438 exc._replycaps = replycaps
439 439 exc._bundle2salvagedoutput = salvaged
440 440
441 441 # Re-raising from a variable loses the original stack. So only use
442 442 # that form if we need to.
443 443 if seekerror:
444 444 raise exc
445 445
446 446 self.repo.ui.debug(
447 447 b'bundle2-input-bundle: %i parts total\n' % self.count
448 448 )
449 449
450 450
451 451 def processbundle(repo, unbundler, transactiongetter=None, op=None, source=b''):
452 452 """This function process a bundle, apply effect to/from a repo
453 453
454 454 It iterates over each part then searches for and uses the proper handling
455 455 code to process the part. Parts are processed in order.
456 456
457 457 Unknown Mandatory part will abort the process.
458 458
459 459 It is temporarily possible to provide a prebuilt bundleoperation to the
460 460 function. This is used to ensure output is properly propagated in case of
461 461 an error during the unbundling. This output capturing part will likely be
462 462 reworked and this ability will probably go away in the process.
463 463 """
464 464 if op is None:
465 465 if transactiongetter is None:
466 466 transactiongetter = _notransaction
467 467 op = bundleoperation(repo, transactiongetter, source=source)
468 468 # todo:
469 469 # - replace this is a init function soon.
470 470 # - exception catching
471 471 unbundler.params
472 472 if repo.ui.debugflag:
473 473 msg = [b'bundle2-input-bundle:']
474 474 if unbundler.params:
475 475 msg.append(b' %i params' % len(unbundler.params))
476 476 if op._gettransaction is None or op._gettransaction is _notransaction:
477 477 msg.append(b' no-transaction')
478 478 else:
479 479 msg.append(b' with-transaction')
480 480 msg.append(b'\n')
481 481 repo.ui.debug(b''.join(msg))
482 482
483 483 processparts(repo, op, unbundler)
484 484
485 485 return op
486 486
487 487
488 488 def processparts(repo, op, unbundler):
489 489 with partiterator(repo, op, unbundler) as parts:
490 490 for part in parts:
491 491 _processpart(op, part)
492 492
493 493
494 494 def _processchangegroup(op, cg, tr, source, url, **kwargs):
495 495 ret = cg.apply(op.repo, tr, source, url, **kwargs)
496 496 op.records.add(
497 497 b'changegroup',
498 498 {
499 499 b'return': ret,
500 500 },
501 501 )
502 502 return ret
503 503
504 504
505 505 def _gethandler(op, part):
506 506 status = b'unknown' # used by debug output
507 507 try:
508 508 handler = parthandlermapping.get(part.type)
509 509 if handler is None:
510 510 status = b'unsupported-type'
511 511 raise error.BundleUnknownFeatureError(parttype=part.type)
512 512 indebug(op.ui, b'found a handler for part %s' % part.type)
513 513 unknownparams = part.mandatorykeys - handler.params
514 514 if unknownparams:
515 515 unknownparams = list(unknownparams)
516 516 unknownparams.sort()
517 517 status = b'unsupported-params (%s)' % b', '.join(unknownparams)
518 518 raise error.BundleUnknownFeatureError(
519 519 parttype=part.type, params=unknownparams
520 520 )
521 521 status = b'supported'
522 522 except error.BundleUnknownFeatureError as exc:
523 523 if part.mandatory: # mandatory parts
524 524 raise
525 525 indebug(op.ui, b'ignoring unsupported advisory part %s' % exc)
526 526 return # skip to part processing
527 527 finally:
528 528 if op.ui.debugflag:
529 529 msg = [b'bundle2-input-part: "%s"' % part.type]
530 530 if not part.mandatory:
531 531 msg.append(b' (advisory)')
532 532 nbmp = len(part.mandatorykeys)
533 533 nbap = len(part.params) - nbmp
534 534 if nbmp or nbap:
535 535 msg.append(b' (params:')
536 536 if nbmp:
537 537 msg.append(b' %i mandatory' % nbmp)
538 538 if nbap:
539 539 msg.append(b' %i advisory' % nbmp)
540 540 msg.append(b')')
541 541 msg.append(b' %s\n' % status)
542 542 op.ui.debug(b''.join(msg))
543 543
544 544 return handler
545 545
546 546
547 547 def _processpart(op, part):
548 548 """process a single part from a bundle
549 549
550 550 The part is guaranteed to have been fully consumed when the function exits
551 551 (even if an exception is raised)."""
552 552 handler = _gethandler(op, part)
553 553 if handler is None:
554 554 return
555 555
556 556 # handler is called outside the above try block so that we don't
557 557 # risk catching KeyErrors from anything other than the
558 558 # parthandlermapping lookup (any KeyError raised by handler()
559 559 # itself represents a defect of a different variety).
560 560 output = None
561 561 if op.captureoutput and op.reply is not None:
562 562 op.ui.pushbuffer(error=True, subproc=True)
563 563 output = b''
564 564 try:
565 565 handler(op, part)
566 566 finally:
567 567 if output is not None:
568 568 output = op.ui.popbuffer()
569 569 if output:
570 570 outpart = op.reply.newpart(b'output', data=output, mandatory=False)
571 571 outpart.addparam(
572 572 b'in-reply-to', pycompat.bytestr(part.id), mandatory=False
573 573 )
574 574
575 575
576 576 def decodecaps(blob):
577 577 """decode a bundle2 caps bytes blob into a dictionary
578 578
579 579 The blob is a list of capabilities (one per line)
580 580 Capabilities may have values using a line of the form::
581 581
582 582 capability=value1,value2,value3
583 583
584 584 The values are always a list."""
585 585 caps = {}
586 586 for line in blob.splitlines():
587 587 if not line:
588 588 continue
589 589 if b'=' not in line:
590 590 key, vals = line, ()
591 591 else:
592 592 key, vals = line.split(b'=', 1)
593 593 vals = vals.split(b',')
594 594 key = urlreq.unquote(key)
595 595 vals = [urlreq.unquote(v) for v in vals]
596 596 caps[key] = vals
597 597 return caps
598 598
599 599
600 600 def encodecaps(caps):
601 601 """encode a bundle2 caps dictionary into a bytes blob"""
602 602 chunks = []
603 603 for ca in sorted(caps):
604 604 vals = caps[ca]
605 605 ca = urlreq.quote(ca)
606 606 vals = [urlreq.quote(v) for v in vals]
607 607 if vals:
608 608 ca = b"%s=%s" % (ca, b','.join(vals))
609 609 chunks.append(ca)
610 610 return b'\n'.join(chunks)
611 611
612 612
613 613 bundletypes = {
614 614 b"": (b"", b'UN'), # only when using unbundle on ssh and old http servers
615 615 # since the unification ssh accepts a header but there
616 616 # is no capability signaling it.
617 617 b"HG20": (), # special-cased below
618 618 b"HG10UN": (b"HG10UN", b'UN'),
619 619 b"HG10BZ": (b"HG10", b'BZ'),
620 620 b"HG10GZ": (b"HG10GZ", b'GZ'),
621 621 }
622 622
623 623 # hgweb uses this list to communicate its preferred type
624 624 bundlepriority = [b'HG10GZ', b'HG10BZ', b'HG10UN']
625 625
626 626
627 627 class bundle20(object):
628 628 """represent an outgoing bundle2 container
629 629
630 630 Use the `addparam` method to add stream level parameter. and `newpart` to
631 631 populate it. Then call `getchunks` to retrieve all the binary chunks of
632 632 data that compose the bundle2 container."""
633 633
634 634 _magicstring = b'HG20'
635 635
636 636 def __init__(self, ui, capabilities=()):
637 637 self.ui = ui
638 638 self._params = []
639 639 self._parts = []
640 640 self.capabilities = dict(capabilities)
641 641 self._compengine = util.compengines.forbundletype(b'UN')
642 642 self._compopts = None
643 643 # If compression is being handled by a consumer of the raw
644 644 # data (e.g. the wire protocol), unsetting this flag tells
645 645 # consumers that the bundle is best left uncompressed.
646 646 self.prefercompressed = True
647 647
648 648 def setcompression(self, alg, compopts=None):
649 649 """setup core part compression to <alg>"""
650 650 if alg in (None, b'UN'):
651 651 return
652 652 assert not any(n.lower() == b'compression' for n, v in self._params)
653 653 self.addparam(b'Compression', alg)
654 654 self._compengine = util.compengines.forbundletype(alg)
655 655 self._compopts = compopts
656 656
657 657 @property
658 658 def nbparts(self):
659 659 """total number of parts added to the bundler"""
660 660 return len(self._parts)
661 661
662 662 # methods used to defines the bundle2 content
663 663 def addparam(self, name, value=None):
664 664 """add a stream level parameter"""
665 665 if not name:
666 666 raise error.ProgrammingError(b'empty parameter name')
667 667 if name[0:1] not in pycompat.bytestr(
668 668 string.ascii_letters # pytype: disable=wrong-arg-types
669 669 ):
670 670 raise error.ProgrammingError(
671 671 b'non letter first character: %s' % name
672 672 )
673 673 self._params.append((name, value))
674 674
675 675 def addpart(self, part):
676 676 """add a new part to the bundle2 container
677 677
678 678 Parts contains the actual applicative payload."""
679 679 assert part.id is None
680 680 part.id = len(self._parts) # very cheap counter
681 681 self._parts.append(part)
682 682
683 683 def newpart(self, typeid, *args, **kwargs):
684 684 """create a new part and add it to the containers
685 685
686 686 As the part is directly added to the containers. For now, this means
687 687 that any failure to properly initialize the part after calling
688 688 ``newpart`` should result in a failure of the whole bundling process.
689 689
690 690 You can still fall back to manually create and add if you need better
691 691 control."""
692 692 part = bundlepart(typeid, *args, **kwargs)
693 693 self.addpart(part)
694 694 return part
695 695
696 696 # methods used to generate the bundle2 stream
697 697 def getchunks(self):
698 698 if self.ui.debugflag:
699 699 msg = [b'bundle2-output-bundle: "%s",' % self._magicstring]
700 700 if self._params:
701 701 msg.append(b' (%i params)' % len(self._params))
702 702 msg.append(b' %i parts total\n' % len(self._parts))
703 703 self.ui.debug(b''.join(msg))
704 704 outdebug(self.ui, b'start emission of %s stream' % self._magicstring)
705 705 yield self._magicstring
706 706 param = self._paramchunk()
707 707 outdebug(self.ui, b'bundle parameter: %s' % param)
708 708 yield _pack(_fstreamparamsize, len(param))
709 709 if param:
710 710 yield param
711 711 for chunk in self._compengine.compressstream(
712 712 self._getcorechunk(), self._compopts
713 713 ):
714 714 yield chunk
715 715
716 716 def _paramchunk(self):
717 717 """return a encoded version of all stream parameters"""
718 718 blocks = []
719 719 for par, value in self._params:
720 720 par = urlreq.quote(par)
721 721 if value is not None:
722 722 value = urlreq.quote(value)
723 723 par = b'%s=%s' % (par, value)
724 724 blocks.append(par)
725 725 return b' '.join(blocks)
726 726
727 727 def _getcorechunk(self):
728 728 """yield chunk for the core part of the bundle
729 729
730 730 (all but headers and parameters)"""
731 731 outdebug(self.ui, b'start of parts')
732 732 for part in self._parts:
733 733 outdebug(self.ui, b'bundle part: "%s"' % part.type)
734 734 for chunk in part.getchunks(ui=self.ui):
735 735 yield chunk
736 736 outdebug(self.ui, b'end of bundle')
737 737 yield _pack(_fpartheadersize, 0)
738 738
739 739 def salvageoutput(self):
740 740 """return a list with a copy of all output parts in the bundle
741 741
742 742 This is meant to be used during error handling to make sure we preserve
743 743 server output"""
744 744 salvaged = []
745 745 for part in self._parts:
746 746 if part.type.startswith(b'output'):
747 747 salvaged.append(part.copy())
748 748 return salvaged
749 749
750 750
751 751 class unpackermixin(object):
752 752 """A mixin to extract bytes and struct data from a stream"""
753 753
754 754 def __init__(self, fp):
755 755 self._fp = fp
756 756
757 757 def _unpack(self, format):
758 758 """unpack this struct format from the stream
759 759
760 760 This method is meant for internal usage by the bundle2 protocol only.
761 761 They directly manipulate the low level stream including bundle2 level
762 762 instruction.
763 763
764 764 Do not use it to implement higher-level logic or methods."""
765 765 data = self._readexact(struct.calcsize(format))
766 766 return _unpack(format, data)
767 767
768 768 def _readexact(self, size):
769 769 """read exactly <size> bytes from the stream
770 770
771 771 This method is meant for internal usage by the bundle2 protocol only.
772 772 They directly manipulate the low level stream including bundle2 level
773 773 instruction.
774 774
775 775 Do not use it to implement higher-level logic or methods."""
776 776 return changegroup.readexactly(self._fp, size)
777 777
778 778
779 779 def getunbundler(ui, fp, magicstring=None):
780 780 """return a valid unbundler object for a given magicstring"""
781 781 if magicstring is None:
782 782 magicstring = changegroup.readexactly(fp, 4)
783 783 magic, version = magicstring[0:2], magicstring[2:4]
784 784 if magic != b'HG':
785 785 ui.debug(
786 786 b"error: invalid magic: %r (version %r), should be 'HG'\n"
787 787 % (magic, version)
788 788 )
789 789 raise error.Abort(_(b'not a Mercurial bundle'))
790 790 unbundlerclass = formatmap.get(version)
791 791 if unbundlerclass is None:
792 792 raise error.Abort(_(b'unknown bundle version %s') % version)
793 793 unbundler = unbundlerclass(ui, fp)
794 794 indebug(ui, b'start processing of %s stream' % magicstring)
795 795 return unbundler
796 796
797 797
798 798 class unbundle20(unpackermixin):
799 799 """interpret a bundle2 stream
800 800
801 801 This class is fed with a binary stream and yields parts through its
802 802 `iterparts` methods."""
803 803
804 804 _magicstring = b'HG20'
805 805
806 806 def __init__(self, ui, fp):
807 807 """If header is specified, we do not read it out of the stream."""
808 808 self.ui = ui
809 809 self._compengine = util.compengines.forbundletype(b'UN')
810 810 self._compressed = None
811 811 super(unbundle20, self).__init__(fp)
812 812
813 813 @util.propertycache
814 814 def params(self):
815 815 """dictionary of stream level parameters"""
816 816 indebug(self.ui, b'reading bundle2 stream parameters')
817 817 params = {}
818 818 paramssize = self._unpack(_fstreamparamsize)[0]
819 819 if paramssize < 0:
820 820 raise error.BundleValueError(
821 821 b'negative bundle param size: %i' % paramssize
822 822 )
823 823 if paramssize:
824 824 params = self._readexact(paramssize)
825 825 params = self._processallparams(params)
826 826 return params
827 827
828 828 def _processallparams(self, paramsblock):
829 829 """"""
830 830 params = util.sortdict()
831 831 for p in paramsblock.split(b' '):
832 832 p = p.split(b'=', 1)
833 833 p = [urlreq.unquote(i) for i in p]
834 834 if len(p) < 2:
835 835 p.append(None)
836 836 self._processparam(*p)
837 837 params[p[0]] = p[1]
838 838 return params
839 839
840 840 def _processparam(self, name, value):
841 841 """process a parameter, applying its effect if needed
842 842
843 843 Parameter starting with a lower case letter are advisory and will be
844 844 ignored when unknown. Those starting with an upper case letter are
845 845 mandatory and will this function will raise a KeyError when unknown.
846 846
847 847 Note: no option are currently supported. Any input will be either
848 848 ignored or failing.
849 849 """
850 850 if not name:
851 851 raise ValueError('empty parameter name')
852 852 if name[0:1] not in pycompat.bytestr(
853 853 string.ascii_letters # pytype: disable=wrong-arg-types
854 854 ):
855 855 raise ValueError('non letter first character: %s' % name)
856 856 try:
857 857 handler = b2streamparamsmap[name.lower()]
858 858 except KeyError:
859 859 if name[0:1].islower():
860 860 indebug(self.ui, b"ignoring unknown parameter %s" % name)
861 861 else:
862 862 raise error.BundleUnknownFeatureError(params=(name,))
863 863 else:
864 864 handler(self, name, value)
865 865
866 866 def _forwardchunks(self):
867 867 """utility to transfer a bundle2 as binary
868 868
869 869 This is made necessary by the fact the 'getbundle' command over 'ssh'
870 870 have no way to know then the reply end, relying on the bundle to be
871 871 interpreted to know its end. This is terrible and we are sorry, but we
872 872 needed to move forward to get general delta enabled.
873 873 """
874 874 yield self._magicstring
875 875 assert 'params' not in vars(self)
876 876 paramssize = self._unpack(_fstreamparamsize)[0]
877 877 if paramssize < 0:
878 878 raise error.BundleValueError(
879 879 b'negative bundle param size: %i' % paramssize
880 880 )
881 881 if paramssize:
882 882 params = self._readexact(paramssize)
883 883 self._processallparams(params)
884 884 # The payload itself is decompressed below, so drop
885 885 # the compression parameter passed down to compensate.
886 886 outparams = []
887 887 for p in params.split(b' '):
888 888 k, v = p.split(b'=', 1)
889 889 if k.lower() != b'compression':
890 890 outparams.append(p)
891 891 outparams = b' '.join(outparams)
892 892 yield _pack(_fstreamparamsize, len(outparams))
893 893 yield outparams
894 894 else:
895 895 yield _pack(_fstreamparamsize, paramssize)
896 896 # From there, payload might need to be decompressed
897 897 self._fp = self._compengine.decompressorreader(self._fp)
898 898 emptycount = 0
899 899 while emptycount < 2:
900 900 # so we can brainlessly loop
901 901 assert _fpartheadersize == _fpayloadsize
902 902 size = self._unpack(_fpartheadersize)[0]
903 903 yield _pack(_fpartheadersize, size)
904 904 if size:
905 905 emptycount = 0
906 906 else:
907 907 emptycount += 1
908 908 continue
909 909 if size == flaginterrupt:
910 910 continue
911 911 elif size < 0:
912 912 raise error.BundleValueError(b'negative chunk size: %i')
913 913 yield self._readexact(size)
914 914
915 915 def iterparts(self, seekable=False):
916 916 """yield all parts contained in the stream"""
917 917 cls = seekableunbundlepart if seekable else unbundlepart
918 918 # make sure param have been loaded
919 919 self.params
920 920 # From there, payload need to be decompressed
921 921 self._fp = self._compengine.decompressorreader(self._fp)
922 922 indebug(self.ui, b'start extraction of bundle2 parts')
923 923 headerblock = self._readpartheader()
924 924 while headerblock is not None:
925 925 part = cls(self.ui, headerblock, self._fp)
926 926 yield part
927 927 # Ensure part is fully consumed so we can start reading the next
928 928 # part.
929 929 part.consume()
930 930
931 931 headerblock = self._readpartheader()
932 932 indebug(self.ui, b'end of bundle2 stream')
933 933
934 934 def _readpartheader(self):
935 935 """reads a part header size and return the bytes blob
936 936
937 937 returns None if empty"""
938 938 headersize = self._unpack(_fpartheadersize)[0]
939 939 if headersize < 0:
940 940 raise error.BundleValueError(
941 941 b'negative part header size: %i' % headersize
942 942 )
943 943 indebug(self.ui, b'part header size: %i' % headersize)
944 944 if headersize:
945 945 return self._readexact(headersize)
946 946 return None
947 947
948 948 def compressed(self):
949 949 self.params # load params
950 950 return self._compressed
951 951
952 952 def close(self):
953 953 """close underlying file"""
954 954 if util.safehasattr(self._fp, 'close'):
955 955 return self._fp.close()
956 956
957 957
958 958 formatmap = {b'20': unbundle20}
959 959
960 960 b2streamparamsmap = {}
961 961
962 962
963 963 def b2streamparamhandler(name):
964 964 """register a handler for a stream level parameter"""
965 965
966 966 def decorator(func):
967 967 assert name not in formatmap
968 968 b2streamparamsmap[name] = func
969 969 return func
970 970
971 971 return decorator
972 972
973 973
974 974 @b2streamparamhandler(b'compression')
975 975 def processcompression(unbundler, param, value):
976 976 """read compression parameter and install payload decompression"""
977 977 if value not in util.compengines.supportedbundletypes:
978 978 raise error.BundleUnknownFeatureError(params=(param,), values=(value,))
979 979 unbundler._compengine = util.compengines.forbundletype(value)
980 980 if value is not None:
981 981 unbundler._compressed = True
982 982
983 983
984 984 class bundlepart(object):
985 985 """A bundle2 part contains application level payload
986 986
987 987 The part `type` is used to route the part to the application level
988 988 handler.
989 989
990 990 The part payload is contained in ``part.data``. It could be raw bytes or a
991 991 generator of byte chunks.
992 992
993 993 You can add parameters to the part using the ``addparam`` method.
994 994 Parameters can be either mandatory (default) or advisory. Remote side
995 995 should be able to safely ignore the advisory ones.
996 996
997 997 Both data and parameters cannot be modified after the generation has begun.
998 998 """
999 999
1000 1000 def __init__(
1001 1001 self,
1002 1002 parttype,
1003 1003 mandatoryparams=(),
1004 1004 advisoryparams=(),
1005 1005 data=b'',
1006 1006 mandatory=True,
1007 1007 ):
1008 1008 validateparttype(parttype)
1009 1009 self.id = None
1010 1010 self.type = parttype
1011 1011 self._data = data
1012 1012 self._mandatoryparams = list(mandatoryparams)
1013 1013 self._advisoryparams = list(advisoryparams)
1014 1014 # checking for duplicated entries
1015 1015 self._seenparams = set()
1016 1016 for pname, __ in self._mandatoryparams + self._advisoryparams:
1017 1017 if pname in self._seenparams:
1018 1018 raise error.ProgrammingError(b'duplicated params: %s' % pname)
1019 1019 self._seenparams.add(pname)
1020 1020 # status of the part's generation:
1021 1021 # - None: not started,
1022 1022 # - False: currently generated,
1023 1023 # - True: generation done.
1024 1024 self._generated = None
1025 1025 self.mandatory = mandatory
1026 1026
1027 1027 def __repr__(self):
1028 1028 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
1029 1029 return '<%s object at %x; id: %s; type: %s; mandatory: %s>' % (
1030 1030 cls,
1031 1031 id(self),
1032 1032 self.id,
1033 1033 self.type,
1034 1034 self.mandatory,
1035 1035 )
1036 1036
1037 1037 def copy(self):
1038 1038 """return a copy of the part
1039 1039
1040 1040 The new part have the very same content but no partid assigned yet.
1041 1041 Parts with generated data cannot be copied."""
1042 1042 assert not util.safehasattr(self.data, 'next')
1043 1043 return self.__class__(
1044 1044 self.type,
1045 1045 self._mandatoryparams,
1046 1046 self._advisoryparams,
1047 1047 self._data,
1048 1048 self.mandatory,
1049 1049 )
1050 1050
1051 1051 # methods used to defines the part content
1052 1052 @property
1053 1053 def data(self):
1054 1054 return self._data
1055 1055
1056 1056 @data.setter
1057 1057 def data(self, data):
1058 1058 if self._generated is not None:
1059 1059 raise error.ReadOnlyPartError(b'part is being generated')
1060 1060 self._data = data
1061 1061
1062 1062 @property
1063 1063 def mandatoryparams(self):
1064 1064 # make it an immutable tuple to force people through ``addparam``
1065 1065 return tuple(self._mandatoryparams)
1066 1066
1067 1067 @property
1068 1068 def advisoryparams(self):
1069 1069 # make it an immutable tuple to force people through ``addparam``
1070 1070 return tuple(self._advisoryparams)
1071 1071
1072 1072 def addparam(self, name, value=b'', mandatory=True):
1073 1073 """add a parameter to the part
1074 1074
1075 1075 If 'mandatory' is set to True, the remote handler must claim support
1076 1076 for this parameter or the unbundling will be aborted.
1077 1077
1078 1078 The 'name' and 'value' cannot exceed 255 bytes each.
1079 1079 """
1080 1080 if self._generated is not None:
1081 1081 raise error.ReadOnlyPartError(b'part is being generated')
1082 1082 if name in self._seenparams:
1083 1083 raise ValueError(b'duplicated params: %s' % name)
1084 1084 self._seenparams.add(name)
1085 1085 params = self._advisoryparams
1086 1086 if mandatory:
1087 1087 params = self._mandatoryparams
1088 1088 params.append((name, value))
1089 1089
1090 1090 # methods used to generates the bundle2 stream
1091 1091 def getchunks(self, ui):
1092 1092 if self._generated is not None:
1093 1093 raise error.ProgrammingError(b'part can only be consumed once')
1094 1094 self._generated = False
1095 1095
1096 1096 if ui.debugflag:
1097 1097 msg = [b'bundle2-output-part: "%s"' % self.type]
1098 1098 if not self.mandatory:
1099 1099 msg.append(b' (advisory)')
1100 1100 nbmp = len(self.mandatoryparams)
1101 1101 nbap = len(self.advisoryparams)
1102 1102 if nbmp or nbap:
1103 1103 msg.append(b' (params:')
1104 1104 if nbmp:
1105 1105 msg.append(b' %i mandatory' % nbmp)
1106 1106 if nbap:
1107 1107 msg.append(b' %i advisory' % nbmp)
1108 1108 msg.append(b')')
1109 1109 if not self.data:
1110 1110 msg.append(b' empty payload')
1111 1111 elif util.safehasattr(self.data, 'next') or util.safehasattr(
1112 1112 self.data, b'__next__'
1113 1113 ):
1114 1114 msg.append(b' streamed payload')
1115 1115 else:
1116 1116 msg.append(b' %i bytes payload' % len(self.data))
1117 1117 msg.append(b'\n')
1118 1118 ui.debug(b''.join(msg))
1119 1119
1120 1120 #### header
1121 1121 if self.mandatory:
1122 1122 parttype = self.type.upper()
1123 1123 else:
1124 1124 parttype = self.type.lower()
1125 1125 outdebug(ui, b'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1126 1126 ## parttype
1127 1127 header = [
1128 1128 _pack(_fparttypesize, len(parttype)),
1129 1129 parttype,
1130 1130 _pack(_fpartid, self.id),
1131 1131 ]
1132 1132 ## parameters
1133 1133 # count
1134 1134 manpar = self.mandatoryparams
1135 1135 advpar = self.advisoryparams
1136 1136 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1137 1137 # size
1138 1138 parsizes = []
1139 1139 for key, value in manpar:
1140 1140 parsizes.append(len(key))
1141 1141 parsizes.append(len(value))
1142 1142 for key, value in advpar:
1143 1143 parsizes.append(len(key))
1144 1144 parsizes.append(len(value))
1145 1145 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1146 1146 header.append(paramsizes)
1147 1147 # key, value
1148 1148 for key, value in manpar:
1149 1149 header.append(key)
1150 1150 header.append(value)
1151 1151 for key, value in advpar:
1152 1152 header.append(key)
1153 1153 header.append(value)
1154 1154 ## finalize header
1155 1155 try:
1156 1156 headerchunk = b''.join(header)
1157 1157 except TypeError:
1158 1158 raise TypeError(
1159 1159 'Found a non-bytes trying to '
1160 1160 'build bundle part header: %r' % header
1161 1161 )
1162 1162 outdebug(ui, b'header chunk size: %i' % len(headerchunk))
1163 1163 yield _pack(_fpartheadersize, len(headerchunk))
1164 1164 yield headerchunk
1165 1165 ## payload
1166 1166 try:
1167 1167 for chunk in self._payloadchunks():
1168 1168 outdebug(ui, b'payload chunk size: %i' % len(chunk))
1169 1169 yield _pack(_fpayloadsize, len(chunk))
1170 1170 yield chunk
1171 1171 except GeneratorExit:
1172 1172 # GeneratorExit means that nobody is listening for our
1173 1173 # results anyway, so just bail quickly rather than trying
1174 1174 # to produce an error part.
1175 1175 ui.debug(b'bundle2-generatorexit\n')
1176 1176 raise
1177 1177 except BaseException as exc:
1178 1178 bexc = stringutil.forcebytestr(exc)
1179 1179 # backup exception data for later
1180 1180 ui.debug(
1181 1181 b'bundle2-input-stream-interrupt: encoding exception %s' % bexc
1182 1182 )
1183 1183 tb = sys.exc_info()[2]
1184 1184 msg = b'unexpected error: %s' % bexc
1185 1185 interpart = bundlepart(
1186 1186 b'error:abort', [(b'message', msg)], mandatory=False
1187 1187 )
1188 1188 interpart.id = 0
1189 1189 yield _pack(_fpayloadsize, -1)
1190 1190 for chunk in interpart.getchunks(ui=ui):
1191 1191 yield chunk
1192 1192 outdebug(ui, b'closing payload chunk')
1193 1193 # abort current part payload
1194 1194 yield _pack(_fpayloadsize, 0)
1195 1195 pycompat.raisewithtb(exc, tb)
1196 1196 # end of payload
1197 1197 outdebug(ui, b'closing payload chunk')
1198 1198 yield _pack(_fpayloadsize, 0)
1199 1199 self._generated = True
1200 1200
1201 1201 def _payloadchunks(self):
1202 1202 """yield chunks of a the part payload
1203 1203
1204 1204 Exists to handle the different methods to provide data to a part."""
1205 1205 # we only support fixed size data now.
1206 1206 # This will be improved in the future.
1207 1207 if util.safehasattr(self.data, 'next') or util.safehasattr(
1208 1208 self.data, b'__next__'
1209 1209 ):
1210 1210 buff = util.chunkbuffer(self.data)
1211 1211 chunk = buff.read(preferedchunksize)
1212 1212 while chunk:
1213 1213 yield chunk
1214 1214 chunk = buff.read(preferedchunksize)
1215 1215 elif len(self.data):
1216 1216 yield self.data
1217 1217
1218 1218
1219 1219 flaginterrupt = -1
1220 1220
1221 1221
1222 1222 class interrupthandler(unpackermixin):
1223 1223 """read one part and process it with restricted capability
1224 1224
1225 1225 This allows to transmit exception raised on the producer size during part
1226 1226 iteration while the consumer is reading a part.
1227 1227
1228 1228 Part processed in this manner only have access to a ui object,"""
1229 1229
1230 1230 def __init__(self, ui, fp):
1231 1231 super(interrupthandler, self).__init__(fp)
1232 1232 self.ui = ui
1233 1233
1234 1234 def _readpartheader(self):
1235 1235 """reads a part header size and return the bytes blob
1236 1236
1237 1237 returns None if empty"""
1238 1238 headersize = self._unpack(_fpartheadersize)[0]
1239 1239 if headersize < 0:
1240 1240 raise error.BundleValueError(
1241 1241 b'negative part header size: %i' % headersize
1242 1242 )
1243 1243 indebug(self.ui, b'part header size: %i\n' % headersize)
1244 1244 if headersize:
1245 1245 return self._readexact(headersize)
1246 1246 return None
1247 1247
1248 1248 def __call__(self):
1249 1249
1250 1250 self.ui.debug(
1251 1251 b'bundle2-input-stream-interrupt: opening out of band context\n'
1252 1252 )
1253 1253 indebug(self.ui, b'bundle2 stream interruption, looking for a part.')
1254 1254 headerblock = self._readpartheader()
1255 1255 if headerblock is None:
1256 1256 indebug(self.ui, b'no part found during interruption.')
1257 1257 return
1258 1258 part = unbundlepart(self.ui, headerblock, self._fp)
1259 1259 op = interruptoperation(self.ui)
1260 1260 hardabort = False
1261 1261 try:
1262 1262 _processpart(op, part)
1263 1263 except (SystemExit, KeyboardInterrupt):
1264 1264 hardabort = True
1265 1265 raise
1266 1266 finally:
1267 1267 if not hardabort:
1268 1268 part.consume()
1269 1269 self.ui.debug(
1270 1270 b'bundle2-input-stream-interrupt: closing out of band context\n'
1271 1271 )
1272 1272
1273 1273
1274 1274 class interruptoperation(object):
1275 1275 """A limited operation to be use by part handler during interruption
1276 1276
1277 1277 It only have access to an ui object.
1278 1278 """
1279 1279
1280 1280 def __init__(self, ui):
1281 1281 self.ui = ui
1282 1282 self.reply = None
1283 1283 self.captureoutput = False
1284 1284
1285 1285 @property
1286 1286 def repo(self):
1287 1287 raise error.ProgrammingError(b'no repo access from stream interruption')
1288 1288
1289 1289 def gettransaction(self):
1290 1290 raise TransactionUnavailable(b'no repo access from stream interruption')
1291 1291
1292 1292
1293 1293 def decodepayloadchunks(ui, fh):
1294 1294 """Reads bundle2 part payload data into chunks.
1295 1295
1296 1296 Part payload data consists of framed chunks. This function takes
1297 1297 a file handle and emits those chunks.
1298 1298 """
1299 1299 dolog = ui.configbool(b'devel', b'bundle2.debug')
1300 1300 debug = ui.debug
1301 1301
1302 1302 headerstruct = struct.Struct(_fpayloadsize)
1303 1303 headersize = headerstruct.size
1304 1304 unpack = headerstruct.unpack
1305 1305
1306 1306 readexactly = changegroup.readexactly
1307 1307 read = fh.read
1308 1308
1309 1309 chunksize = unpack(readexactly(fh, headersize))[0]
1310 1310 indebug(ui, b'payload chunk size: %i' % chunksize)
1311 1311
1312 1312 # changegroup.readexactly() is inlined below for performance.
1313 1313 while chunksize:
1314 1314 if chunksize >= 0:
1315 1315 s = read(chunksize)
1316 1316 if len(s) < chunksize:
1317 1317 raise error.Abort(
1318 1318 _(
1319 1319 b'stream ended unexpectedly '
1320 1320 b' (got %d bytes, expected %d)'
1321 1321 )
1322 1322 % (len(s), chunksize)
1323 1323 )
1324 1324
1325 1325 yield s
1326 1326 elif chunksize == flaginterrupt:
1327 1327 # Interrupt "signal" detected. The regular stream is interrupted
1328 1328 # and a bundle2 part follows. Consume it.
1329 1329 interrupthandler(ui, fh)()
1330 1330 else:
1331 1331 raise error.BundleValueError(
1332 1332 b'negative payload chunk size: %s' % chunksize
1333 1333 )
1334 1334
1335 1335 s = read(headersize)
1336 1336 if len(s) < headersize:
1337 1337 raise error.Abort(
1338 1338 _(b'stream ended unexpectedly (got %d bytes, expected %d)')
1339 1339 % (len(s), chunksize)
1340 1340 )
1341 1341
1342 1342 chunksize = unpack(s)[0]
1343 1343
1344 1344 # indebug() inlined for performance.
1345 1345 if dolog:
1346 1346 debug(b'bundle2-input: payload chunk size: %i\n' % chunksize)
1347 1347
1348 1348
1349 1349 class unbundlepart(unpackermixin):
1350 1350 """a bundle part read from a bundle"""
1351 1351
1352 1352 def __init__(self, ui, header, fp):
1353 1353 super(unbundlepart, self).__init__(fp)
1354 1354 self._seekable = util.safehasattr(fp, 'seek') and util.safehasattr(
1355 1355 fp, b'tell'
1356 1356 )
1357 1357 self.ui = ui
1358 1358 # unbundle state attr
1359 1359 self._headerdata = header
1360 1360 self._headeroffset = 0
1361 1361 self._initialized = False
1362 1362 self.consumed = False
1363 1363 # part data
1364 1364 self.id = None
1365 1365 self.type = None
1366 1366 self.mandatoryparams = None
1367 1367 self.advisoryparams = None
1368 1368 self.params = None
1369 1369 self.mandatorykeys = ()
1370 1370 self._readheader()
1371 1371 self._mandatory = None
1372 1372 self._pos = 0
1373 1373
1374 1374 def _fromheader(self, size):
1375 1375 """return the next <size> byte from the header"""
1376 1376 offset = self._headeroffset
1377 1377 data = self._headerdata[offset : (offset + size)]
1378 1378 self._headeroffset = offset + size
1379 1379 return data
1380 1380
1381 1381 def _unpackheader(self, format):
1382 1382 """read given format from header
1383 1383
1384 1384 This automatically compute the size of the format to read."""
1385 1385 data = self._fromheader(struct.calcsize(format))
1386 1386 return _unpack(format, data)
1387 1387
1388 1388 def _initparams(self, mandatoryparams, advisoryparams):
1389 1389 """internal function to setup all logic related parameters"""
1390 1390 # make it read only to prevent people touching it by mistake.
1391 1391 self.mandatoryparams = tuple(mandatoryparams)
1392 1392 self.advisoryparams = tuple(advisoryparams)
1393 1393 # user friendly UI
1394 1394 self.params = util.sortdict(self.mandatoryparams)
1395 1395 self.params.update(self.advisoryparams)
1396 1396 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1397 1397
1398 1398 def _readheader(self):
1399 1399 """read the header and setup the object"""
1400 1400 typesize = self._unpackheader(_fparttypesize)[0]
1401 1401 self.type = self._fromheader(typesize)
1402 1402 indebug(self.ui, b'part type: "%s"' % self.type)
1403 1403 self.id = self._unpackheader(_fpartid)[0]
1404 1404 indebug(self.ui, b'part id: "%s"' % pycompat.bytestr(self.id))
1405 1405 # extract mandatory bit from type
1406 1406 self.mandatory = self.type != self.type.lower()
1407 1407 self.type = self.type.lower()
1408 1408 ## reading parameters
1409 1409 # param count
1410 1410 mancount, advcount = self._unpackheader(_fpartparamcount)
1411 1411 indebug(self.ui, b'part parameters: %i' % (mancount + advcount))
1412 1412 # param size
1413 1413 fparamsizes = _makefpartparamsizes(mancount + advcount)
1414 1414 paramsizes = self._unpackheader(fparamsizes)
1415 1415 # make it a list of couple again
1416 1416 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1417 1417 # split mandatory from advisory
1418 1418 mansizes = paramsizes[:mancount]
1419 1419 advsizes = paramsizes[mancount:]
1420 1420 # retrieve param value
1421 1421 manparams = []
1422 1422 for key, value in mansizes:
1423 1423 manparams.append((self._fromheader(key), self._fromheader(value)))
1424 1424 advparams = []
1425 1425 for key, value in advsizes:
1426 1426 advparams.append((self._fromheader(key), self._fromheader(value)))
1427 1427 self._initparams(manparams, advparams)
1428 1428 ## part payload
1429 1429 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1430 1430 # we read the data, tell it
1431 1431 self._initialized = True
1432 1432
1433 1433 def _payloadchunks(self):
1434 1434 """Generator of decoded chunks in the payload."""
1435 1435 return decodepayloadchunks(self.ui, self._fp)
1436 1436
1437 1437 def consume(self):
1438 1438 """Read the part payload until completion.
1439 1439
1440 1440 By consuming the part data, the underlying stream read offset will
1441 1441 be advanced to the next part (or end of stream).
1442 1442 """
1443 1443 if self.consumed:
1444 1444 return
1445 1445
1446 1446 chunk = self.read(32768)
1447 1447 while chunk:
1448 1448 self._pos += len(chunk)
1449 1449 chunk = self.read(32768)
1450 1450
1451 1451 def read(self, size=None):
1452 1452 """read payload data"""
1453 1453 if not self._initialized:
1454 1454 self._readheader()
1455 1455 if size is None:
1456 1456 data = self._payloadstream.read()
1457 1457 else:
1458 1458 data = self._payloadstream.read(size)
1459 1459 self._pos += len(data)
1460 1460 if size is None or len(data) < size:
1461 1461 if not self.consumed and self._pos:
1462 1462 self.ui.debug(
1463 1463 b'bundle2-input-part: total payload size %i\n' % self._pos
1464 1464 )
1465 1465 self.consumed = True
1466 1466 return data
1467 1467
1468 1468
1469 1469 class seekableunbundlepart(unbundlepart):
1470 1470 """A bundle2 part in a bundle that is seekable.
1471 1471
1472 1472 Regular ``unbundlepart`` instances can only be read once. This class
1473 1473 extends ``unbundlepart`` to enable bi-directional seeking within the
1474 1474 part.
1475 1475
1476 1476 Bundle2 part data consists of framed chunks. Offsets when seeking
1477 1477 refer to the decoded data, not the offsets in the underlying bundle2
1478 1478 stream.
1479 1479
1480 1480 To facilitate quickly seeking within the decoded data, instances of this
1481 1481 class maintain a mapping between offsets in the underlying stream and
1482 1482 the decoded payload. This mapping will consume memory in proportion
1483 1483 to the number of chunks within the payload (which almost certainly
1484 1484 increases in proportion with the size of the part).
1485 1485 """
1486 1486
1487 1487 def __init__(self, ui, header, fp):
1488 1488 # (payload, file) offsets for chunk starts.
1489 1489 self._chunkindex = []
1490 1490
1491 1491 super(seekableunbundlepart, self).__init__(ui, header, fp)
1492 1492
1493 1493 def _payloadchunks(self, chunknum=0):
1494 1494 '''seek to specified chunk and start yielding data'''
1495 1495 if len(self._chunkindex) == 0:
1496 1496 assert chunknum == 0, b'Must start with chunk 0'
1497 1497 self._chunkindex.append((0, self._tellfp()))
1498 1498 else:
1499 1499 assert chunknum < len(self._chunkindex), (
1500 1500 b'Unknown chunk %d' % chunknum
1501 1501 )
1502 1502 self._seekfp(self._chunkindex[chunknum][1])
1503 1503
1504 1504 pos = self._chunkindex[chunknum][0]
1505 1505
1506 1506 for chunk in decodepayloadchunks(self.ui, self._fp):
1507 1507 chunknum += 1
1508 1508 pos += len(chunk)
1509 1509 if chunknum == len(self._chunkindex):
1510 1510 self._chunkindex.append((pos, self._tellfp()))
1511 1511
1512 1512 yield chunk
1513 1513
1514 1514 def _findchunk(self, pos):
1515 1515 '''for a given payload position, return a chunk number and offset'''
1516 1516 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1517 1517 if ppos == pos:
1518 1518 return chunk, 0
1519 1519 elif ppos > pos:
1520 1520 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1521 1521 raise ValueError(b'Unknown chunk')
1522 1522
1523 1523 def tell(self):
1524 1524 return self._pos
1525 1525
1526 1526 def seek(self, offset, whence=os.SEEK_SET):
1527 1527 if whence == os.SEEK_SET:
1528 1528 newpos = offset
1529 1529 elif whence == os.SEEK_CUR:
1530 1530 newpos = self._pos + offset
1531 1531 elif whence == os.SEEK_END:
1532 1532 if not self.consumed:
1533 1533 # Can't use self.consume() here because it advances self._pos.
1534 1534 chunk = self.read(32768)
1535 1535 while chunk:
1536 1536 chunk = self.read(32768)
1537 1537 newpos = self._chunkindex[-1][0] - offset
1538 1538 else:
1539 1539 raise ValueError(b'Unknown whence value: %r' % (whence,))
1540 1540
1541 1541 if newpos > self._chunkindex[-1][0] and not self.consumed:
1542 1542 # Can't use self.consume() here because it advances self._pos.
1543 1543 chunk = self.read(32768)
1544 1544 while chunk:
1545 1545 chunk = self.read(32668)
1546 1546
1547 1547 if not 0 <= newpos <= self._chunkindex[-1][0]:
1548 1548 raise ValueError(b'Offset out of range')
1549 1549
1550 1550 if self._pos != newpos:
1551 1551 chunk, internaloffset = self._findchunk(newpos)
1552 1552 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1553 1553 adjust = self.read(internaloffset)
1554 1554 if len(adjust) != internaloffset:
1555 1555 raise error.Abort(_(b'Seek failed\n'))
1556 1556 self._pos = newpos
1557 1557
1558 1558 def _seekfp(self, offset, whence=0):
1559 1559 """move the underlying file pointer
1560 1560
1561 1561 This method is meant for internal usage by the bundle2 protocol only.
1562 1562 They directly manipulate the low level stream including bundle2 level
1563 1563 instruction.
1564 1564
1565 1565 Do not use it to implement higher-level logic or methods."""
1566 1566 if self._seekable:
1567 1567 return self._fp.seek(offset, whence)
1568 1568 else:
1569 1569 raise NotImplementedError(_(b'File pointer is not seekable'))
1570 1570
1571 1571 def _tellfp(self):
1572 1572 """return the file offset, or None if file is not seekable
1573 1573
1574 1574 This method is meant for internal usage by the bundle2 protocol only.
1575 1575 They directly manipulate the low level stream including bundle2 level
1576 1576 instruction.
1577 1577
1578 1578 Do not use it to implement higher-level logic or methods."""
1579 1579 if self._seekable:
1580 1580 try:
1581 1581 return self._fp.tell()
1582 1582 except IOError as e:
1583 1583 if e.errno == errno.ESPIPE:
1584 1584 self._seekable = False
1585 1585 else:
1586 1586 raise
1587 1587 return None
1588 1588
1589 1589
1590 1590 # These are only the static capabilities.
1591 1591 # Check the 'getrepocaps' function for the rest.
1592 1592 capabilities = {
1593 1593 b'HG20': (),
1594 1594 b'bookmarks': (),
1595 1595 b'error': (b'abort', b'unsupportedcontent', b'pushraced', b'pushkey'),
1596 1596 b'listkeys': (),
1597 1597 b'pushkey': (),
1598 1598 b'digests': tuple(sorted(util.DIGESTS.keys())),
1599 1599 b'remote-changegroup': (b'http', b'https'),
1600 1600 b'hgtagsfnodes': (),
1601 1601 b'rev-branch-cache': (),
1602 1602 b'phases': (b'heads',),
1603 1603 b'stream': (b'v2',),
1604 1604 }
1605 1605
1606 1606
1607 1607 def getrepocaps(repo, allowpushback=False, role=None):
1608 1608 """return the bundle2 capabilities for a given repo
1609 1609
1610 1610 Exists to allow extensions (like evolution) to mutate the capabilities.
1611 1611
1612 1612 The returned value is used for servers advertising their capabilities as
1613 1613 well as clients advertising their capabilities to servers as part of
1614 1614 bundle2 requests. The ``role`` argument specifies which is which.
1615 1615 """
1616 1616 if role not in (b'client', b'server'):
1617 1617 raise error.ProgrammingError(b'role argument must be client or server')
1618 1618
1619 1619 caps = capabilities.copy()
1620 1620 caps[b'changegroup'] = tuple(
1621 1621 sorted(changegroup.supportedincomingversions(repo))
1622 1622 )
1623 1623 if obsolete.isenabled(repo, obsolete.exchangeopt):
1624 1624 supportedformat = tuple(b'V%i' % v for v in obsolete.formats)
1625 1625 caps[b'obsmarkers'] = supportedformat
1626 1626 if allowpushback:
1627 1627 caps[b'pushback'] = ()
1628 1628 cpmode = repo.ui.config(b'server', b'concurrent-push-mode')
1629 1629 if cpmode == b'check-related':
1630 1630 caps[b'checkheads'] = (b'related',)
1631 1631 if b'phases' in repo.ui.configlist(b'devel', b'legacy.exchange'):
1632 1632 caps.pop(b'phases')
1633 1633
1634 1634 # Don't advertise stream clone support in server mode if not configured.
1635 1635 if role == b'server':
1636 1636 streamsupported = repo.ui.configbool(
1637 1637 b'server', b'uncompressed', untrusted=True
1638 1638 )
1639 1639 featuresupported = repo.ui.configbool(b'server', b'bundle2.stream')
1640 1640
1641 1641 if not streamsupported or not featuresupported:
1642 1642 caps.pop(b'stream')
1643 1643 # Else always advertise support on client, because payload support
1644 1644 # should always be advertised.
1645 1645
1646 1646 return caps
1647 1647
1648 1648
1649 1649 def bundle2caps(remote):
1650 1650 """return the bundle capabilities of a peer as dict"""
1651 1651 raw = remote.capable(b'bundle2')
1652 1652 if not raw and raw != b'':
1653 1653 return {}
1654 1654 capsblob = urlreq.unquote(remote.capable(b'bundle2'))
1655 1655 return decodecaps(capsblob)
1656 1656
1657 1657
1658 1658 def obsmarkersversion(caps):
1659 1659 """extract the list of supported obsmarkers versions from a bundle2caps dict"""
1660 1660 obscaps = caps.get(b'obsmarkers', ())
1661 1661 return [int(c[1:]) for c in obscaps if c.startswith(b'V')]
1662 1662
1663 1663
1664 1664 def writenewbundle(
1665 1665 ui,
1666 1666 repo,
1667 1667 source,
1668 1668 filename,
1669 1669 bundletype,
1670 1670 outgoing,
1671 1671 opts,
1672 1672 vfs=None,
1673 1673 compression=None,
1674 1674 compopts=None,
1675 1675 ):
1676 1676 if bundletype.startswith(b'HG10'):
1677 1677 cg = changegroup.makechangegroup(repo, outgoing, b'01', source)
1678 1678 return writebundle(
1679 1679 ui,
1680 1680 cg,
1681 1681 filename,
1682 1682 bundletype,
1683 1683 vfs=vfs,
1684 1684 compression=compression,
1685 1685 compopts=compopts,
1686 1686 )
1687 1687 elif not bundletype.startswith(b'HG20'):
1688 1688 raise error.ProgrammingError(b'unknown bundle type: %s' % bundletype)
1689 1689
1690 1690 caps = {}
1691 1691 if b'obsolescence' in opts:
1692 1692 caps[b'obsmarkers'] = (b'V1',)
1693 1693 bundle = bundle20(ui, caps)
1694 1694 bundle.setcompression(compression, compopts)
1695 1695 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1696 1696 chunkiter = bundle.getchunks()
1697 1697
1698 1698 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1699 1699
1700 1700
1701 1701 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1702 1702 # We should eventually reconcile this logic with the one behind
1703 1703 # 'exchange.getbundle2partsgenerator'.
1704 1704 #
1705 1705 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1706 1706 # different right now. So we keep them separated for now for the sake of
1707 1707 # simplicity.
1708 1708
1709 1709 # we might not always want a changegroup in such bundle, for example in
1710 1710 # stream bundles
1711 1711 if opts.get(b'changegroup', True):
1712 1712 cgversion = opts.get(b'cg.version')
1713 1713 if cgversion is None:
1714 1714 cgversion = changegroup.safeversion(repo)
1715 1715 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1716 1716 part = bundler.newpart(b'changegroup', data=cg.getchunks())
1717 1717 part.addparam(b'version', cg.version)
1718 1718 if b'clcount' in cg.extras:
1719 1719 part.addparam(
1720 1720 b'nbchanges', b'%d' % cg.extras[b'clcount'], mandatory=False
1721 1721 )
1722 1722 if opts.get(b'phases') and repo.revs(
1723 1723 b'%ln and secret()', outgoing.ancestorsof
1724 1724 ):
1725 1725 part.addparam(
1726 1726 b'targetphase', b'%d' % phases.secret, mandatory=False
1727 1727 )
1728 1728 if b'exp-sidedata-flag' in repo.requirements:
1729 1729 part.addparam(b'exp-sidedata', b'1')
1730 1730
1731 1731 if opts.get(b'streamv2', False):
1732 1732 addpartbundlestream2(bundler, repo, stream=True)
1733 1733
1734 1734 if opts.get(b'tagsfnodescache', True):
1735 1735 addparttagsfnodescache(repo, bundler, outgoing)
1736 1736
1737 1737 if opts.get(b'revbranchcache', True):
1738 1738 addpartrevbranchcache(repo, bundler, outgoing)
1739 1739
1740 1740 if opts.get(b'obsolescence', False):
1741 1741 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1742 1742 buildobsmarkerspart(
1743 1743 bundler,
1744 1744 obsmarkers,
1745 1745 mandatory=opts.get(b'obsolescence-mandatory', True),
1746 1746 )
1747 1747
1748 1748 if opts.get(b'phases', False):
1749 1749 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1750 1750 phasedata = phases.binaryencode(headsbyphase)
1751 1751 bundler.newpart(b'phase-heads', data=phasedata)
1752 1752
1753 1753
1754 1754 def addparttagsfnodescache(repo, bundler, outgoing):
1755 1755 # we include the tags fnode cache for the bundle changeset
1756 1756 # (as an optional parts)
1757 1757 cache = tags.hgtagsfnodescache(repo.unfiltered())
1758 1758 chunks = []
1759 1759
1760 1760 # .hgtags fnodes are only relevant for head changesets. While we could
1761 1761 # transfer values for all known nodes, there will likely be little to
1762 1762 # no benefit.
1763 1763 #
1764 1764 # We don't bother using a generator to produce output data because
1765 1765 # a) we only have 40 bytes per head and even esoteric numbers of heads
1766 1766 # consume little memory (1M heads is 40MB) b) we don't want to send the
1767 1767 # part if we don't have entries and knowing if we have entries requires
1768 1768 # cache lookups.
1769 1769 for node in outgoing.ancestorsof:
1770 1770 # Don't compute missing, as this may slow down serving.
1771 1771 fnode = cache.getfnode(node, computemissing=False)
1772 1772 if fnode is not None:
1773 1773 chunks.extend([node, fnode])
1774 1774
1775 1775 if chunks:
1776 1776 bundler.newpart(b'hgtagsfnodes', data=b''.join(chunks))
1777 1777
1778 1778
1779 1779 def addpartrevbranchcache(repo, bundler, outgoing):
1780 1780 # we include the rev branch cache for the bundle changeset
1781 1781 # (as an optional parts)
1782 1782 cache = repo.revbranchcache()
1783 1783 cl = repo.unfiltered().changelog
1784 1784 branchesdata = collections.defaultdict(lambda: (set(), set()))
1785 1785 for node in outgoing.missing:
1786 1786 branch, close = cache.branchinfo(cl.rev(node))
1787 1787 branchesdata[branch][close].add(node)
1788 1788
1789 1789 def generate():
1790 1790 for branch, (nodes, closed) in sorted(branchesdata.items()):
1791 1791 utf8branch = encoding.fromlocal(branch)
1792 1792 yield rbcstruct.pack(len(utf8branch), len(nodes), len(closed))
1793 1793 yield utf8branch
1794 1794 for n in sorted(nodes):
1795 1795 yield n
1796 1796 for n in sorted(closed):
1797 1797 yield n
1798 1798
1799 1799 bundler.newpart(b'cache:rev-branch-cache', data=generate(), mandatory=False)
1800 1800
1801 1801
1802 1802 def _formatrequirementsspec(requirements):
1803 1803 requirements = [req for req in requirements if req != b"shared"]
1804 1804 return urlreq.quote(b','.join(sorted(requirements)))
1805 1805
1806 1806
1807 1807 def _formatrequirementsparams(requirements):
1808 1808 requirements = _formatrequirementsspec(requirements)
1809 1809 params = b"%s%s" % (urlreq.quote(b"requirements="), requirements)
1810 1810 return params
1811 1811
1812 1812
1813 1813 def addpartbundlestream2(bundler, repo, **kwargs):
1814 1814 if not kwargs.get('stream', False):
1815 1815 return
1816 1816
1817 1817 if not streamclone.allowservergeneration(repo):
1818 1818 raise error.Abort(
1819 1819 _(
1820 1820 b'stream data requested but server does not allow '
1821 1821 b'this feature'
1822 1822 ),
1823 1823 hint=_(
1824 1824 b'well-behaved clients should not be '
1825 1825 b'requesting stream data from servers not '
1826 1826 b'advertising it; the client may be buggy'
1827 1827 ),
1828 1828 )
1829 1829
1830 1830 # Stream clones don't compress well. And compression undermines a
1831 1831 # goal of stream clones, which is to be fast. Communicate the desire
1832 1832 # to avoid compression to consumers of the bundle.
1833 1833 bundler.prefercompressed = False
1834 1834
1835 1835 # get the includes and excludes
1836 1836 includepats = kwargs.get('includepats')
1837 1837 excludepats = kwargs.get('excludepats')
1838 1838
1839 1839 narrowstream = repo.ui.configbool(
1840 1840 b'experimental', b'server.stream-narrow-clones'
1841 1841 )
1842 1842
1843 1843 if (includepats or excludepats) and not narrowstream:
1844 1844 raise error.Abort(_(b'server does not support narrow stream clones'))
1845 1845
1846 1846 includeobsmarkers = False
1847 1847 if repo.obsstore:
1848 1848 remoteversions = obsmarkersversion(bundler.capabilities)
1849 1849 if not remoteversions:
1850 1850 raise error.Abort(
1851 1851 _(
1852 1852 b'server has obsolescence markers, but client '
1853 1853 b'cannot receive them via stream clone'
1854 1854 )
1855 1855 )
1856 1856 elif repo.obsstore._version in remoteversions:
1857 1857 includeobsmarkers = True
1858 1858
1859 1859 filecount, bytecount, it = streamclone.generatev2(
1860 1860 repo, includepats, excludepats, includeobsmarkers
1861 1861 )
1862 1862 requirements = _formatrequirementsspec(repo.requirements)
1863 1863 part = bundler.newpart(b'stream2', data=it)
1864 1864 part.addparam(b'bytecount', b'%d' % bytecount, mandatory=True)
1865 1865 part.addparam(b'filecount', b'%d' % filecount, mandatory=True)
1866 1866 part.addparam(b'requirements', requirements, mandatory=True)
1867 1867
1868 1868
1869 1869 def buildobsmarkerspart(bundler, markers, mandatory=True):
1870 1870 """add an obsmarker part to the bundler with <markers>
1871 1871
1872 1872 No part is created if markers is empty.
1873 1873 Raises ValueError if the bundler doesn't support any known obsmarker format.
1874 1874 """
1875 1875 if not markers:
1876 1876 return None
1877 1877
1878 1878 remoteversions = obsmarkersversion(bundler.capabilities)
1879 1879 version = obsolete.commonversion(remoteversions)
1880 1880 if version is None:
1881 1881 raise ValueError(b'bundler does not support common obsmarker format')
1882 1882 stream = obsolete.encodemarkers(markers, True, version=version)
1883 1883 return bundler.newpart(b'obsmarkers', data=stream, mandatory=mandatory)
1884 1884
1885 1885
1886 1886 def writebundle(
1887 1887 ui, cg, filename, bundletype, vfs=None, compression=None, compopts=None
1888 1888 ):
1889 1889 """Write a bundle file and return its filename.
1890 1890
1891 1891 Existing files will not be overwritten.
1892 1892 If no filename is specified, a temporary file is created.
1893 1893 bz2 compression can be turned off.
1894 1894 The bundle file will be deleted in case of errors.
1895 1895 """
1896 1896
1897 1897 if bundletype == b"HG20":
1898 1898 bundle = bundle20(ui)
1899 1899 bundle.setcompression(compression, compopts)
1900 1900 part = bundle.newpart(b'changegroup', data=cg.getchunks())
1901 1901 part.addparam(b'version', cg.version)
1902 1902 if b'clcount' in cg.extras:
1903 1903 part.addparam(
1904 1904 b'nbchanges', b'%d' % cg.extras[b'clcount'], mandatory=False
1905 1905 )
1906 1906 chunkiter = bundle.getchunks()
1907 1907 else:
1908 1908 # compression argument is only for the bundle2 case
1909 1909 assert compression is None
1910 1910 if cg.version != b'01':
1911 1911 raise error.Abort(
1912 1912 _(b'old bundle types only supports v1 changegroups')
1913 1913 )
1914 1914 header, comp = bundletypes[bundletype]
1915 1915 if comp not in util.compengines.supportedbundletypes:
1916 1916 raise error.Abort(_(b'unknown stream compression type: %s') % comp)
1917 1917 compengine = util.compengines.forbundletype(comp)
1918 1918
1919 1919 def chunkiter():
1920 1920 yield header
1921 1921 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1922 1922 yield chunk
1923 1923
1924 1924 chunkiter = chunkiter()
1925 1925
1926 1926 # parse the changegroup data, otherwise we will block
1927 1927 # in case of sshrepo because we don't know the end of the stream
1928 1928 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1929 1929
1930 1930
1931 1931 def combinechangegroupresults(op):
1932 1932 """logic to combine 0 or more addchangegroup results into one"""
1933 1933 results = [r.get(b'return', 0) for r in op.records[b'changegroup']]
1934 1934 changedheads = 0
1935 1935 result = 1
1936 1936 for ret in results:
1937 1937 # If any changegroup result is 0, return 0
1938 1938 if ret == 0:
1939 1939 result = 0
1940 1940 break
1941 1941 if ret < -1:
1942 1942 changedheads += ret + 1
1943 1943 elif ret > 1:
1944 1944 changedheads += ret - 1
1945 1945 if changedheads > 0:
1946 1946 result = 1 + changedheads
1947 1947 elif changedheads < 0:
1948 1948 result = -1 + changedheads
1949 1949 return result
1950 1950
1951 1951
1952 1952 @parthandler(
1953 1953 b'changegroup',
1954 1954 (
1955 1955 b'version',
1956 1956 b'nbchanges',
1957 1957 b'exp-sidedata',
1958 1958 b'treemanifest',
1959 1959 b'targetphase',
1960 1960 ),
1961 1961 )
1962 1962 def handlechangegroup(op, inpart):
1963 """apply a changegroup part on the repo
1964
1965 This is a very early implementation that will massive rework before being
1966 inflicted to any end-user.
1967 """
1963 """apply a changegroup part on the repo"""
1968 1964 from . import localrepo
1969 1965
1970 1966 tr = op.gettransaction()
1971 1967 unpackerversion = inpart.params.get(b'version', b'01')
1972 1968 # We should raise an appropriate exception here
1973 1969 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1974 1970 # the source and url passed here are overwritten by the one contained in
1975 1971 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1976 1972 nbchangesets = None
1977 1973 if b'nbchanges' in inpart.params:
1978 1974 nbchangesets = int(inpart.params.get(b'nbchanges'))
1979 1975 if b'treemanifest' in inpart.params and not scmutil.istreemanifest(op.repo):
1980 1976 if len(op.repo.changelog) != 0:
1981 1977 raise error.Abort(
1982 1978 _(
1983 1979 b"bundle contains tree manifests, but local repo is "
1984 1980 b"non-empty and does not use tree manifests"
1985 1981 )
1986 1982 )
1987 1983 op.repo.requirements.add(requirements.TREEMANIFEST_REQUIREMENT)
1988 1984 op.repo.svfs.options = localrepo.resolvestorevfsoptions(
1989 1985 op.repo.ui, op.repo.requirements, op.repo.features
1990 1986 )
1991 1987 scmutil.writereporequirements(op.repo)
1992 1988
1993 1989 bundlesidedata = bool(b'exp-sidedata' in inpart.params)
1994 1990 reposidedata = bool(b'exp-sidedata-flag' in op.repo.requirements)
1995 1991 if reposidedata and not bundlesidedata:
1996 1992 msg = b"repository is using sidedata but the bundle source do not"
1997 1993 hint = b'this is currently unsupported'
1998 1994 raise error.Abort(msg, hint=hint)
1999 1995
2000 1996 extrakwargs = {}
2001 1997 targetphase = inpart.params.get(b'targetphase')
2002 1998 if targetphase is not None:
2003 1999 extrakwargs['targetphase'] = int(targetphase)
2004 2000 ret = _processchangegroup(
2005 2001 op,
2006 2002 cg,
2007 2003 tr,
2008 2004 b'bundle2',
2009 2005 b'bundle2',
2010 2006 expectedtotal=nbchangesets,
2011 2007 **extrakwargs
2012 2008 )
2013 2009 if op.reply is not None:
2014 2010 # This is definitely not the final form of this
2015 2011 # return. But one need to start somewhere.
2016 2012 part = op.reply.newpart(b'reply:changegroup', mandatory=False)
2017 2013 part.addparam(
2018 2014 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2019 2015 )
2020 2016 part.addparam(b'return', b'%i' % ret, mandatory=False)
2021 2017 assert not inpart.read()
2022 2018
2023 2019
2024 2020 _remotechangegroupparams = tuple(
2025 2021 [b'url', b'size', b'digests']
2026 2022 + [b'digest:%s' % k for k in util.DIGESTS.keys()]
2027 2023 )
2028 2024
2029 2025
2030 2026 @parthandler(b'remote-changegroup', _remotechangegroupparams)
2031 2027 def handleremotechangegroup(op, inpart):
2032 2028 """apply a bundle10 on the repo, given an url and validation information
2033 2029
2034 2030 All the information about the remote bundle to import are given as
2035 2031 parameters. The parameters include:
2036 2032 - url: the url to the bundle10.
2037 2033 - size: the bundle10 file size. It is used to validate what was
2038 2034 retrieved by the client matches the server knowledge about the bundle.
2039 2035 - digests: a space separated list of the digest types provided as
2040 2036 parameters.
2041 2037 - digest:<digest-type>: the hexadecimal representation of the digest with
2042 2038 that name. Like the size, it is used to validate what was retrieved by
2043 2039 the client matches what the server knows about the bundle.
2044 2040
2045 2041 When multiple digest types are given, all of them are checked.
2046 2042 """
2047 2043 try:
2048 2044 raw_url = inpart.params[b'url']
2049 2045 except KeyError:
2050 2046 raise error.Abort(_(b'remote-changegroup: missing "%s" param') % b'url')
2051 2047 parsed_url = util.url(raw_url)
2052 2048 if parsed_url.scheme not in capabilities[b'remote-changegroup']:
2053 2049 raise error.Abort(
2054 2050 _(b'remote-changegroup does not support %s urls')
2055 2051 % parsed_url.scheme
2056 2052 )
2057 2053
2058 2054 try:
2059 2055 size = int(inpart.params[b'size'])
2060 2056 except ValueError:
2061 2057 raise error.Abort(
2062 2058 _(b'remote-changegroup: invalid value for param "%s"') % b'size'
2063 2059 )
2064 2060 except KeyError:
2065 2061 raise error.Abort(
2066 2062 _(b'remote-changegroup: missing "%s" param') % b'size'
2067 2063 )
2068 2064
2069 2065 digests = {}
2070 2066 for typ in inpart.params.get(b'digests', b'').split():
2071 2067 param = b'digest:%s' % typ
2072 2068 try:
2073 2069 value = inpart.params[param]
2074 2070 except KeyError:
2075 2071 raise error.Abort(
2076 2072 _(b'remote-changegroup: missing "%s" param') % param
2077 2073 )
2078 2074 digests[typ] = value
2079 2075
2080 2076 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
2081 2077
2082 2078 tr = op.gettransaction()
2083 2079 from . import exchange
2084 2080
2085 2081 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
2086 2082 if not isinstance(cg, changegroup.cg1unpacker):
2087 2083 raise error.Abort(
2088 2084 _(b'%s: not a bundle version 1.0') % util.hidepassword(raw_url)
2089 2085 )
2090 2086 ret = _processchangegroup(op, cg, tr, b'bundle2', b'bundle2')
2091 2087 if op.reply is not None:
2092 2088 # This is definitely not the final form of this
2093 2089 # return. But one need to start somewhere.
2094 2090 part = op.reply.newpart(b'reply:changegroup')
2095 2091 part.addparam(
2096 2092 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2097 2093 )
2098 2094 part.addparam(b'return', b'%i' % ret, mandatory=False)
2099 2095 try:
2100 2096 real_part.validate()
2101 2097 except error.Abort as e:
2102 2098 raise error.Abort(
2103 2099 _(b'bundle at %s is corrupted:\n%s')
2104 2100 % (util.hidepassword(raw_url), e.message)
2105 2101 )
2106 2102 assert not inpart.read()
2107 2103
2108 2104
2109 2105 @parthandler(b'reply:changegroup', (b'return', b'in-reply-to'))
2110 2106 def handlereplychangegroup(op, inpart):
2111 2107 ret = int(inpart.params[b'return'])
2112 2108 replyto = int(inpart.params[b'in-reply-to'])
2113 2109 op.records.add(b'changegroup', {b'return': ret}, replyto)
2114 2110
2115 2111
2116 2112 @parthandler(b'check:bookmarks')
2117 2113 def handlecheckbookmarks(op, inpart):
2118 2114 """check location of bookmarks
2119 2115
2120 2116 This part is to be used to detect push race regarding bookmark, it
2121 2117 contains binary encoded (bookmark, node) tuple. If the local state does
2122 2118 not marks the one in the part, a PushRaced exception is raised
2123 2119 """
2124 2120 bookdata = bookmarks.binarydecode(inpart)
2125 2121
2126 2122 msgstandard = (
2127 2123 b'remote repository changed while pushing - please try again '
2128 2124 b'(bookmark "%s" move from %s to %s)'
2129 2125 )
2130 2126 msgmissing = (
2131 2127 b'remote repository changed while pushing - please try again '
2132 2128 b'(bookmark "%s" is missing, expected %s)'
2133 2129 )
2134 2130 msgexist = (
2135 2131 b'remote repository changed while pushing - please try again '
2136 2132 b'(bookmark "%s" set on %s, expected missing)'
2137 2133 )
2138 2134 for book, node in bookdata:
2139 2135 currentnode = op.repo._bookmarks.get(book)
2140 2136 if currentnode != node:
2141 2137 if node is None:
2142 2138 finalmsg = msgexist % (book, short(currentnode))
2143 2139 elif currentnode is None:
2144 2140 finalmsg = msgmissing % (book, short(node))
2145 2141 else:
2146 2142 finalmsg = msgstandard % (
2147 2143 book,
2148 2144 short(node),
2149 2145 short(currentnode),
2150 2146 )
2151 2147 raise error.PushRaced(finalmsg)
2152 2148
2153 2149
2154 2150 @parthandler(b'check:heads')
2155 2151 def handlecheckheads(op, inpart):
2156 2152 """check that head of the repo did not change
2157 2153
2158 2154 This is used to detect a push race when using unbundle.
2159 2155 This replaces the "heads" argument of unbundle."""
2160 2156 h = inpart.read(20)
2161 2157 heads = []
2162 2158 while len(h) == 20:
2163 2159 heads.append(h)
2164 2160 h = inpart.read(20)
2165 2161 assert not h
2166 2162 # Trigger a transaction so that we are guaranteed to have the lock now.
2167 2163 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2168 2164 op.gettransaction()
2169 2165 if sorted(heads) != sorted(op.repo.heads()):
2170 2166 raise error.PushRaced(
2171 2167 b'remote repository changed while pushing - please try again'
2172 2168 )
2173 2169
2174 2170
2175 2171 @parthandler(b'check:updated-heads')
2176 2172 def handlecheckupdatedheads(op, inpart):
2177 2173 """check for race on the heads touched by a push
2178 2174
2179 2175 This is similar to 'check:heads' but focus on the heads actually updated
2180 2176 during the push. If other activities happen on unrelated heads, it is
2181 2177 ignored.
2182 2178
2183 2179 This allow server with high traffic to avoid push contention as long as
2184 2180 unrelated parts of the graph are involved."""
2185 2181 h = inpart.read(20)
2186 2182 heads = []
2187 2183 while len(h) == 20:
2188 2184 heads.append(h)
2189 2185 h = inpart.read(20)
2190 2186 assert not h
2191 2187 # trigger a transaction so that we are guaranteed to have the lock now.
2192 2188 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2193 2189 op.gettransaction()
2194 2190
2195 2191 currentheads = set()
2196 2192 for ls in op.repo.branchmap().iterheads():
2197 2193 currentheads.update(ls)
2198 2194
2199 2195 for h in heads:
2200 2196 if h not in currentheads:
2201 2197 raise error.PushRaced(
2202 2198 b'remote repository changed while pushing - '
2203 2199 b'please try again'
2204 2200 )
2205 2201
2206 2202
2207 2203 @parthandler(b'check:phases')
2208 2204 def handlecheckphases(op, inpart):
2209 2205 """check that phase boundaries of the repository did not change
2210 2206
2211 2207 This is used to detect a push race.
2212 2208 """
2213 2209 phasetonodes = phases.binarydecode(inpart)
2214 2210 unfi = op.repo.unfiltered()
2215 2211 cl = unfi.changelog
2216 2212 phasecache = unfi._phasecache
2217 2213 msg = (
2218 2214 b'remote repository changed while pushing - please try again '
2219 2215 b'(%s is %s expected %s)'
2220 2216 )
2221 2217 for expectedphase, nodes in pycompat.iteritems(phasetonodes):
2222 2218 for n in nodes:
2223 2219 actualphase = phasecache.phase(unfi, cl.rev(n))
2224 2220 if actualphase != expectedphase:
2225 2221 finalmsg = msg % (
2226 2222 short(n),
2227 2223 phases.phasenames[actualphase],
2228 2224 phases.phasenames[expectedphase],
2229 2225 )
2230 2226 raise error.PushRaced(finalmsg)
2231 2227
2232 2228
2233 2229 @parthandler(b'output')
2234 2230 def handleoutput(op, inpart):
2235 2231 """forward output captured on the server to the client"""
2236 2232 for line in inpart.read().splitlines():
2237 2233 op.ui.status(_(b'remote: %s\n') % line)
2238 2234
2239 2235
2240 2236 @parthandler(b'replycaps')
2241 2237 def handlereplycaps(op, inpart):
2242 2238 """Notify that a reply bundle should be created
2243 2239
2244 2240 The payload contains the capabilities information for the reply"""
2245 2241 caps = decodecaps(inpart.read())
2246 2242 if op.reply is None:
2247 2243 op.reply = bundle20(op.ui, caps)
2248 2244
2249 2245
2250 2246 class AbortFromPart(error.Abort):
2251 2247 """Sub-class of Abort that denotes an error from a bundle2 part."""
2252 2248
2253 2249
2254 2250 @parthandler(b'error:abort', (b'message', b'hint'))
2255 2251 def handleerrorabort(op, inpart):
2256 2252 """Used to transmit abort error over the wire"""
2257 2253 raise AbortFromPart(
2258 2254 inpart.params[b'message'], hint=inpart.params.get(b'hint')
2259 2255 )
2260 2256
2261 2257
2262 2258 @parthandler(
2263 2259 b'error:pushkey',
2264 2260 (b'namespace', b'key', b'new', b'old', b'ret', b'in-reply-to'),
2265 2261 )
2266 2262 def handleerrorpushkey(op, inpart):
2267 2263 """Used to transmit failure of a mandatory pushkey over the wire"""
2268 2264 kwargs = {}
2269 2265 for name in (b'namespace', b'key', b'new', b'old', b'ret'):
2270 2266 value = inpart.params.get(name)
2271 2267 if value is not None:
2272 2268 kwargs[name] = value
2273 2269 raise error.PushkeyFailed(
2274 2270 inpart.params[b'in-reply-to'], **pycompat.strkwargs(kwargs)
2275 2271 )
2276 2272
2277 2273
2278 2274 @parthandler(b'error:unsupportedcontent', (b'parttype', b'params'))
2279 2275 def handleerrorunsupportedcontent(op, inpart):
2280 2276 """Used to transmit unknown content error over the wire"""
2281 2277 kwargs = {}
2282 2278 parttype = inpart.params.get(b'parttype')
2283 2279 if parttype is not None:
2284 2280 kwargs[b'parttype'] = parttype
2285 2281 params = inpart.params.get(b'params')
2286 2282 if params is not None:
2287 2283 kwargs[b'params'] = params.split(b'\0')
2288 2284
2289 2285 raise error.BundleUnknownFeatureError(**pycompat.strkwargs(kwargs))
2290 2286
2291 2287
2292 2288 @parthandler(b'error:pushraced', (b'message',))
2293 2289 def handleerrorpushraced(op, inpart):
2294 2290 """Used to transmit push race error over the wire"""
2295 2291 raise error.ResponseError(_(b'push failed:'), inpart.params[b'message'])
2296 2292
2297 2293
2298 2294 @parthandler(b'listkeys', (b'namespace',))
2299 2295 def handlelistkeys(op, inpart):
2300 2296 """retrieve pushkey namespace content stored in a bundle2"""
2301 2297 namespace = inpart.params[b'namespace']
2302 2298 r = pushkey.decodekeys(inpart.read())
2303 2299 op.records.add(b'listkeys', (namespace, r))
2304 2300
2305 2301
2306 2302 @parthandler(b'pushkey', (b'namespace', b'key', b'old', b'new'))
2307 2303 def handlepushkey(op, inpart):
2308 2304 """process a pushkey request"""
2309 2305 dec = pushkey.decode
2310 2306 namespace = dec(inpart.params[b'namespace'])
2311 2307 key = dec(inpart.params[b'key'])
2312 2308 old = dec(inpart.params[b'old'])
2313 2309 new = dec(inpart.params[b'new'])
2314 2310 # Grab the transaction to ensure that we have the lock before performing the
2315 2311 # pushkey.
2316 2312 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2317 2313 op.gettransaction()
2318 2314 ret = op.repo.pushkey(namespace, key, old, new)
2319 2315 record = {b'namespace': namespace, b'key': key, b'old': old, b'new': new}
2320 2316 op.records.add(b'pushkey', record)
2321 2317 if op.reply is not None:
2322 2318 rpart = op.reply.newpart(b'reply:pushkey')
2323 2319 rpart.addparam(
2324 2320 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2325 2321 )
2326 2322 rpart.addparam(b'return', b'%i' % ret, mandatory=False)
2327 2323 if inpart.mandatory and not ret:
2328 2324 kwargs = {}
2329 2325 for key in (b'namespace', b'key', b'new', b'old', b'ret'):
2330 2326 if key in inpart.params:
2331 2327 kwargs[key] = inpart.params[key]
2332 2328 raise error.PushkeyFailed(
2333 2329 partid=b'%d' % inpart.id, **pycompat.strkwargs(kwargs)
2334 2330 )
2335 2331
2336 2332
2337 2333 @parthandler(b'bookmarks')
2338 2334 def handlebookmark(op, inpart):
2339 2335 """transmit bookmark information
2340 2336
2341 2337 The part contains binary encoded bookmark information.
2342 2338
2343 2339 The exact behavior of this part can be controlled by the 'bookmarks' mode
2344 2340 on the bundle operation.
2345 2341
2346 2342 When mode is 'apply' (the default) the bookmark information is applied as
2347 2343 is to the unbundling repository. Make sure a 'check:bookmarks' part is
2348 2344 issued earlier to check for push races in such update. This behavior is
2349 2345 suitable for pushing.
2350 2346
2351 2347 When mode is 'records', the information is recorded into the 'bookmarks'
2352 2348 records of the bundle operation. This behavior is suitable for pulling.
2353 2349 """
2354 2350 changes = bookmarks.binarydecode(inpart)
2355 2351
2356 2352 pushkeycompat = op.repo.ui.configbool(
2357 2353 b'server', b'bookmarks-pushkey-compat'
2358 2354 )
2359 2355 bookmarksmode = op.modes.get(b'bookmarks', b'apply')
2360 2356
2361 2357 if bookmarksmode == b'apply':
2362 2358 tr = op.gettransaction()
2363 2359 bookstore = op.repo._bookmarks
2364 2360 if pushkeycompat:
2365 2361 allhooks = []
2366 2362 for book, node in changes:
2367 2363 hookargs = tr.hookargs.copy()
2368 2364 hookargs[b'pushkeycompat'] = b'1'
2369 2365 hookargs[b'namespace'] = b'bookmarks'
2370 2366 hookargs[b'key'] = book
2371 2367 hookargs[b'old'] = hex(bookstore.get(book, b''))
2372 2368 hookargs[b'new'] = hex(node if node is not None else b'')
2373 2369 allhooks.append(hookargs)
2374 2370
2375 2371 for hookargs in allhooks:
2376 2372 op.repo.hook(
2377 2373 b'prepushkey', throw=True, **pycompat.strkwargs(hookargs)
2378 2374 )
2379 2375
2380 2376 for book, node in changes:
2381 2377 if bookmarks.isdivergent(book):
2382 2378 msg = _(b'cannot accept divergent bookmark %s!') % book
2383 2379 raise error.Abort(msg)
2384 2380
2385 2381 bookstore.applychanges(op.repo, op.gettransaction(), changes)
2386 2382
2387 2383 if pushkeycompat:
2388 2384
2389 2385 def runhook(unused_success):
2390 2386 for hookargs in allhooks:
2391 2387 op.repo.hook(b'pushkey', **pycompat.strkwargs(hookargs))
2392 2388
2393 2389 op.repo._afterlock(runhook)
2394 2390
2395 2391 elif bookmarksmode == b'records':
2396 2392 for book, node in changes:
2397 2393 record = {b'bookmark': book, b'node': node}
2398 2394 op.records.add(b'bookmarks', record)
2399 2395 else:
2400 2396 raise error.ProgrammingError(
2401 2397 b'unkown bookmark mode: %s' % bookmarksmode
2402 2398 )
2403 2399
2404 2400
2405 2401 @parthandler(b'phase-heads')
2406 2402 def handlephases(op, inpart):
2407 2403 """apply phases from bundle part to repo"""
2408 2404 headsbyphase = phases.binarydecode(inpart)
2409 2405 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
2410 2406
2411 2407
2412 2408 @parthandler(b'reply:pushkey', (b'return', b'in-reply-to'))
2413 2409 def handlepushkeyreply(op, inpart):
2414 2410 """retrieve the result of a pushkey request"""
2415 2411 ret = int(inpart.params[b'return'])
2416 2412 partid = int(inpart.params[b'in-reply-to'])
2417 2413 op.records.add(b'pushkey', {b'return': ret}, partid)
2418 2414
2419 2415
2420 2416 @parthandler(b'obsmarkers')
2421 2417 def handleobsmarker(op, inpart):
2422 2418 """add a stream of obsmarkers to the repo"""
2423 2419 tr = op.gettransaction()
2424 2420 markerdata = inpart.read()
2425 2421 if op.ui.config(b'experimental', b'obsmarkers-exchange-debug'):
2426 2422 op.ui.writenoi18n(
2427 2423 b'obsmarker-exchange: %i bytes received\n' % len(markerdata)
2428 2424 )
2429 2425 # The mergemarkers call will crash if marker creation is not enabled.
2430 2426 # we want to avoid this if the part is advisory.
2431 2427 if not inpart.mandatory and op.repo.obsstore.readonly:
2432 2428 op.repo.ui.debug(
2433 2429 b'ignoring obsolescence markers, feature not enabled\n'
2434 2430 )
2435 2431 return
2436 2432 new = op.repo.obsstore.mergemarkers(tr, markerdata)
2437 2433 op.repo.invalidatevolatilesets()
2438 2434 op.records.add(b'obsmarkers', {b'new': new})
2439 2435 if op.reply is not None:
2440 2436 rpart = op.reply.newpart(b'reply:obsmarkers')
2441 2437 rpart.addparam(
2442 2438 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2443 2439 )
2444 2440 rpart.addparam(b'new', b'%i' % new, mandatory=False)
2445 2441
2446 2442
2447 2443 @parthandler(b'reply:obsmarkers', (b'new', b'in-reply-to'))
2448 2444 def handleobsmarkerreply(op, inpart):
2449 2445 """retrieve the result of a pushkey request"""
2450 2446 ret = int(inpart.params[b'new'])
2451 2447 partid = int(inpart.params[b'in-reply-to'])
2452 2448 op.records.add(b'obsmarkers', {b'new': ret}, partid)
2453 2449
2454 2450
2455 2451 @parthandler(b'hgtagsfnodes')
2456 2452 def handlehgtagsfnodes(op, inpart):
2457 2453 """Applies .hgtags fnodes cache entries to the local repo.
2458 2454
2459 2455 Payload is pairs of 20 byte changeset nodes and filenodes.
2460 2456 """
2461 2457 # Grab the transaction so we ensure that we have the lock at this point.
2462 2458 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2463 2459 op.gettransaction()
2464 2460 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
2465 2461
2466 2462 count = 0
2467 2463 while True:
2468 2464 node = inpart.read(20)
2469 2465 fnode = inpart.read(20)
2470 2466 if len(node) < 20 or len(fnode) < 20:
2471 2467 op.ui.debug(b'ignoring incomplete received .hgtags fnodes data\n')
2472 2468 break
2473 2469 cache.setfnode(node, fnode)
2474 2470 count += 1
2475 2471
2476 2472 cache.write()
2477 2473 op.ui.debug(b'applied %i hgtags fnodes cache entries\n' % count)
2478 2474
2479 2475
2480 2476 rbcstruct = struct.Struct(b'>III')
2481 2477
2482 2478
2483 2479 @parthandler(b'cache:rev-branch-cache')
2484 2480 def handlerbc(op, inpart):
2485 2481 """receive a rev-branch-cache payload and update the local cache
2486 2482
2487 2483 The payload is a series of data related to each branch
2488 2484
2489 2485 1) branch name length
2490 2486 2) number of open heads
2491 2487 3) number of closed heads
2492 2488 4) open heads nodes
2493 2489 5) closed heads nodes
2494 2490 """
2495 2491 total = 0
2496 2492 rawheader = inpart.read(rbcstruct.size)
2497 2493 cache = op.repo.revbranchcache()
2498 2494 cl = op.repo.unfiltered().changelog
2499 2495 while rawheader:
2500 2496 header = rbcstruct.unpack(rawheader)
2501 2497 total += header[1] + header[2]
2502 2498 utf8branch = inpart.read(header[0])
2503 2499 branch = encoding.tolocal(utf8branch)
2504 2500 for x in pycompat.xrange(header[1]):
2505 2501 node = inpart.read(20)
2506 2502 rev = cl.rev(node)
2507 2503 cache.setdata(branch, rev, node, False)
2508 2504 for x in pycompat.xrange(header[2]):
2509 2505 node = inpart.read(20)
2510 2506 rev = cl.rev(node)
2511 2507 cache.setdata(branch, rev, node, True)
2512 2508 rawheader = inpart.read(rbcstruct.size)
2513 2509 cache.write()
2514 2510
2515 2511
2516 2512 @parthandler(b'pushvars')
2517 2513 def bundle2getvars(op, part):
2518 2514 '''unbundle a bundle2 containing shellvars on the server'''
2519 2515 # An option to disable unbundling on server-side for security reasons
2520 2516 if op.ui.configbool(b'push', b'pushvars.server'):
2521 2517 hookargs = {}
2522 2518 for key, value in part.advisoryparams:
2523 2519 key = key.upper()
2524 2520 # We want pushed variables to have USERVAR_ prepended so we know
2525 2521 # they came from the --pushvar flag.
2526 2522 key = b"USERVAR_" + key
2527 2523 hookargs[key] = value
2528 2524 op.addhookargs(hookargs)
2529 2525
2530 2526
2531 2527 @parthandler(b'stream2', (b'requirements', b'filecount', b'bytecount'))
2532 2528 def handlestreamv2bundle(op, part):
2533 2529
2534 2530 requirements = urlreq.unquote(part.params[b'requirements']).split(b',')
2535 2531 filecount = int(part.params[b'filecount'])
2536 2532 bytecount = int(part.params[b'bytecount'])
2537 2533
2538 2534 repo = op.repo
2539 2535 if len(repo):
2540 2536 msg = _(b'cannot apply stream clone to non empty repository')
2541 2537 raise error.Abort(msg)
2542 2538
2543 2539 repo.ui.debug(b'applying stream bundle\n')
2544 2540 streamclone.applybundlev2(repo, part, filecount, bytecount, requirements)
2545 2541
2546 2542
2547 2543 def widen_bundle(
2548 2544 bundler, repo, oldmatcher, newmatcher, common, known, cgversion, ellipses
2549 2545 ):
2550 2546 """generates bundle2 for widening a narrow clone
2551 2547
2552 2548 bundler is the bundle to which data should be added
2553 2549 repo is the localrepository instance
2554 2550 oldmatcher matches what the client already has
2555 2551 newmatcher matches what the client needs (including what it already has)
2556 2552 common is set of common heads between server and client
2557 2553 known is a set of revs known on the client side (used in ellipses)
2558 2554 cgversion is the changegroup version to send
2559 2555 ellipses is boolean value telling whether to send ellipses data or not
2560 2556
2561 2557 returns bundle2 of the data required for extending
2562 2558 """
2563 2559 commonnodes = set()
2564 2560 cl = repo.changelog
2565 2561 for r in repo.revs(b"::%ln", common):
2566 2562 commonnodes.add(cl.node(r))
2567 2563 if commonnodes:
2568 2564 # XXX: we should only send the filelogs (and treemanifest). user
2569 2565 # already has the changelog and manifest
2570 2566 packer = changegroup.getbundler(
2571 2567 cgversion,
2572 2568 repo,
2573 2569 oldmatcher=oldmatcher,
2574 2570 matcher=newmatcher,
2575 2571 fullnodes=commonnodes,
2576 2572 )
2577 2573 cgdata = packer.generate(
2578 2574 {nullid},
2579 2575 list(commonnodes),
2580 2576 False,
2581 2577 b'narrow_widen',
2582 2578 changelog=False,
2583 2579 )
2584 2580
2585 2581 part = bundler.newpart(b'changegroup', data=cgdata)
2586 2582 part.addparam(b'version', cgversion)
2587 2583 if scmutil.istreemanifest(repo):
2588 2584 part.addparam(b'treemanifest', b'1')
2589 2585 if b'exp-sidedata-flag' in repo.requirements:
2590 2586 part.addparam(b'exp-sidedata', b'1')
2591 2587
2592 2588 return bundler
General Comments 0
You need to be logged in to leave comments. Login now