##// END OF EJS Templates
bundle2: implement consume() API on unbundlepart...
Gregory Szorc -
r35111:db503852 default
parent child Browse files
Show More
@@ -1,1986 +1,2001 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import, division
149 149
150 150 import errno
151 151 import os
152 152 import re
153 153 import string
154 154 import struct
155 155 import sys
156 156
157 157 from .i18n import _
158 158 from . import (
159 159 changegroup,
160 160 error,
161 161 node as nodemod,
162 162 obsolete,
163 163 phases,
164 164 pushkey,
165 165 pycompat,
166 166 tags,
167 167 url,
168 168 util,
169 169 )
170 170
171 171 urlerr = util.urlerr
172 172 urlreq = util.urlreq
173 173
174 174 _pack = struct.pack
175 175 _unpack = struct.unpack
176 176
177 177 _fstreamparamsize = '>i'
178 178 _fpartheadersize = '>i'
179 179 _fparttypesize = '>B'
180 180 _fpartid = '>I'
181 181 _fpayloadsize = '>i'
182 182 _fpartparamcount = '>BB'
183 183
184 184 preferedchunksize = 4096
185 185
186 186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
187 187
188 188 def outdebug(ui, message):
189 189 """debug regarding output stream (bundling)"""
190 190 if ui.configbool('devel', 'bundle2.debug'):
191 191 ui.debug('bundle2-output: %s\n' % message)
192 192
193 193 def indebug(ui, message):
194 194 """debug on input stream (unbundling)"""
195 195 if ui.configbool('devel', 'bundle2.debug'):
196 196 ui.debug('bundle2-input: %s\n' % message)
197 197
198 198 def validateparttype(parttype):
199 199 """raise ValueError if a parttype contains invalid character"""
200 200 if _parttypeforbidden.search(parttype):
201 201 raise ValueError(parttype)
202 202
203 203 def _makefpartparamsizes(nbparams):
204 204 """return a struct format to read part parameter sizes
205 205
206 206 The number parameters is variable so we need to build that format
207 207 dynamically.
208 208 """
209 209 return '>'+('BB'*nbparams)
210 210
211 211 parthandlermapping = {}
212 212
213 213 def parthandler(parttype, params=()):
214 214 """decorator that register a function as a bundle2 part handler
215 215
216 216 eg::
217 217
218 218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
219 219 def myparttypehandler(...):
220 220 '''process a part of type "my part".'''
221 221 ...
222 222 """
223 223 validateparttype(parttype)
224 224 def _decorator(func):
225 225 lparttype = parttype.lower() # enforce lower case matching.
226 226 assert lparttype not in parthandlermapping
227 227 parthandlermapping[lparttype] = func
228 228 func.params = frozenset(params)
229 229 return func
230 230 return _decorator
231 231
232 232 class unbundlerecords(object):
233 233 """keep record of what happens during and unbundle
234 234
235 235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
236 236 category of record and obj is an arbitrary object.
237 237
238 238 `records['cat']` will return all entries of this category 'cat'.
239 239
240 240 Iterating on the object itself will yield `('category', obj)` tuples
241 241 for all entries.
242 242
243 243 All iterations happens in chronological order.
244 244 """
245 245
246 246 def __init__(self):
247 247 self._categories = {}
248 248 self._sequences = []
249 249 self._replies = {}
250 250
251 251 def add(self, category, entry, inreplyto=None):
252 252 """add a new record of a given category.
253 253
254 254 The entry can then be retrieved in the list returned by
255 255 self['category']."""
256 256 self._categories.setdefault(category, []).append(entry)
257 257 self._sequences.append((category, entry))
258 258 if inreplyto is not None:
259 259 self.getreplies(inreplyto).add(category, entry)
260 260
261 261 def getreplies(self, partid):
262 262 """get the records that are replies to a specific part"""
263 263 return self._replies.setdefault(partid, unbundlerecords())
264 264
265 265 def __getitem__(self, cat):
266 266 return tuple(self._categories.get(cat, ()))
267 267
268 268 def __iter__(self):
269 269 return iter(self._sequences)
270 270
271 271 def __len__(self):
272 272 return len(self._sequences)
273 273
274 274 def __nonzero__(self):
275 275 return bool(self._sequences)
276 276
277 277 __bool__ = __nonzero__
278 278
279 279 class bundleoperation(object):
280 280 """an object that represents a single bundling process
281 281
282 282 Its purpose is to carry unbundle-related objects and states.
283 283
284 284 A new object should be created at the beginning of each bundle processing.
285 285 The object is to be returned by the processing function.
286 286
287 287 The object has very little content now it will ultimately contain:
288 288 * an access to the repo the bundle is applied to,
289 289 * a ui object,
290 290 * a way to retrieve a transaction to add changes to the repo,
291 291 * a way to record the result of processing each part,
292 292 * a way to construct a bundle response when applicable.
293 293 """
294 294
295 295 def __init__(self, repo, transactiongetter, captureoutput=True):
296 296 self.repo = repo
297 297 self.ui = repo.ui
298 298 self.records = unbundlerecords()
299 299 self.reply = None
300 300 self.captureoutput = captureoutput
301 301 self.hookargs = {}
302 302 self._gettransaction = transactiongetter
303 303
304 304 def gettransaction(self):
305 305 transaction = self._gettransaction()
306 306
307 307 if self.hookargs:
308 308 # the ones added to the transaction supercede those added
309 309 # to the operation.
310 310 self.hookargs.update(transaction.hookargs)
311 311 transaction.hookargs = self.hookargs
312 312
313 313 # mark the hookargs as flushed. further attempts to add to
314 314 # hookargs will result in an abort.
315 315 self.hookargs = None
316 316
317 317 return transaction
318 318
319 319 def addhookargs(self, hookargs):
320 320 if self.hookargs is None:
321 321 raise error.ProgrammingError('attempted to add hookargs to '
322 322 'operation after transaction started')
323 323 self.hookargs.update(hookargs)
324 324
325 325 class TransactionUnavailable(RuntimeError):
326 326 pass
327 327
328 328 def _notransaction():
329 329 """default method to get a transaction while processing a bundle
330 330
331 331 Raise an exception to highlight the fact that no transaction was expected
332 332 to be created"""
333 333 raise TransactionUnavailable()
334 334
335 335 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
336 336 # transform me into unbundler.apply() as soon as the freeze is lifted
337 337 if isinstance(unbundler, unbundle20):
338 338 tr.hookargs['bundle2'] = '1'
339 339 if source is not None and 'source' not in tr.hookargs:
340 340 tr.hookargs['source'] = source
341 341 if url is not None and 'url' not in tr.hookargs:
342 342 tr.hookargs['url'] = url
343 343 return processbundle(repo, unbundler, lambda: tr)
344 344 else:
345 345 # the transactiongetter won't be used, but we might as well set it
346 346 op = bundleoperation(repo, lambda: tr)
347 347 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
348 348 return op
349 349
350 350 class partiterator(object):
351 351 def __init__(self, repo, op, unbundler):
352 352 self.repo = repo
353 353 self.op = op
354 354 self.unbundler = unbundler
355 355 self.iterator = None
356 356 self.count = 0
357 357 self.current = None
358 358
359 359 def __enter__(self):
360 360 def func():
361 361 itr = enumerate(self.unbundler.iterparts())
362 362 for count, p in itr:
363 363 self.count = count
364 364 self.current = p
365 365 yield p
366 p.seek(0, os.SEEK_END)
366 p.consume()
367 367 self.current = None
368 368 self.iterator = func()
369 369 return self.iterator
370 370
371 371 def __exit__(self, type, exc, tb):
372 372 if not self.iterator:
373 373 return
374 374
375 375 # Only gracefully abort in a normal exception situation. User aborts
376 376 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
377 377 # and should not gracefully cleanup.
378 378 if isinstance(exc, Exception):
379 379 # Any exceptions seeking to the end of the bundle at this point are
380 380 # almost certainly related to the underlying stream being bad.
381 381 # And, chances are that the exception we're handling is related to
382 382 # getting in that bad state. So, we swallow the seeking error and
383 383 # re-raise the original error.
384 384 seekerror = False
385 385 try:
386 386 if self.current:
387 387 # consume the part content to not corrupt the stream.
388 self.current.seek(0, os.SEEK_END)
388 self.current.consume()
389 389
390 390 for part in self.iterator:
391 391 # consume the bundle content
392 part.seek(0, os.SEEK_END)
392 part.consume()
393 393 except Exception:
394 394 seekerror = True
395 395
396 396 # Small hack to let caller code distinguish exceptions from bundle2
397 397 # processing from processing the old format. This is mostly needed
398 398 # to handle different return codes to unbundle according to the type
399 399 # of bundle. We should probably clean up or drop this return code
400 400 # craziness in a future version.
401 401 exc.duringunbundle2 = True
402 402 salvaged = []
403 403 replycaps = None
404 404 if self.op.reply is not None:
405 405 salvaged = self.op.reply.salvageoutput()
406 406 replycaps = self.op.reply.capabilities
407 407 exc._replycaps = replycaps
408 408 exc._bundle2salvagedoutput = salvaged
409 409
410 410 # Re-raising from a variable loses the original stack. So only use
411 411 # that form if we need to.
412 412 if seekerror:
413 413 raise exc
414 414
415 415 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
416 416 self.count)
417 417
418 418 def processbundle(repo, unbundler, transactiongetter=None, op=None):
419 419 """This function process a bundle, apply effect to/from a repo
420 420
421 421 It iterates over each part then searches for and uses the proper handling
422 422 code to process the part. Parts are processed in order.
423 423
424 424 Unknown Mandatory part will abort the process.
425 425
426 426 It is temporarily possible to provide a prebuilt bundleoperation to the
427 427 function. This is used to ensure output is properly propagated in case of
428 428 an error during the unbundling. This output capturing part will likely be
429 429 reworked and this ability will probably go away in the process.
430 430 """
431 431 if op is None:
432 432 if transactiongetter is None:
433 433 transactiongetter = _notransaction
434 434 op = bundleoperation(repo, transactiongetter)
435 435 # todo:
436 436 # - replace this is a init function soon.
437 437 # - exception catching
438 438 unbundler.params
439 439 if repo.ui.debugflag:
440 440 msg = ['bundle2-input-bundle:']
441 441 if unbundler.params:
442 442 msg.append(' %i params' % len(unbundler.params))
443 443 if op._gettransaction is None or op._gettransaction is _notransaction:
444 444 msg.append(' no-transaction')
445 445 else:
446 446 msg.append(' with-transaction')
447 447 msg.append('\n')
448 448 repo.ui.debug(''.join(msg))
449 449
450 450 processparts(repo, op, unbundler)
451 451
452 452 return op
453 453
454 454 def processparts(repo, op, unbundler):
455 455 with partiterator(repo, op, unbundler) as parts:
456 456 for part in parts:
457 457 _processpart(op, part)
458 458
459 459 def _processchangegroup(op, cg, tr, source, url, **kwargs):
460 460 ret = cg.apply(op.repo, tr, source, url, **kwargs)
461 461 op.records.add('changegroup', {
462 462 'return': ret,
463 463 })
464 464 return ret
465 465
466 466 def _gethandler(op, part):
467 467 status = 'unknown' # used by debug output
468 468 try:
469 469 handler = parthandlermapping.get(part.type)
470 470 if handler is None:
471 471 status = 'unsupported-type'
472 472 raise error.BundleUnknownFeatureError(parttype=part.type)
473 473 indebug(op.ui, 'found a handler for part %s' % part.type)
474 474 unknownparams = part.mandatorykeys - handler.params
475 475 if unknownparams:
476 476 unknownparams = list(unknownparams)
477 477 unknownparams.sort()
478 478 status = 'unsupported-params (%s)' % ', '.join(unknownparams)
479 479 raise error.BundleUnknownFeatureError(parttype=part.type,
480 480 params=unknownparams)
481 481 status = 'supported'
482 482 except error.BundleUnknownFeatureError as exc:
483 483 if part.mandatory: # mandatory parts
484 484 raise
485 485 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
486 486 return # skip to part processing
487 487 finally:
488 488 if op.ui.debugflag:
489 489 msg = ['bundle2-input-part: "%s"' % part.type]
490 490 if not part.mandatory:
491 491 msg.append(' (advisory)')
492 492 nbmp = len(part.mandatorykeys)
493 493 nbap = len(part.params) - nbmp
494 494 if nbmp or nbap:
495 495 msg.append(' (params:')
496 496 if nbmp:
497 497 msg.append(' %i mandatory' % nbmp)
498 498 if nbap:
499 499 msg.append(' %i advisory' % nbmp)
500 500 msg.append(')')
501 501 msg.append(' %s\n' % status)
502 502 op.ui.debug(''.join(msg))
503 503
504 504 return handler
505 505
506 506 def _processpart(op, part):
507 507 """process a single part from a bundle
508 508
509 509 The part is guaranteed to have been fully consumed when the function exits
510 510 (even if an exception is raised)."""
511 511 handler = _gethandler(op, part)
512 512 if handler is None:
513 513 return
514 514
515 515 # handler is called outside the above try block so that we don't
516 516 # risk catching KeyErrors from anything other than the
517 517 # parthandlermapping lookup (any KeyError raised by handler()
518 518 # itself represents a defect of a different variety).
519 519 output = None
520 520 if op.captureoutput and op.reply is not None:
521 521 op.ui.pushbuffer(error=True, subproc=True)
522 522 output = ''
523 523 try:
524 524 handler(op, part)
525 525 finally:
526 526 if output is not None:
527 527 output = op.ui.popbuffer()
528 528 if output:
529 529 outpart = op.reply.newpart('output', data=output,
530 530 mandatory=False)
531 531 outpart.addparam(
532 532 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
533 533
534 534 def decodecaps(blob):
535 535 """decode a bundle2 caps bytes blob into a dictionary
536 536
537 537 The blob is a list of capabilities (one per line)
538 538 Capabilities may have values using a line of the form::
539 539
540 540 capability=value1,value2,value3
541 541
542 542 The values are always a list."""
543 543 caps = {}
544 544 for line in blob.splitlines():
545 545 if not line:
546 546 continue
547 547 if '=' not in line:
548 548 key, vals = line, ()
549 549 else:
550 550 key, vals = line.split('=', 1)
551 551 vals = vals.split(',')
552 552 key = urlreq.unquote(key)
553 553 vals = [urlreq.unquote(v) for v in vals]
554 554 caps[key] = vals
555 555 return caps
556 556
557 557 def encodecaps(caps):
558 558 """encode a bundle2 caps dictionary into a bytes blob"""
559 559 chunks = []
560 560 for ca in sorted(caps):
561 561 vals = caps[ca]
562 562 ca = urlreq.quote(ca)
563 563 vals = [urlreq.quote(v) for v in vals]
564 564 if vals:
565 565 ca = "%s=%s" % (ca, ','.join(vals))
566 566 chunks.append(ca)
567 567 return '\n'.join(chunks)
568 568
569 569 bundletypes = {
570 570 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
571 571 # since the unification ssh accepts a header but there
572 572 # is no capability signaling it.
573 573 "HG20": (), # special-cased below
574 574 "HG10UN": ("HG10UN", 'UN'),
575 575 "HG10BZ": ("HG10", 'BZ'),
576 576 "HG10GZ": ("HG10GZ", 'GZ'),
577 577 }
578 578
579 579 # hgweb uses this list to communicate its preferred type
580 580 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
581 581
582 582 class bundle20(object):
583 583 """represent an outgoing bundle2 container
584 584
585 585 Use the `addparam` method to add stream level parameter. and `newpart` to
586 586 populate it. Then call `getchunks` to retrieve all the binary chunks of
587 587 data that compose the bundle2 container."""
588 588
589 589 _magicstring = 'HG20'
590 590
591 591 def __init__(self, ui, capabilities=()):
592 592 self.ui = ui
593 593 self._params = []
594 594 self._parts = []
595 595 self.capabilities = dict(capabilities)
596 596 self._compengine = util.compengines.forbundletype('UN')
597 597 self._compopts = None
598 598
599 599 def setcompression(self, alg, compopts=None):
600 600 """setup core part compression to <alg>"""
601 601 if alg in (None, 'UN'):
602 602 return
603 603 assert not any(n.lower() == 'compression' for n, v in self._params)
604 604 self.addparam('Compression', alg)
605 605 self._compengine = util.compengines.forbundletype(alg)
606 606 self._compopts = compopts
607 607
608 608 @property
609 609 def nbparts(self):
610 610 """total number of parts added to the bundler"""
611 611 return len(self._parts)
612 612
613 613 # methods used to defines the bundle2 content
614 614 def addparam(self, name, value=None):
615 615 """add a stream level parameter"""
616 616 if not name:
617 617 raise ValueError(r'empty parameter name')
618 618 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
619 619 raise ValueError(r'non letter first character: %s' % name)
620 620 self._params.append((name, value))
621 621
622 622 def addpart(self, part):
623 623 """add a new part to the bundle2 container
624 624
625 625 Parts contains the actual applicative payload."""
626 626 assert part.id is None
627 627 part.id = len(self._parts) # very cheap counter
628 628 self._parts.append(part)
629 629
630 630 def newpart(self, typeid, *args, **kwargs):
631 631 """create a new part and add it to the containers
632 632
633 633 As the part is directly added to the containers. For now, this means
634 634 that any failure to properly initialize the part after calling
635 635 ``newpart`` should result in a failure of the whole bundling process.
636 636
637 637 You can still fall back to manually create and add if you need better
638 638 control."""
639 639 part = bundlepart(typeid, *args, **kwargs)
640 640 self.addpart(part)
641 641 return part
642 642
643 643 # methods used to generate the bundle2 stream
644 644 def getchunks(self):
645 645 if self.ui.debugflag:
646 646 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
647 647 if self._params:
648 648 msg.append(' (%i params)' % len(self._params))
649 649 msg.append(' %i parts total\n' % len(self._parts))
650 650 self.ui.debug(''.join(msg))
651 651 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
652 652 yield self._magicstring
653 653 param = self._paramchunk()
654 654 outdebug(self.ui, 'bundle parameter: %s' % param)
655 655 yield _pack(_fstreamparamsize, len(param))
656 656 if param:
657 657 yield param
658 658 for chunk in self._compengine.compressstream(self._getcorechunk(),
659 659 self._compopts):
660 660 yield chunk
661 661
662 662 def _paramchunk(self):
663 663 """return a encoded version of all stream parameters"""
664 664 blocks = []
665 665 for par, value in self._params:
666 666 par = urlreq.quote(par)
667 667 if value is not None:
668 668 value = urlreq.quote(value)
669 669 par = '%s=%s' % (par, value)
670 670 blocks.append(par)
671 671 return ' '.join(blocks)
672 672
673 673 def _getcorechunk(self):
674 674 """yield chunk for the core part of the bundle
675 675
676 676 (all but headers and parameters)"""
677 677 outdebug(self.ui, 'start of parts')
678 678 for part in self._parts:
679 679 outdebug(self.ui, 'bundle part: "%s"' % part.type)
680 680 for chunk in part.getchunks(ui=self.ui):
681 681 yield chunk
682 682 outdebug(self.ui, 'end of bundle')
683 683 yield _pack(_fpartheadersize, 0)
684 684
685 685
686 686 def salvageoutput(self):
687 687 """return a list with a copy of all output parts in the bundle
688 688
689 689 This is meant to be used during error handling to make sure we preserve
690 690 server output"""
691 691 salvaged = []
692 692 for part in self._parts:
693 693 if part.type.startswith('output'):
694 694 salvaged.append(part.copy())
695 695 return salvaged
696 696
697 697
698 698 class unpackermixin(object):
699 699 """A mixin to extract bytes and struct data from a stream"""
700 700
701 701 def __init__(self, fp):
702 702 self._fp = fp
703 703
704 704 def _unpack(self, format):
705 705 """unpack this struct format from the stream
706 706
707 707 This method is meant for internal usage by the bundle2 protocol only.
708 708 They directly manipulate the low level stream including bundle2 level
709 709 instruction.
710 710
711 711 Do not use it to implement higher-level logic or methods."""
712 712 data = self._readexact(struct.calcsize(format))
713 713 return _unpack(format, data)
714 714
715 715 def _readexact(self, size):
716 716 """read exactly <size> bytes from the stream
717 717
718 718 This method is meant for internal usage by the bundle2 protocol only.
719 719 They directly manipulate the low level stream including bundle2 level
720 720 instruction.
721 721
722 722 Do not use it to implement higher-level logic or methods."""
723 723 return changegroup.readexactly(self._fp, size)
724 724
725 725 def getunbundler(ui, fp, magicstring=None):
726 726 """return a valid unbundler object for a given magicstring"""
727 727 if magicstring is None:
728 728 magicstring = changegroup.readexactly(fp, 4)
729 729 magic, version = magicstring[0:2], magicstring[2:4]
730 730 if magic != 'HG':
731 731 ui.debug(
732 732 "error: invalid magic: %r (version %r), should be 'HG'\n"
733 733 % (magic, version))
734 734 raise error.Abort(_('not a Mercurial bundle'))
735 735 unbundlerclass = formatmap.get(version)
736 736 if unbundlerclass is None:
737 737 raise error.Abort(_('unknown bundle version %s') % version)
738 738 unbundler = unbundlerclass(ui, fp)
739 739 indebug(ui, 'start processing of %s stream' % magicstring)
740 740 return unbundler
741 741
742 742 class unbundle20(unpackermixin):
743 743 """interpret a bundle2 stream
744 744
745 745 This class is fed with a binary stream and yields parts through its
746 746 `iterparts` methods."""
747 747
748 748 _magicstring = 'HG20'
749 749
750 750 def __init__(self, ui, fp):
751 751 """If header is specified, we do not read it out of the stream."""
752 752 self.ui = ui
753 753 self._compengine = util.compengines.forbundletype('UN')
754 754 self._compressed = None
755 755 super(unbundle20, self).__init__(fp)
756 756
757 757 @util.propertycache
758 758 def params(self):
759 759 """dictionary of stream level parameters"""
760 760 indebug(self.ui, 'reading bundle2 stream parameters')
761 761 params = {}
762 762 paramssize = self._unpack(_fstreamparamsize)[0]
763 763 if paramssize < 0:
764 764 raise error.BundleValueError('negative bundle param size: %i'
765 765 % paramssize)
766 766 if paramssize:
767 767 params = self._readexact(paramssize)
768 768 params = self._processallparams(params)
769 769 return params
770 770
771 771 def _processallparams(self, paramsblock):
772 772 """"""
773 773 params = util.sortdict()
774 774 for p in paramsblock.split(' '):
775 775 p = p.split('=', 1)
776 776 p = [urlreq.unquote(i) for i in p]
777 777 if len(p) < 2:
778 778 p.append(None)
779 779 self._processparam(*p)
780 780 params[p[0]] = p[1]
781 781 return params
782 782
783 783
784 784 def _processparam(self, name, value):
785 785 """process a parameter, applying its effect if needed
786 786
787 787 Parameter starting with a lower case letter are advisory and will be
788 788 ignored when unknown. Those starting with an upper case letter are
789 789 mandatory and will this function will raise a KeyError when unknown.
790 790
791 791 Note: no option are currently supported. Any input will be either
792 792 ignored or failing.
793 793 """
794 794 if not name:
795 795 raise ValueError(r'empty parameter name')
796 796 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
797 797 raise ValueError(r'non letter first character: %s' % name)
798 798 try:
799 799 handler = b2streamparamsmap[name.lower()]
800 800 except KeyError:
801 801 if name[0:1].islower():
802 802 indebug(self.ui, "ignoring unknown parameter %s" % name)
803 803 else:
804 804 raise error.BundleUnknownFeatureError(params=(name,))
805 805 else:
806 806 handler(self, name, value)
807 807
808 808 def _forwardchunks(self):
809 809 """utility to transfer a bundle2 as binary
810 810
811 811 This is made necessary by the fact the 'getbundle' command over 'ssh'
812 812 have no way to know then the reply end, relying on the bundle to be
813 813 interpreted to know its end. This is terrible and we are sorry, but we
814 814 needed to move forward to get general delta enabled.
815 815 """
816 816 yield self._magicstring
817 817 assert 'params' not in vars(self)
818 818 paramssize = self._unpack(_fstreamparamsize)[0]
819 819 if paramssize < 0:
820 820 raise error.BundleValueError('negative bundle param size: %i'
821 821 % paramssize)
822 822 yield _pack(_fstreamparamsize, paramssize)
823 823 if paramssize:
824 824 params = self._readexact(paramssize)
825 825 self._processallparams(params)
826 826 yield params
827 827 assert self._compengine.bundletype == 'UN'
828 828 # From there, payload might need to be decompressed
829 829 self._fp = self._compengine.decompressorreader(self._fp)
830 830 emptycount = 0
831 831 while emptycount < 2:
832 832 # so we can brainlessly loop
833 833 assert _fpartheadersize == _fpayloadsize
834 834 size = self._unpack(_fpartheadersize)[0]
835 835 yield _pack(_fpartheadersize, size)
836 836 if size:
837 837 emptycount = 0
838 838 else:
839 839 emptycount += 1
840 840 continue
841 841 if size == flaginterrupt:
842 842 continue
843 843 elif size < 0:
844 844 raise error.BundleValueError('negative chunk size: %i')
845 845 yield self._readexact(size)
846 846
847 847
848 848 def iterparts(self):
849 849 """yield all parts contained in the stream"""
850 850 # make sure param have been loaded
851 851 self.params
852 852 # From there, payload need to be decompressed
853 853 self._fp = self._compengine.decompressorreader(self._fp)
854 854 indebug(self.ui, 'start extraction of bundle2 parts')
855 855 headerblock = self._readpartheader()
856 856 while headerblock is not None:
857 857 part = seekableunbundlepart(self.ui, headerblock, self._fp)
858 858 yield part
859 # Seek to the end of the part to force it's consumption so the next
860 # part can be read. But then seek back to the beginning so the
861 # code consuming this generator has a part that starts at 0.
862 part.seek(0, os.SEEK_END)
859 # Ensure part is fully consumed so we can start reading the next
860 # part.
861 part.consume()
862 # But then seek back to the beginning so the code consuming this
863 # generator has a part that starts at 0.
863 864 part.seek(0, os.SEEK_SET)
864 865 headerblock = self._readpartheader()
865 866 indebug(self.ui, 'end of bundle2 stream')
866 867
867 868 def _readpartheader(self):
868 869 """reads a part header size and return the bytes blob
869 870
870 871 returns None if empty"""
871 872 headersize = self._unpack(_fpartheadersize)[0]
872 873 if headersize < 0:
873 874 raise error.BundleValueError('negative part header size: %i'
874 875 % headersize)
875 876 indebug(self.ui, 'part header size: %i' % headersize)
876 877 if headersize:
877 878 return self._readexact(headersize)
878 879 return None
879 880
880 881 def compressed(self):
881 882 self.params # load params
882 883 return self._compressed
883 884
884 885 def close(self):
885 886 """close underlying file"""
886 887 if util.safehasattr(self._fp, 'close'):
887 888 return self._fp.close()
888 889
889 890 formatmap = {'20': unbundle20}
890 891
891 892 b2streamparamsmap = {}
892 893
893 894 def b2streamparamhandler(name):
894 895 """register a handler for a stream level parameter"""
895 896 def decorator(func):
896 897 assert name not in formatmap
897 898 b2streamparamsmap[name] = func
898 899 return func
899 900 return decorator
900 901
901 902 @b2streamparamhandler('compression')
902 903 def processcompression(unbundler, param, value):
903 904 """read compression parameter and install payload decompression"""
904 905 if value not in util.compengines.supportedbundletypes:
905 906 raise error.BundleUnknownFeatureError(params=(param,),
906 907 values=(value,))
907 908 unbundler._compengine = util.compengines.forbundletype(value)
908 909 if value is not None:
909 910 unbundler._compressed = True
910 911
911 912 class bundlepart(object):
912 913 """A bundle2 part contains application level payload
913 914
914 915 The part `type` is used to route the part to the application level
915 916 handler.
916 917
917 918 The part payload is contained in ``part.data``. It could be raw bytes or a
918 919 generator of byte chunks.
919 920
920 921 You can add parameters to the part using the ``addparam`` method.
921 922 Parameters can be either mandatory (default) or advisory. Remote side
922 923 should be able to safely ignore the advisory ones.
923 924
924 925 Both data and parameters cannot be modified after the generation has begun.
925 926 """
926 927
927 928 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
928 929 data='', mandatory=True):
929 930 validateparttype(parttype)
930 931 self.id = None
931 932 self.type = parttype
932 933 self._data = data
933 934 self._mandatoryparams = list(mandatoryparams)
934 935 self._advisoryparams = list(advisoryparams)
935 936 # checking for duplicated entries
936 937 self._seenparams = set()
937 938 for pname, __ in self._mandatoryparams + self._advisoryparams:
938 939 if pname in self._seenparams:
939 940 raise error.ProgrammingError('duplicated params: %s' % pname)
940 941 self._seenparams.add(pname)
941 942 # status of the part's generation:
942 943 # - None: not started,
943 944 # - False: currently generated,
944 945 # - True: generation done.
945 946 self._generated = None
946 947 self.mandatory = mandatory
947 948
948 949 def __repr__(self):
949 950 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
950 951 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
951 952 % (cls, id(self), self.id, self.type, self.mandatory))
952 953
953 954 def copy(self):
954 955 """return a copy of the part
955 956
956 957 The new part have the very same content but no partid assigned yet.
957 958 Parts with generated data cannot be copied."""
958 959 assert not util.safehasattr(self.data, 'next')
959 960 return self.__class__(self.type, self._mandatoryparams,
960 961 self._advisoryparams, self._data, self.mandatory)
961 962
962 963 # methods used to defines the part content
963 964 @property
964 965 def data(self):
965 966 return self._data
966 967
967 968 @data.setter
968 969 def data(self, data):
969 970 if self._generated is not None:
970 971 raise error.ReadOnlyPartError('part is being generated')
971 972 self._data = data
972 973
973 974 @property
974 975 def mandatoryparams(self):
975 976 # make it an immutable tuple to force people through ``addparam``
976 977 return tuple(self._mandatoryparams)
977 978
978 979 @property
979 980 def advisoryparams(self):
980 981 # make it an immutable tuple to force people through ``addparam``
981 982 return tuple(self._advisoryparams)
982 983
983 984 def addparam(self, name, value='', mandatory=True):
984 985 """add a parameter to the part
985 986
986 987 If 'mandatory' is set to True, the remote handler must claim support
987 988 for this parameter or the unbundling will be aborted.
988 989
989 990 The 'name' and 'value' cannot exceed 255 bytes each.
990 991 """
991 992 if self._generated is not None:
992 993 raise error.ReadOnlyPartError('part is being generated')
993 994 if name in self._seenparams:
994 995 raise ValueError('duplicated params: %s' % name)
995 996 self._seenparams.add(name)
996 997 params = self._advisoryparams
997 998 if mandatory:
998 999 params = self._mandatoryparams
999 1000 params.append((name, value))
1000 1001
1001 1002 # methods used to generates the bundle2 stream
1002 1003 def getchunks(self, ui):
1003 1004 if self._generated is not None:
1004 1005 raise error.ProgrammingError('part can only be consumed once')
1005 1006 self._generated = False
1006 1007
1007 1008 if ui.debugflag:
1008 1009 msg = ['bundle2-output-part: "%s"' % self.type]
1009 1010 if not self.mandatory:
1010 1011 msg.append(' (advisory)')
1011 1012 nbmp = len(self.mandatoryparams)
1012 1013 nbap = len(self.advisoryparams)
1013 1014 if nbmp or nbap:
1014 1015 msg.append(' (params:')
1015 1016 if nbmp:
1016 1017 msg.append(' %i mandatory' % nbmp)
1017 1018 if nbap:
1018 1019 msg.append(' %i advisory' % nbmp)
1019 1020 msg.append(')')
1020 1021 if not self.data:
1021 1022 msg.append(' empty payload')
1022 1023 elif (util.safehasattr(self.data, 'next')
1023 1024 or util.safehasattr(self.data, '__next__')):
1024 1025 msg.append(' streamed payload')
1025 1026 else:
1026 1027 msg.append(' %i bytes payload' % len(self.data))
1027 1028 msg.append('\n')
1028 1029 ui.debug(''.join(msg))
1029 1030
1030 1031 #### header
1031 1032 if self.mandatory:
1032 1033 parttype = self.type.upper()
1033 1034 else:
1034 1035 parttype = self.type.lower()
1035 1036 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1036 1037 ## parttype
1037 1038 header = [_pack(_fparttypesize, len(parttype)),
1038 1039 parttype, _pack(_fpartid, self.id),
1039 1040 ]
1040 1041 ## parameters
1041 1042 # count
1042 1043 manpar = self.mandatoryparams
1043 1044 advpar = self.advisoryparams
1044 1045 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1045 1046 # size
1046 1047 parsizes = []
1047 1048 for key, value in manpar:
1048 1049 parsizes.append(len(key))
1049 1050 parsizes.append(len(value))
1050 1051 for key, value in advpar:
1051 1052 parsizes.append(len(key))
1052 1053 parsizes.append(len(value))
1053 1054 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1054 1055 header.append(paramsizes)
1055 1056 # key, value
1056 1057 for key, value in manpar:
1057 1058 header.append(key)
1058 1059 header.append(value)
1059 1060 for key, value in advpar:
1060 1061 header.append(key)
1061 1062 header.append(value)
1062 1063 ## finalize header
1063 1064 try:
1064 1065 headerchunk = ''.join(header)
1065 1066 except TypeError:
1066 1067 raise TypeError(r'Found a non-bytes trying to '
1067 1068 r'build bundle part header: %r' % header)
1068 1069 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1069 1070 yield _pack(_fpartheadersize, len(headerchunk))
1070 1071 yield headerchunk
1071 1072 ## payload
1072 1073 try:
1073 1074 for chunk in self._payloadchunks():
1074 1075 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1075 1076 yield _pack(_fpayloadsize, len(chunk))
1076 1077 yield chunk
1077 1078 except GeneratorExit:
1078 1079 # GeneratorExit means that nobody is listening for our
1079 1080 # results anyway, so just bail quickly rather than trying
1080 1081 # to produce an error part.
1081 1082 ui.debug('bundle2-generatorexit\n')
1082 1083 raise
1083 1084 except BaseException as exc:
1084 1085 bexc = util.forcebytestr(exc)
1085 1086 # backup exception data for later
1086 1087 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1087 1088 % bexc)
1088 1089 tb = sys.exc_info()[2]
1089 1090 msg = 'unexpected error: %s' % bexc
1090 1091 interpart = bundlepart('error:abort', [('message', msg)],
1091 1092 mandatory=False)
1092 1093 interpart.id = 0
1093 1094 yield _pack(_fpayloadsize, -1)
1094 1095 for chunk in interpart.getchunks(ui=ui):
1095 1096 yield chunk
1096 1097 outdebug(ui, 'closing payload chunk')
1097 1098 # abort current part payload
1098 1099 yield _pack(_fpayloadsize, 0)
1099 1100 pycompat.raisewithtb(exc, tb)
1100 1101 # end of payload
1101 1102 outdebug(ui, 'closing payload chunk')
1102 1103 yield _pack(_fpayloadsize, 0)
1103 1104 self._generated = True
1104 1105
1105 1106 def _payloadchunks(self):
1106 1107 """yield chunks of a the part payload
1107 1108
1108 1109 Exists to handle the different methods to provide data to a part."""
1109 1110 # we only support fixed size data now.
1110 1111 # This will be improved in the future.
1111 1112 if (util.safehasattr(self.data, 'next')
1112 1113 or util.safehasattr(self.data, '__next__')):
1113 1114 buff = util.chunkbuffer(self.data)
1114 1115 chunk = buff.read(preferedchunksize)
1115 1116 while chunk:
1116 1117 yield chunk
1117 1118 chunk = buff.read(preferedchunksize)
1118 1119 elif len(self.data):
1119 1120 yield self.data
1120 1121
1121 1122
1122 1123 flaginterrupt = -1
1123 1124
1124 1125 class interrupthandler(unpackermixin):
1125 1126 """read one part and process it with restricted capability
1126 1127
1127 1128 This allows to transmit exception raised on the producer size during part
1128 1129 iteration while the consumer is reading a part.
1129 1130
1130 1131 Part processed in this manner only have access to a ui object,"""
1131 1132
1132 1133 def __init__(self, ui, fp):
1133 1134 super(interrupthandler, self).__init__(fp)
1134 1135 self.ui = ui
1135 1136
1136 1137 def _readpartheader(self):
1137 1138 """reads a part header size and return the bytes blob
1138 1139
1139 1140 returns None if empty"""
1140 1141 headersize = self._unpack(_fpartheadersize)[0]
1141 1142 if headersize < 0:
1142 1143 raise error.BundleValueError('negative part header size: %i'
1143 1144 % headersize)
1144 1145 indebug(self.ui, 'part header size: %i\n' % headersize)
1145 1146 if headersize:
1146 1147 return self._readexact(headersize)
1147 1148 return None
1148 1149
1149 1150 def __call__(self):
1150 1151
1151 1152 self.ui.debug('bundle2-input-stream-interrupt:'
1152 1153 ' opening out of band context\n')
1153 1154 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1154 1155 headerblock = self._readpartheader()
1155 1156 if headerblock is None:
1156 1157 indebug(self.ui, 'no part found during interruption.')
1157 1158 return
1158 1159 part = seekableunbundlepart(self.ui, headerblock, self._fp)
1159 1160 op = interruptoperation(self.ui)
1160 1161 hardabort = False
1161 1162 try:
1162 1163 _processpart(op, part)
1163 1164 except (SystemExit, KeyboardInterrupt):
1164 1165 hardabort = True
1165 1166 raise
1166 1167 finally:
1167 1168 if not hardabort:
1168 part.seek(0, os.SEEK_END)
1169 part.consume()
1169 1170 self.ui.debug('bundle2-input-stream-interrupt:'
1170 1171 ' closing out of band context\n')
1171 1172
1172 1173 class interruptoperation(object):
1173 1174 """A limited operation to be use by part handler during interruption
1174 1175
1175 1176 It only have access to an ui object.
1176 1177 """
1177 1178
1178 1179 def __init__(self, ui):
1179 1180 self.ui = ui
1180 1181 self.reply = None
1181 1182 self.captureoutput = False
1182 1183
1183 1184 @property
1184 1185 def repo(self):
1185 1186 raise error.ProgrammingError('no repo access from stream interruption')
1186 1187
1187 1188 def gettransaction(self):
1188 1189 raise TransactionUnavailable('no repo access from stream interruption')
1189 1190
1190 1191 def decodepayloadchunks(ui, fh):
1191 1192 """Reads bundle2 part payload data into chunks.
1192 1193
1193 1194 Part payload data consists of framed chunks. This function takes
1194 1195 a file handle and emits those chunks.
1195 1196 """
1196 1197 headersize = struct.calcsize(_fpayloadsize)
1197 1198 readexactly = changegroup.readexactly
1198 1199
1199 1200 chunksize = _unpack(_fpayloadsize, readexactly(fh, headersize))[0]
1200 1201 indebug(ui, 'payload chunk size: %i' % chunksize)
1201 1202
1202 1203 while chunksize:
1203 1204 if chunksize >= 0:
1204 1205 yield readexactly(fh, chunksize)
1205 1206 elif chunksize == flaginterrupt:
1206 1207 # Interrupt "signal" detected. The regular stream is interrupted
1207 1208 # and a bundle2 part follows. Consume it.
1208 1209 interrupthandler(ui, fh)()
1209 1210 else:
1210 1211 raise error.BundleValueError(
1211 1212 'negative payload chunk size: %s' % chunksize)
1212 1213
1213 1214 chunksize = _unpack(_fpayloadsize, readexactly(fh, headersize))[0]
1214 1215 indebug(ui, 'payload chunk size: %i' % chunksize)
1215 1216
1216 1217 class unbundlepart(unpackermixin):
1217 1218 """a bundle part read from a bundle"""
1218 1219
1219 1220 def __init__(self, ui, header, fp):
1220 1221 super(unbundlepart, self).__init__(fp)
1221 1222 self._seekable = (util.safehasattr(fp, 'seek') and
1222 1223 util.safehasattr(fp, 'tell'))
1223 1224 self.ui = ui
1224 1225 # unbundle state attr
1225 1226 self._headerdata = header
1226 1227 self._headeroffset = 0
1227 1228 self._initialized = False
1228 1229 self.consumed = False
1229 1230 # part data
1230 1231 self.id = None
1231 1232 self.type = None
1232 1233 self.mandatoryparams = None
1233 1234 self.advisoryparams = None
1234 1235 self.params = None
1235 1236 self.mandatorykeys = ()
1236 1237 self._readheader()
1237 1238 self._mandatory = None
1238 1239 self._pos = 0
1239 1240
1240 1241 def _fromheader(self, size):
1241 1242 """return the next <size> byte from the header"""
1242 1243 offset = self._headeroffset
1243 1244 data = self._headerdata[offset:(offset + size)]
1244 1245 self._headeroffset = offset + size
1245 1246 return data
1246 1247
1247 1248 def _unpackheader(self, format):
1248 1249 """read given format from header
1249 1250
1250 1251 This automatically compute the size of the format to read."""
1251 1252 data = self._fromheader(struct.calcsize(format))
1252 1253 return _unpack(format, data)
1253 1254
1254 1255 def _initparams(self, mandatoryparams, advisoryparams):
1255 1256 """internal function to setup all logic related parameters"""
1256 1257 # make it read only to prevent people touching it by mistake.
1257 1258 self.mandatoryparams = tuple(mandatoryparams)
1258 1259 self.advisoryparams = tuple(advisoryparams)
1259 1260 # user friendly UI
1260 1261 self.params = util.sortdict(self.mandatoryparams)
1261 1262 self.params.update(self.advisoryparams)
1262 1263 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1263 1264
1264 1265 def _readheader(self):
1265 1266 """read the header and setup the object"""
1266 1267 typesize = self._unpackheader(_fparttypesize)[0]
1267 1268 self.type = self._fromheader(typesize)
1268 1269 indebug(self.ui, 'part type: "%s"' % self.type)
1269 1270 self.id = self._unpackheader(_fpartid)[0]
1270 1271 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1271 1272 # extract mandatory bit from type
1272 1273 self.mandatory = (self.type != self.type.lower())
1273 1274 self.type = self.type.lower()
1274 1275 ## reading parameters
1275 1276 # param count
1276 1277 mancount, advcount = self._unpackheader(_fpartparamcount)
1277 1278 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1278 1279 # param size
1279 1280 fparamsizes = _makefpartparamsizes(mancount + advcount)
1280 1281 paramsizes = self._unpackheader(fparamsizes)
1281 1282 # make it a list of couple again
1282 1283 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1283 1284 # split mandatory from advisory
1284 1285 mansizes = paramsizes[:mancount]
1285 1286 advsizes = paramsizes[mancount:]
1286 1287 # retrieve param value
1287 1288 manparams = []
1288 1289 for key, value in mansizes:
1289 1290 manparams.append((self._fromheader(key), self._fromheader(value)))
1290 1291 advparams = []
1291 1292 for key, value in advsizes:
1292 1293 advparams.append((self._fromheader(key), self._fromheader(value)))
1293 1294 self._initparams(manparams, advparams)
1294 1295 ## part payload
1295 1296 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1296 1297 # we read the data, tell it
1297 1298 self._initialized = True
1298 1299
1299 1300 def _payloadchunks(self):
1300 1301 """Generator of decoded chunks in the payload."""
1301 1302 return decodepayloadchunks(self.ui, self._fp)
1302 1303
1304 def consume(self):
1305 """Read the part payload until completion.
1306
1307 By consuming the part data, the underlying stream read offset will
1308 be advanced to the next part (or end of stream).
1309 """
1310 if self.consumed:
1311 return
1312
1313 chunk = self.read(32768)
1314 while chunk:
1315 self._pos += len(chunk)
1316 chunk = self.read(32768)
1317
1303 1318 def read(self, size=None):
1304 1319 """read payload data"""
1305 1320 if not self._initialized:
1306 1321 self._readheader()
1307 1322 if size is None:
1308 1323 data = self._payloadstream.read()
1309 1324 else:
1310 1325 data = self._payloadstream.read(size)
1311 1326 self._pos += len(data)
1312 1327 if size is None or len(data) < size:
1313 1328 if not self.consumed and self._pos:
1314 1329 self.ui.debug('bundle2-input-part: total payload size %i\n'
1315 1330 % self._pos)
1316 1331 self.consumed = True
1317 1332 return data
1318 1333
1319 1334 class seekableunbundlepart(unbundlepart):
1320 1335 """A bundle2 part in a bundle that is seekable.
1321 1336
1322 1337 Regular ``unbundlepart`` instances can only be read once. This class
1323 1338 extends ``unbundlepart`` to enable bi-directional seeking within the
1324 1339 part.
1325 1340
1326 1341 Bundle2 part data consists of framed chunks. Offsets when seeking
1327 1342 refer to the decoded data, not the offsets in the underlying bundle2
1328 1343 stream.
1329 1344
1330 1345 To facilitate quickly seeking within the decoded data, instances of this
1331 1346 class maintain a mapping between offsets in the underlying stream and
1332 1347 the decoded payload. This mapping will consume memory in proportion
1333 1348 to the number of chunks within the payload (which almost certainly
1334 1349 increases in proportion with the size of the part).
1335 1350 """
1336 1351 def __init__(self, ui, header, fp):
1337 1352 # (payload, file) offsets for chunk starts.
1338 1353 self._chunkindex = []
1339 1354
1340 1355 super(seekableunbundlepart, self).__init__(ui, header, fp)
1341 1356
1342 1357 def _payloadchunks(self, chunknum=0):
1343 1358 '''seek to specified chunk and start yielding data'''
1344 1359 if len(self._chunkindex) == 0:
1345 1360 assert chunknum == 0, 'Must start with chunk 0'
1346 1361 self._chunkindex.append((0, self._tellfp()))
1347 1362 else:
1348 1363 assert chunknum < len(self._chunkindex), \
1349 1364 'Unknown chunk %d' % chunknum
1350 1365 self._seekfp(self._chunkindex[chunknum][1])
1351 1366
1352 1367 pos = self._chunkindex[chunknum][0]
1353 1368
1354 1369 for chunk in decodepayloadchunks(self.ui, self._fp):
1355 1370 chunknum += 1
1356 1371 pos += len(chunk)
1357 1372 if chunknum == len(self._chunkindex):
1358 1373 self._chunkindex.append((pos, self._tellfp()))
1359 1374
1360 1375 yield chunk
1361 1376
1362 1377 def _findchunk(self, pos):
1363 1378 '''for a given payload position, return a chunk number and offset'''
1364 1379 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1365 1380 if ppos == pos:
1366 1381 return chunk, 0
1367 1382 elif ppos > pos:
1368 1383 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1369 1384 raise ValueError('Unknown chunk')
1370 1385
1371 1386 def tell(self):
1372 1387 return self._pos
1373 1388
1374 1389 def seek(self, offset, whence=os.SEEK_SET):
1375 1390 if whence == os.SEEK_SET:
1376 1391 newpos = offset
1377 1392 elif whence == os.SEEK_CUR:
1378 1393 newpos = self._pos + offset
1379 1394 elif whence == os.SEEK_END:
1380 1395 if not self.consumed:
1381 1396 self.read()
1382 1397 newpos = self._chunkindex[-1][0] - offset
1383 1398 else:
1384 1399 raise ValueError('Unknown whence value: %r' % (whence,))
1385 1400
1386 1401 if newpos > self._chunkindex[-1][0] and not self.consumed:
1387 1402 self.read()
1388 1403 if not 0 <= newpos <= self._chunkindex[-1][0]:
1389 1404 raise ValueError('Offset out of range')
1390 1405
1391 1406 if self._pos != newpos:
1392 1407 chunk, internaloffset = self._findchunk(newpos)
1393 1408 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1394 1409 adjust = self.read(internaloffset)
1395 1410 if len(adjust) != internaloffset:
1396 1411 raise error.Abort(_('Seek failed\n'))
1397 1412 self._pos = newpos
1398 1413
1399 1414 def _seekfp(self, offset, whence=0):
1400 1415 """move the underlying file pointer
1401 1416
1402 1417 This method is meant for internal usage by the bundle2 protocol only.
1403 1418 They directly manipulate the low level stream including bundle2 level
1404 1419 instruction.
1405 1420
1406 1421 Do not use it to implement higher-level logic or methods."""
1407 1422 if self._seekable:
1408 1423 return self._fp.seek(offset, whence)
1409 1424 else:
1410 1425 raise NotImplementedError(_('File pointer is not seekable'))
1411 1426
1412 1427 def _tellfp(self):
1413 1428 """return the file offset, or None if file is not seekable
1414 1429
1415 1430 This method is meant for internal usage by the bundle2 protocol only.
1416 1431 They directly manipulate the low level stream including bundle2 level
1417 1432 instruction.
1418 1433
1419 1434 Do not use it to implement higher-level logic or methods."""
1420 1435 if self._seekable:
1421 1436 try:
1422 1437 return self._fp.tell()
1423 1438 except IOError as e:
1424 1439 if e.errno == errno.ESPIPE:
1425 1440 self._seekable = False
1426 1441 else:
1427 1442 raise
1428 1443 return None
1429 1444
1430 1445 # These are only the static capabilities.
1431 1446 # Check the 'getrepocaps' function for the rest.
1432 1447 capabilities = {'HG20': (),
1433 1448 'error': ('abort', 'unsupportedcontent', 'pushraced',
1434 1449 'pushkey'),
1435 1450 'listkeys': (),
1436 1451 'pushkey': (),
1437 1452 'digests': tuple(sorted(util.DIGESTS.keys())),
1438 1453 'remote-changegroup': ('http', 'https'),
1439 1454 'hgtagsfnodes': (),
1440 1455 'phases': ('heads',),
1441 1456 }
1442 1457
1443 1458 def getrepocaps(repo, allowpushback=False):
1444 1459 """return the bundle2 capabilities for a given repo
1445 1460
1446 1461 Exists to allow extensions (like evolution) to mutate the capabilities.
1447 1462 """
1448 1463 caps = capabilities.copy()
1449 1464 caps['changegroup'] = tuple(sorted(
1450 1465 changegroup.supportedincomingversions(repo)))
1451 1466 if obsolete.isenabled(repo, obsolete.exchangeopt):
1452 1467 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1453 1468 caps['obsmarkers'] = supportedformat
1454 1469 if allowpushback:
1455 1470 caps['pushback'] = ()
1456 1471 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1457 1472 if cpmode == 'check-related':
1458 1473 caps['checkheads'] = ('related',)
1459 1474 if 'phases' in repo.ui.configlist('devel', 'legacy.exchange'):
1460 1475 caps.pop('phases')
1461 1476 return caps
1462 1477
1463 1478 def bundle2caps(remote):
1464 1479 """return the bundle capabilities of a peer as dict"""
1465 1480 raw = remote.capable('bundle2')
1466 1481 if not raw and raw != '':
1467 1482 return {}
1468 1483 capsblob = urlreq.unquote(remote.capable('bundle2'))
1469 1484 return decodecaps(capsblob)
1470 1485
1471 1486 def obsmarkersversion(caps):
1472 1487 """extract the list of supported obsmarkers versions from a bundle2caps dict
1473 1488 """
1474 1489 obscaps = caps.get('obsmarkers', ())
1475 1490 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1476 1491
1477 1492 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1478 1493 vfs=None, compression=None, compopts=None):
1479 1494 if bundletype.startswith('HG10'):
1480 1495 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1481 1496 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1482 1497 compression=compression, compopts=compopts)
1483 1498 elif not bundletype.startswith('HG20'):
1484 1499 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1485 1500
1486 1501 caps = {}
1487 1502 if 'obsolescence' in opts:
1488 1503 caps['obsmarkers'] = ('V1',)
1489 1504 bundle = bundle20(ui, caps)
1490 1505 bundle.setcompression(compression, compopts)
1491 1506 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1492 1507 chunkiter = bundle.getchunks()
1493 1508
1494 1509 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1495 1510
1496 1511 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1497 1512 # We should eventually reconcile this logic with the one behind
1498 1513 # 'exchange.getbundle2partsgenerator'.
1499 1514 #
1500 1515 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1501 1516 # different right now. So we keep them separated for now for the sake of
1502 1517 # simplicity.
1503 1518
1504 1519 # we always want a changegroup in such bundle
1505 1520 cgversion = opts.get('cg.version')
1506 1521 if cgversion is None:
1507 1522 cgversion = changegroup.safeversion(repo)
1508 1523 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1509 1524 part = bundler.newpart('changegroup', data=cg.getchunks())
1510 1525 part.addparam('version', cg.version)
1511 1526 if 'clcount' in cg.extras:
1512 1527 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1513 1528 mandatory=False)
1514 1529 if opts.get('phases') and repo.revs('%ln and secret()',
1515 1530 outgoing.missingheads):
1516 1531 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1517 1532
1518 1533 addparttagsfnodescache(repo, bundler, outgoing)
1519 1534
1520 1535 if opts.get('obsolescence', False):
1521 1536 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1522 1537 buildobsmarkerspart(bundler, obsmarkers)
1523 1538
1524 1539 if opts.get('phases', False):
1525 1540 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1526 1541 phasedata = phases.binaryencode(headsbyphase)
1527 1542 bundler.newpart('phase-heads', data=phasedata)
1528 1543
1529 1544 def addparttagsfnodescache(repo, bundler, outgoing):
1530 1545 # we include the tags fnode cache for the bundle changeset
1531 1546 # (as an optional parts)
1532 1547 cache = tags.hgtagsfnodescache(repo.unfiltered())
1533 1548 chunks = []
1534 1549
1535 1550 # .hgtags fnodes are only relevant for head changesets. While we could
1536 1551 # transfer values for all known nodes, there will likely be little to
1537 1552 # no benefit.
1538 1553 #
1539 1554 # We don't bother using a generator to produce output data because
1540 1555 # a) we only have 40 bytes per head and even esoteric numbers of heads
1541 1556 # consume little memory (1M heads is 40MB) b) we don't want to send the
1542 1557 # part if we don't have entries and knowing if we have entries requires
1543 1558 # cache lookups.
1544 1559 for node in outgoing.missingheads:
1545 1560 # Don't compute missing, as this may slow down serving.
1546 1561 fnode = cache.getfnode(node, computemissing=False)
1547 1562 if fnode is not None:
1548 1563 chunks.extend([node, fnode])
1549 1564
1550 1565 if chunks:
1551 1566 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1552 1567
1553 1568 def buildobsmarkerspart(bundler, markers):
1554 1569 """add an obsmarker part to the bundler with <markers>
1555 1570
1556 1571 No part is created if markers is empty.
1557 1572 Raises ValueError if the bundler doesn't support any known obsmarker format.
1558 1573 """
1559 1574 if not markers:
1560 1575 return None
1561 1576
1562 1577 remoteversions = obsmarkersversion(bundler.capabilities)
1563 1578 version = obsolete.commonversion(remoteversions)
1564 1579 if version is None:
1565 1580 raise ValueError('bundler does not support common obsmarker format')
1566 1581 stream = obsolete.encodemarkers(markers, True, version=version)
1567 1582 return bundler.newpart('obsmarkers', data=stream)
1568 1583
1569 1584 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1570 1585 compopts=None):
1571 1586 """Write a bundle file and return its filename.
1572 1587
1573 1588 Existing files will not be overwritten.
1574 1589 If no filename is specified, a temporary file is created.
1575 1590 bz2 compression can be turned off.
1576 1591 The bundle file will be deleted in case of errors.
1577 1592 """
1578 1593
1579 1594 if bundletype == "HG20":
1580 1595 bundle = bundle20(ui)
1581 1596 bundle.setcompression(compression, compopts)
1582 1597 part = bundle.newpart('changegroup', data=cg.getchunks())
1583 1598 part.addparam('version', cg.version)
1584 1599 if 'clcount' in cg.extras:
1585 1600 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1586 1601 mandatory=False)
1587 1602 chunkiter = bundle.getchunks()
1588 1603 else:
1589 1604 # compression argument is only for the bundle2 case
1590 1605 assert compression is None
1591 1606 if cg.version != '01':
1592 1607 raise error.Abort(_('old bundle types only supports v1 '
1593 1608 'changegroups'))
1594 1609 header, comp = bundletypes[bundletype]
1595 1610 if comp not in util.compengines.supportedbundletypes:
1596 1611 raise error.Abort(_('unknown stream compression type: %s')
1597 1612 % comp)
1598 1613 compengine = util.compengines.forbundletype(comp)
1599 1614 def chunkiter():
1600 1615 yield header
1601 1616 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1602 1617 yield chunk
1603 1618 chunkiter = chunkiter()
1604 1619
1605 1620 # parse the changegroup data, otherwise we will block
1606 1621 # in case of sshrepo because we don't know the end of the stream
1607 1622 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1608 1623
1609 1624 def combinechangegroupresults(op):
1610 1625 """logic to combine 0 or more addchangegroup results into one"""
1611 1626 results = [r.get('return', 0)
1612 1627 for r in op.records['changegroup']]
1613 1628 changedheads = 0
1614 1629 result = 1
1615 1630 for ret in results:
1616 1631 # If any changegroup result is 0, return 0
1617 1632 if ret == 0:
1618 1633 result = 0
1619 1634 break
1620 1635 if ret < -1:
1621 1636 changedheads += ret + 1
1622 1637 elif ret > 1:
1623 1638 changedheads += ret - 1
1624 1639 if changedheads > 0:
1625 1640 result = 1 + changedheads
1626 1641 elif changedheads < 0:
1627 1642 result = -1 + changedheads
1628 1643 return result
1629 1644
1630 1645 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1631 1646 'targetphase'))
1632 1647 def handlechangegroup(op, inpart):
1633 1648 """apply a changegroup part on the repo
1634 1649
1635 1650 This is a very early implementation that will massive rework before being
1636 1651 inflicted to any end-user.
1637 1652 """
1638 1653 tr = op.gettransaction()
1639 1654 unpackerversion = inpart.params.get('version', '01')
1640 1655 # We should raise an appropriate exception here
1641 1656 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1642 1657 # the source and url passed here are overwritten by the one contained in
1643 1658 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1644 1659 nbchangesets = None
1645 1660 if 'nbchanges' in inpart.params:
1646 1661 nbchangesets = int(inpart.params.get('nbchanges'))
1647 1662 if ('treemanifest' in inpart.params and
1648 1663 'treemanifest' not in op.repo.requirements):
1649 1664 if len(op.repo.changelog) != 0:
1650 1665 raise error.Abort(_(
1651 1666 "bundle contains tree manifests, but local repo is "
1652 1667 "non-empty and does not use tree manifests"))
1653 1668 op.repo.requirements.add('treemanifest')
1654 1669 op.repo._applyopenerreqs()
1655 1670 op.repo._writerequirements()
1656 1671 extrakwargs = {}
1657 1672 targetphase = inpart.params.get('targetphase')
1658 1673 if targetphase is not None:
1659 1674 extrakwargs['targetphase'] = int(targetphase)
1660 1675 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1661 1676 expectedtotal=nbchangesets, **extrakwargs)
1662 1677 if op.reply is not None:
1663 1678 # This is definitely not the final form of this
1664 1679 # return. But one need to start somewhere.
1665 1680 part = op.reply.newpart('reply:changegroup', mandatory=False)
1666 1681 part.addparam(
1667 1682 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1668 1683 part.addparam('return', '%i' % ret, mandatory=False)
1669 1684 assert not inpart.read()
1670 1685
1671 1686 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1672 1687 ['digest:%s' % k for k in util.DIGESTS.keys()])
1673 1688 @parthandler('remote-changegroup', _remotechangegroupparams)
1674 1689 def handleremotechangegroup(op, inpart):
1675 1690 """apply a bundle10 on the repo, given an url and validation information
1676 1691
1677 1692 All the information about the remote bundle to import are given as
1678 1693 parameters. The parameters include:
1679 1694 - url: the url to the bundle10.
1680 1695 - size: the bundle10 file size. It is used to validate what was
1681 1696 retrieved by the client matches the server knowledge about the bundle.
1682 1697 - digests: a space separated list of the digest types provided as
1683 1698 parameters.
1684 1699 - digest:<digest-type>: the hexadecimal representation of the digest with
1685 1700 that name. Like the size, it is used to validate what was retrieved by
1686 1701 the client matches what the server knows about the bundle.
1687 1702
1688 1703 When multiple digest types are given, all of them are checked.
1689 1704 """
1690 1705 try:
1691 1706 raw_url = inpart.params['url']
1692 1707 except KeyError:
1693 1708 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1694 1709 parsed_url = util.url(raw_url)
1695 1710 if parsed_url.scheme not in capabilities['remote-changegroup']:
1696 1711 raise error.Abort(_('remote-changegroup does not support %s urls') %
1697 1712 parsed_url.scheme)
1698 1713
1699 1714 try:
1700 1715 size = int(inpart.params['size'])
1701 1716 except ValueError:
1702 1717 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1703 1718 % 'size')
1704 1719 except KeyError:
1705 1720 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1706 1721
1707 1722 digests = {}
1708 1723 for typ in inpart.params.get('digests', '').split():
1709 1724 param = 'digest:%s' % typ
1710 1725 try:
1711 1726 value = inpart.params[param]
1712 1727 except KeyError:
1713 1728 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1714 1729 param)
1715 1730 digests[typ] = value
1716 1731
1717 1732 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1718 1733
1719 1734 tr = op.gettransaction()
1720 1735 from . import exchange
1721 1736 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1722 1737 if not isinstance(cg, changegroup.cg1unpacker):
1723 1738 raise error.Abort(_('%s: not a bundle version 1.0') %
1724 1739 util.hidepassword(raw_url))
1725 1740 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1726 1741 if op.reply is not None:
1727 1742 # This is definitely not the final form of this
1728 1743 # return. But one need to start somewhere.
1729 1744 part = op.reply.newpart('reply:changegroup')
1730 1745 part.addparam(
1731 1746 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1732 1747 part.addparam('return', '%i' % ret, mandatory=False)
1733 1748 try:
1734 1749 real_part.validate()
1735 1750 except error.Abort as e:
1736 1751 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1737 1752 (util.hidepassword(raw_url), str(e)))
1738 1753 assert not inpart.read()
1739 1754
1740 1755 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1741 1756 def handlereplychangegroup(op, inpart):
1742 1757 ret = int(inpart.params['return'])
1743 1758 replyto = int(inpart.params['in-reply-to'])
1744 1759 op.records.add('changegroup', {'return': ret}, replyto)
1745 1760
1746 1761 @parthandler('check:heads')
1747 1762 def handlecheckheads(op, inpart):
1748 1763 """check that head of the repo did not change
1749 1764
1750 1765 This is used to detect a push race when using unbundle.
1751 1766 This replaces the "heads" argument of unbundle."""
1752 1767 h = inpart.read(20)
1753 1768 heads = []
1754 1769 while len(h) == 20:
1755 1770 heads.append(h)
1756 1771 h = inpart.read(20)
1757 1772 assert not h
1758 1773 # Trigger a transaction so that we are guaranteed to have the lock now.
1759 1774 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1760 1775 op.gettransaction()
1761 1776 if sorted(heads) != sorted(op.repo.heads()):
1762 1777 raise error.PushRaced('repository changed while pushing - '
1763 1778 'please try again')
1764 1779
1765 1780 @parthandler('check:updated-heads')
1766 1781 def handlecheckupdatedheads(op, inpart):
1767 1782 """check for race on the heads touched by a push
1768 1783
1769 1784 This is similar to 'check:heads' but focus on the heads actually updated
1770 1785 during the push. If other activities happen on unrelated heads, it is
1771 1786 ignored.
1772 1787
1773 1788 This allow server with high traffic to avoid push contention as long as
1774 1789 unrelated parts of the graph are involved."""
1775 1790 h = inpart.read(20)
1776 1791 heads = []
1777 1792 while len(h) == 20:
1778 1793 heads.append(h)
1779 1794 h = inpart.read(20)
1780 1795 assert not h
1781 1796 # trigger a transaction so that we are guaranteed to have the lock now.
1782 1797 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1783 1798 op.gettransaction()
1784 1799
1785 1800 currentheads = set()
1786 1801 for ls in op.repo.branchmap().itervalues():
1787 1802 currentheads.update(ls)
1788 1803
1789 1804 for h in heads:
1790 1805 if h not in currentheads:
1791 1806 raise error.PushRaced('repository changed while pushing - '
1792 1807 'please try again')
1793 1808
1794 1809 @parthandler('check:phases')
1795 1810 def handlecheckphases(op, inpart):
1796 1811 """check that phase boundaries of the repository did not change
1797 1812
1798 1813 This is used to detect a push race.
1799 1814 """
1800 1815 phasetonodes = phases.binarydecode(inpart)
1801 1816 unfi = op.repo.unfiltered()
1802 1817 cl = unfi.changelog
1803 1818 phasecache = unfi._phasecache
1804 1819 msg = ('repository changed while pushing - please try again '
1805 1820 '(%s is %s expected %s)')
1806 1821 for expectedphase, nodes in enumerate(phasetonodes):
1807 1822 for n in nodes:
1808 1823 actualphase = phasecache.phase(unfi, cl.rev(n))
1809 1824 if actualphase != expectedphase:
1810 1825 finalmsg = msg % (nodemod.short(n),
1811 1826 phases.phasenames[actualphase],
1812 1827 phases.phasenames[expectedphase])
1813 1828 raise error.PushRaced(finalmsg)
1814 1829
1815 1830 @parthandler('output')
1816 1831 def handleoutput(op, inpart):
1817 1832 """forward output captured on the server to the client"""
1818 1833 for line in inpart.read().splitlines():
1819 1834 op.ui.status(_('remote: %s\n') % line)
1820 1835
1821 1836 @parthandler('replycaps')
1822 1837 def handlereplycaps(op, inpart):
1823 1838 """Notify that a reply bundle should be created
1824 1839
1825 1840 The payload contains the capabilities information for the reply"""
1826 1841 caps = decodecaps(inpart.read())
1827 1842 if op.reply is None:
1828 1843 op.reply = bundle20(op.ui, caps)
1829 1844
1830 1845 class AbortFromPart(error.Abort):
1831 1846 """Sub-class of Abort that denotes an error from a bundle2 part."""
1832 1847
1833 1848 @parthandler('error:abort', ('message', 'hint'))
1834 1849 def handleerrorabort(op, inpart):
1835 1850 """Used to transmit abort error over the wire"""
1836 1851 raise AbortFromPart(inpart.params['message'],
1837 1852 hint=inpart.params.get('hint'))
1838 1853
1839 1854 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1840 1855 'in-reply-to'))
1841 1856 def handleerrorpushkey(op, inpart):
1842 1857 """Used to transmit failure of a mandatory pushkey over the wire"""
1843 1858 kwargs = {}
1844 1859 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1845 1860 value = inpart.params.get(name)
1846 1861 if value is not None:
1847 1862 kwargs[name] = value
1848 1863 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1849 1864
1850 1865 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1851 1866 def handleerrorunsupportedcontent(op, inpart):
1852 1867 """Used to transmit unknown content error over the wire"""
1853 1868 kwargs = {}
1854 1869 parttype = inpart.params.get('parttype')
1855 1870 if parttype is not None:
1856 1871 kwargs['parttype'] = parttype
1857 1872 params = inpart.params.get('params')
1858 1873 if params is not None:
1859 1874 kwargs['params'] = params.split('\0')
1860 1875
1861 1876 raise error.BundleUnknownFeatureError(**kwargs)
1862 1877
1863 1878 @parthandler('error:pushraced', ('message',))
1864 1879 def handleerrorpushraced(op, inpart):
1865 1880 """Used to transmit push race error over the wire"""
1866 1881 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1867 1882
1868 1883 @parthandler('listkeys', ('namespace',))
1869 1884 def handlelistkeys(op, inpart):
1870 1885 """retrieve pushkey namespace content stored in a bundle2"""
1871 1886 namespace = inpart.params['namespace']
1872 1887 r = pushkey.decodekeys(inpart.read())
1873 1888 op.records.add('listkeys', (namespace, r))
1874 1889
1875 1890 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1876 1891 def handlepushkey(op, inpart):
1877 1892 """process a pushkey request"""
1878 1893 dec = pushkey.decode
1879 1894 namespace = dec(inpart.params['namespace'])
1880 1895 key = dec(inpart.params['key'])
1881 1896 old = dec(inpart.params['old'])
1882 1897 new = dec(inpart.params['new'])
1883 1898 # Grab the transaction to ensure that we have the lock before performing the
1884 1899 # pushkey.
1885 1900 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1886 1901 op.gettransaction()
1887 1902 ret = op.repo.pushkey(namespace, key, old, new)
1888 1903 record = {'namespace': namespace,
1889 1904 'key': key,
1890 1905 'old': old,
1891 1906 'new': new}
1892 1907 op.records.add('pushkey', record)
1893 1908 if op.reply is not None:
1894 1909 rpart = op.reply.newpart('reply:pushkey')
1895 1910 rpart.addparam(
1896 1911 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1897 1912 rpart.addparam('return', '%i' % ret, mandatory=False)
1898 1913 if inpart.mandatory and not ret:
1899 1914 kwargs = {}
1900 1915 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1901 1916 if key in inpart.params:
1902 1917 kwargs[key] = inpart.params[key]
1903 1918 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1904 1919
1905 1920 @parthandler('phase-heads')
1906 1921 def handlephases(op, inpart):
1907 1922 """apply phases from bundle part to repo"""
1908 1923 headsbyphase = phases.binarydecode(inpart)
1909 1924 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
1910 1925
1911 1926 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1912 1927 def handlepushkeyreply(op, inpart):
1913 1928 """retrieve the result of a pushkey request"""
1914 1929 ret = int(inpart.params['return'])
1915 1930 partid = int(inpart.params['in-reply-to'])
1916 1931 op.records.add('pushkey', {'return': ret}, partid)
1917 1932
1918 1933 @parthandler('obsmarkers')
1919 1934 def handleobsmarker(op, inpart):
1920 1935 """add a stream of obsmarkers to the repo"""
1921 1936 tr = op.gettransaction()
1922 1937 markerdata = inpart.read()
1923 1938 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1924 1939 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1925 1940 % len(markerdata))
1926 1941 # The mergemarkers call will crash if marker creation is not enabled.
1927 1942 # we want to avoid this if the part is advisory.
1928 1943 if not inpart.mandatory and op.repo.obsstore.readonly:
1929 1944 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1930 1945 return
1931 1946 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1932 1947 op.repo.invalidatevolatilesets()
1933 1948 if new:
1934 1949 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1935 1950 op.records.add('obsmarkers', {'new': new})
1936 1951 if op.reply is not None:
1937 1952 rpart = op.reply.newpart('reply:obsmarkers')
1938 1953 rpart.addparam(
1939 1954 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1940 1955 rpart.addparam('new', '%i' % new, mandatory=False)
1941 1956
1942 1957
1943 1958 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1944 1959 def handleobsmarkerreply(op, inpart):
1945 1960 """retrieve the result of a pushkey request"""
1946 1961 ret = int(inpart.params['new'])
1947 1962 partid = int(inpart.params['in-reply-to'])
1948 1963 op.records.add('obsmarkers', {'new': ret}, partid)
1949 1964
1950 1965 @parthandler('hgtagsfnodes')
1951 1966 def handlehgtagsfnodes(op, inpart):
1952 1967 """Applies .hgtags fnodes cache entries to the local repo.
1953 1968
1954 1969 Payload is pairs of 20 byte changeset nodes and filenodes.
1955 1970 """
1956 1971 # Grab the transaction so we ensure that we have the lock at this point.
1957 1972 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1958 1973 op.gettransaction()
1959 1974 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1960 1975
1961 1976 count = 0
1962 1977 while True:
1963 1978 node = inpart.read(20)
1964 1979 fnode = inpart.read(20)
1965 1980 if len(node) < 20 or len(fnode) < 20:
1966 1981 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1967 1982 break
1968 1983 cache.setfnode(node, fnode)
1969 1984 count += 1
1970 1985
1971 1986 cache.write()
1972 1987 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1973 1988
1974 1989 @parthandler('pushvars')
1975 1990 def bundle2getvars(op, part):
1976 1991 '''unbundle a bundle2 containing shellvars on the server'''
1977 1992 # An option to disable unbundling on server-side for security reasons
1978 1993 if op.ui.configbool('push', 'pushvars.server'):
1979 1994 hookargs = {}
1980 1995 for key, value in part.advisoryparams:
1981 1996 key = key.upper()
1982 1997 # We want pushed variables to have USERVAR_ prepended so we know
1983 1998 # they came from the --pushvar flag.
1984 1999 key = "USERVAR_" + key
1985 2000 hookargs[key] = value
1986 2001 op.addhookargs(hookargs)
General Comments 0
You need to be logged in to leave comments. Login now