##// END OF EJS Templates
py3: fully fix bundlepart.__repr__ to return str not bytes...
Kyle Lippincott -
r44759:74172a23 stable
parent child Browse files
Show More
@@ -1,2579 +1,2578 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import, division
149 149
150 150 import collections
151 151 import errno
152 152 import os
153 153 import re
154 154 import string
155 155 import struct
156 156 import sys
157 157
158 158 from .i18n import _
159 159 from . import (
160 160 bookmarks,
161 161 changegroup,
162 162 encoding,
163 163 error,
164 164 node as nodemod,
165 165 obsolete,
166 166 phases,
167 167 pushkey,
168 168 pycompat,
169 169 streamclone,
170 170 tags,
171 171 url,
172 172 util,
173 173 )
174 174 from .utils import stringutil
175 175
176 176 urlerr = util.urlerr
177 177 urlreq = util.urlreq
178 178
179 179 _pack = struct.pack
180 180 _unpack = struct.unpack
181 181
182 182 _fstreamparamsize = b'>i'
183 183 _fpartheadersize = b'>i'
184 184 _fparttypesize = b'>B'
185 185 _fpartid = b'>I'
186 186 _fpayloadsize = b'>i'
187 187 _fpartparamcount = b'>BB'
188 188
189 189 preferedchunksize = 32768
190 190
191 191 _parttypeforbidden = re.compile(b'[^a-zA-Z0-9_:-]')
192 192
193 193
194 194 def outdebug(ui, message):
195 195 """debug regarding output stream (bundling)"""
196 196 if ui.configbool(b'devel', b'bundle2.debug'):
197 197 ui.debug(b'bundle2-output: %s\n' % message)
198 198
199 199
200 200 def indebug(ui, message):
201 201 """debug on input stream (unbundling)"""
202 202 if ui.configbool(b'devel', b'bundle2.debug'):
203 203 ui.debug(b'bundle2-input: %s\n' % message)
204 204
205 205
206 206 def validateparttype(parttype):
207 207 """raise ValueError if a parttype contains invalid character"""
208 208 if _parttypeforbidden.search(parttype):
209 209 raise ValueError(parttype)
210 210
211 211
212 212 def _makefpartparamsizes(nbparams):
213 213 """return a struct format to read part parameter sizes
214 214
215 215 The number parameters is variable so we need to build that format
216 216 dynamically.
217 217 """
218 218 return b'>' + (b'BB' * nbparams)
219 219
220 220
221 221 parthandlermapping = {}
222 222
223 223
224 224 def parthandler(parttype, params=()):
225 225 """decorator that register a function as a bundle2 part handler
226 226
227 227 eg::
228 228
229 229 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
230 230 def myparttypehandler(...):
231 231 '''process a part of type "my part".'''
232 232 ...
233 233 """
234 234 validateparttype(parttype)
235 235
236 236 def _decorator(func):
237 237 lparttype = parttype.lower() # enforce lower case matching.
238 238 assert lparttype not in parthandlermapping
239 239 parthandlermapping[lparttype] = func
240 240 func.params = frozenset(params)
241 241 return func
242 242
243 243 return _decorator
244 244
245 245
246 246 class unbundlerecords(object):
247 247 """keep record of what happens during and unbundle
248 248
249 249 New records are added using `records.add('cat', obj)`. Where 'cat' is a
250 250 category of record and obj is an arbitrary object.
251 251
252 252 `records['cat']` will return all entries of this category 'cat'.
253 253
254 254 Iterating on the object itself will yield `('category', obj)` tuples
255 255 for all entries.
256 256
257 257 All iterations happens in chronological order.
258 258 """
259 259
260 260 def __init__(self):
261 261 self._categories = {}
262 262 self._sequences = []
263 263 self._replies = {}
264 264
265 265 def add(self, category, entry, inreplyto=None):
266 266 """add a new record of a given category.
267 267
268 268 The entry can then be retrieved in the list returned by
269 269 self['category']."""
270 270 self._categories.setdefault(category, []).append(entry)
271 271 self._sequences.append((category, entry))
272 272 if inreplyto is not None:
273 273 self.getreplies(inreplyto).add(category, entry)
274 274
275 275 def getreplies(self, partid):
276 276 """get the records that are replies to a specific part"""
277 277 return self._replies.setdefault(partid, unbundlerecords())
278 278
279 279 def __getitem__(self, cat):
280 280 return tuple(self._categories.get(cat, ()))
281 281
282 282 def __iter__(self):
283 283 return iter(self._sequences)
284 284
285 285 def __len__(self):
286 286 return len(self._sequences)
287 287
288 288 def __nonzero__(self):
289 289 return bool(self._sequences)
290 290
291 291 __bool__ = __nonzero__
292 292
293 293
294 294 class bundleoperation(object):
295 295 """an object that represents a single bundling process
296 296
297 297 Its purpose is to carry unbundle-related objects and states.
298 298
299 299 A new object should be created at the beginning of each bundle processing.
300 300 The object is to be returned by the processing function.
301 301
302 302 The object has very little content now it will ultimately contain:
303 303 * an access to the repo the bundle is applied to,
304 304 * a ui object,
305 305 * a way to retrieve a transaction to add changes to the repo,
306 306 * a way to record the result of processing each part,
307 307 * a way to construct a bundle response when applicable.
308 308 """
309 309
310 310 def __init__(self, repo, transactiongetter, captureoutput=True, source=b''):
311 311 self.repo = repo
312 312 self.ui = repo.ui
313 313 self.records = unbundlerecords()
314 314 self.reply = None
315 315 self.captureoutput = captureoutput
316 316 self.hookargs = {}
317 317 self._gettransaction = transactiongetter
318 318 # carries value that can modify part behavior
319 319 self.modes = {}
320 320 self.source = source
321 321
322 322 def gettransaction(self):
323 323 transaction = self._gettransaction()
324 324
325 325 if self.hookargs:
326 326 # the ones added to the transaction supercede those added
327 327 # to the operation.
328 328 self.hookargs.update(transaction.hookargs)
329 329 transaction.hookargs = self.hookargs
330 330
331 331 # mark the hookargs as flushed. further attempts to add to
332 332 # hookargs will result in an abort.
333 333 self.hookargs = None
334 334
335 335 return transaction
336 336
337 337 def addhookargs(self, hookargs):
338 338 if self.hookargs is None:
339 339 raise error.ProgrammingError(
340 340 b'attempted to add hookargs to '
341 341 b'operation after transaction started'
342 342 )
343 343 self.hookargs.update(hookargs)
344 344
345 345
346 346 class TransactionUnavailable(RuntimeError):
347 347 pass
348 348
349 349
350 350 def _notransaction():
351 351 """default method to get a transaction while processing a bundle
352 352
353 353 Raise an exception to highlight the fact that no transaction was expected
354 354 to be created"""
355 355 raise TransactionUnavailable()
356 356
357 357
358 358 def applybundle(repo, unbundler, tr, source, url=None, **kwargs):
359 359 # transform me into unbundler.apply() as soon as the freeze is lifted
360 360 if isinstance(unbundler, unbundle20):
361 361 tr.hookargs[b'bundle2'] = b'1'
362 362 if source is not None and b'source' not in tr.hookargs:
363 363 tr.hookargs[b'source'] = source
364 364 if url is not None and b'url' not in tr.hookargs:
365 365 tr.hookargs[b'url'] = url
366 366 return processbundle(repo, unbundler, lambda: tr, source=source)
367 367 else:
368 368 # the transactiongetter won't be used, but we might as well set it
369 369 op = bundleoperation(repo, lambda: tr, source=source)
370 370 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
371 371 return op
372 372
373 373
374 374 class partiterator(object):
375 375 def __init__(self, repo, op, unbundler):
376 376 self.repo = repo
377 377 self.op = op
378 378 self.unbundler = unbundler
379 379 self.iterator = None
380 380 self.count = 0
381 381 self.current = None
382 382
383 383 def __enter__(self):
384 384 def func():
385 385 itr = enumerate(self.unbundler.iterparts(), 1)
386 386 for count, p in itr:
387 387 self.count = count
388 388 self.current = p
389 389 yield p
390 390 p.consume()
391 391 self.current = None
392 392
393 393 self.iterator = func()
394 394 return self.iterator
395 395
396 396 def __exit__(self, type, exc, tb):
397 397 if not self.iterator:
398 398 return
399 399
400 400 # Only gracefully abort in a normal exception situation. User aborts
401 401 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
402 402 # and should not gracefully cleanup.
403 403 if isinstance(exc, Exception):
404 404 # Any exceptions seeking to the end of the bundle at this point are
405 405 # almost certainly related to the underlying stream being bad.
406 406 # And, chances are that the exception we're handling is related to
407 407 # getting in that bad state. So, we swallow the seeking error and
408 408 # re-raise the original error.
409 409 seekerror = False
410 410 try:
411 411 if self.current:
412 412 # consume the part content to not corrupt the stream.
413 413 self.current.consume()
414 414
415 415 for part in self.iterator:
416 416 # consume the bundle content
417 417 part.consume()
418 418 except Exception:
419 419 seekerror = True
420 420
421 421 # Small hack to let caller code distinguish exceptions from bundle2
422 422 # processing from processing the old format. This is mostly needed
423 423 # to handle different return codes to unbundle according to the type
424 424 # of bundle. We should probably clean up or drop this return code
425 425 # craziness in a future version.
426 426 exc.duringunbundle2 = True
427 427 salvaged = []
428 428 replycaps = None
429 429 if self.op.reply is not None:
430 430 salvaged = self.op.reply.salvageoutput()
431 431 replycaps = self.op.reply.capabilities
432 432 exc._replycaps = replycaps
433 433 exc._bundle2salvagedoutput = salvaged
434 434
435 435 # Re-raising from a variable loses the original stack. So only use
436 436 # that form if we need to.
437 437 if seekerror:
438 438 raise exc
439 439
440 440 self.repo.ui.debug(
441 441 b'bundle2-input-bundle: %i parts total\n' % self.count
442 442 )
443 443
444 444
445 445 def processbundle(repo, unbundler, transactiongetter=None, op=None, source=b''):
446 446 """This function process a bundle, apply effect to/from a repo
447 447
448 448 It iterates over each part then searches for and uses the proper handling
449 449 code to process the part. Parts are processed in order.
450 450
451 451 Unknown Mandatory part will abort the process.
452 452
453 453 It is temporarily possible to provide a prebuilt bundleoperation to the
454 454 function. This is used to ensure output is properly propagated in case of
455 455 an error during the unbundling. This output capturing part will likely be
456 456 reworked and this ability will probably go away in the process.
457 457 """
458 458 if op is None:
459 459 if transactiongetter is None:
460 460 transactiongetter = _notransaction
461 461 op = bundleoperation(repo, transactiongetter, source=source)
462 462 # todo:
463 463 # - replace this is a init function soon.
464 464 # - exception catching
465 465 unbundler.params
466 466 if repo.ui.debugflag:
467 467 msg = [b'bundle2-input-bundle:']
468 468 if unbundler.params:
469 469 msg.append(b' %i params' % len(unbundler.params))
470 470 if op._gettransaction is None or op._gettransaction is _notransaction:
471 471 msg.append(b' no-transaction')
472 472 else:
473 473 msg.append(b' with-transaction')
474 474 msg.append(b'\n')
475 475 repo.ui.debug(b''.join(msg))
476 476
477 477 processparts(repo, op, unbundler)
478 478
479 479 return op
480 480
481 481
482 482 def processparts(repo, op, unbundler):
483 483 with partiterator(repo, op, unbundler) as parts:
484 484 for part in parts:
485 485 _processpart(op, part)
486 486
487 487
488 488 def _processchangegroup(op, cg, tr, source, url, **kwargs):
489 489 ret = cg.apply(op.repo, tr, source, url, **kwargs)
490 490 op.records.add(b'changegroup', {b'return': ret,})
491 491 return ret
492 492
493 493
494 494 def _gethandler(op, part):
495 495 status = b'unknown' # used by debug output
496 496 try:
497 497 handler = parthandlermapping.get(part.type)
498 498 if handler is None:
499 499 status = b'unsupported-type'
500 500 raise error.BundleUnknownFeatureError(parttype=part.type)
501 501 indebug(op.ui, b'found a handler for part %s' % part.type)
502 502 unknownparams = part.mandatorykeys - handler.params
503 503 if unknownparams:
504 504 unknownparams = list(unknownparams)
505 505 unknownparams.sort()
506 506 status = b'unsupported-params (%s)' % b', '.join(unknownparams)
507 507 raise error.BundleUnknownFeatureError(
508 508 parttype=part.type, params=unknownparams
509 509 )
510 510 status = b'supported'
511 511 except error.BundleUnknownFeatureError as exc:
512 512 if part.mandatory: # mandatory parts
513 513 raise
514 514 indebug(op.ui, b'ignoring unsupported advisory part %s' % exc)
515 515 return # skip to part processing
516 516 finally:
517 517 if op.ui.debugflag:
518 518 msg = [b'bundle2-input-part: "%s"' % part.type]
519 519 if not part.mandatory:
520 520 msg.append(b' (advisory)')
521 521 nbmp = len(part.mandatorykeys)
522 522 nbap = len(part.params) - nbmp
523 523 if nbmp or nbap:
524 524 msg.append(b' (params:')
525 525 if nbmp:
526 526 msg.append(b' %i mandatory' % nbmp)
527 527 if nbap:
528 528 msg.append(b' %i advisory' % nbmp)
529 529 msg.append(b')')
530 530 msg.append(b' %s\n' % status)
531 531 op.ui.debug(b''.join(msg))
532 532
533 533 return handler
534 534
535 535
536 536 def _processpart(op, part):
537 537 """process a single part from a bundle
538 538
539 539 The part is guaranteed to have been fully consumed when the function exits
540 540 (even if an exception is raised)."""
541 541 handler = _gethandler(op, part)
542 542 if handler is None:
543 543 return
544 544
545 545 # handler is called outside the above try block so that we don't
546 546 # risk catching KeyErrors from anything other than the
547 547 # parthandlermapping lookup (any KeyError raised by handler()
548 548 # itself represents a defect of a different variety).
549 549 output = None
550 550 if op.captureoutput and op.reply is not None:
551 551 op.ui.pushbuffer(error=True, subproc=True)
552 552 output = b''
553 553 try:
554 554 handler(op, part)
555 555 finally:
556 556 if output is not None:
557 557 output = op.ui.popbuffer()
558 558 if output:
559 559 outpart = op.reply.newpart(b'output', data=output, mandatory=False)
560 560 outpart.addparam(
561 561 b'in-reply-to', pycompat.bytestr(part.id), mandatory=False
562 562 )
563 563
564 564
565 565 def decodecaps(blob):
566 566 """decode a bundle2 caps bytes blob into a dictionary
567 567
568 568 The blob is a list of capabilities (one per line)
569 569 Capabilities may have values using a line of the form::
570 570
571 571 capability=value1,value2,value3
572 572
573 573 The values are always a list."""
574 574 caps = {}
575 575 for line in blob.splitlines():
576 576 if not line:
577 577 continue
578 578 if b'=' not in line:
579 579 key, vals = line, ()
580 580 else:
581 581 key, vals = line.split(b'=', 1)
582 582 vals = vals.split(b',')
583 583 key = urlreq.unquote(key)
584 584 vals = [urlreq.unquote(v) for v in vals]
585 585 caps[key] = vals
586 586 return caps
587 587
588 588
589 589 def encodecaps(caps):
590 590 """encode a bundle2 caps dictionary into a bytes blob"""
591 591 chunks = []
592 592 for ca in sorted(caps):
593 593 vals = caps[ca]
594 594 ca = urlreq.quote(ca)
595 595 vals = [urlreq.quote(v) for v in vals]
596 596 if vals:
597 597 ca = b"%s=%s" % (ca, b','.join(vals))
598 598 chunks.append(ca)
599 599 return b'\n'.join(chunks)
600 600
601 601
602 602 bundletypes = {
603 603 b"": (b"", b'UN'), # only when using unbundle on ssh and old http servers
604 604 # since the unification ssh accepts a header but there
605 605 # is no capability signaling it.
606 606 b"HG20": (), # special-cased below
607 607 b"HG10UN": (b"HG10UN", b'UN'),
608 608 b"HG10BZ": (b"HG10", b'BZ'),
609 609 b"HG10GZ": (b"HG10GZ", b'GZ'),
610 610 }
611 611
612 612 # hgweb uses this list to communicate its preferred type
613 613 bundlepriority = [b'HG10GZ', b'HG10BZ', b'HG10UN']
614 614
615 615
616 616 class bundle20(object):
617 617 """represent an outgoing bundle2 container
618 618
619 619 Use the `addparam` method to add stream level parameter. and `newpart` to
620 620 populate it. Then call `getchunks` to retrieve all the binary chunks of
621 621 data that compose the bundle2 container."""
622 622
623 623 _magicstring = b'HG20'
624 624
625 625 def __init__(self, ui, capabilities=()):
626 626 self.ui = ui
627 627 self._params = []
628 628 self._parts = []
629 629 self.capabilities = dict(capabilities)
630 630 self._compengine = util.compengines.forbundletype(b'UN')
631 631 self._compopts = None
632 632 # If compression is being handled by a consumer of the raw
633 633 # data (e.g. the wire protocol), unsetting this flag tells
634 634 # consumers that the bundle is best left uncompressed.
635 635 self.prefercompressed = True
636 636
637 637 def setcompression(self, alg, compopts=None):
638 638 """setup core part compression to <alg>"""
639 639 if alg in (None, b'UN'):
640 640 return
641 641 assert not any(n.lower() == b'compression' for n, v in self._params)
642 642 self.addparam(b'Compression', alg)
643 643 self._compengine = util.compengines.forbundletype(alg)
644 644 self._compopts = compopts
645 645
646 646 @property
647 647 def nbparts(self):
648 648 """total number of parts added to the bundler"""
649 649 return len(self._parts)
650 650
651 651 # methods used to defines the bundle2 content
652 652 def addparam(self, name, value=None):
653 653 """add a stream level parameter"""
654 654 if not name:
655 655 raise error.ProgrammingError(b'empty parameter name')
656 656 if name[0:1] not in pycompat.bytestr(
657 657 string.ascii_letters # pytype: disable=wrong-arg-types
658 658 ):
659 659 raise error.ProgrammingError(
660 660 b'non letter first character: %s' % name
661 661 )
662 662 self._params.append((name, value))
663 663
664 664 def addpart(self, part):
665 665 """add a new part to the bundle2 container
666 666
667 667 Parts contains the actual applicative payload."""
668 668 assert part.id is None
669 669 part.id = len(self._parts) # very cheap counter
670 670 self._parts.append(part)
671 671
672 672 def newpart(self, typeid, *args, **kwargs):
673 673 """create a new part and add it to the containers
674 674
675 675 As the part is directly added to the containers. For now, this means
676 676 that any failure to properly initialize the part after calling
677 677 ``newpart`` should result in a failure of the whole bundling process.
678 678
679 679 You can still fall back to manually create and add if you need better
680 680 control."""
681 681 part = bundlepart(typeid, *args, **kwargs)
682 682 self.addpart(part)
683 683 return part
684 684
685 685 # methods used to generate the bundle2 stream
686 686 def getchunks(self):
687 687 if self.ui.debugflag:
688 688 msg = [b'bundle2-output-bundle: "%s",' % self._magicstring]
689 689 if self._params:
690 690 msg.append(b' (%i params)' % len(self._params))
691 691 msg.append(b' %i parts total\n' % len(self._parts))
692 692 self.ui.debug(b''.join(msg))
693 693 outdebug(self.ui, b'start emission of %s stream' % self._magicstring)
694 694 yield self._magicstring
695 695 param = self._paramchunk()
696 696 outdebug(self.ui, b'bundle parameter: %s' % param)
697 697 yield _pack(_fstreamparamsize, len(param))
698 698 if param:
699 699 yield param
700 700 for chunk in self._compengine.compressstream(
701 701 self._getcorechunk(), self._compopts
702 702 ):
703 703 yield chunk
704 704
705 705 def _paramchunk(self):
706 706 """return a encoded version of all stream parameters"""
707 707 blocks = []
708 708 for par, value in self._params:
709 709 par = urlreq.quote(par)
710 710 if value is not None:
711 711 value = urlreq.quote(value)
712 712 par = b'%s=%s' % (par, value)
713 713 blocks.append(par)
714 714 return b' '.join(blocks)
715 715
716 716 def _getcorechunk(self):
717 717 """yield chunk for the core part of the bundle
718 718
719 719 (all but headers and parameters)"""
720 720 outdebug(self.ui, b'start of parts')
721 721 for part in self._parts:
722 722 outdebug(self.ui, b'bundle part: "%s"' % part.type)
723 723 for chunk in part.getchunks(ui=self.ui):
724 724 yield chunk
725 725 outdebug(self.ui, b'end of bundle')
726 726 yield _pack(_fpartheadersize, 0)
727 727
728 728 def salvageoutput(self):
729 729 """return a list with a copy of all output parts in the bundle
730 730
731 731 This is meant to be used during error handling to make sure we preserve
732 732 server output"""
733 733 salvaged = []
734 734 for part in self._parts:
735 735 if part.type.startswith(b'output'):
736 736 salvaged.append(part.copy())
737 737 return salvaged
738 738
739 739
740 740 class unpackermixin(object):
741 741 """A mixin to extract bytes and struct data from a stream"""
742 742
743 743 def __init__(self, fp):
744 744 self._fp = fp
745 745
746 746 def _unpack(self, format):
747 747 """unpack this struct format from the stream
748 748
749 749 This method is meant for internal usage by the bundle2 protocol only.
750 750 They directly manipulate the low level stream including bundle2 level
751 751 instruction.
752 752
753 753 Do not use it to implement higher-level logic or methods."""
754 754 data = self._readexact(struct.calcsize(format))
755 755 return _unpack(format, data)
756 756
757 757 def _readexact(self, size):
758 758 """read exactly <size> bytes from the stream
759 759
760 760 This method is meant for internal usage by the bundle2 protocol only.
761 761 They directly manipulate the low level stream including bundle2 level
762 762 instruction.
763 763
764 764 Do not use it to implement higher-level logic or methods."""
765 765 return changegroup.readexactly(self._fp, size)
766 766
767 767
768 768 def getunbundler(ui, fp, magicstring=None):
769 769 """return a valid unbundler object for a given magicstring"""
770 770 if magicstring is None:
771 771 magicstring = changegroup.readexactly(fp, 4)
772 772 magic, version = magicstring[0:2], magicstring[2:4]
773 773 if magic != b'HG':
774 774 ui.debug(
775 775 b"error: invalid magic: %r (version %r), should be 'HG'\n"
776 776 % (magic, version)
777 777 )
778 778 raise error.Abort(_(b'not a Mercurial bundle'))
779 779 unbundlerclass = formatmap.get(version)
780 780 if unbundlerclass is None:
781 781 raise error.Abort(_(b'unknown bundle version %s') % version)
782 782 unbundler = unbundlerclass(ui, fp)
783 783 indebug(ui, b'start processing of %s stream' % magicstring)
784 784 return unbundler
785 785
786 786
787 787 class unbundle20(unpackermixin):
788 788 """interpret a bundle2 stream
789 789
790 790 This class is fed with a binary stream and yields parts through its
791 791 `iterparts` methods."""
792 792
793 793 _magicstring = b'HG20'
794 794
795 795 def __init__(self, ui, fp):
796 796 """If header is specified, we do not read it out of the stream."""
797 797 self.ui = ui
798 798 self._compengine = util.compengines.forbundletype(b'UN')
799 799 self._compressed = None
800 800 super(unbundle20, self).__init__(fp)
801 801
802 802 @util.propertycache
803 803 def params(self):
804 804 """dictionary of stream level parameters"""
805 805 indebug(self.ui, b'reading bundle2 stream parameters')
806 806 params = {}
807 807 paramssize = self._unpack(_fstreamparamsize)[0]
808 808 if paramssize < 0:
809 809 raise error.BundleValueError(
810 810 b'negative bundle param size: %i' % paramssize
811 811 )
812 812 if paramssize:
813 813 params = self._readexact(paramssize)
814 814 params = self._processallparams(params)
815 815 return params
816 816
817 817 def _processallparams(self, paramsblock):
818 818 """"""
819 819 params = util.sortdict()
820 820 for p in paramsblock.split(b' '):
821 821 p = p.split(b'=', 1)
822 822 p = [urlreq.unquote(i) for i in p]
823 823 if len(p) < 2:
824 824 p.append(None)
825 825 self._processparam(*p)
826 826 params[p[0]] = p[1]
827 827 return params
828 828
829 829 def _processparam(self, name, value):
830 830 """process a parameter, applying its effect if needed
831 831
832 832 Parameter starting with a lower case letter are advisory and will be
833 833 ignored when unknown. Those starting with an upper case letter are
834 834 mandatory and will this function will raise a KeyError when unknown.
835 835
836 836 Note: no option are currently supported. Any input will be either
837 837 ignored or failing.
838 838 """
839 839 if not name:
840 840 raise ValueError('empty parameter name')
841 841 if name[0:1] not in pycompat.bytestr(
842 842 string.ascii_letters # pytype: disable=wrong-arg-types
843 843 ):
844 844 raise ValueError('non letter first character: %s' % name)
845 845 try:
846 846 handler = b2streamparamsmap[name.lower()]
847 847 except KeyError:
848 848 if name[0:1].islower():
849 849 indebug(self.ui, b"ignoring unknown parameter %s" % name)
850 850 else:
851 851 raise error.BundleUnknownFeatureError(params=(name,))
852 852 else:
853 853 handler(self, name, value)
854 854
855 855 def _forwardchunks(self):
856 856 """utility to transfer a bundle2 as binary
857 857
858 858 This is made necessary by the fact the 'getbundle' command over 'ssh'
859 859 have no way to know then the reply end, relying on the bundle to be
860 860 interpreted to know its end. This is terrible and we are sorry, but we
861 861 needed to move forward to get general delta enabled.
862 862 """
863 863 yield self._magicstring
864 864 assert 'params' not in vars(self)
865 865 paramssize = self._unpack(_fstreamparamsize)[0]
866 866 if paramssize < 0:
867 867 raise error.BundleValueError(
868 868 b'negative bundle param size: %i' % paramssize
869 869 )
870 870 if paramssize:
871 871 params = self._readexact(paramssize)
872 872 self._processallparams(params)
873 873 # The payload itself is decompressed below, so drop
874 874 # the compression parameter passed down to compensate.
875 875 outparams = []
876 876 for p in params.split(b' '):
877 877 k, v = p.split(b'=', 1)
878 878 if k.lower() != b'compression':
879 879 outparams.append(p)
880 880 outparams = b' '.join(outparams)
881 881 yield _pack(_fstreamparamsize, len(outparams))
882 882 yield outparams
883 883 else:
884 884 yield _pack(_fstreamparamsize, paramssize)
885 885 # From there, payload might need to be decompressed
886 886 self._fp = self._compengine.decompressorreader(self._fp)
887 887 emptycount = 0
888 888 while emptycount < 2:
889 889 # so we can brainlessly loop
890 890 assert _fpartheadersize == _fpayloadsize
891 891 size = self._unpack(_fpartheadersize)[0]
892 892 yield _pack(_fpartheadersize, size)
893 893 if size:
894 894 emptycount = 0
895 895 else:
896 896 emptycount += 1
897 897 continue
898 898 if size == flaginterrupt:
899 899 continue
900 900 elif size < 0:
901 901 raise error.BundleValueError(b'negative chunk size: %i')
902 902 yield self._readexact(size)
903 903
904 904 def iterparts(self, seekable=False):
905 905 """yield all parts contained in the stream"""
906 906 cls = seekableunbundlepart if seekable else unbundlepart
907 907 # make sure param have been loaded
908 908 self.params
909 909 # From there, payload need to be decompressed
910 910 self._fp = self._compengine.decompressorreader(self._fp)
911 911 indebug(self.ui, b'start extraction of bundle2 parts')
912 912 headerblock = self._readpartheader()
913 913 while headerblock is not None:
914 914 part = cls(self.ui, headerblock, self._fp)
915 915 yield part
916 916 # Ensure part is fully consumed so we can start reading the next
917 917 # part.
918 918 part.consume()
919 919
920 920 headerblock = self._readpartheader()
921 921 indebug(self.ui, b'end of bundle2 stream')
922 922
923 923 def _readpartheader(self):
924 924 """reads a part header size and return the bytes blob
925 925
926 926 returns None if empty"""
927 927 headersize = self._unpack(_fpartheadersize)[0]
928 928 if headersize < 0:
929 929 raise error.BundleValueError(
930 930 b'negative part header size: %i' % headersize
931 931 )
932 932 indebug(self.ui, b'part header size: %i' % headersize)
933 933 if headersize:
934 934 return self._readexact(headersize)
935 935 return None
936 936
937 937 def compressed(self):
938 938 self.params # load params
939 939 return self._compressed
940 940
941 941 def close(self):
942 942 """close underlying file"""
943 943 if util.safehasattr(self._fp, 'close'):
944 944 return self._fp.close()
945 945
946 946
947 947 formatmap = {b'20': unbundle20}
948 948
949 949 b2streamparamsmap = {}
950 950
951 951
952 952 def b2streamparamhandler(name):
953 953 """register a handler for a stream level parameter"""
954 954
955 955 def decorator(func):
956 956 assert name not in formatmap
957 957 b2streamparamsmap[name] = func
958 958 return func
959 959
960 960 return decorator
961 961
962 962
963 963 @b2streamparamhandler(b'compression')
964 964 def processcompression(unbundler, param, value):
965 965 """read compression parameter and install payload decompression"""
966 966 if value not in util.compengines.supportedbundletypes:
967 967 raise error.BundleUnknownFeatureError(params=(param,), values=(value,))
968 968 unbundler._compengine = util.compengines.forbundletype(value)
969 969 if value is not None:
970 970 unbundler._compressed = True
971 971
972 972
973 973 class bundlepart(object):
974 974 """A bundle2 part contains application level payload
975 975
976 976 The part `type` is used to route the part to the application level
977 977 handler.
978 978
979 979 The part payload is contained in ``part.data``. It could be raw bytes or a
980 980 generator of byte chunks.
981 981
982 982 You can add parameters to the part using the ``addparam`` method.
983 983 Parameters can be either mandatory (default) or advisory. Remote side
984 984 should be able to safely ignore the advisory ones.
985 985
986 986 Both data and parameters cannot be modified after the generation has begun.
987 987 """
988 988
989 989 def __init__(
990 990 self,
991 991 parttype,
992 992 mandatoryparams=(),
993 993 advisoryparams=(),
994 994 data=b'',
995 995 mandatory=True,
996 996 ):
997 997 validateparttype(parttype)
998 998 self.id = None
999 999 self.type = parttype
1000 1000 self._data = data
1001 1001 self._mandatoryparams = list(mandatoryparams)
1002 1002 self._advisoryparams = list(advisoryparams)
1003 1003 # checking for duplicated entries
1004 1004 self._seenparams = set()
1005 1005 for pname, __ in self._mandatoryparams + self._advisoryparams:
1006 1006 if pname in self._seenparams:
1007 1007 raise error.ProgrammingError(b'duplicated params: %s' % pname)
1008 1008 self._seenparams.add(pname)
1009 1009 # status of the part's generation:
1010 1010 # - None: not started,
1011 1011 # - False: currently generated,
1012 1012 # - True: generation done.
1013 1013 self._generated = None
1014 1014 self.mandatory = mandatory
1015 1015
1016 @encoding.strmethod
1017 1016 def __repr__(self):
1018 cls = b"%s.%s" % (self.__class__.__module__, self.__class__.__name__)
1019 return b'<%s object at %x; id: %s; type: %s; mandatory: %s>' % (
1017 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
1018 return '<%s object at %x; id: %s; type: %s; mandatory: %s>' % (
1020 1019 cls,
1021 1020 id(self),
1022 1021 self.id,
1023 1022 self.type,
1024 1023 self.mandatory,
1025 1024 )
1026 1025
1027 1026 def copy(self):
1028 1027 """return a copy of the part
1029 1028
1030 1029 The new part have the very same content but no partid assigned yet.
1031 1030 Parts with generated data cannot be copied."""
1032 1031 assert not util.safehasattr(self.data, 'next')
1033 1032 return self.__class__(
1034 1033 self.type,
1035 1034 self._mandatoryparams,
1036 1035 self._advisoryparams,
1037 1036 self._data,
1038 1037 self.mandatory,
1039 1038 )
1040 1039
1041 1040 # methods used to defines the part content
1042 1041 @property
1043 1042 def data(self):
1044 1043 return self._data
1045 1044
1046 1045 @data.setter
1047 1046 def data(self, data):
1048 1047 if self._generated is not None:
1049 1048 raise error.ReadOnlyPartError(b'part is being generated')
1050 1049 self._data = data
1051 1050
1052 1051 @property
1053 1052 def mandatoryparams(self):
1054 1053 # make it an immutable tuple to force people through ``addparam``
1055 1054 return tuple(self._mandatoryparams)
1056 1055
1057 1056 @property
1058 1057 def advisoryparams(self):
1059 1058 # make it an immutable tuple to force people through ``addparam``
1060 1059 return tuple(self._advisoryparams)
1061 1060
1062 1061 def addparam(self, name, value=b'', mandatory=True):
1063 1062 """add a parameter to the part
1064 1063
1065 1064 If 'mandatory' is set to True, the remote handler must claim support
1066 1065 for this parameter or the unbundling will be aborted.
1067 1066
1068 1067 The 'name' and 'value' cannot exceed 255 bytes each.
1069 1068 """
1070 1069 if self._generated is not None:
1071 1070 raise error.ReadOnlyPartError(b'part is being generated')
1072 1071 if name in self._seenparams:
1073 1072 raise ValueError(b'duplicated params: %s' % name)
1074 1073 self._seenparams.add(name)
1075 1074 params = self._advisoryparams
1076 1075 if mandatory:
1077 1076 params = self._mandatoryparams
1078 1077 params.append((name, value))
1079 1078
1080 1079 # methods used to generates the bundle2 stream
1081 1080 def getchunks(self, ui):
1082 1081 if self._generated is not None:
1083 1082 raise error.ProgrammingError(b'part can only be consumed once')
1084 1083 self._generated = False
1085 1084
1086 1085 if ui.debugflag:
1087 1086 msg = [b'bundle2-output-part: "%s"' % self.type]
1088 1087 if not self.mandatory:
1089 1088 msg.append(b' (advisory)')
1090 1089 nbmp = len(self.mandatoryparams)
1091 1090 nbap = len(self.advisoryparams)
1092 1091 if nbmp or nbap:
1093 1092 msg.append(b' (params:')
1094 1093 if nbmp:
1095 1094 msg.append(b' %i mandatory' % nbmp)
1096 1095 if nbap:
1097 1096 msg.append(b' %i advisory' % nbmp)
1098 1097 msg.append(b')')
1099 1098 if not self.data:
1100 1099 msg.append(b' empty payload')
1101 1100 elif util.safehasattr(self.data, 'next') or util.safehasattr(
1102 1101 self.data, b'__next__'
1103 1102 ):
1104 1103 msg.append(b' streamed payload')
1105 1104 else:
1106 1105 msg.append(b' %i bytes payload' % len(self.data))
1107 1106 msg.append(b'\n')
1108 1107 ui.debug(b''.join(msg))
1109 1108
1110 1109 #### header
1111 1110 if self.mandatory:
1112 1111 parttype = self.type.upper()
1113 1112 else:
1114 1113 parttype = self.type.lower()
1115 1114 outdebug(ui, b'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1116 1115 ## parttype
1117 1116 header = [
1118 1117 _pack(_fparttypesize, len(parttype)),
1119 1118 parttype,
1120 1119 _pack(_fpartid, self.id),
1121 1120 ]
1122 1121 ## parameters
1123 1122 # count
1124 1123 manpar = self.mandatoryparams
1125 1124 advpar = self.advisoryparams
1126 1125 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1127 1126 # size
1128 1127 parsizes = []
1129 1128 for key, value in manpar:
1130 1129 parsizes.append(len(key))
1131 1130 parsizes.append(len(value))
1132 1131 for key, value in advpar:
1133 1132 parsizes.append(len(key))
1134 1133 parsizes.append(len(value))
1135 1134 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1136 1135 header.append(paramsizes)
1137 1136 # key, value
1138 1137 for key, value in manpar:
1139 1138 header.append(key)
1140 1139 header.append(value)
1141 1140 for key, value in advpar:
1142 1141 header.append(key)
1143 1142 header.append(value)
1144 1143 ## finalize header
1145 1144 try:
1146 1145 headerchunk = b''.join(header)
1147 1146 except TypeError:
1148 1147 raise TypeError(
1149 1148 'Found a non-bytes trying to '
1150 1149 'build bundle part header: %r' % header
1151 1150 )
1152 1151 outdebug(ui, b'header chunk size: %i' % len(headerchunk))
1153 1152 yield _pack(_fpartheadersize, len(headerchunk))
1154 1153 yield headerchunk
1155 1154 ## payload
1156 1155 try:
1157 1156 for chunk in self._payloadchunks():
1158 1157 outdebug(ui, b'payload chunk size: %i' % len(chunk))
1159 1158 yield _pack(_fpayloadsize, len(chunk))
1160 1159 yield chunk
1161 1160 except GeneratorExit:
1162 1161 # GeneratorExit means that nobody is listening for our
1163 1162 # results anyway, so just bail quickly rather than trying
1164 1163 # to produce an error part.
1165 1164 ui.debug(b'bundle2-generatorexit\n')
1166 1165 raise
1167 1166 except BaseException as exc:
1168 1167 bexc = stringutil.forcebytestr(exc)
1169 1168 # backup exception data for later
1170 1169 ui.debug(
1171 1170 b'bundle2-input-stream-interrupt: encoding exception %s' % bexc
1172 1171 )
1173 1172 tb = sys.exc_info()[2]
1174 1173 msg = b'unexpected error: %s' % bexc
1175 1174 interpart = bundlepart(
1176 1175 b'error:abort', [(b'message', msg)], mandatory=False
1177 1176 )
1178 1177 interpart.id = 0
1179 1178 yield _pack(_fpayloadsize, -1)
1180 1179 for chunk in interpart.getchunks(ui=ui):
1181 1180 yield chunk
1182 1181 outdebug(ui, b'closing payload chunk')
1183 1182 # abort current part payload
1184 1183 yield _pack(_fpayloadsize, 0)
1185 1184 pycompat.raisewithtb(exc, tb)
1186 1185 # end of payload
1187 1186 outdebug(ui, b'closing payload chunk')
1188 1187 yield _pack(_fpayloadsize, 0)
1189 1188 self._generated = True
1190 1189
1191 1190 def _payloadchunks(self):
1192 1191 """yield chunks of a the part payload
1193 1192
1194 1193 Exists to handle the different methods to provide data to a part."""
1195 1194 # we only support fixed size data now.
1196 1195 # This will be improved in the future.
1197 1196 if util.safehasattr(self.data, 'next') or util.safehasattr(
1198 1197 self.data, b'__next__'
1199 1198 ):
1200 1199 buff = util.chunkbuffer(self.data)
1201 1200 chunk = buff.read(preferedchunksize)
1202 1201 while chunk:
1203 1202 yield chunk
1204 1203 chunk = buff.read(preferedchunksize)
1205 1204 elif len(self.data):
1206 1205 yield self.data
1207 1206
1208 1207
1209 1208 flaginterrupt = -1
1210 1209
1211 1210
1212 1211 class interrupthandler(unpackermixin):
1213 1212 """read one part and process it with restricted capability
1214 1213
1215 1214 This allows to transmit exception raised on the producer size during part
1216 1215 iteration while the consumer is reading a part.
1217 1216
1218 1217 Part processed in this manner only have access to a ui object,"""
1219 1218
1220 1219 def __init__(self, ui, fp):
1221 1220 super(interrupthandler, self).__init__(fp)
1222 1221 self.ui = ui
1223 1222
1224 1223 def _readpartheader(self):
1225 1224 """reads a part header size and return the bytes blob
1226 1225
1227 1226 returns None if empty"""
1228 1227 headersize = self._unpack(_fpartheadersize)[0]
1229 1228 if headersize < 0:
1230 1229 raise error.BundleValueError(
1231 1230 b'negative part header size: %i' % headersize
1232 1231 )
1233 1232 indebug(self.ui, b'part header size: %i\n' % headersize)
1234 1233 if headersize:
1235 1234 return self._readexact(headersize)
1236 1235 return None
1237 1236
1238 1237 def __call__(self):
1239 1238
1240 1239 self.ui.debug(
1241 1240 b'bundle2-input-stream-interrupt: opening out of band context\n'
1242 1241 )
1243 1242 indebug(self.ui, b'bundle2 stream interruption, looking for a part.')
1244 1243 headerblock = self._readpartheader()
1245 1244 if headerblock is None:
1246 1245 indebug(self.ui, b'no part found during interruption.')
1247 1246 return
1248 1247 part = unbundlepart(self.ui, headerblock, self._fp)
1249 1248 op = interruptoperation(self.ui)
1250 1249 hardabort = False
1251 1250 try:
1252 1251 _processpart(op, part)
1253 1252 except (SystemExit, KeyboardInterrupt):
1254 1253 hardabort = True
1255 1254 raise
1256 1255 finally:
1257 1256 if not hardabort:
1258 1257 part.consume()
1259 1258 self.ui.debug(
1260 1259 b'bundle2-input-stream-interrupt: closing out of band context\n'
1261 1260 )
1262 1261
1263 1262
1264 1263 class interruptoperation(object):
1265 1264 """A limited operation to be use by part handler during interruption
1266 1265
1267 1266 It only have access to an ui object.
1268 1267 """
1269 1268
1270 1269 def __init__(self, ui):
1271 1270 self.ui = ui
1272 1271 self.reply = None
1273 1272 self.captureoutput = False
1274 1273
1275 1274 @property
1276 1275 def repo(self):
1277 1276 raise error.ProgrammingError(b'no repo access from stream interruption')
1278 1277
1279 1278 def gettransaction(self):
1280 1279 raise TransactionUnavailable(b'no repo access from stream interruption')
1281 1280
1282 1281
1283 1282 def decodepayloadchunks(ui, fh):
1284 1283 """Reads bundle2 part payload data into chunks.
1285 1284
1286 1285 Part payload data consists of framed chunks. This function takes
1287 1286 a file handle and emits those chunks.
1288 1287 """
1289 1288 dolog = ui.configbool(b'devel', b'bundle2.debug')
1290 1289 debug = ui.debug
1291 1290
1292 1291 headerstruct = struct.Struct(_fpayloadsize)
1293 1292 headersize = headerstruct.size
1294 1293 unpack = headerstruct.unpack
1295 1294
1296 1295 readexactly = changegroup.readexactly
1297 1296 read = fh.read
1298 1297
1299 1298 chunksize = unpack(readexactly(fh, headersize))[0]
1300 1299 indebug(ui, b'payload chunk size: %i' % chunksize)
1301 1300
1302 1301 # changegroup.readexactly() is inlined below for performance.
1303 1302 while chunksize:
1304 1303 if chunksize >= 0:
1305 1304 s = read(chunksize)
1306 1305 if len(s) < chunksize:
1307 1306 raise error.Abort(
1308 1307 _(
1309 1308 b'stream ended unexpectedly '
1310 1309 b' (got %d bytes, expected %d)'
1311 1310 )
1312 1311 % (len(s), chunksize)
1313 1312 )
1314 1313
1315 1314 yield s
1316 1315 elif chunksize == flaginterrupt:
1317 1316 # Interrupt "signal" detected. The regular stream is interrupted
1318 1317 # and a bundle2 part follows. Consume it.
1319 1318 interrupthandler(ui, fh)()
1320 1319 else:
1321 1320 raise error.BundleValueError(
1322 1321 b'negative payload chunk size: %s' % chunksize
1323 1322 )
1324 1323
1325 1324 s = read(headersize)
1326 1325 if len(s) < headersize:
1327 1326 raise error.Abort(
1328 1327 _(b'stream ended unexpectedly (got %d bytes, expected %d)')
1329 1328 % (len(s), chunksize)
1330 1329 )
1331 1330
1332 1331 chunksize = unpack(s)[0]
1333 1332
1334 1333 # indebug() inlined for performance.
1335 1334 if dolog:
1336 1335 debug(b'bundle2-input: payload chunk size: %i\n' % chunksize)
1337 1336
1338 1337
1339 1338 class unbundlepart(unpackermixin):
1340 1339 """a bundle part read from a bundle"""
1341 1340
1342 1341 def __init__(self, ui, header, fp):
1343 1342 super(unbundlepart, self).__init__(fp)
1344 1343 self._seekable = util.safehasattr(fp, 'seek') and util.safehasattr(
1345 1344 fp, b'tell'
1346 1345 )
1347 1346 self.ui = ui
1348 1347 # unbundle state attr
1349 1348 self._headerdata = header
1350 1349 self._headeroffset = 0
1351 1350 self._initialized = False
1352 1351 self.consumed = False
1353 1352 # part data
1354 1353 self.id = None
1355 1354 self.type = None
1356 1355 self.mandatoryparams = None
1357 1356 self.advisoryparams = None
1358 1357 self.params = None
1359 1358 self.mandatorykeys = ()
1360 1359 self._readheader()
1361 1360 self._mandatory = None
1362 1361 self._pos = 0
1363 1362
1364 1363 def _fromheader(self, size):
1365 1364 """return the next <size> byte from the header"""
1366 1365 offset = self._headeroffset
1367 1366 data = self._headerdata[offset : (offset + size)]
1368 1367 self._headeroffset = offset + size
1369 1368 return data
1370 1369
1371 1370 def _unpackheader(self, format):
1372 1371 """read given format from header
1373 1372
1374 1373 This automatically compute the size of the format to read."""
1375 1374 data = self._fromheader(struct.calcsize(format))
1376 1375 return _unpack(format, data)
1377 1376
1378 1377 def _initparams(self, mandatoryparams, advisoryparams):
1379 1378 """internal function to setup all logic related parameters"""
1380 1379 # make it read only to prevent people touching it by mistake.
1381 1380 self.mandatoryparams = tuple(mandatoryparams)
1382 1381 self.advisoryparams = tuple(advisoryparams)
1383 1382 # user friendly UI
1384 1383 self.params = util.sortdict(self.mandatoryparams)
1385 1384 self.params.update(self.advisoryparams)
1386 1385 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1387 1386
1388 1387 def _readheader(self):
1389 1388 """read the header and setup the object"""
1390 1389 typesize = self._unpackheader(_fparttypesize)[0]
1391 1390 self.type = self._fromheader(typesize)
1392 1391 indebug(self.ui, b'part type: "%s"' % self.type)
1393 1392 self.id = self._unpackheader(_fpartid)[0]
1394 1393 indebug(self.ui, b'part id: "%s"' % pycompat.bytestr(self.id))
1395 1394 # extract mandatory bit from type
1396 1395 self.mandatory = self.type != self.type.lower()
1397 1396 self.type = self.type.lower()
1398 1397 ## reading parameters
1399 1398 # param count
1400 1399 mancount, advcount = self._unpackheader(_fpartparamcount)
1401 1400 indebug(self.ui, b'part parameters: %i' % (mancount + advcount))
1402 1401 # param size
1403 1402 fparamsizes = _makefpartparamsizes(mancount + advcount)
1404 1403 paramsizes = self._unpackheader(fparamsizes)
1405 1404 # make it a list of couple again
1406 1405 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1407 1406 # split mandatory from advisory
1408 1407 mansizes = paramsizes[:mancount]
1409 1408 advsizes = paramsizes[mancount:]
1410 1409 # retrieve param value
1411 1410 manparams = []
1412 1411 for key, value in mansizes:
1413 1412 manparams.append((self._fromheader(key), self._fromheader(value)))
1414 1413 advparams = []
1415 1414 for key, value in advsizes:
1416 1415 advparams.append((self._fromheader(key), self._fromheader(value)))
1417 1416 self._initparams(manparams, advparams)
1418 1417 ## part payload
1419 1418 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1420 1419 # we read the data, tell it
1421 1420 self._initialized = True
1422 1421
1423 1422 def _payloadchunks(self):
1424 1423 """Generator of decoded chunks in the payload."""
1425 1424 return decodepayloadchunks(self.ui, self._fp)
1426 1425
1427 1426 def consume(self):
1428 1427 """Read the part payload until completion.
1429 1428
1430 1429 By consuming the part data, the underlying stream read offset will
1431 1430 be advanced to the next part (or end of stream).
1432 1431 """
1433 1432 if self.consumed:
1434 1433 return
1435 1434
1436 1435 chunk = self.read(32768)
1437 1436 while chunk:
1438 1437 self._pos += len(chunk)
1439 1438 chunk = self.read(32768)
1440 1439
1441 1440 def read(self, size=None):
1442 1441 """read payload data"""
1443 1442 if not self._initialized:
1444 1443 self._readheader()
1445 1444 if size is None:
1446 1445 data = self._payloadstream.read()
1447 1446 else:
1448 1447 data = self._payloadstream.read(size)
1449 1448 self._pos += len(data)
1450 1449 if size is None or len(data) < size:
1451 1450 if not self.consumed and self._pos:
1452 1451 self.ui.debug(
1453 1452 b'bundle2-input-part: total payload size %i\n' % self._pos
1454 1453 )
1455 1454 self.consumed = True
1456 1455 return data
1457 1456
1458 1457
1459 1458 class seekableunbundlepart(unbundlepart):
1460 1459 """A bundle2 part in a bundle that is seekable.
1461 1460
1462 1461 Regular ``unbundlepart`` instances can only be read once. This class
1463 1462 extends ``unbundlepart`` to enable bi-directional seeking within the
1464 1463 part.
1465 1464
1466 1465 Bundle2 part data consists of framed chunks. Offsets when seeking
1467 1466 refer to the decoded data, not the offsets in the underlying bundle2
1468 1467 stream.
1469 1468
1470 1469 To facilitate quickly seeking within the decoded data, instances of this
1471 1470 class maintain a mapping between offsets in the underlying stream and
1472 1471 the decoded payload. This mapping will consume memory in proportion
1473 1472 to the number of chunks within the payload (which almost certainly
1474 1473 increases in proportion with the size of the part).
1475 1474 """
1476 1475
1477 1476 def __init__(self, ui, header, fp):
1478 1477 # (payload, file) offsets for chunk starts.
1479 1478 self._chunkindex = []
1480 1479
1481 1480 super(seekableunbundlepart, self).__init__(ui, header, fp)
1482 1481
1483 1482 def _payloadchunks(self, chunknum=0):
1484 1483 '''seek to specified chunk and start yielding data'''
1485 1484 if len(self._chunkindex) == 0:
1486 1485 assert chunknum == 0, b'Must start with chunk 0'
1487 1486 self._chunkindex.append((0, self._tellfp()))
1488 1487 else:
1489 1488 assert chunknum < len(self._chunkindex), (
1490 1489 b'Unknown chunk %d' % chunknum
1491 1490 )
1492 1491 self._seekfp(self._chunkindex[chunknum][1])
1493 1492
1494 1493 pos = self._chunkindex[chunknum][0]
1495 1494
1496 1495 for chunk in decodepayloadchunks(self.ui, self._fp):
1497 1496 chunknum += 1
1498 1497 pos += len(chunk)
1499 1498 if chunknum == len(self._chunkindex):
1500 1499 self._chunkindex.append((pos, self._tellfp()))
1501 1500
1502 1501 yield chunk
1503 1502
1504 1503 def _findchunk(self, pos):
1505 1504 '''for a given payload position, return a chunk number and offset'''
1506 1505 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1507 1506 if ppos == pos:
1508 1507 return chunk, 0
1509 1508 elif ppos > pos:
1510 1509 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1511 1510 raise ValueError(b'Unknown chunk')
1512 1511
1513 1512 def tell(self):
1514 1513 return self._pos
1515 1514
1516 1515 def seek(self, offset, whence=os.SEEK_SET):
1517 1516 if whence == os.SEEK_SET:
1518 1517 newpos = offset
1519 1518 elif whence == os.SEEK_CUR:
1520 1519 newpos = self._pos + offset
1521 1520 elif whence == os.SEEK_END:
1522 1521 if not self.consumed:
1523 1522 # Can't use self.consume() here because it advances self._pos.
1524 1523 chunk = self.read(32768)
1525 1524 while chunk:
1526 1525 chunk = self.read(32768)
1527 1526 newpos = self._chunkindex[-1][0] - offset
1528 1527 else:
1529 1528 raise ValueError(b'Unknown whence value: %r' % (whence,))
1530 1529
1531 1530 if newpos > self._chunkindex[-1][0] and not self.consumed:
1532 1531 # Can't use self.consume() here because it advances self._pos.
1533 1532 chunk = self.read(32768)
1534 1533 while chunk:
1535 1534 chunk = self.read(32668)
1536 1535
1537 1536 if not 0 <= newpos <= self._chunkindex[-1][0]:
1538 1537 raise ValueError(b'Offset out of range')
1539 1538
1540 1539 if self._pos != newpos:
1541 1540 chunk, internaloffset = self._findchunk(newpos)
1542 1541 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1543 1542 adjust = self.read(internaloffset)
1544 1543 if len(adjust) != internaloffset:
1545 1544 raise error.Abort(_(b'Seek failed\n'))
1546 1545 self._pos = newpos
1547 1546
1548 1547 def _seekfp(self, offset, whence=0):
1549 1548 """move the underlying file pointer
1550 1549
1551 1550 This method is meant for internal usage by the bundle2 protocol only.
1552 1551 They directly manipulate the low level stream including bundle2 level
1553 1552 instruction.
1554 1553
1555 1554 Do not use it to implement higher-level logic or methods."""
1556 1555 if self._seekable:
1557 1556 return self._fp.seek(offset, whence)
1558 1557 else:
1559 1558 raise NotImplementedError(_(b'File pointer is not seekable'))
1560 1559
1561 1560 def _tellfp(self):
1562 1561 """return the file offset, or None if file is not seekable
1563 1562
1564 1563 This method is meant for internal usage by the bundle2 protocol only.
1565 1564 They directly manipulate the low level stream including bundle2 level
1566 1565 instruction.
1567 1566
1568 1567 Do not use it to implement higher-level logic or methods."""
1569 1568 if self._seekable:
1570 1569 try:
1571 1570 return self._fp.tell()
1572 1571 except IOError as e:
1573 1572 if e.errno == errno.ESPIPE:
1574 1573 self._seekable = False
1575 1574 else:
1576 1575 raise
1577 1576 return None
1578 1577
1579 1578
1580 1579 # These are only the static capabilities.
1581 1580 # Check the 'getrepocaps' function for the rest.
1582 1581 capabilities = {
1583 1582 b'HG20': (),
1584 1583 b'bookmarks': (),
1585 1584 b'error': (b'abort', b'unsupportedcontent', b'pushraced', b'pushkey'),
1586 1585 b'listkeys': (),
1587 1586 b'pushkey': (),
1588 1587 b'digests': tuple(sorted(util.DIGESTS.keys())),
1589 1588 b'remote-changegroup': (b'http', b'https'),
1590 1589 b'hgtagsfnodes': (),
1591 1590 b'rev-branch-cache': (),
1592 1591 b'phases': (b'heads',),
1593 1592 b'stream': (b'v2',),
1594 1593 }
1595 1594
1596 1595
1597 1596 def getrepocaps(repo, allowpushback=False, role=None):
1598 1597 """return the bundle2 capabilities for a given repo
1599 1598
1600 1599 Exists to allow extensions (like evolution) to mutate the capabilities.
1601 1600
1602 1601 The returned value is used for servers advertising their capabilities as
1603 1602 well as clients advertising their capabilities to servers as part of
1604 1603 bundle2 requests. The ``role`` argument specifies which is which.
1605 1604 """
1606 1605 if role not in (b'client', b'server'):
1607 1606 raise error.ProgrammingError(b'role argument must be client or server')
1608 1607
1609 1608 caps = capabilities.copy()
1610 1609 caps[b'changegroup'] = tuple(
1611 1610 sorted(changegroup.supportedincomingversions(repo))
1612 1611 )
1613 1612 if obsolete.isenabled(repo, obsolete.exchangeopt):
1614 1613 supportedformat = tuple(b'V%i' % v for v in obsolete.formats)
1615 1614 caps[b'obsmarkers'] = supportedformat
1616 1615 if allowpushback:
1617 1616 caps[b'pushback'] = ()
1618 1617 cpmode = repo.ui.config(b'server', b'concurrent-push-mode')
1619 1618 if cpmode == b'check-related':
1620 1619 caps[b'checkheads'] = (b'related',)
1621 1620 if b'phases' in repo.ui.configlist(b'devel', b'legacy.exchange'):
1622 1621 caps.pop(b'phases')
1623 1622
1624 1623 # Don't advertise stream clone support in server mode if not configured.
1625 1624 if role == b'server':
1626 1625 streamsupported = repo.ui.configbool(
1627 1626 b'server', b'uncompressed', untrusted=True
1628 1627 )
1629 1628 featuresupported = repo.ui.configbool(b'server', b'bundle2.stream')
1630 1629
1631 1630 if not streamsupported or not featuresupported:
1632 1631 caps.pop(b'stream')
1633 1632 # Else always advertise support on client, because payload support
1634 1633 # should always be advertised.
1635 1634
1636 1635 return caps
1637 1636
1638 1637
1639 1638 def bundle2caps(remote):
1640 1639 """return the bundle capabilities of a peer as dict"""
1641 1640 raw = remote.capable(b'bundle2')
1642 1641 if not raw and raw != b'':
1643 1642 return {}
1644 1643 capsblob = urlreq.unquote(remote.capable(b'bundle2'))
1645 1644 return decodecaps(capsblob)
1646 1645
1647 1646
1648 1647 def obsmarkersversion(caps):
1649 1648 """extract the list of supported obsmarkers versions from a bundle2caps dict
1650 1649 """
1651 1650 obscaps = caps.get(b'obsmarkers', ())
1652 1651 return [int(c[1:]) for c in obscaps if c.startswith(b'V')]
1653 1652
1654 1653
1655 1654 def writenewbundle(
1656 1655 ui,
1657 1656 repo,
1658 1657 source,
1659 1658 filename,
1660 1659 bundletype,
1661 1660 outgoing,
1662 1661 opts,
1663 1662 vfs=None,
1664 1663 compression=None,
1665 1664 compopts=None,
1666 1665 ):
1667 1666 if bundletype.startswith(b'HG10'):
1668 1667 cg = changegroup.makechangegroup(repo, outgoing, b'01', source)
1669 1668 return writebundle(
1670 1669 ui,
1671 1670 cg,
1672 1671 filename,
1673 1672 bundletype,
1674 1673 vfs=vfs,
1675 1674 compression=compression,
1676 1675 compopts=compopts,
1677 1676 )
1678 1677 elif not bundletype.startswith(b'HG20'):
1679 1678 raise error.ProgrammingError(b'unknown bundle type: %s' % bundletype)
1680 1679
1681 1680 caps = {}
1682 1681 if b'obsolescence' in opts:
1683 1682 caps[b'obsmarkers'] = (b'V1',)
1684 1683 bundle = bundle20(ui, caps)
1685 1684 bundle.setcompression(compression, compopts)
1686 1685 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1687 1686 chunkiter = bundle.getchunks()
1688 1687
1689 1688 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1690 1689
1691 1690
1692 1691 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1693 1692 # We should eventually reconcile this logic with the one behind
1694 1693 # 'exchange.getbundle2partsgenerator'.
1695 1694 #
1696 1695 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1697 1696 # different right now. So we keep them separated for now for the sake of
1698 1697 # simplicity.
1699 1698
1700 1699 # we might not always want a changegroup in such bundle, for example in
1701 1700 # stream bundles
1702 1701 if opts.get(b'changegroup', True):
1703 1702 cgversion = opts.get(b'cg.version')
1704 1703 if cgversion is None:
1705 1704 cgversion = changegroup.safeversion(repo)
1706 1705 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1707 1706 part = bundler.newpart(b'changegroup', data=cg.getchunks())
1708 1707 part.addparam(b'version', cg.version)
1709 1708 if b'clcount' in cg.extras:
1710 1709 part.addparam(
1711 1710 b'nbchanges', b'%d' % cg.extras[b'clcount'], mandatory=False
1712 1711 )
1713 1712 if opts.get(b'phases') and repo.revs(
1714 1713 b'%ln and secret()', outgoing.missingheads
1715 1714 ):
1716 1715 part.addparam(
1717 1716 b'targetphase', b'%d' % phases.secret, mandatory=False
1718 1717 )
1719 1718 if b'exp-sidedata-flag' in repo.requirements:
1720 1719 part.addparam(b'exp-sidedata', b'1')
1721 1720
1722 1721 if opts.get(b'streamv2', False):
1723 1722 addpartbundlestream2(bundler, repo, stream=True)
1724 1723
1725 1724 if opts.get(b'tagsfnodescache', True):
1726 1725 addparttagsfnodescache(repo, bundler, outgoing)
1727 1726
1728 1727 if opts.get(b'revbranchcache', True):
1729 1728 addpartrevbranchcache(repo, bundler, outgoing)
1730 1729
1731 1730 if opts.get(b'obsolescence', False):
1732 1731 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1733 1732 buildobsmarkerspart(bundler, obsmarkers)
1734 1733
1735 1734 if opts.get(b'phases', False):
1736 1735 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1737 1736 phasedata = phases.binaryencode(headsbyphase)
1738 1737 bundler.newpart(b'phase-heads', data=phasedata)
1739 1738
1740 1739
1741 1740 def addparttagsfnodescache(repo, bundler, outgoing):
1742 1741 # we include the tags fnode cache for the bundle changeset
1743 1742 # (as an optional parts)
1744 1743 cache = tags.hgtagsfnodescache(repo.unfiltered())
1745 1744 chunks = []
1746 1745
1747 1746 # .hgtags fnodes are only relevant for head changesets. While we could
1748 1747 # transfer values for all known nodes, there will likely be little to
1749 1748 # no benefit.
1750 1749 #
1751 1750 # We don't bother using a generator to produce output data because
1752 1751 # a) we only have 40 bytes per head and even esoteric numbers of heads
1753 1752 # consume little memory (1M heads is 40MB) b) we don't want to send the
1754 1753 # part if we don't have entries and knowing if we have entries requires
1755 1754 # cache lookups.
1756 1755 for node in outgoing.missingheads:
1757 1756 # Don't compute missing, as this may slow down serving.
1758 1757 fnode = cache.getfnode(node, computemissing=False)
1759 1758 if fnode is not None:
1760 1759 chunks.extend([node, fnode])
1761 1760
1762 1761 if chunks:
1763 1762 bundler.newpart(b'hgtagsfnodes', data=b''.join(chunks))
1764 1763
1765 1764
1766 1765 def addpartrevbranchcache(repo, bundler, outgoing):
1767 1766 # we include the rev branch cache for the bundle changeset
1768 1767 # (as an optional parts)
1769 1768 cache = repo.revbranchcache()
1770 1769 cl = repo.unfiltered().changelog
1771 1770 branchesdata = collections.defaultdict(lambda: (set(), set()))
1772 1771 for node in outgoing.missing:
1773 1772 branch, close = cache.branchinfo(cl.rev(node))
1774 1773 branchesdata[branch][close].add(node)
1775 1774
1776 1775 def generate():
1777 1776 for branch, (nodes, closed) in sorted(branchesdata.items()):
1778 1777 utf8branch = encoding.fromlocal(branch)
1779 1778 yield rbcstruct.pack(len(utf8branch), len(nodes), len(closed))
1780 1779 yield utf8branch
1781 1780 for n in sorted(nodes):
1782 1781 yield n
1783 1782 for n in sorted(closed):
1784 1783 yield n
1785 1784
1786 1785 bundler.newpart(b'cache:rev-branch-cache', data=generate(), mandatory=False)
1787 1786
1788 1787
1789 1788 def _formatrequirementsspec(requirements):
1790 1789 requirements = [req for req in requirements if req != b"shared"]
1791 1790 return urlreq.quote(b','.join(sorted(requirements)))
1792 1791
1793 1792
1794 1793 def _formatrequirementsparams(requirements):
1795 1794 requirements = _formatrequirementsspec(requirements)
1796 1795 params = b"%s%s" % (urlreq.quote(b"requirements="), requirements)
1797 1796 return params
1798 1797
1799 1798
1800 1799 def addpartbundlestream2(bundler, repo, **kwargs):
1801 1800 if not kwargs.get('stream', False):
1802 1801 return
1803 1802
1804 1803 if not streamclone.allowservergeneration(repo):
1805 1804 raise error.Abort(
1806 1805 _(
1807 1806 b'stream data requested but server does not allow '
1808 1807 b'this feature'
1809 1808 ),
1810 1809 hint=_(
1811 1810 b'well-behaved clients should not be '
1812 1811 b'requesting stream data from servers not '
1813 1812 b'advertising it; the client may be buggy'
1814 1813 ),
1815 1814 )
1816 1815
1817 1816 # Stream clones don't compress well. And compression undermines a
1818 1817 # goal of stream clones, which is to be fast. Communicate the desire
1819 1818 # to avoid compression to consumers of the bundle.
1820 1819 bundler.prefercompressed = False
1821 1820
1822 1821 # get the includes and excludes
1823 1822 includepats = kwargs.get('includepats')
1824 1823 excludepats = kwargs.get('excludepats')
1825 1824
1826 1825 narrowstream = repo.ui.configbool(
1827 1826 b'experimental', b'server.stream-narrow-clones'
1828 1827 )
1829 1828
1830 1829 if (includepats or excludepats) and not narrowstream:
1831 1830 raise error.Abort(_(b'server does not support narrow stream clones'))
1832 1831
1833 1832 includeobsmarkers = False
1834 1833 if repo.obsstore:
1835 1834 remoteversions = obsmarkersversion(bundler.capabilities)
1836 1835 if not remoteversions:
1837 1836 raise error.Abort(
1838 1837 _(
1839 1838 b'server has obsolescence markers, but client '
1840 1839 b'cannot receive them via stream clone'
1841 1840 )
1842 1841 )
1843 1842 elif repo.obsstore._version in remoteversions:
1844 1843 includeobsmarkers = True
1845 1844
1846 1845 filecount, bytecount, it = streamclone.generatev2(
1847 1846 repo, includepats, excludepats, includeobsmarkers
1848 1847 )
1849 1848 requirements = _formatrequirementsspec(repo.requirements)
1850 1849 part = bundler.newpart(b'stream2', data=it)
1851 1850 part.addparam(b'bytecount', b'%d' % bytecount, mandatory=True)
1852 1851 part.addparam(b'filecount', b'%d' % filecount, mandatory=True)
1853 1852 part.addparam(b'requirements', requirements, mandatory=True)
1854 1853
1855 1854
1856 1855 def buildobsmarkerspart(bundler, markers):
1857 1856 """add an obsmarker part to the bundler with <markers>
1858 1857
1859 1858 No part is created if markers is empty.
1860 1859 Raises ValueError if the bundler doesn't support any known obsmarker format.
1861 1860 """
1862 1861 if not markers:
1863 1862 return None
1864 1863
1865 1864 remoteversions = obsmarkersversion(bundler.capabilities)
1866 1865 version = obsolete.commonversion(remoteversions)
1867 1866 if version is None:
1868 1867 raise ValueError(b'bundler does not support common obsmarker format')
1869 1868 stream = obsolete.encodemarkers(markers, True, version=version)
1870 1869 return bundler.newpart(b'obsmarkers', data=stream)
1871 1870
1872 1871
1873 1872 def writebundle(
1874 1873 ui, cg, filename, bundletype, vfs=None, compression=None, compopts=None
1875 1874 ):
1876 1875 """Write a bundle file and return its filename.
1877 1876
1878 1877 Existing files will not be overwritten.
1879 1878 If no filename is specified, a temporary file is created.
1880 1879 bz2 compression can be turned off.
1881 1880 The bundle file will be deleted in case of errors.
1882 1881 """
1883 1882
1884 1883 if bundletype == b"HG20":
1885 1884 bundle = bundle20(ui)
1886 1885 bundle.setcompression(compression, compopts)
1887 1886 part = bundle.newpart(b'changegroup', data=cg.getchunks())
1888 1887 part.addparam(b'version', cg.version)
1889 1888 if b'clcount' in cg.extras:
1890 1889 part.addparam(
1891 1890 b'nbchanges', b'%d' % cg.extras[b'clcount'], mandatory=False
1892 1891 )
1893 1892 chunkiter = bundle.getchunks()
1894 1893 else:
1895 1894 # compression argument is only for the bundle2 case
1896 1895 assert compression is None
1897 1896 if cg.version != b'01':
1898 1897 raise error.Abort(
1899 1898 _(b'old bundle types only supports v1 changegroups')
1900 1899 )
1901 1900 header, comp = bundletypes[bundletype]
1902 1901 if comp not in util.compengines.supportedbundletypes:
1903 1902 raise error.Abort(_(b'unknown stream compression type: %s') % comp)
1904 1903 compengine = util.compengines.forbundletype(comp)
1905 1904
1906 1905 def chunkiter():
1907 1906 yield header
1908 1907 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1909 1908 yield chunk
1910 1909
1911 1910 chunkiter = chunkiter()
1912 1911
1913 1912 # parse the changegroup data, otherwise we will block
1914 1913 # in case of sshrepo because we don't know the end of the stream
1915 1914 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1916 1915
1917 1916
1918 1917 def combinechangegroupresults(op):
1919 1918 """logic to combine 0 or more addchangegroup results into one"""
1920 1919 results = [r.get(b'return', 0) for r in op.records[b'changegroup']]
1921 1920 changedheads = 0
1922 1921 result = 1
1923 1922 for ret in results:
1924 1923 # If any changegroup result is 0, return 0
1925 1924 if ret == 0:
1926 1925 result = 0
1927 1926 break
1928 1927 if ret < -1:
1929 1928 changedheads += ret + 1
1930 1929 elif ret > 1:
1931 1930 changedheads += ret - 1
1932 1931 if changedheads > 0:
1933 1932 result = 1 + changedheads
1934 1933 elif changedheads < 0:
1935 1934 result = -1 + changedheads
1936 1935 return result
1937 1936
1938 1937
1939 1938 @parthandler(
1940 1939 b'changegroup',
1941 1940 (
1942 1941 b'version',
1943 1942 b'nbchanges',
1944 1943 b'exp-sidedata',
1945 1944 b'treemanifest',
1946 1945 b'targetphase',
1947 1946 ),
1948 1947 )
1949 1948 def handlechangegroup(op, inpart):
1950 1949 """apply a changegroup part on the repo
1951 1950
1952 1951 This is a very early implementation that will massive rework before being
1953 1952 inflicted to any end-user.
1954 1953 """
1955 1954 from . import localrepo
1956 1955
1957 1956 tr = op.gettransaction()
1958 1957 unpackerversion = inpart.params.get(b'version', b'01')
1959 1958 # We should raise an appropriate exception here
1960 1959 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1961 1960 # the source and url passed here are overwritten by the one contained in
1962 1961 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1963 1962 nbchangesets = None
1964 1963 if b'nbchanges' in inpart.params:
1965 1964 nbchangesets = int(inpart.params.get(b'nbchanges'))
1966 1965 if (
1967 1966 b'treemanifest' in inpart.params
1968 1967 and b'treemanifest' not in op.repo.requirements
1969 1968 ):
1970 1969 if len(op.repo.changelog) != 0:
1971 1970 raise error.Abort(
1972 1971 _(
1973 1972 b"bundle contains tree manifests, but local repo is "
1974 1973 b"non-empty and does not use tree manifests"
1975 1974 )
1976 1975 )
1977 1976 op.repo.requirements.add(b'treemanifest')
1978 1977 op.repo.svfs.options = localrepo.resolvestorevfsoptions(
1979 1978 op.repo.ui, op.repo.requirements, op.repo.features
1980 1979 )
1981 1980 op.repo._writerequirements()
1982 1981
1983 1982 bundlesidedata = bool(b'exp-sidedata' in inpart.params)
1984 1983 reposidedata = bool(b'exp-sidedata-flag' in op.repo.requirements)
1985 1984 if reposidedata and not bundlesidedata:
1986 1985 msg = b"repository is using sidedata but the bundle source do not"
1987 1986 hint = b'this is currently unsupported'
1988 1987 raise error.Abort(msg, hint=hint)
1989 1988
1990 1989 extrakwargs = {}
1991 1990 targetphase = inpart.params.get(b'targetphase')
1992 1991 if targetphase is not None:
1993 1992 extrakwargs['targetphase'] = int(targetphase)
1994 1993 ret = _processchangegroup(
1995 1994 op,
1996 1995 cg,
1997 1996 tr,
1998 1997 b'bundle2',
1999 1998 b'bundle2',
2000 1999 expectedtotal=nbchangesets,
2001 2000 **extrakwargs
2002 2001 )
2003 2002 if op.reply is not None:
2004 2003 # This is definitely not the final form of this
2005 2004 # return. But one need to start somewhere.
2006 2005 part = op.reply.newpart(b'reply:changegroup', mandatory=False)
2007 2006 part.addparam(
2008 2007 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2009 2008 )
2010 2009 part.addparam(b'return', b'%i' % ret, mandatory=False)
2011 2010 assert not inpart.read()
2012 2011
2013 2012
2014 2013 _remotechangegroupparams = tuple(
2015 2014 [b'url', b'size', b'digests']
2016 2015 + [b'digest:%s' % k for k in util.DIGESTS.keys()]
2017 2016 )
2018 2017
2019 2018
2020 2019 @parthandler(b'remote-changegroup', _remotechangegroupparams)
2021 2020 def handleremotechangegroup(op, inpart):
2022 2021 """apply a bundle10 on the repo, given an url and validation information
2023 2022
2024 2023 All the information about the remote bundle to import are given as
2025 2024 parameters. The parameters include:
2026 2025 - url: the url to the bundle10.
2027 2026 - size: the bundle10 file size. It is used to validate what was
2028 2027 retrieved by the client matches the server knowledge about the bundle.
2029 2028 - digests: a space separated list of the digest types provided as
2030 2029 parameters.
2031 2030 - digest:<digest-type>: the hexadecimal representation of the digest with
2032 2031 that name. Like the size, it is used to validate what was retrieved by
2033 2032 the client matches what the server knows about the bundle.
2034 2033
2035 2034 When multiple digest types are given, all of them are checked.
2036 2035 """
2037 2036 try:
2038 2037 raw_url = inpart.params[b'url']
2039 2038 except KeyError:
2040 2039 raise error.Abort(_(b'remote-changegroup: missing "%s" param') % b'url')
2041 2040 parsed_url = util.url(raw_url)
2042 2041 if parsed_url.scheme not in capabilities[b'remote-changegroup']:
2043 2042 raise error.Abort(
2044 2043 _(b'remote-changegroup does not support %s urls')
2045 2044 % parsed_url.scheme
2046 2045 )
2047 2046
2048 2047 try:
2049 2048 size = int(inpart.params[b'size'])
2050 2049 except ValueError:
2051 2050 raise error.Abort(
2052 2051 _(b'remote-changegroup: invalid value for param "%s"') % b'size'
2053 2052 )
2054 2053 except KeyError:
2055 2054 raise error.Abort(
2056 2055 _(b'remote-changegroup: missing "%s" param') % b'size'
2057 2056 )
2058 2057
2059 2058 digests = {}
2060 2059 for typ in inpart.params.get(b'digests', b'').split():
2061 2060 param = b'digest:%s' % typ
2062 2061 try:
2063 2062 value = inpart.params[param]
2064 2063 except KeyError:
2065 2064 raise error.Abort(
2066 2065 _(b'remote-changegroup: missing "%s" param') % param
2067 2066 )
2068 2067 digests[typ] = value
2069 2068
2070 2069 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
2071 2070
2072 2071 tr = op.gettransaction()
2073 2072 from . import exchange
2074 2073
2075 2074 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
2076 2075 if not isinstance(cg, changegroup.cg1unpacker):
2077 2076 raise error.Abort(
2078 2077 _(b'%s: not a bundle version 1.0') % util.hidepassword(raw_url)
2079 2078 )
2080 2079 ret = _processchangegroup(op, cg, tr, b'bundle2', b'bundle2')
2081 2080 if op.reply is not None:
2082 2081 # This is definitely not the final form of this
2083 2082 # return. But one need to start somewhere.
2084 2083 part = op.reply.newpart(b'reply:changegroup')
2085 2084 part.addparam(
2086 2085 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2087 2086 )
2088 2087 part.addparam(b'return', b'%i' % ret, mandatory=False)
2089 2088 try:
2090 2089 real_part.validate()
2091 2090 except error.Abort as e:
2092 2091 raise error.Abort(
2093 2092 _(b'bundle at %s is corrupted:\n%s')
2094 2093 % (util.hidepassword(raw_url), bytes(e))
2095 2094 )
2096 2095 assert not inpart.read()
2097 2096
2098 2097
2099 2098 @parthandler(b'reply:changegroup', (b'return', b'in-reply-to'))
2100 2099 def handlereplychangegroup(op, inpart):
2101 2100 ret = int(inpart.params[b'return'])
2102 2101 replyto = int(inpart.params[b'in-reply-to'])
2103 2102 op.records.add(b'changegroup', {b'return': ret}, replyto)
2104 2103
2105 2104
2106 2105 @parthandler(b'check:bookmarks')
2107 2106 def handlecheckbookmarks(op, inpart):
2108 2107 """check location of bookmarks
2109 2108
2110 2109 This part is to be used to detect push race regarding bookmark, it
2111 2110 contains binary encoded (bookmark, node) tuple. If the local state does
2112 2111 not marks the one in the part, a PushRaced exception is raised
2113 2112 """
2114 2113 bookdata = bookmarks.binarydecode(inpart)
2115 2114
2116 2115 msgstandard = (
2117 2116 b'remote repository changed while pushing - please try again '
2118 2117 b'(bookmark "%s" move from %s to %s)'
2119 2118 )
2120 2119 msgmissing = (
2121 2120 b'remote repository changed while pushing - please try again '
2122 2121 b'(bookmark "%s" is missing, expected %s)'
2123 2122 )
2124 2123 msgexist = (
2125 2124 b'remote repository changed while pushing - please try again '
2126 2125 b'(bookmark "%s" set on %s, expected missing)'
2127 2126 )
2128 2127 for book, node in bookdata:
2129 2128 currentnode = op.repo._bookmarks.get(book)
2130 2129 if currentnode != node:
2131 2130 if node is None:
2132 2131 finalmsg = msgexist % (book, nodemod.short(currentnode))
2133 2132 elif currentnode is None:
2134 2133 finalmsg = msgmissing % (book, nodemod.short(node))
2135 2134 else:
2136 2135 finalmsg = msgstandard % (
2137 2136 book,
2138 2137 nodemod.short(node),
2139 2138 nodemod.short(currentnode),
2140 2139 )
2141 2140 raise error.PushRaced(finalmsg)
2142 2141
2143 2142
2144 2143 @parthandler(b'check:heads')
2145 2144 def handlecheckheads(op, inpart):
2146 2145 """check that head of the repo did not change
2147 2146
2148 2147 This is used to detect a push race when using unbundle.
2149 2148 This replaces the "heads" argument of unbundle."""
2150 2149 h = inpart.read(20)
2151 2150 heads = []
2152 2151 while len(h) == 20:
2153 2152 heads.append(h)
2154 2153 h = inpart.read(20)
2155 2154 assert not h
2156 2155 # Trigger a transaction so that we are guaranteed to have the lock now.
2157 2156 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2158 2157 op.gettransaction()
2159 2158 if sorted(heads) != sorted(op.repo.heads()):
2160 2159 raise error.PushRaced(
2161 2160 b'remote repository changed while pushing - please try again'
2162 2161 )
2163 2162
2164 2163
2165 2164 @parthandler(b'check:updated-heads')
2166 2165 def handlecheckupdatedheads(op, inpart):
2167 2166 """check for race on the heads touched by a push
2168 2167
2169 2168 This is similar to 'check:heads' but focus on the heads actually updated
2170 2169 during the push. If other activities happen on unrelated heads, it is
2171 2170 ignored.
2172 2171
2173 2172 This allow server with high traffic to avoid push contention as long as
2174 2173 unrelated parts of the graph are involved."""
2175 2174 h = inpart.read(20)
2176 2175 heads = []
2177 2176 while len(h) == 20:
2178 2177 heads.append(h)
2179 2178 h = inpart.read(20)
2180 2179 assert not h
2181 2180 # trigger a transaction so that we are guaranteed to have the lock now.
2182 2181 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2183 2182 op.gettransaction()
2184 2183
2185 2184 currentheads = set()
2186 2185 for ls in op.repo.branchmap().iterheads():
2187 2186 currentheads.update(ls)
2188 2187
2189 2188 for h in heads:
2190 2189 if h not in currentheads:
2191 2190 raise error.PushRaced(
2192 2191 b'remote repository changed while pushing - '
2193 2192 b'please try again'
2194 2193 )
2195 2194
2196 2195
2197 2196 @parthandler(b'check:phases')
2198 2197 def handlecheckphases(op, inpart):
2199 2198 """check that phase boundaries of the repository did not change
2200 2199
2201 2200 This is used to detect a push race.
2202 2201 """
2203 2202 phasetonodes = phases.binarydecode(inpart)
2204 2203 unfi = op.repo.unfiltered()
2205 2204 cl = unfi.changelog
2206 2205 phasecache = unfi._phasecache
2207 2206 msg = (
2208 2207 b'remote repository changed while pushing - please try again '
2209 2208 b'(%s is %s expected %s)'
2210 2209 )
2211 2210 for expectedphase, nodes in enumerate(phasetonodes):
2212 2211 for n in nodes:
2213 2212 actualphase = phasecache.phase(unfi, cl.rev(n))
2214 2213 if actualphase != expectedphase:
2215 2214 finalmsg = msg % (
2216 2215 nodemod.short(n),
2217 2216 phases.phasenames[actualphase],
2218 2217 phases.phasenames[expectedphase],
2219 2218 )
2220 2219 raise error.PushRaced(finalmsg)
2221 2220
2222 2221
2223 2222 @parthandler(b'output')
2224 2223 def handleoutput(op, inpart):
2225 2224 """forward output captured on the server to the client"""
2226 2225 for line in inpart.read().splitlines():
2227 2226 op.ui.status(_(b'remote: %s\n') % line)
2228 2227
2229 2228
2230 2229 @parthandler(b'replycaps')
2231 2230 def handlereplycaps(op, inpart):
2232 2231 """Notify that a reply bundle should be created
2233 2232
2234 2233 The payload contains the capabilities information for the reply"""
2235 2234 caps = decodecaps(inpart.read())
2236 2235 if op.reply is None:
2237 2236 op.reply = bundle20(op.ui, caps)
2238 2237
2239 2238
2240 2239 class AbortFromPart(error.Abort):
2241 2240 """Sub-class of Abort that denotes an error from a bundle2 part."""
2242 2241
2243 2242
2244 2243 @parthandler(b'error:abort', (b'message', b'hint'))
2245 2244 def handleerrorabort(op, inpart):
2246 2245 """Used to transmit abort error over the wire"""
2247 2246 raise AbortFromPart(
2248 2247 inpart.params[b'message'], hint=inpart.params.get(b'hint')
2249 2248 )
2250 2249
2251 2250
2252 2251 @parthandler(
2253 2252 b'error:pushkey',
2254 2253 (b'namespace', b'key', b'new', b'old', b'ret', b'in-reply-to'),
2255 2254 )
2256 2255 def handleerrorpushkey(op, inpart):
2257 2256 """Used to transmit failure of a mandatory pushkey over the wire"""
2258 2257 kwargs = {}
2259 2258 for name in (b'namespace', b'key', b'new', b'old', b'ret'):
2260 2259 value = inpart.params.get(name)
2261 2260 if value is not None:
2262 2261 kwargs[name] = value
2263 2262 raise error.PushkeyFailed(
2264 2263 inpart.params[b'in-reply-to'], **pycompat.strkwargs(kwargs)
2265 2264 )
2266 2265
2267 2266
2268 2267 @parthandler(b'error:unsupportedcontent', (b'parttype', b'params'))
2269 2268 def handleerrorunsupportedcontent(op, inpart):
2270 2269 """Used to transmit unknown content error over the wire"""
2271 2270 kwargs = {}
2272 2271 parttype = inpart.params.get(b'parttype')
2273 2272 if parttype is not None:
2274 2273 kwargs[b'parttype'] = parttype
2275 2274 params = inpart.params.get(b'params')
2276 2275 if params is not None:
2277 2276 kwargs[b'params'] = params.split(b'\0')
2278 2277
2279 2278 raise error.BundleUnknownFeatureError(**pycompat.strkwargs(kwargs))
2280 2279
2281 2280
2282 2281 @parthandler(b'error:pushraced', (b'message',))
2283 2282 def handleerrorpushraced(op, inpart):
2284 2283 """Used to transmit push race error over the wire"""
2285 2284 raise error.ResponseError(_(b'push failed:'), inpart.params[b'message'])
2286 2285
2287 2286
2288 2287 @parthandler(b'listkeys', (b'namespace',))
2289 2288 def handlelistkeys(op, inpart):
2290 2289 """retrieve pushkey namespace content stored in a bundle2"""
2291 2290 namespace = inpart.params[b'namespace']
2292 2291 r = pushkey.decodekeys(inpart.read())
2293 2292 op.records.add(b'listkeys', (namespace, r))
2294 2293
2295 2294
2296 2295 @parthandler(b'pushkey', (b'namespace', b'key', b'old', b'new'))
2297 2296 def handlepushkey(op, inpart):
2298 2297 """process a pushkey request"""
2299 2298 dec = pushkey.decode
2300 2299 namespace = dec(inpart.params[b'namespace'])
2301 2300 key = dec(inpart.params[b'key'])
2302 2301 old = dec(inpart.params[b'old'])
2303 2302 new = dec(inpart.params[b'new'])
2304 2303 # Grab the transaction to ensure that we have the lock before performing the
2305 2304 # pushkey.
2306 2305 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2307 2306 op.gettransaction()
2308 2307 ret = op.repo.pushkey(namespace, key, old, new)
2309 2308 record = {b'namespace': namespace, b'key': key, b'old': old, b'new': new}
2310 2309 op.records.add(b'pushkey', record)
2311 2310 if op.reply is not None:
2312 2311 rpart = op.reply.newpart(b'reply:pushkey')
2313 2312 rpart.addparam(
2314 2313 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2315 2314 )
2316 2315 rpart.addparam(b'return', b'%i' % ret, mandatory=False)
2317 2316 if inpart.mandatory and not ret:
2318 2317 kwargs = {}
2319 2318 for key in (b'namespace', b'key', b'new', b'old', b'ret'):
2320 2319 if key in inpart.params:
2321 2320 kwargs[key] = inpart.params[key]
2322 2321 raise error.PushkeyFailed(
2323 2322 partid=b'%d' % inpart.id, **pycompat.strkwargs(kwargs)
2324 2323 )
2325 2324
2326 2325
2327 2326 @parthandler(b'bookmarks')
2328 2327 def handlebookmark(op, inpart):
2329 2328 """transmit bookmark information
2330 2329
2331 2330 The part contains binary encoded bookmark information.
2332 2331
2333 2332 The exact behavior of this part can be controlled by the 'bookmarks' mode
2334 2333 on the bundle operation.
2335 2334
2336 2335 When mode is 'apply' (the default) the bookmark information is applied as
2337 2336 is to the unbundling repository. Make sure a 'check:bookmarks' part is
2338 2337 issued earlier to check for push races in such update. This behavior is
2339 2338 suitable for pushing.
2340 2339
2341 2340 When mode is 'records', the information is recorded into the 'bookmarks'
2342 2341 records of the bundle operation. This behavior is suitable for pulling.
2343 2342 """
2344 2343 changes = bookmarks.binarydecode(inpart)
2345 2344
2346 2345 pushkeycompat = op.repo.ui.configbool(
2347 2346 b'server', b'bookmarks-pushkey-compat'
2348 2347 )
2349 2348 bookmarksmode = op.modes.get(b'bookmarks', b'apply')
2350 2349
2351 2350 if bookmarksmode == b'apply':
2352 2351 tr = op.gettransaction()
2353 2352 bookstore = op.repo._bookmarks
2354 2353 if pushkeycompat:
2355 2354 allhooks = []
2356 2355 for book, node in changes:
2357 2356 hookargs = tr.hookargs.copy()
2358 2357 hookargs[b'pushkeycompat'] = b'1'
2359 2358 hookargs[b'namespace'] = b'bookmarks'
2360 2359 hookargs[b'key'] = book
2361 2360 hookargs[b'old'] = nodemod.hex(bookstore.get(book, b''))
2362 2361 hookargs[b'new'] = nodemod.hex(
2363 2362 node if node is not None else b''
2364 2363 )
2365 2364 allhooks.append(hookargs)
2366 2365
2367 2366 for hookargs in allhooks:
2368 2367 op.repo.hook(
2369 2368 b'prepushkey', throw=True, **pycompat.strkwargs(hookargs)
2370 2369 )
2371 2370
2372 2371 bookstore.applychanges(op.repo, op.gettransaction(), changes)
2373 2372
2374 2373 if pushkeycompat:
2375 2374
2376 2375 def runhook(unused_success):
2377 2376 for hookargs in allhooks:
2378 2377 op.repo.hook(b'pushkey', **pycompat.strkwargs(hookargs))
2379 2378
2380 2379 op.repo._afterlock(runhook)
2381 2380
2382 2381 elif bookmarksmode == b'records':
2383 2382 for book, node in changes:
2384 2383 record = {b'bookmark': book, b'node': node}
2385 2384 op.records.add(b'bookmarks', record)
2386 2385 else:
2387 2386 raise error.ProgrammingError(
2388 2387 b'unkown bookmark mode: %s' % bookmarksmode
2389 2388 )
2390 2389
2391 2390
2392 2391 @parthandler(b'phase-heads')
2393 2392 def handlephases(op, inpart):
2394 2393 """apply phases from bundle part to repo"""
2395 2394 headsbyphase = phases.binarydecode(inpart)
2396 2395 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
2397 2396
2398 2397
2399 2398 @parthandler(b'reply:pushkey', (b'return', b'in-reply-to'))
2400 2399 def handlepushkeyreply(op, inpart):
2401 2400 """retrieve the result of a pushkey request"""
2402 2401 ret = int(inpart.params[b'return'])
2403 2402 partid = int(inpart.params[b'in-reply-to'])
2404 2403 op.records.add(b'pushkey', {b'return': ret}, partid)
2405 2404
2406 2405
2407 2406 @parthandler(b'obsmarkers')
2408 2407 def handleobsmarker(op, inpart):
2409 2408 """add a stream of obsmarkers to the repo"""
2410 2409 tr = op.gettransaction()
2411 2410 markerdata = inpart.read()
2412 2411 if op.ui.config(b'experimental', b'obsmarkers-exchange-debug'):
2413 2412 op.ui.writenoi18n(
2414 2413 b'obsmarker-exchange: %i bytes received\n' % len(markerdata)
2415 2414 )
2416 2415 # The mergemarkers call will crash if marker creation is not enabled.
2417 2416 # we want to avoid this if the part is advisory.
2418 2417 if not inpart.mandatory and op.repo.obsstore.readonly:
2419 2418 op.repo.ui.debug(
2420 2419 b'ignoring obsolescence markers, feature not enabled\n'
2421 2420 )
2422 2421 return
2423 2422 new = op.repo.obsstore.mergemarkers(tr, markerdata)
2424 2423 op.repo.invalidatevolatilesets()
2425 2424 op.records.add(b'obsmarkers', {b'new': new})
2426 2425 if op.reply is not None:
2427 2426 rpart = op.reply.newpart(b'reply:obsmarkers')
2428 2427 rpart.addparam(
2429 2428 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2430 2429 )
2431 2430 rpart.addparam(b'new', b'%i' % new, mandatory=False)
2432 2431
2433 2432
2434 2433 @parthandler(b'reply:obsmarkers', (b'new', b'in-reply-to'))
2435 2434 def handleobsmarkerreply(op, inpart):
2436 2435 """retrieve the result of a pushkey request"""
2437 2436 ret = int(inpart.params[b'new'])
2438 2437 partid = int(inpart.params[b'in-reply-to'])
2439 2438 op.records.add(b'obsmarkers', {b'new': ret}, partid)
2440 2439
2441 2440
2442 2441 @parthandler(b'hgtagsfnodes')
2443 2442 def handlehgtagsfnodes(op, inpart):
2444 2443 """Applies .hgtags fnodes cache entries to the local repo.
2445 2444
2446 2445 Payload is pairs of 20 byte changeset nodes and filenodes.
2447 2446 """
2448 2447 # Grab the transaction so we ensure that we have the lock at this point.
2449 2448 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2450 2449 op.gettransaction()
2451 2450 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
2452 2451
2453 2452 count = 0
2454 2453 while True:
2455 2454 node = inpart.read(20)
2456 2455 fnode = inpart.read(20)
2457 2456 if len(node) < 20 or len(fnode) < 20:
2458 2457 op.ui.debug(b'ignoring incomplete received .hgtags fnodes data\n')
2459 2458 break
2460 2459 cache.setfnode(node, fnode)
2461 2460 count += 1
2462 2461
2463 2462 cache.write()
2464 2463 op.ui.debug(b'applied %i hgtags fnodes cache entries\n' % count)
2465 2464
2466 2465
2467 2466 rbcstruct = struct.Struct(b'>III')
2468 2467
2469 2468
2470 2469 @parthandler(b'cache:rev-branch-cache')
2471 2470 def handlerbc(op, inpart):
2472 2471 """receive a rev-branch-cache payload and update the local cache
2473 2472
2474 2473 The payload is a series of data related to each branch
2475 2474
2476 2475 1) branch name length
2477 2476 2) number of open heads
2478 2477 3) number of closed heads
2479 2478 4) open heads nodes
2480 2479 5) closed heads nodes
2481 2480 """
2482 2481 total = 0
2483 2482 rawheader = inpart.read(rbcstruct.size)
2484 2483 cache = op.repo.revbranchcache()
2485 2484 cl = op.repo.unfiltered().changelog
2486 2485 while rawheader:
2487 2486 header = rbcstruct.unpack(rawheader)
2488 2487 total += header[1] + header[2]
2489 2488 utf8branch = inpart.read(header[0])
2490 2489 branch = encoding.tolocal(utf8branch)
2491 2490 for x in pycompat.xrange(header[1]):
2492 2491 node = inpart.read(20)
2493 2492 rev = cl.rev(node)
2494 2493 cache.setdata(branch, rev, node, False)
2495 2494 for x in pycompat.xrange(header[2]):
2496 2495 node = inpart.read(20)
2497 2496 rev = cl.rev(node)
2498 2497 cache.setdata(branch, rev, node, True)
2499 2498 rawheader = inpart.read(rbcstruct.size)
2500 2499 cache.write()
2501 2500
2502 2501
2503 2502 @parthandler(b'pushvars')
2504 2503 def bundle2getvars(op, part):
2505 2504 '''unbundle a bundle2 containing shellvars on the server'''
2506 2505 # An option to disable unbundling on server-side for security reasons
2507 2506 if op.ui.configbool(b'push', b'pushvars.server'):
2508 2507 hookargs = {}
2509 2508 for key, value in part.advisoryparams:
2510 2509 key = key.upper()
2511 2510 # We want pushed variables to have USERVAR_ prepended so we know
2512 2511 # they came from the --pushvar flag.
2513 2512 key = b"USERVAR_" + key
2514 2513 hookargs[key] = value
2515 2514 op.addhookargs(hookargs)
2516 2515
2517 2516
2518 2517 @parthandler(b'stream2', (b'requirements', b'filecount', b'bytecount'))
2519 2518 def handlestreamv2bundle(op, part):
2520 2519
2521 2520 requirements = urlreq.unquote(part.params[b'requirements']).split(b',')
2522 2521 filecount = int(part.params[b'filecount'])
2523 2522 bytecount = int(part.params[b'bytecount'])
2524 2523
2525 2524 repo = op.repo
2526 2525 if len(repo):
2527 2526 msg = _(b'cannot apply stream clone to non empty repository')
2528 2527 raise error.Abort(msg)
2529 2528
2530 2529 repo.ui.debug(b'applying stream bundle\n')
2531 2530 streamclone.applybundlev2(repo, part, filecount, bytecount, requirements)
2532 2531
2533 2532
2534 2533 def widen_bundle(
2535 2534 bundler, repo, oldmatcher, newmatcher, common, known, cgversion, ellipses
2536 2535 ):
2537 2536 """generates bundle2 for widening a narrow clone
2538 2537
2539 2538 bundler is the bundle to which data should be added
2540 2539 repo is the localrepository instance
2541 2540 oldmatcher matches what the client already has
2542 2541 newmatcher matches what the client needs (including what it already has)
2543 2542 common is set of common heads between server and client
2544 2543 known is a set of revs known on the client side (used in ellipses)
2545 2544 cgversion is the changegroup version to send
2546 2545 ellipses is boolean value telling whether to send ellipses data or not
2547 2546
2548 2547 returns bundle2 of the data required for extending
2549 2548 """
2550 2549 commonnodes = set()
2551 2550 cl = repo.changelog
2552 2551 for r in repo.revs(b"::%ln", common):
2553 2552 commonnodes.add(cl.node(r))
2554 2553 if commonnodes:
2555 2554 # XXX: we should only send the filelogs (and treemanifest). user
2556 2555 # already has the changelog and manifest
2557 2556 packer = changegroup.getbundler(
2558 2557 cgversion,
2559 2558 repo,
2560 2559 oldmatcher=oldmatcher,
2561 2560 matcher=newmatcher,
2562 2561 fullnodes=commonnodes,
2563 2562 )
2564 2563 cgdata = packer.generate(
2565 2564 {nodemod.nullid},
2566 2565 list(commonnodes),
2567 2566 False,
2568 2567 b'narrow_widen',
2569 2568 changelog=False,
2570 2569 )
2571 2570
2572 2571 part = bundler.newpart(b'changegroup', data=cgdata)
2573 2572 part.addparam(b'version', cgversion)
2574 2573 if b'treemanifest' in repo.requirements:
2575 2574 part.addparam(b'treemanifest', b'1')
2576 2575 if b'exp-sidedata-flag' in repo.requirements:
2577 2576 part.addparam(b'exp-sidedata', b'1')
2578 2577
2579 2578 return bundler
General Comments 0
You need to be logged in to leave comments. Login now