##// END OF EJS Templates
bundle2: avoid unbound read when seeking...
Gregory Szorc -
r35117:699b2a75 default
parent child Browse files
Show More
@@ -1,2023 +1,2030
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import, division
149 149
150 150 import errno
151 151 import os
152 152 import re
153 153 import string
154 154 import struct
155 155 import sys
156 156
157 157 from .i18n import _
158 158 from . import (
159 159 changegroup,
160 160 error,
161 161 node as nodemod,
162 162 obsolete,
163 163 phases,
164 164 pushkey,
165 165 pycompat,
166 166 tags,
167 167 url,
168 168 util,
169 169 )
170 170
171 171 urlerr = util.urlerr
172 172 urlreq = util.urlreq
173 173
174 174 _pack = struct.pack
175 175 _unpack = struct.unpack
176 176
177 177 _fstreamparamsize = '>i'
178 178 _fpartheadersize = '>i'
179 179 _fparttypesize = '>B'
180 180 _fpartid = '>I'
181 181 _fpayloadsize = '>i'
182 182 _fpartparamcount = '>BB'
183 183
184 184 preferedchunksize = 4096
185 185
186 186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
187 187
188 188 def outdebug(ui, message):
189 189 """debug regarding output stream (bundling)"""
190 190 if ui.configbool('devel', 'bundle2.debug'):
191 191 ui.debug('bundle2-output: %s\n' % message)
192 192
193 193 def indebug(ui, message):
194 194 """debug on input stream (unbundling)"""
195 195 if ui.configbool('devel', 'bundle2.debug'):
196 196 ui.debug('bundle2-input: %s\n' % message)
197 197
198 198 def validateparttype(parttype):
199 199 """raise ValueError if a parttype contains invalid character"""
200 200 if _parttypeforbidden.search(parttype):
201 201 raise ValueError(parttype)
202 202
203 203 def _makefpartparamsizes(nbparams):
204 204 """return a struct format to read part parameter sizes
205 205
206 206 The number parameters is variable so we need to build that format
207 207 dynamically.
208 208 """
209 209 return '>'+('BB'*nbparams)
210 210
211 211 parthandlermapping = {}
212 212
213 213 def parthandler(parttype, params=()):
214 214 """decorator that register a function as a bundle2 part handler
215 215
216 216 eg::
217 217
218 218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
219 219 def myparttypehandler(...):
220 220 '''process a part of type "my part".'''
221 221 ...
222 222 """
223 223 validateparttype(parttype)
224 224 def _decorator(func):
225 225 lparttype = parttype.lower() # enforce lower case matching.
226 226 assert lparttype not in parthandlermapping
227 227 parthandlermapping[lparttype] = func
228 228 func.params = frozenset(params)
229 229 return func
230 230 return _decorator
231 231
232 232 class unbundlerecords(object):
233 233 """keep record of what happens during and unbundle
234 234
235 235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
236 236 category of record and obj is an arbitrary object.
237 237
238 238 `records['cat']` will return all entries of this category 'cat'.
239 239
240 240 Iterating on the object itself will yield `('category', obj)` tuples
241 241 for all entries.
242 242
243 243 All iterations happens in chronological order.
244 244 """
245 245
246 246 def __init__(self):
247 247 self._categories = {}
248 248 self._sequences = []
249 249 self._replies = {}
250 250
251 251 def add(self, category, entry, inreplyto=None):
252 252 """add a new record of a given category.
253 253
254 254 The entry can then be retrieved in the list returned by
255 255 self['category']."""
256 256 self._categories.setdefault(category, []).append(entry)
257 257 self._sequences.append((category, entry))
258 258 if inreplyto is not None:
259 259 self.getreplies(inreplyto).add(category, entry)
260 260
261 261 def getreplies(self, partid):
262 262 """get the records that are replies to a specific part"""
263 263 return self._replies.setdefault(partid, unbundlerecords())
264 264
265 265 def __getitem__(self, cat):
266 266 return tuple(self._categories.get(cat, ()))
267 267
268 268 def __iter__(self):
269 269 return iter(self._sequences)
270 270
271 271 def __len__(self):
272 272 return len(self._sequences)
273 273
274 274 def __nonzero__(self):
275 275 return bool(self._sequences)
276 276
277 277 __bool__ = __nonzero__
278 278
279 279 class bundleoperation(object):
280 280 """an object that represents a single bundling process
281 281
282 282 Its purpose is to carry unbundle-related objects and states.
283 283
284 284 A new object should be created at the beginning of each bundle processing.
285 285 The object is to be returned by the processing function.
286 286
287 287 The object has very little content now it will ultimately contain:
288 288 * an access to the repo the bundle is applied to,
289 289 * a ui object,
290 290 * a way to retrieve a transaction to add changes to the repo,
291 291 * a way to record the result of processing each part,
292 292 * a way to construct a bundle response when applicable.
293 293 """
294 294
295 295 def __init__(self, repo, transactiongetter, captureoutput=True):
296 296 self.repo = repo
297 297 self.ui = repo.ui
298 298 self.records = unbundlerecords()
299 299 self.reply = None
300 300 self.captureoutput = captureoutput
301 301 self.hookargs = {}
302 302 self._gettransaction = transactiongetter
303 303
304 304 def gettransaction(self):
305 305 transaction = self._gettransaction()
306 306
307 307 if self.hookargs:
308 308 # the ones added to the transaction supercede those added
309 309 # to the operation.
310 310 self.hookargs.update(transaction.hookargs)
311 311 transaction.hookargs = self.hookargs
312 312
313 313 # mark the hookargs as flushed. further attempts to add to
314 314 # hookargs will result in an abort.
315 315 self.hookargs = None
316 316
317 317 return transaction
318 318
319 319 def addhookargs(self, hookargs):
320 320 if self.hookargs is None:
321 321 raise error.ProgrammingError('attempted to add hookargs to '
322 322 'operation after transaction started')
323 323 self.hookargs.update(hookargs)
324 324
325 325 class TransactionUnavailable(RuntimeError):
326 326 pass
327 327
328 328 def _notransaction():
329 329 """default method to get a transaction while processing a bundle
330 330
331 331 Raise an exception to highlight the fact that no transaction was expected
332 332 to be created"""
333 333 raise TransactionUnavailable()
334 334
335 335 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
336 336 # transform me into unbundler.apply() as soon as the freeze is lifted
337 337 if isinstance(unbundler, unbundle20):
338 338 tr.hookargs['bundle2'] = '1'
339 339 if source is not None and 'source' not in tr.hookargs:
340 340 tr.hookargs['source'] = source
341 341 if url is not None and 'url' not in tr.hookargs:
342 342 tr.hookargs['url'] = url
343 343 return processbundle(repo, unbundler, lambda: tr)
344 344 else:
345 345 # the transactiongetter won't be used, but we might as well set it
346 346 op = bundleoperation(repo, lambda: tr)
347 347 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
348 348 return op
349 349
350 350 class partiterator(object):
351 351 def __init__(self, repo, op, unbundler):
352 352 self.repo = repo
353 353 self.op = op
354 354 self.unbundler = unbundler
355 355 self.iterator = None
356 356 self.count = 0
357 357 self.current = None
358 358
359 359 def __enter__(self):
360 360 def func():
361 361 itr = enumerate(self.unbundler.iterparts())
362 362 for count, p in itr:
363 363 self.count = count
364 364 self.current = p
365 365 yield p
366 366 p.consume()
367 367 self.current = None
368 368 self.iterator = func()
369 369 return self.iterator
370 370
371 371 def __exit__(self, type, exc, tb):
372 372 if not self.iterator:
373 373 return
374 374
375 375 # Only gracefully abort in a normal exception situation. User aborts
376 376 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
377 377 # and should not gracefully cleanup.
378 378 if isinstance(exc, Exception):
379 379 # Any exceptions seeking to the end of the bundle at this point are
380 380 # almost certainly related to the underlying stream being bad.
381 381 # And, chances are that the exception we're handling is related to
382 382 # getting in that bad state. So, we swallow the seeking error and
383 383 # re-raise the original error.
384 384 seekerror = False
385 385 try:
386 386 if self.current:
387 387 # consume the part content to not corrupt the stream.
388 388 self.current.consume()
389 389
390 390 for part in self.iterator:
391 391 # consume the bundle content
392 392 part.consume()
393 393 except Exception:
394 394 seekerror = True
395 395
396 396 # Small hack to let caller code distinguish exceptions from bundle2
397 397 # processing from processing the old format. This is mostly needed
398 398 # to handle different return codes to unbundle according to the type
399 399 # of bundle. We should probably clean up or drop this return code
400 400 # craziness in a future version.
401 401 exc.duringunbundle2 = True
402 402 salvaged = []
403 403 replycaps = None
404 404 if self.op.reply is not None:
405 405 salvaged = self.op.reply.salvageoutput()
406 406 replycaps = self.op.reply.capabilities
407 407 exc._replycaps = replycaps
408 408 exc._bundle2salvagedoutput = salvaged
409 409
410 410 # Re-raising from a variable loses the original stack. So only use
411 411 # that form if we need to.
412 412 if seekerror:
413 413 raise exc
414 414
415 415 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
416 416 self.count)
417 417
418 418 def processbundle(repo, unbundler, transactiongetter=None, op=None):
419 419 """This function process a bundle, apply effect to/from a repo
420 420
421 421 It iterates over each part then searches for and uses the proper handling
422 422 code to process the part. Parts are processed in order.
423 423
424 424 Unknown Mandatory part will abort the process.
425 425
426 426 It is temporarily possible to provide a prebuilt bundleoperation to the
427 427 function. This is used to ensure output is properly propagated in case of
428 428 an error during the unbundling. This output capturing part will likely be
429 429 reworked and this ability will probably go away in the process.
430 430 """
431 431 if op is None:
432 432 if transactiongetter is None:
433 433 transactiongetter = _notransaction
434 434 op = bundleoperation(repo, transactiongetter)
435 435 # todo:
436 436 # - replace this is a init function soon.
437 437 # - exception catching
438 438 unbundler.params
439 439 if repo.ui.debugflag:
440 440 msg = ['bundle2-input-bundle:']
441 441 if unbundler.params:
442 442 msg.append(' %i params' % len(unbundler.params))
443 443 if op._gettransaction is None or op._gettransaction is _notransaction:
444 444 msg.append(' no-transaction')
445 445 else:
446 446 msg.append(' with-transaction')
447 447 msg.append('\n')
448 448 repo.ui.debug(''.join(msg))
449 449
450 450 processparts(repo, op, unbundler)
451 451
452 452 return op
453 453
454 454 def processparts(repo, op, unbundler):
455 455 with partiterator(repo, op, unbundler) as parts:
456 456 for part in parts:
457 457 _processpart(op, part)
458 458
459 459 def _processchangegroup(op, cg, tr, source, url, **kwargs):
460 460 ret = cg.apply(op.repo, tr, source, url, **kwargs)
461 461 op.records.add('changegroup', {
462 462 'return': ret,
463 463 })
464 464 return ret
465 465
466 466 def _gethandler(op, part):
467 467 status = 'unknown' # used by debug output
468 468 try:
469 469 handler = parthandlermapping.get(part.type)
470 470 if handler is None:
471 471 status = 'unsupported-type'
472 472 raise error.BundleUnknownFeatureError(parttype=part.type)
473 473 indebug(op.ui, 'found a handler for part %s' % part.type)
474 474 unknownparams = part.mandatorykeys - handler.params
475 475 if unknownparams:
476 476 unknownparams = list(unknownparams)
477 477 unknownparams.sort()
478 478 status = 'unsupported-params (%s)' % ', '.join(unknownparams)
479 479 raise error.BundleUnknownFeatureError(parttype=part.type,
480 480 params=unknownparams)
481 481 status = 'supported'
482 482 except error.BundleUnknownFeatureError as exc:
483 483 if part.mandatory: # mandatory parts
484 484 raise
485 485 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
486 486 return # skip to part processing
487 487 finally:
488 488 if op.ui.debugflag:
489 489 msg = ['bundle2-input-part: "%s"' % part.type]
490 490 if not part.mandatory:
491 491 msg.append(' (advisory)')
492 492 nbmp = len(part.mandatorykeys)
493 493 nbap = len(part.params) - nbmp
494 494 if nbmp or nbap:
495 495 msg.append(' (params:')
496 496 if nbmp:
497 497 msg.append(' %i mandatory' % nbmp)
498 498 if nbap:
499 499 msg.append(' %i advisory' % nbmp)
500 500 msg.append(')')
501 501 msg.append(' %s\n' % status)
502 502 op.ui.debug(''.join(msg))
503 503
504 504 return handler
505 505
506 506 def _processpart(op, part):
507 507 """process a single part from a bundle
508 508
509 509 The part is guaranteed to have been fully consumed when the function exits
510 510 (even if an exception is raised)."""
511 511 handler = _gethandler(op, part)
512 512 if handler is None:
513 513 return
514 514
515 515 # handler is called outside the above try block so that we don't
516 516 # risk catching KeyErrors from anything other than the
517 517 # parthandlermapping lookup (any KeyError raised by handler()
518 518 # itself represents a defect of a different variety).
519 519 output = None
520 520 if op.captureoutput and op.reply is not None:
521 521 op.ui.pushbuffer(error=True, subproc=True)
522 522 output = ''
523 523 try:
524 524 handler(op, part)
525 525 finally:
526 526 if output is not None:
527 527 output = op.ui.popbuffer()
528 528 if output:
529 529 outpart = op.reply.newpart('output', data=output,
530 530 mandatory=False)
531 531 outpart.addparam(
532 532 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
533 533
534 534 def decodecaps(blob):
535 535 """decode a bundle2 caps bytes blob into a dictionary
536 536
537 537 The blob is a list of capabilities (one per line)
538 538 Capabilities may have values using a line of the form::
539 539
540 540 capability=value1,value2,value3
541 541
542 542 The values are always a list."""
543 543 caps = {}
544 544 for line in blob.splitlines():
545 545 if not line:
546 546 continue
547 547 if '=' not in line:
548 548 key, vals = line, ()
549 549 else:
550 550 key, vals = line.split('=', 1)
551 551 vals = vals.split(',')
552 552 key = urlreq.unquote(key)
553 553 vals = [urlreq.unquote(v) for v in vals]
554 554 caps[key] = vals
555 555 return caps
556 556
557 557 def encodecaps(caps):
558 558 """encode a bundle2 caps dictionary into a bytes blob"""
559 559 chunks = []
560 560 for ca in sorted(caps):
561 561 vals = caps[ca]
562 562 ca = urlreq.quote(ca)
563 563 vals = [urlreq.quote(v) for v in vals]
564 564 if vals:
565 565 ca = "%s=%s" % (ca, ','.join(vals))
566 566 chunks.append(ca)
567 567 return '\n'.join(chunks)
568 568
569 569 bundletypes = {
570 570 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
571 571 # since the unification ssh accepts a header but there
572 572 # is no capability signaling it.
573 573 "HG20": (), # special-cased below
574 574 "HG10UN": ("HG10UN", 'UN'),
575 575 "HG10BZ": ("HG10", 'BZ'),
576 576 "HG10GZ": ("HG10GZ", 'GZ'),
577 577 }
578 578
579 579 # hgweb uses this list to communicate its preferred type
580 580 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
581 581
582 582 class bundle20(object):
583 583 """represent an outgoing bundle2 container
584 584
585 585 Use the `addparam` method to add stream level parameter. and `newpart` to
586 586 populate it. Then call `getchunks` to retrieve all the binary chunks of
587 587 data that compose the bundle2 container."""
588 588
589 589 _magicstring = 'HG20'
590 590
591 591 def __init__(self, ui, capabilities=()):
592 592 self.ui = ui
593 593 self._params = []
594 594 self._parts = []
595 595 self.capabilities = dict(capabilities)
596 596 self._compengine = util.compengines.forbundletype('UN')
597 597 self._compopts = None
598 598
599 599 def setcompression(self, alg, compopts=None):
600 600 """setup core part compression to <alg>"""
601 601 if alg in (None, 'UN'):
602 602 return
603 603 assert not any(n.lower() == 'compression' for n, v in self._params)
604 604 self.addparam('Compression', alg)
605 605 self._compengine = util.compengines.forbundletype(alg)
606 606 self._compopts = compopts
607 607
608 608 @property
609 609 def nbparts(self):
610 610 """total number of parts added to the bundler"""
611 611 return len(self._parts)
612 612
613 613 # methods used to defines the bundle2 content
614 614 def addparam(self, name, value=None):
615 615 """add a stream level parameter"""
616 616 if not name:
617 617 raise ValueError(r'empty parameter name')
618 618 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
619 619 raise ValueError(r'non letter first character: %s' % name)
620 620 self._params.append((name, value))
621 621
622 622 def addpart(self, part):
623 623 """add a new part to the bundle2 container
624 624
625 625 Parts contains the actual applicative payload."""
626 626 assert part.id is None
627 627 part.id = len(self._parts) # very cheap counter
628 628 self._parts.append(part)
629 629
630 630 def newpart(self, typeid, *args, **kwargs):
631 631 """create a new part and add it to the containers
632 632
633 633 As the part is directly added to the containers. For now, this means
634 634 that any failure to properly initialize the part after calling
635 635 ``newpart`` should result in a failure of the whole bundling process.
636 636
637 637 You can still fall back to manually create and add if you need better
638 638 control."""
639 639 part = bundlepart(typeid, *args, **kwargs)
640 640 self.addpart(part)
641 641 return part
642 642
643 643 # methods used to generate the bundle2 stream
644 644 def getchunks(self):
645 645 if self.ui.debugflag:
646 646 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
647 647 if self._params:
648 648 msg.append(' (%i params)' % len(self._params))
649 649 msg.append(' %i parts total\n' % len(self._parts))
650 650 self.ui.debug(''.join(msg))
651 651 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
652 652 yield self._magicstring
653 653 param = self._paramchunk()
654 654 outdebug(self.ui, 'bundle parameter: %s' % param)
655 655 yield _pack(_fstreamparamsize, len(param))
656 656 if param:
657 657 yield param
658 658 for chunk in self._compengine.compressstream(self._getcorechunk(),
659 659 self._compopts):
660 660 yield chunk
661 661
662 662 def _paramchunk(self):
663 663 """return a encoded version of all stream parameters"""
664 664 blocks = []
665 665 for par, value in self._params:
666 666 par = urlreq.quote(par)
667 667 if value is not None:
668 668 value = urlreq.quote(value)
669 669 par = '%s=%s' % (par, value)
670 670 blocks.append(par)
671 671 return ' '.join(blocks)
672 672
673 673 def _getcorechunk(self):
674 674 """yield chunk for the core part of the bundle
675 675
676 676 (all but headers and parameters)"""
677 677 outdebug(self.ui, 'start of parts')
678 678 for part in self._parts:
679 679 outdebug(self.ui, 'bundle part: "%s"' % part.type)
680 680 for chunk in part.getchunks(ui=self.ui):
681 681 yield chunk
682 682 outdebug(self.ui, 'end of bundle')
683 683 yield _pack(_fpartheadersize, 0)
684 684
685 685
686 686 def salvageoutput(self):
687 687 """return a list with a copy of all output parts in the bundle
688 688
689 689 This is meant to be used during error handling to make sure we preserve
690 690 server output"""
691 691 salvaged = []
692 692 for part in self._parts:
693 693 if part.type.startswith('output'):
694 694 salvaged.append(part.copy())
695 695 return salvaged
696 696
697 697
698 698 class unpackermixin(object):
699 699 """A mixin to extract bytes and struct data from a stream"""
700 700
701 701 def __init__(self, fp):
702 702 self._fp = fp
703 703
704 704 def _unpack(self, format):
705 705 """unpack this struct format from the stream
706 706
707 707 This method is meant for internal usage by the bundle2 protocol only.
708 708 They directly manipulate the low level stream including bundle2 level
709 709 instruction.
710 710
711 711 Do not use it to implement higher-level logic or methods."""
712 712 data = self._readexact(struct.calcsize(format))
713 713 return _unpack(format, data)
714 714
715 715 def _readexact(self, size):
716 716 """read exactly <size> bytes from the stream
717 717
718 718 This method is meant for internal usage by the bundle2 protocol only.
719 719 They directly manipulate the low level stream including bundle2 level
720 720 instruction.
721 721
722 722 Do not use it to implement higher-level logic or methods."""
723 723 return changegroup.readexactly(self._fp, size)
724 724
725 725 def getunbundler(ui, fp, magicstring=None):
726 726 """return a valid unbundler object for a given magicstring"""
727 727 if magicstring is None:
728 728 magicstring = changegroup.readexactly(fp, 4)
729 729 magic, version = magicstring[0:2], magicstring[2:4]
730 730 if magic != 'HG':
731 731 ui.debug(
732 732 "error: invalid magic: %r (version %r), should be 'HG'\n"
733 733 % (magic, version))
734 734 raise error.Abort(_('not a Mercurial bundle'))
735 735 unbundlerclass = formatmap.get(version)
736 736 if unbundlerclass is None:
737 737 raise error.Abort(_('unknown bundle version %s') % version)
738 738 unbundler = unbundlerclass(ui, fp)
739 739 indebug(ui, 'start processing of %s stream' % magicstring)
740 740 return unbundler
741 741
742 742 class unbundle20(unpackermixin):
743 743 """interpret a bundle2 stream
744 744
745 745 This class is fed with a binary stream and yields parts through its
746 746 `iterparts` methods."""
747 747
748 748 _magicstring = 'HG20'
749 749
750 750 def __init__(self, ui, fp):
751 751 """If header is specified, we do not read it out of the stream."""
752 752 self.ui = ui
753 753 self._compengine = util.compengines.forbundletype('UN')
754 754 self._compressed = None
755 755 super(unbundle20, self).__init__(fp)
756 756
757 757 @util.propertycache
758 758 def params(self):
759 759 """dictionary of stream level parameters"""
760 760 indebug(self.ui, 'reading bundle2 stream parameters')
761 761 params = {}
762 762 paramssize = self._unpack(_fstreamparamsize)[0]
763 763 if paramssize < 0:
764 764 raise error.BundleValueError('negative bundle param size: %i'
765 765 % paramssize)
766 766 if paramssize:
767 767 params = self._readexact(paramssize)
768 768 params = self._processallparams(params)
769 769 return params
770 770
771 771 def _processallparams(self, paramsblock):
772 772 """"""
773 773 params = util.sortdict()
774 774 for p in paramsblock.split(' '):
775 775 p = p.split('=', 1)
776 776 p = [urlreq.unquote(i) for i in p]
777 777 if len(p) < 2:
778 778 p.append(None)
779 779 self._processparam(*p)
780 780 params[p[0]] = p[1]
781 781 return params
782 782
783 783
784 784 def _processparam(self, name, value):
785 785 """process a parameter, applying its effect if needed
786 786
787 787 Parameter starting with a lower case letter are advisory and will be
788 788 ignored when unknown. Those starting with an upper case letter are
789 789 mandatory and will this function will raise a KeyError when unknown.
790 790
791 791 Note: no option are currently supported. Any input will be either
792 792 ignored or failing.
793 793 """
794 794 if not name:
795 795 raise ValueError(r'empty parameter name')
796 796 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
797 797 raise ValueError(r'non letter first character: %s' % name)
798 798 try:
799 799 handler = b2streamparamsmap[name.lower()]
800 800 except KeyError:
801 801 if name[0:1].islower():
802 802 indebug(self.ui, "ignoring unknown parameter %s" % name)
803 803 else:
804 804 raise error.BundleUnknownFeatureError(params=(name,))
805 805 else:
806 806 handler(self, name, value)
807 807
808 808 def _forwardchunks(self):
809 809 """utility to transfer a bundle2 as binary
810 810
811 811 This is made necessary by the fact the 'getbundle' command over 'ssh'
812 812 have no way to know then the reply end, relying on the bundle to be
813 813 interpreted to know its end. This is terrible and we are sorry, but we
814 814 needed to move forward to get general delta enabled.
815 815 """
816 816 yield self._magicstring
817 817 assert 'params' not in vars(self)
818 818 paramssize = self._unpack(_fstreamparamsize)[0]
819 819 if paramssize < 0:
820 820 raise error.BundleValueError('negative bundle param size: %i'
821 821 % paramssize)
822 822 yield _pack(_fstreamparamsize, paramssize)
823 823 if paramssize:
824 824 params = self._readexact(paramssize)
825 825 self._processallparams(params)
826 826 yield params
827 827 assert self._compengine.bundletype == 'UN'
828 828 # From there, payload might need to be decompressed
829 829 self._fp = self._compengine.decompressorreader(self._fp)
830 830 emptycount = 0
831 831 while emptycount < 2:
832 832 # so we can brainlessly loop
833 833 assert _fpartheadersize == _fpayloadsize
834 834 size = self._unpack(_fpartheadersize)[0]
835 835 yield _pack(_fpartheadersize, size)
836 836 if size:
837 837 emptycount = 0
838 838 else:
839 839 emptycount += 1
840 840 continue
841 841 if size == flaginterrupt:
842 842 continue
843 843 elif size < 0:
844 844 raise error.BundleValueError('negative chunk size: %i')
845 845 yield self._readexact(size)
846 846
847 847
848 848 def iterparts(self, seekable=False):
849 849 """yield all parts contained in the stream"""
850 850 cls = seekableunbundlepart if seekable else unbundlepart
851 851 # make sure param have been loaded
852 852 self.params
853 853 # From there, payload need to be decompressed
854 854 self._fp = self._compengine.decompressorreader(self._fp)
855 855 indebug(self.ui, 'start extraction of bundle2 parts')
856 856 headerblock = self._readpartheader()
857 857 while headerblock is not None:
858 858 part = cls(self.ui, headerblock, self._fp)
859 859 yield part
860 860 # Ensure part is fully consumed so we can start reading the next
861 861 # part.
862 862 part.consume()
863 863
864 864 headerblock = self._readpartheader()
865 865 indebug(self.ui, 'end of bundle2 stream')
866 866
867 867 def _readpartheader(self):
868 868 """reads a part header size and return the bytes blob
869 869
870 870 returns None if empty"""
871 871 headersize = self._unpack(_fpartheadersize)[0]
872 872 if headersize < 0:
873 873 raise error.BundleValueError('negative part header size: %i'
874 874 % headersize)
875 875 indebug(self.ui, 'part header size: %i' % headersize)
876 876 if headersize:
877 877 return self._readexact(headersize)
878 878 return None
879 879
880 880 def compressed(self):
881 881 self.params # load params
882 882 return self._compressed
883 883
884 884 def close(self):
885 885 """close underlying file"""
886 886 if util.safehasattr(self._fp, 'close'):
887 887 return self._fp.close()
888 888
889 889 formatmap = {'20': unbundle20}
890 890
891 891 b2streamparamsmap = {}
892 892
893 893 def b2streamparamhandler(name):
894 894 """register a handler for a stream level parameter"""
895 895 def decorator(func):
896 896 assert name not in formatmap
897 897 b2streamparamsmap[name] = func
898 898 return func
899 899 return decorator
900 900
901 901 @b2streamparamhandler('compression')
902 902 def processcompression(unbundler, param, value):
903 903 """read compression parameter and install payload decompression"""
904 904 if value not in util.compengines.supportedbundletypes:
905 905 raise error.BundleUnknownFeatureError(params=(param,),
906 906 values=(value,))
907 907 unbundler._compengine = util.compengines.forbundletype(value)
908 908 if value is not None:
909 909 unbundler._compressed = True
910 910
911 911 class bundlepart(object):
912 912 """A bundle2 part contains application level payload
913 913
914 914 The part `type` is used to route the part to the application level
915 915 handler.
916 916
917 917 The part payload is contained in ``part.data``. It could be raw bytes or a
918 918 generator of byte chunks.
919 919
920 920 You can add parameters to the part using the ``addparam`` method.
921 921 Parameters can be either mandatory (default) or advisory. Remote side
922 922 should be able to safely ignore the advisory ones.
923 923
924 924 Both data and parameters cannot be modified after the generation has begun.
925 925 """
926 926
927 927 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
928 928 data='', mandatory=True):
929 929 validateparttype(parttype)
930 930 self.id = None
931 931 self.type = parttype
932 932 self._data = data
933 933 self._mandatoryparams = list(mandatoryparams)
934 934 self._advisoryparams = list(advisoryparams)
935 935 # checking for duplicated entries
936 936 self._seenparams = set()
937 937 for pname, __ in self._mandatoryparams + self._advisoryparams:
938 938 if pname in self._seenparams:
939 939 raise error.ProgrammingError('duplicated params: %s' % pname)
940 940 self._seenparams.add(pname)
941 941 # status of the part's generation:
942 942 # - None: not started,
943 943 # - False: currently generated,
944 944 # - True: generation done.
945 945 self._generated = None
946 946 self.mandatory = mandatory
947 947
948 948 def __repr__(self):
949 949 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
950 950 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
951 951 % (cls, id(self), self.id, self.type, self.mandatory))
952 952
953 953 def copy(self):
954 954 """return a copy of the part
955 955
956 956 The new part have the very same content but no partid assigned yet.
957 957 Parts with generated data cannot be copied."""
958 958 assert not util.safehasattr(self.data, 'next')
959 959 return self.__class__(self.type, self._mandatoryparams,
960 960 self._advisoryparams, self._data, self.mandatory)
961 961
962 962 # methods used to defines the part content
963 963 @property
964 964 def data(self):
965 965 return self._data
966 966
967 967 @data.setter
968 968 def data(self, data):
969 969 if self._generated is not None:
970 970 raise error.ReadOnlyPartError('part is being generated')
971 971 self._data = data
972 972
973 973 @property
974 974 def mandatoryparams(self):
975 975 # make it an immutable tuple to force people through ``addparam``
976 976 return tuple(self._mandatoryparams)
977 977
978 978 @property
979 979 def advisoryparams(self):
980 980 # make it an immutable tuple to force people through ``addparam``
981 981 return tuple(self._advisoryparams)
982 982
983 983 def addparam(self, name, value='', mandatory=True):
984 984 """add a parameter to the part
985 985
986 986 If 'mandatory' is set to True, the remote handler must claim support
987 987 for this parameter or the unbundling will be aborted.
988 988
989 989 The 'name' and 'value' cannot exceed 255 bytes each.
990 990 """
991 991 if self._generated is not None:
992 992 raise error.ReadOnlyPartError('part is being generated')
993 993 if name in self._seenparams:
994 994 raise ValueError('duplicated params: %s' % name)
995 995 self._seenparams.add(name)
996 996 params = self._advisoryparams
997 997 if mandatory:
998 998 params = self._mandatoryparams
999 999 params.append((name, value))
1000 1000
1001 1001 # methods used to generates the bundle2 stream
1002 1002 def getchunks(self, ui):
1003 1003 if self._generated is not None:
1004 1004 raise error.ProgrammingError('part can only be consumed once')
1005 1005 self._generated = False
1006 1006
1007 1007 if ui.debugflag:
1008 1008 msg = ['bundle2-output-part: "%s"' % self.type]
1009 1009 if not self.mandatory:
1010 1010 msg.append(' (advisory)')
1011 1011 nbmp = len(self.mandatoryparams)
1012 1012 nbap = len(self.advisoryparams)
1013 1013 if nbmp or nbap:
1014 1014 msg.append(' (params:')
1015 1015 if nbmp:
1016 1016 msg.append(' %i mandatory' % nbmp)
1017 1017 if nbap:
1018 1018 msg.append(' %i advisory' % nbmp)
1019 1019 msg.append(')')
1020 1020 if not self.data:
1021 1021 msg.append(' empty payload')
1022 1022 elif (util.safehasattr(self.data, 'next')
1023 1023 or util.safehasattr(self.data, '__next__')):
1024 1024 msg.append(' streamed payload')
1025 1025 else:
1026 1026 msg.append(' %i bytes payload' % len(self.data))
1027 1027 msg.append('\n')
1028 1028 ui.debug(''.join(msg))
1029 1029
1030 1030 #### header
1031 1031 if self.mandatory:
1032 1032 parttype = self.type.upper()
1033 1033 else:
1034 1034 parttype = self.type.lower()
1035 1035 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1036 1036 ## parttype
1037 1037 header = [_pack(_fparttypesize, len(parttype)),
1038 1038 parttype, _pack(_fpartid, self.id),
1039 1039 ]
1040 1040 ## parameters
1041 1041 # count
1042 1042 manpar = self.mandatoryparams
1043 1043 advpar = self.advisoryparams
1044 1044 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1045 1045 # size
1046 1046 parsizes = []
1047 1047 for key, value in manpar:
1048 1048 parsizes.append(len(key))
1049 1049 parsizes.append(len(value))
1050 1050 for key, value in advpar:
1051 1051 parsizes.append(len(key))
1052 1052 parsizes.append(len(value))
1053 1053 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1054 1054 header.append(paramsizes)
1055 1055 # key, value
1056 1056 for key, value in manpar:
1057 1057 header.append(key)
1058 1058 header.append(value)
1059 1059 for key, value in advpar:
1060 1060 header.append(key)
1061 1061 header.append(value)
1062 1062 ## finalize header
1063 1063 try:
1064 1064 headerchunk = ''.join(header)
1065 1065 except TypeError:
1066 1066 raise TypeError(r'Found a non-bytes trying to '
1067 1067 r'build bundle part header: %r' % header)
1068 1068 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1069 1069 yield _pack(_fpartheadersize, len(headerchunk))
1070 1070 yield headerchunk
1071 1071 ## payload
1072 1072 try:
1073 1073 for chunk in self._payloadchunks():
1074 1074 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1075 1075 yield _pack(_fpayloadsize, len(chunk))
1076 1076 yield chunk
1077 1077 except GeneratorExit:
1078 1078 # GeneratorExit means that nobody is listening for our
1079 1079 # results anyway, so just bail quickly rather than trying
1080 1080 # to produce an error part.
1081 1081 ui.debug('bundle2-generatorexit\n')
1082 1082 raise
1083 1083 except BaseException as exc:
1084 1084 bexc = util.forcebytestr(exc)
1085 1085 # backup exception data for later
1086 1086 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1087 1087 % bexc)
1088 1088 tb = sys.exc_info()[2]
1089 1089 msg = 'unexpected error: %s' % bexc
1090 1090 interpart = bundlepart('error:abort', [('message', msg)],
1091 1091 mandatory=False)
1092 1092 interpart.id = 0
1093 1093 yield _pack(_fpayloadsize, -1)
1094 1094 for chunk in interpart.getchunks(ui=ui):
1095 1095 yield chunk
1096 1096 outdebug(ui, 'closing payload chunk')
1097 1097 # abort current part payload
1098 1098 yield _pack(_fpayloadsize, 0)
1099 1099 pycompat.raisewithtb(exc, tb)
1100 1100 # end of payload
1101 1101 outdebug(ui, 'closing payload chunk')
1102 1102 yield _pack(_fpayloadsize, 0)
1103 1103 self._generated = True
1104 1104
1105 1105 def _payloadchunks(self):
1106 1106 """yield chunks of a the part payload
1107 1107
1108 1108 Exists to handle the different methods to provide data to a part."""
1109 1109 # we only support fixed size data now.
1110 1110 # This will be improved in the future.
1111 1111 if (util.safehasattr(self.data, 'next')
1112 1112 or util.safehasattr(self.data, '__next__')):
1113 1113 buff = util.chunkbuffer(self.data)
1114 1114 chunk = buff.read(preferedchunksize)
1115 1115 while chunk:
1116 1116 yield chunk
1117 1117 chunk = buff.read(preferedchunksize)
1118 1118 elif len(self.data):
1119 1119 yield self.data
1120 1120
1121 1121
1122 1122 flaginterrupt = -1
1123 1123
1124 1124 class interrupthandler(unpackermixin):
1125 1125 """read one part and process it with restricted capability
1126 1126
1127 1127 This allows to transmit exception raised on the producer size during part
1128 1128 iteration while the consumer is reading a part.
1129 1129
1130 1130 Part processed in this manner only have access to a ui object,"""
1131 1131
1132 1132 def __init__(self, ui, fp):
1133 1133 super(interrupthandler, self).__init__(fp)
1134 1134 self.ui = ui
1135 1135
1136 1136 def _readpartheader(self):
1137 1137 """reads a part header size and return the bytes blob
1138 1138
1139 1139 returns None if empty"""
1140 1140 headersize = self._unpack(_fpartheadersize)[0]
1141 1141 if headersize < 0:
1142 1142 raise error.BundleValueError('negative part header size: %i'
1143 1143 % headersize)
1144 1144 indebug(self.ui, 'part header size: %i\n' % headersize)
1145 1145 if headersize:
1146 1146 return self._readexact(headersize)
1147 1147 return None
1148 1148
1149 1149 def __call__(self):
1150 1150
1151 1151 self.ui.debug('bundle2-input-stream-interrupt:'
1152 1152 ' opening out of band context\n')
1153 1153 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1154 1154 headerblock = self._readpartheader()
1155 1155 if headerblock is None:
1156 1156 indebug(self.ui, 'no part found during interruption.')
1157 1157 return
1158 1158 part = unbundlepart(self.ui, headerblock, self._fp)
1159 1159 op = interruptoperation(self.ui)
1160 1160 hardabort = False
1161 1161 try:
1162 1162 _processpart(op, part)
1163 1163 except (SystemExit, KeyboardInterrupt):
1164 1164 hardabort = True
1165 1165 raise
1166 1166 finally:
1167 1167 if not hardabort:
1168 1168 part.consume()
1169 1169 self.ui.debug('bundle2-input-stream-interrupt:'
1170 1170 ' closing out of band context\n')
1171 1171
1172 1172 class interruptoperation(object):
1173 1173 """A limited operation to be use by part handler during interruption
1174 1174
1175 1175 It only have access to an ui object.
1176 1176 """
1177 1177
1178 1178 def __init__(self, ui):
1179 1179 self.ui = ui
1180 1180 self.reply = None
1181 1181 self.captureoutput = False
1182 1182
1183 1183 @property
1184 1184 def repo(self):
1185 1185 raise error.ProgrammingError('no repo access from stream interruption')
1186 1186
1187 1187 def gettransaction(self):
1188 1188 raise TransactionUnavailable('no repo access from stream interruption')
1189 1189
1190 1190 def decodepayloadchunks(ui, fh):
1191 1191 """Reads bundle2 part payload data into chunks.
1192 1192
1193 1193 Part payload data consists of framed chunks. This function takes
1194 1194 a file handle and emits those chunks.
1195 1195 """
1196 1196 dolog = ui.configbool('devel', 'bundle2.debug')
1197 1197 debug = ui.debug
1198 1198
1199 1199 headerstruct = struct.Struct(_fpayloadsize)
1200 1200 headersize = headerstruct.size
1201 1201 unpack = headerstruct.unpack
1202 1202
1203 1203 readexactly = changegroup.readexactly
1204 1204 read = fh.read
1205 1205
1206 1206 chunksize = unpack(readexactly(fh, headersize))[0]
1207 1207 indebug(ui, 'payload chunk size: %i' % chunksize)
1208 1208
1209 1209 # changegroup.readexactly() is inlined below for performance.
1210 1210 while chunksize:
1211 1211 if chunksize >= 0:
1212 1212 s = read(chunksize)
1213 1213 if len(s) < chunksize:
1214 1214 raise error.Abort(_('stream ended unexpectedly '
1215 1215 ' (got %d bytes, expected %d)') %
1216 1216 (len(s), chunksize))
1217 1217
1218 1218 yield s
1219 1219 elif chunksize == flaginterrupt:
1220 1220 # Interrupt "signal" detected. The regular stream is interrupted
1221 1221 # and a bundle2 part follows. Consume it.
1222 1222 interrupthandler(ui, fh)()
1223 1223 else:
1224 1224 raise error.BundleValueError(
1225 1225 'negative payload chunk size: %s' % chunksize)
1226 1226
1227 1227 s = read(headersize)
1228 1228 if len(s) < headersize:
1229 1229 raise error.Abort(_('stream ended unexpectedly '
1230 1230 ' (got %d bytes, expected %d)') %
1231 1231 (len(s), chunksize))
1232 1232
1233 1233 chunksize = unpack(s)[0]
1234 1234
1235 1235 # indebug() inlined for performance.
1236 1236 if dolog:
1237 1237 debug('bundle2-input: payload chunk size: %i\n' % chunksize)
1238 1238
1239 1239 class unbundlepart(unpackermixin):
1240 1240 """a bundle part read from a bundle"""
1241 1241
1242 1242 def __init__(self, ui, header, fp):
1243 1243 super(unbundlepart, self).__init__(fp)
1244 1244 self._seekable = (util.safehasattr(fp, 'seek') and
1245 1245 util.safehasattr(fp, 'tell'))
1246 1246 self.ui = ui
1247 1247 # unbundle state attr
1248 1248 self._headerdata = header
1249 1249 self._headeroffset = 0
1250 1250 self._initialized = False
1251 1251 self.consumed = False
1252 1252 # part data
1253 1253 self.id = None
1254 1254 self.type = None
1255 1255 self.mandatoryparams = None
1256 1256 self.advisoryparams = None
1257 1257 self.params = None
1258 1258 self.mandatorykeys = ()
1259 1259 self._readheader()
1260 1260 self._mandatory = None
1261 1261 self._pos = 0
1262 1262
1263 1263 def _fromheader(self, size):
1264 1264 """return the next <size> byte from the header"""
1265 1265 offset = self._headeroffset
1266 1266 data = self._headerdata[offset:(offset + size)]
1267 1267 self._headeroffset = offset + size
1268 1268 return data
1269 1269
1270 1270 def _unpackheader(self, format):
1271 1271 """read given format from header
1272 1272
1273 1273 This automatically compute the size of the format to read."""
1274 1274 data = self._fromheader(struct.calcsize(format))
1275 1275 return _unpack(format, data)
1276 1276
1277 1277 def _initparams(self, mandatoryparams, advisoryparams):
1278 1278 """internal function to setup all logic related parameters"""
1279 1279 # make it read only to prevent people touching it by mistake.
1280 1280 self.mandatoryparams = tuple(mandatoryparams)
1281 1281 self.advisoryparams = tuple(advisoryparams)
1282 1282 # user friendly UI
1283 1283 self.params = util.sortdict(self.mandatoryparams)
1284 1284 self.params.update(self.advisoryparams)
1285 1285 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1286 1286
1287 1287 def _readheader(self):
1288 1288 """read the header and setup the object"""
1289 1289 typesize = self._unpackheader(_fparttypesize)[0]
1290 1290 self.type = self._fromheader(typesize)
1291 1291 indebug(self.ui, 'part type: "%s"' % self.type)
1292 1292 self.id = self._unpackheader(_fpartid)[0]
1293 1293 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1294 1294 # extract mandatory bit from type
1295 1295 self.mandatory = (self.type != self.type.lower())
1296 1296 self.type = self.type.lower()
1297 1297 ## reading parameters
1298 1298 # param count
1299 1299 mancount, advcount = self._unpackheader(_fpartparamcount)
1300 1300 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1301 1301 # param size
1302 1302 fparamsizes = _makefpartparamsizes(mancount + advcount)
1303 1303 paramsizes = self._unpackheader(fparamsizes)
1304 1304 # make it a list of couple again
1305 1305 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1306 1306 # split mandatory from advisory
1307 1307 mansizes = paramsizes[:mancount]
1308 1308 advsizes = paramsizes[mancount:]
1309 1309 # retrieve param value
1310 1310 manparams = []
1311 1311 for key, value in mansizes:
1312 1312 manparams.append((self._fromheader(key), self._fromheader(value)))
1313 1313 advparams = []
1314 1314 for key, value in advsizes:
1315 1315 advparams.append((self._fromheader(key), self._fromheader(value)))
1316 1316 self._initparams(manparams, advparams)
1317 1317 ## part payload
1318 1318 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1319 1319 # we read the data, tell it
1320 1320 self._initialized = True
1321 1321
1322 1322 def _payloadchunks(self):
1323 1323 """Generator of decoded chunks in the payload."""
1324 1324 return decodepayloadchunks(self.ui, self._fp)
1325 1325
1326 1326 def consume(self):
1327 1327 """Read the part payload until completion.
1328 1328
1329 1329 By consuming the part data, the underlying stream read offset will
1330 1330 be advanced to the next part (or end of stream).
1331 1331 """
1332 1332 if self.consumed:
1333 1333 return
1334 1334
1335 1335 chunk = self.read(32768)
1336 1336 while chunk:
1337 1337 self._pos += len(chunk)
1338 1338 chunk = self.read(32768)
1339 1339
1340 1340 def read(self, size=None):
1341 1341 """read payload data"""
1342 1342 if not self._initialized:
1343 1343 self._readheader()
1344 1344 if size is None:
1345 1345 data = self._payloadstream.read()
1346 1346 else:
1347 1347 data = self._payloadstream.read(size)
1348 1348 self._pos += len(data)
1349 1349 if size is None or len(data) < size:
1350 1350 if not self.consumed and self._pos:
1351 1351 self.ui.debug('bundle2-input-part: total payload size %i\n'
1352 1352 % self._pos)
1353 1353 self.consumed = True
1354 1354 return data
1355 1355
1356 1356 class seekableunbundlepart(unbundlepart):
1357 1357 """A bundle2 part in a bundle that is seekable.
1358 1358
1359 1359 Regular ``unbundlepart`` instances can only be read once. This class
1360 1360 extends ``unbundlepart`` to enable bi-directional seeking within the
1361 1361 part.
1362 1362
1363 1363 Bundle2 part data consists of framed chunks. Offsets when seeking
1364 1364 refer to the decoded data, not the offsets in the underlying bundle2
1365 1365 stream.
1366 1366
1367 1367 To facilitate quickly seeking within the decoded data, instances of this
1368 1368 class maintain a mapping between offsets in the underlying stream and
1369 1369 the decoded payload. This mapping will consume memory in proportion
1370 1370 to the number of chunks within the payload (which almost certainly
1371 1371 increases in proportion with the size of the part).
1372 1372 """
1373 1373 def __init__(self, ui, header, fp):
1374 1374 # (payload, file) offsets for chunk starts.
1375 1375 self._chunkindex = []
1376 1376
1377 1377 super(seekableunbundlepart, self).__init__(ui, header, fp)
1378 1378
1379 1379 def _payloadchunks(self, chunknum=0):
1380 1380 '''seek to specified chunk and start yielding data'''
1381 1381 if len(self._chunkindex) == 0:
1382 1382 assert chunknum == 0, 'Must start with chunk 0'
1383 1383 self._chunkindex.append((0, self._tellfp()))
1384 1384 else:
1385 1385 assert chunknum < len(self._chunkindex), \
1386 1386 'Unknown chunk %d' % chunknum
1387 1387 self._seekfp(self._chunkindex[chunknum][1])
1388 1388
1389 1389 pos = self._chunkindex[chunknum][0]
1390 1390
1391 1391 for chunk in decodepayloadchunks(self.ui, self._fp):
1392 1392 chunknum += 1
1393 1393 pos += len(chunk)
1394 1394 if chunknum == len(self._chunkindex):
1395 1395 self._chunkindex.append((pos, self._tellfp()))
1396 1396
1397 1397 yield chunk
1398 1398
1399 1399 def _findchunk(self, pos):
1400 1400 '''for a given payload position, return a chunk number and offset'''
1401 1401 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1402 1402 if ppos == pos:
1403 1403 return chunk, 0
1404 1404 elif ppos > pos:
1405 1405 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1406 1406 raise ValueError('Unknown chunk')
1407 1407
1408 1408 def tell(self):
1409 1409 return self._pos
1410 1410
1411 1411 def seek(self, offset, whence=os.SEEK_SET):
1412 1412 if whence == os.SEEK_SET:
1413 1413 newpos = offset
1414 1414 elif whence == os.SEEK_CUR:
1415 1415 newpos = self._pos + offset
1416 1416 elif whence == os.SEEK_END:
1417 1417 if not self.consumed:
1418 self.read()
1418 # Can't use self.consume() here because it advances self._pos.
1419 chunk = self.read(32768)
1420 while chunk:
1421 chunk = self.read(32768)
1419 1422 newpos = self._chunkindex[-1][0] - offset
1420 1423 else:
1421 1424 raise ValueError('Unknown whence value: %r' % (whence,))
1422 1425
1423 1426 if newpos > self._chunkindex[-1][0] and not self.consumed:
1424 self.read()
1427 # Can't use self.consume() here because it advances self._pos.
1428 chunk = self.read(32768)
1429 while chunk:
1430 chunk = self.read(32668)
1431
1425 1432 if not 0 <= newpos <= self._chunkindex[-1][0]:
1426 1433 raise ValueError('Offset out of range')
1427 1434
1428 1435 if self._pos != newpos:
1429 1436 chunk, internaloffset = self._findchunk(newpos)
1430 1437 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1431 1438 adjust = self.read(internaloffset)
1432 1439 if len(adjust) != internaloffset:
1433 1440 raise error.Abort(_('Seek failed\n'))
1434 1441 self._pos = newpos
1435 1442
1436 1443 def _seekfp(self, offset, whence=0):
1437 1444 """move the underlying file pointer
1438 1445
1439 1446 This method is meant for internal usage by the bundle2 protocol only.
1440 1447 They directly manipulate the low level stream including bundle2 level
1441 1448 instruction.
1442 1449
1443 1450 Do not use it to implement higher-level logic or methods."""
1444 1451 if self._seekable:
1445 1452 return self._fp.seek(offset, whence)
1446 1453 else:
1447 1454 raise NotImplementedError(_('File pointer is not seekable'))
1448 1455
1449 1456 def _tellfp(self):
1450 1457 """return the file offset, or None if file is not seekable
1451 1458
1452 1459 This method is meant for internal usage by the bundle2 protocol only.
1453 1460 They directly manipulate the low level stream including bundle2 level
1454 1461 instruction.
1455 1462
1456 1463 Do not use it to implement higher-level logic or methods."""
1457 1464 if self._seekable:
1458 1465 try:
1459 1466 return self._fp.tell()
1460 1467 except IOError as e:
1461 1468 if e.errno == errno.ESPIPE:
1462 1469 self._seekable = False
1463 1470 else:
1464 1471 raise
1465 1472 return None
1466 1473
1467 1474 # These are only the static capabilities.
1468 1475 # Check the 'getrepocaps' function for the rest.
1469 1476 capabilities = {'HG20': (),
1470 1477 'error': ('abort', 'unsupportedcontent', 'pushraced',
1471 1478 'pushkey'),
1472 1479 'listkeys': (),
1473 1480 'pushkey': (),
1474 1481 'digests': tuple(sorted(util.DIGESTS.keys())),
1475 1482 'remote-changegroup': ('http', 'https'),
1476 1483 'hgtagsfnodes': (),
1477 1484 'phases': ('heads',),
1478 1485 }
1479 1486
1480 1487 def getrepocaps(repo, allowpushback=False):
1481 1488 """return the bundle2 capabilities for a given repo
1482 1489
1483 1490 Exists to allow extensions (like evolution) to mutate the capabilities.
1484 1491 """
1485 1492 caps = capabilities.copy()
1486 1493 caps['changegroup'] = tuple(sorted(
1487 1494 changegroup.supportedincomingversions(repo)))
1488 1495 if obsolete.isenabled(repo, obsolete.exchangeopt):
1489 1496 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1490 1497 caps['obsmarkers'] = supportedformat
1491 1498 if allowpushback:
1492 1499 caps['pushback'] = ()
1493 1500 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1494 1501 if cpmode == 'check-related':
1495 1502 caps['checkheads'] = ('related',)
1496 1503 if 'phases' in repo.ui.configlist('devel', 'legacy.exchange'):
1497 1504 caps.pop('phases')
1498 1505 return caps
1499 1506
1500 1507 def bundle2caps(remote):
1501 1508 """return the bundle capabilities of a peer as dict"""
1502 1509 raw = remote.capable('bundle2')
1503 1510 if not raw and raw != '':
1504 1511 return {}
1505 1512 capsblob = urlreq.unquote(remote.capable('bundle2'))
1506 1513 return decodecaps(capsblob)
1507 1514
1508 1515 def obsmarkersversion(caps):
1509 1516 """extract the list of supported obsmarkers versions from a bundle2caps dict
1510 1517 """
1511 1518 obscaps = caps.get('obsmarkers', ())
1512 1519 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1513 1520
1514 1521 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1515 1522 vfs=None, compression=None, compopts=None):
1516 1523 if bundletype.startswith('HG10'):
1517 1524 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1518 1525 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1519 1526 compression=compression, compopts=compopts)
1520 1527 elif not bundletype.startswith('HG20'):
1521 1528 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1522 1529
1523 1530 caps = {}
1524 1531 if 'obsolescence' in opts:
1525 1532 caps['obsmarkers'] = ('V1',)
1526 1533 bundle = bundle20(ui, caps)
1527 1534 bundle.setcompression(compression, compopts)
1528 1535 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1529 1536 chunkiter = bundle.getchunks()
1530 1537
1531 1538 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1532 1539
1533 1540 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1534 1541 # We should eventually reconcile this logic with the one behind
1535 1542 # 'exchange.getbundle2partsgenerator'.
1536 1543 #
1537 1544 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1538 1545 # different right now. So we keep them separated for now for the sake of
1539 1546 # simplicity.
1540 1547
1541 1548 # we always want a changegroup in such bundle
1542 1549 cgversion = opts.get('cg.version')
1543 1550 if cgversion is None:
1544 1551 cgversion = changegroup.safeversion(repo)
1545 1552 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1546 1553 part = bundler.newpart('changegroup', data=cg.getchunks())
1547 1554 part.addparam('version', cg.version)
1548 1555 if 'clcount' in cg.extras:
1549 1556 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1550 1557 mandatory=False)
1551 1558 if opts.get('phases') and repo.revs('%ln and secret()',
1552 1559 outgoing.missingheads):
1553 1560 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1554 1561
1555 1562 addparttagsfnodescache(repo, bundler, outgoing)
1556 1563
1557 1564 if opts.get('obsolescence', False):
1558 1565 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1559 1566 buildobsmarkerspart(bundler, obsmarkers)
1560 1567
1561 1568 if opts.get('phases', False):
1562 1569 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1563 1570 phasedata = phases.binaryencode(headsbyphase)
1564 1571 bundler.newpart('phase-heads', data=phasedata)
1565 1572
1566 1573 def addparttagsfnodescache(repo, bundler, outgoing):
1567 1574 # we include the tags fnode cache for the bundle changeset
1568 1575 # (as an optional parts)
1569 1576 cache = tags.hgtagsfnodescache(repo.unfiltered())
1570 1577 chunks = []
1571 1578
1572 1579 # .hgtags fnodes are only relevant for head changesets. While we could
1573 1580 # transfer values for all known nodes, there will likely be little to
1574 1581 # no benefit.
1575 1582 #
1576 1583 # We don't bother using a generator to produce output data because
1577 1584 # a) we only have 40 bytes per head and even esoteric numbers of heads
1578 1585 # consume little memory (1M heads is 40MB) b) we don't want to send the
1579 1586 # part if we don't have entries and knowing if we have entries requires
1580 1587 # cache lookups.
1581 1588 for node in outgoing.missingheads:
1582 1589 # Don't compute missing, as this may slow down serving.
1583 1590 fnode = cache.getfnode(node, computemissing=False)
1584 1591 if fnode is not None:
1585 1592 chunks.extend([node, fnode])
1586 1593
1587 1594 if chunks:
1588 1595 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1589 1596
1590 1597 def buildobsmarkerspart(bundler, markers):
1591 1598 """add an obsmarker part to the bundler with <markers>
1592 1599
1593 1600 No part is created if markers is empty.
1594 1601 Raises ValueError if the bundler doesn't support any known obsmarker format.
1595 1602 """
1596 1603 if not markers:
1597 1604 return None
1598 1605
1599 1606 remoteversions = obsmarkersversion(bundler.capabilities)
1600 1607 version = obsolete.commonversion(remoteversions)
1601 1608 if version is None:
1602 1609 raise ValueError('bundler does not support common obsmarker format')
1603 1610 stream = obsolete.encodemarkers(markers, True, version=version)
1604 1611 return bundler.newpart('obsmarkers', data=stream)
1605 1612
1606 1613 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1607 1614 compopts=None):
1608 1615 """Write a bundle file and return its filename.
1609 1616
1610 1617 Existing files will not be overwritten.
1611 1618 If no filename is specified, a temporary file is created.
1612 1619 bz2 compression can be turned off.
1613 1620 The bundle file will be deleted in case of errors.
1614 1621 """
1615 1622
1616 1623 if bundletype == "HG20":
1617 1624 bundle = bundle20(ui)
1618 1625 bundle.setcompression(compression, compopts)
1619 1626 part = bundle.newpart('changegroup', data=cg.getchunks())
1620 1627 part.addparam('version', cg.version)
1621 1628 if 'clcount' in cg.extras:
1622 1629 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1623 1630 mandatory=False)
1624 1631 chunkiter = bundle.getchunks()
1625 1632 else:
1626 1633 # compression argument is only for the bundle2 case
1627 1634 assert compression is None
1628 1635 if cg.version != '01':
1629 1636 raise error.Abort(_('old bundle types only supports v1 '
1630 1637 'changegroups'))
1631 1638 header, comp = bundletypes[bundletype]
1632 1639 if comp not in util.compengines.supportedbundletypes:
1633 1640 raise error.Abort(_('unknown stream compression type: %s')
1634 1641 % comp)
1635 1642 compengine = util.compengines.forbundletype(comp)
1636 1643 def chunkiter():
1637 1644 yield header
1638 1645 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1639 1646 yield chunk
1640 1647 chunkiter = chunkiter()
1641 1648
1642 1649 # parse the changegroup data, otherwise we will block
1643 1650 # in case of sshrepo because we don't know the end of the stream
1644 1651 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1645 1652
1646 1653 def combinechangegroupresults(op):
1647 1654 """logic to combine 0 or more addchangegroup results into one"""
1648 1655 results = [r.get('return', 0)
1649 1656 for r in op.records['changegroup']]
1650 1657 changedheads = 0
1651 1658 result = 1
1652 1659 for ret in results:
1653 1660 # If any changegroup result is 0, return 0
1654 1661 if ret == 0:
1655 1662 result = 0
1656 1663 break
1657 1664 if ret < -1:
1658 1665 changedheads += ret + 1
1659 1666 elif ret > 1:
1660 1667 changedheads += ret - 1
1661 1668 if changedheads > 0:
1662 1669 result = 1 + changedheads
1663 1670 elif changedheads < 0:
1664 1671 result = -1 + changedheads
1665 1672 return result
1666 1673
1667 1674 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1668 1675 'targetphase'))
1669 1676 def handlechangegroup(op, inpart):
1670 1677 """apply a changegroup part on the repo
1671 1678
1672 1679 This is a very early implementation that will massive rework before being
1673 1680 inflicted to any end-user.
1674 1681 """
1675 1682 tr = op.gettransaction()
1676 1683 unpackerversion = inpart.params.get('version', '01')
1677 1684 # We should raise an appropriate exception here
1678 1685 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1679 1686 # the source and url passed here are overwritten by the one contained in
1680 1687 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1681 1688 nbchangesets = None
1682 1689 if 'nbchanges' in inpart.params:
1683 1690 nbchangesets = int(inpart.params.get('nbchanges'))
1684 1691 if ('treemanifest' in inpart.params and
1685 1692 'treemanifest' not in op.repo.requirements):
1686 1693 if len(op.repo.changelog) != 0:
1687 1694 raise error.Abort(_(
1688 1695 "bundle contains tree manifests, but local repo is "
1689 1696 "non-empty and does not use tree manifests"))
1690 1697 op.repo.requirements.add('treemanifest')
1691 1698 op.repo._applyopenerreqs()
1692 1699 op.repo._writerequirements()
1693 1700 extrakwargs = {}
1694 1701 targetphase = inpart.params.get('targetphase')
1695 1702 if targetphase is not None:
1696 1703 extrakwargs['targetphase'] = int(targetphase)
1697 1704 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1698 1705 expectedtotal=nbchangesets, **extrakwargs)
1699 1706 if op.reply is not None:
1700 1707 # This is definitely not the final form of this
1701 1708 # return. But one need to start somewhere.
1702 1709 part = op.reply.newpart('reply:changegroup', mandatory=False)
1703 1710 part.addparam(
1704 1711 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1705 1712 part.addparam('return', '%i' % ret, mandatory=False)
1706 1713 assert not inpart.read()
1707 1714
1708 1715 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1709 1716 ['digest:%s' % k for k in util.DIGESTS.keys()])
1710 1717 @parthandler('remote-changegroup', _remotechangegroupparams)
1711 1718 def handleremotechangegroup(op, inpart):
1712 1719 """apply a bundle10 on the repo, given an url and validation information
1713 1720
1714 1721 All the information about the remote bundle to import are given as
1715 1722 parameters. The parameters include:
1716 1723 - url: the url to the bundle10.
1717 1724 - size: the bundle10 file size. It is used to validate what was
1718 1725 retrieved by the client matches the server knowledge about the bundle.
1719 1726 - digests: a space separated list of the digest types provided as
1720 1727 parameters.
1721 1728 - digest:<digest-type>: the hexadecimal representation of the digest with
1722 1729 that name. Like the size, it is used to validate what was retrieved by
1723 1730 the client matches what the server knows about the bundle.
1724 1731
1725 1732 When multiple digest types are given, all of them are checked.
1726 1733 """
1727 1734 try:
1728 1735 raw_url = inpart.params['url']
1729 1736 except KeyError:
1730 1737 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1731 1738 parsed_url = util.url(raw_url)
1732 1739 if parsed_url.scheme not in capabilities['remote-changegroup']:
1733 1740 raise error.Abort(_('remote-changegroup does not support %s urls') %
1734 1741 parsed_url.scheme)
1735 1742
1736 1743 try:
1737 1744 size = int(inpart.params['size'])
1738 1745 except ValueError:
1739 1746 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1740 1747 % 'size')
1741 1748 except KeyError:
1742 1749 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1743 1750
1744 1751 digests = {}
1745 1752 for typ in inpart.params.get('digests', '').split():
1746 1753 param = 'digest:%s' % typ
1747 1754 try:
1748 1755 value = inpart.params[param]
1749 1756 except KeyError:
1750 1757 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1751 1758 param)
1752 1759 digests[typ] = value
1753 1760
1754 1761 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1755 1762
1756 1763 tr = op.gettransaction()
1757 1764 from . import exchange
1758 1765 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1759 1766 if not isinstance(cg, changegroup.cg1unpacker):
1760 1767 raise error.Abort(_('%s: not a bundle version 1.0') %
1761 1768 util.hidepassword(raw_url))
1762 1769 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1763 1770 if op.reply is not None:
1764 1771 # This is definitely not the final form of this
1765 1772 # return. But one need to start somewhere.
1766 1773 part = op.reply.newpart('reply:changegroup')
1767 1774 part.addparam(
1768 1775 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1769 1776 part.addparam('return', '%i' % ret, mandatory=False)
1770 1777 try:
1771 1778 real_part.validate()
1772 1779 except error.Abort as e:
1773 1780 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1774 1781 (util.hidepassword(raw_url), str(e)))
1775 1782 assert not inpart.read()
1776 1783
1777 1784 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1778 1785 def handlereplychangegroup(op, inpart):
1779 1786 ret = int(inpart.params['return'])
1780 1787 replyto = int(inpart.params['in-reply-to'])
1781 1788 op.records.add('changegroup', {'return': ret}, replyto)
1782 1789
1783 1790 @parthandler('check:heads')
1784 1791 def handlecheckheads(op, inpart):
1785 1792 """check that head of the repo did not change
1786 1793
1787 1794 This is used to detect a push race when using unbundle.
1788 1795 This replaces the "heads" argument of unbundle."""
1789 1796 h = inpart.read(20)
1790 1797 heads = []
1791 1798 while len(h) == 20:
1792 1799 heads.append(h)
1793 1800 h = inpart.read(20)
1794 1801 assert not h
1795 1802 # Trigger a transaction so that we are guaranteed to have the lock now.
1796 1803 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1797 1804 op.gettransaction()
1798 1805 if sorted(heads) != sorted(op.repo.heads()):
1799 1806 raise error.PushRaced('repository changed while pushing - '
1800 1807 'please try again')
1801 1808
1802 1809 @parthandler('check:updated-heads')
1803 1810 def handlecheckupdatedheads(op, inpart):
1804 1811 """check for race on the heads touched by a push
1805 1812
1806 1813 This is similar to 'check:heads' but focus on the heads actually updated
1807 1814 during the push. If other activities happen on unrelated heads, it is
1808 1815 ignored.
1809 1816
1810 1817 This allow server with high traffic to avoid push contention as long as
1811 1818 unrelated parts of the graph are involved."""
1812 1819 h = inpart.read(20)
1813 1820 heads = []
1814 1821 while len(h) == 20:
1815 1822 heads.append(h)
1816 1823 h = inpart.read(20)
1817 1824 assert not h
1818 1825 # trigger a transaction so that we are guaranteed to have the lock now.
1819 1826 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1820 1827 op.gettransaction()
1821 1828
1822 1829 currentheads = set()
1823 1830 for ls in op.repo.branchmap().itervalues():
1824 1831 currentheads.update(ls)
1825 1832
1826 1833 for h in heads:
1827 1834 if h not in currentheads:
1828 1835 raise error.PushRaced('repository changed while pushing - '
1829 1836 'please try again')
1830 1837
1831 1838 @parthandler('check:phases')
1832 1839 def handlecheckphases(op, inpart):
1833 1840 """check that phase boundaries of the repository did not change
1834 1841
1835 1842 This is used to detect a push race.
1836 1843 """
1837 1844 phasetonodes = phases.binarydecode(inpart)
1838 1845 unfi = op.repo.unfiltered()
1839 1846 cl = unfi.changelog
1840 1847 phasecache = unfi._phasecache
1841 1848 msg = ('repository changed while pushing - please try again '
1842 1849 '(%s is %s expected %s)')
1843 1850 for expectedphase, nodes in enumerate(phasetonodes):
1844 1851 for n in nodes:
1845 1852 actualphase = phasecache.phase(unfi, cl.rev(n))
1846 1853 if actualphase != expectedphase:
1847 1854 finalmsg = msg % (nodemod.short(n),
1848 1855 phases.phasenames[actualphase],
1849 1856 phases.phasenames[expectedphase])
1850 1857 raise error.PushRaced(finalmsg)
1851 1858
1852 1859 @parthandler('output')
1853 1860 def handleoutput(op, inpart):
1854 1861 """forward output captured on the server to the client"""
1855 1862 for line in inpart.read().splitlines():
1856 1863 op.ui.status(_('remote: %s\n') % line)
1857 1864
1858 1865 @parthandler('replycaps')
1859 1866 def handlereplycaps(op, inpart):
1860 1867 """Notify that a reply bundle should be created
1861 1868
1862 1869 The payload contains the capabilities information for the reply"""
1863 1870 caps = decodecaps(inpart.read())
1864 1871 if op.reply is None:
1865 1872 op.reply = bundle20(op.ui, caps)
1866 1873
1867 1874 class AbortFromPart(error.Abort):
1868 1875 """Sub-class of Abort that denotes an error from a bundle2 part."""
1869 1876
1870 1877 @parthandler('error:abort', ('message', 'hint'))
1871 1878 def handleerrorabort(op, inpart):
1872 1879 """Used to transmit abort error over the wire"""
1873 1880 raise AbortFromPart(inpart.params['message'],
1874 1881 hint=inpart.params.get('hint'))
1875 1882
1876 1883 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1877 1884 'in-reply-to'))
1878 1885 def handleerrorpushkey(op, inpart):
1879 1886 """Used to transmit failure of a mandatory pushkey over the wire"""
1880 1887 kwargs = {}
1881 1888 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1882 1889 value = inpart.params.get(name)
1883 1890 if value is not None:
1884 1891 kwargs[name] = value
1885 1892 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1886 1893
1887 1894 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1888 1895 def handleerrorunsupportedcontent(op, inpart):
1889 1896 """Used to transmit unknown content error over the wire"""
1890 1897 kwargs = {}
1891 1898 parttype = inpart.params.get('parttype')
1892 1899 if parttype is not None:
1893 1900 kwargs['parttype'] = parttype
1894 1901 params = inpart.params.get('params')
1895 1902 if params is not None:
1896 1903 kwargs['params'] = params.split('\0')
1897 1904
1898 1905 raise error.BundleUnknownFeatureError(**kwargs)
1899 1906
1900 1907 @parthandler('error:pushraced', ('message',))
1901 1908 def handleerrorpushraced(op, inpart):
1902 1909 """Used to transmit push race error over the wire"""
1903 1910 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1904 1911
1905 1912 @parthandler('listkeys', ('namespace',))
1906 1913 def handlelistkeys(op, inpart):
1907 1914 """retrieve pushkey namespace content stored in a bundle2"""
1908 1915 namespace = inpart.params['namespace']
1909 1916 r = pushkey.decodekeys(inpart.read())
1910 1917 op.records.add('listkeys', (namespace, r))
1911 1918
1912 1919 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1913 1920 def handlepushkey(op, inpart):
1914 1921 """process a pushkey request"""
1915 1922 dec = pushkey.decode
1916 1923 namespace = dec(inpart.params['namespace'])
1917 1924 key = dec(inpart.params['key'])
1918 1925 old = dec(inpart.params['old'])
1919 1926 new = dec(inpart.params['new'])
1920 1927 # Grab the transaction to ensure that we have the lock before performing the
1921 1928 # pushkey.
1922 1929 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1923 1930 op.gettransaction()
1924 1931 ret = op.repo.pushkey(namespace, key, old, new)
1925 1932 record = {'namespace': namespace,
1926 1933 'key': key,
1927 1934 'old': old,
1928 1935 'new': new}
1929 1936 op.records.add('pushkey', record)
1930 1937 if op.reply is not None:
1931 1938 rpart = op.reply.newpart('reply:pushkey')
1932 1939 rpart.addparam(
1933 1940 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1934 1941 rpart.addparam('return', '%i' % ret, mandatory=False)
1935 1942 if inpart.mandatory and not ret:
1936 1943 kwargs = {}
1937 1944 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1938 1945 if key in inpart.params:
1939 1946 kwargs[key] = inpart.params[key]
1940 1947 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1941 1948
1942 1949 @parthandler('phase-heads')
1943 1950 def handlephases(op, inpart):
1944 1951 """apply phases from bundle part to repo"""
1945 1952 headsbyphase = phases.binarydecode(inpart)
1946 1953 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
1947 1954
1948 1955 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1949 1956 def handlepushkeyreply(op, inpart):
1950 1957 """retrieve the result of a pushkey request"""
1951 1958 ret = int(inpart.params['return'])
1952 1959 partid = int(inpart.params['in-reply-to'])
1953 1960 op.records.add('pushkey', {'return': ret}, partid)
1954 1961
1955 1962 @parthandler('obsmarkers')
1956 1963 def handleobsmarker(op, inpart):
1957 1964 """add a stream of obsmarkers to the repo"""
1958 1965 tr = op.gettransaction()
1959 1966 markerdata = inpart.read()
1960 1967 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1961 1968 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1962 1969 % len(markerdata))
1963 1970 # The mergemarkers call will crash if marker creation is not enabled.
1964 1971 # we want to avoid this if the part is advisory.
1965 1972 if not inpart.mandatory and op.repo.obsstore.readonly:
1966 1973 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1967 1974 return
1968 1975 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1969 1976 op.repo.invalidatevolatilesets()
1970 1977 if new:
1971 1978 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1972 1979 op.records.add('obsmarkers', {'new': new})
1973 1980 if op.reply is not None:
1974 1981 rpart = op.reply.newpart('reply:obsmarkers')
1975 1982 rpart.addparam(
1976 1983 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1977 1984 rpart.addparam('new', '%i' % new, mandatory=False)
1978 1985
1979 1986
1980 1987 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1981 1988 def handleobsmarkerreply(op, inpart):
1982 1989 """retrieve the result of a pushkey request"""
1983 1990 ret = int(inpart.params['new'])
1984 1991 partid = int(inpart.params['in-reply-to'])
1985 1992 op.records.add('obsmarkers', {'new': ret}, partid)
1986 1993
1987 1994 @parthandler('hgtagsfnodes')
1988 1995 def handlehgtagsfnodes(op, inpart):
1989 1996 """Applies .hgtags fnodes cache entries to the local repo.
1990 1997
1991 1998 Payload is pairs of 20 byte changeset nodes and filenodes.
1992 1999 """
1993 2000 # Grab the transaction so we ensure that we have the lock at this point.
1994 2001 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1995 2002 op.gettransaction()
1996 2003 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1997 2004
1998 2005 count = 0
1999 2006 while True:
2000 2007 node = inpart.read(20)
2001 2008 fnode = inpart.read(20)
2002 2009 if len(node) < 20 or len(fnode) < 20:
2003 2010 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
2004 2011 break
2005 2012 cache.setfnode(node, fnode)
2006 2013 count += 1
2007 2014
2008 2015 cache.write()
2009 2016 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
2010 2017
2011 2018 @parthandler('pushvars')
2012 2019 def bundle2getvars(op, part):
2013 2020 '''unbundle a bundle2 containing shellvars on the server'''
2014 2021 # An option to disable unbundling on server-side for security reasons
2015 2022 if op.ui.configbool('push', 'pushvars.server'):
2016 2023 hookargs = {}
2017 2024 for key, value in part.advisoryparams:
2018 2025 key = key.upper()
2019 2026 # We want pushed variables to have USERVAR_ prepended so we know
2020 2027 # they came from the --pushvar flag.
2021 2028 key = "USERVAR_" + key
2022 2029 hookargs[key] = value
2023 2030 op.addhookargs(hookargs)
General Comments 0
You need to be logged in to leave comments. Login now