##// END OF EJS Templates
bundle2: immediate exit for ctrl+c (issue5692)...
Durham Goode -
r34638:5f79f5f8 default
parent child Browse files
Show More
@@ -1,1923 +1,1923
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import, division
149 149
150 150 import errno
151 151 import re
152 152 import string
153 153 import struct
154 154 import sys
155 155
156 156 from .i18n import _
157 157 from . import (
158 158 changegroup,
159 159 error,
160 160 obsolete,
161 161 phases,
162 162 pushkey,
163 163 pycompat,
164 164 tags,
165 165 url,
166 166 util,
167 167 )
168 168
169 169 urlerr = util.urlerr
170 170 urlreq = util.urlreq
171 171
172 172 _pack = struct.pack
173 173 _unpack = struct.unpack
174 174
175 175 _fstreamparamsize = '>i'
176 176 _fpartheadersize = '>i'
177 177 _fparttypesize = '>B'
178 178 _fpartid = '>I'
179 179 _fpayloadsize = '>i'
180 180 _fpartparamcount = '>BB'
181 181
182 182 preferedchunksize = 4096
183 183
184 184 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
185 185
186 186 def outdebug(ui, message):
187 187 """debug regarding output stream (bundling)"""
188 188 if ui.configbool('devel', 'bundle2.debug'):
189 189 ui.debug('bundle2-output: %s\n' % message)
190 190
191 191 def indebug(ui, message):
192 192 """debug on input stream (unbundling)"""
193 193 if ui.configbool('devel', 'bundle2.debug'):
194 194 ui.debug('bundle2-input: %s\n' % message)
195 195
196 196 def validateparttype(parttype):
197 197 """raise ValueError if a parttype contains invalid character"""
198 198 if _parttypeforbidden.search(parttype):
199 199 raise ValueError(parttype)
200 200
201 201 def _makefpartparamsizes(nbparams):
202 202 """return a struct format to read part parameter sizes
203 203
204 204 The number parameters is variable so we need to build that format
205 205 dynamically.
206 206 """
207 207 return '>'+('BB'*nbparams)
208 208
209 209 parthandlermapping = {}
210 210
211 211 def parthandler(parttype, params=()):
212 212 """decorator that register a function as a bundle2 part handler
213 213
214 214 eg::
215 215
216 216 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
217 217 def myparttypehandler(...):
218 218 '''process a part of type "my part".'''
219 219 ...
220 220 """
221 221 validateparttype(parttype)
222 222 def _decorator(func):
223 223 lparttype = parttype.lower() # enforce lower case matching.
224 224 assert lparttype not in parthandlermapping
225 225 parthandlermapping[lparttype] = func
226 226 func.params = frozenset(params)
227 227 return func
228 228 return _decorator
229 229
230 230 class unbundlerecords(object):
231 231 """keep record of what happens during and unbundle
232 232
233 233 New records are added using `records.add('cat', obj)`. Where 'cat' is a
234 234 category of record and obj is an arbitrary object.
235 235
236 236 `records['cat']` will return all entries of this category 'cat'.
237 237
238 238 Iterating on the object itself will yield `('category', obj)` tuples
239 239 for all entries.
240 240
241 241 All iterations happens in chronological order.
242 242 """
243 243
244 244 def __init__(self):
245 245 self._categories = {}
246 246 self._sequences = []
247 247 self._replies = {}
248 248
249 249 def add(self, category, entry, inreplyto=None):
250 250 """add a new record of a given category.
251 251
252 252 The entry can then be retrieved in the list returned by
253 253 self['category']."""
254 254 self._categories.setdefault(category, []).append(entry)
255 255 self._sequences.append((category, entry))
256 256 if inreplyto is not None:
257 257 self.getreplies(inreplyto).add(category, entry)
258 258
259 259 def getreplies(self, partid):
260 260 """get the records that are replies to a specific part"""
261 261 return self._replies.setdefault(partid, unbundlerecords())
262 262
263 263 def __getitem__(self, cat):
264 264 return tuple(self._categories.get(cat, ()))
265 265
266 266 def __iter__(self):
267 267 return iter(self._sequences)
268 268
269 269 def __len__(self):
270 270 return len(self._sequences)
271 271
272 272 def __nonzero__(self):
273 273 return bool(self._sequences)
274 274
275 275 __bool__ = __nonzero__
276 276
277 277 class bundleoperation(object):
278 278 """an object that represents a single bundling process
279 279
280 280 Its purpose is to carry unbundle-related objects and states.
281 281
282 282 A new object should be created at the beginning of each bundle processing.
283 283 The object is to be returned by the processing function.
284 284
285 285 The object has very little content now it will ultimately contain:
286 286 * an access to the repo the bundle is applied to,
287 287 * a ui object,
288 288 * a way to retrieve a transaction to add changes to the repo,
289 289 * a way to record the result of processing each part,
290 290 * a way to construct a bundle response when applicable.
291 291 """
292 292
293 293 def __init__(self, repo, transactiongetter, captureoutput=True):
294 294 self.repo = repo
295 295 self.ui = repo.ui
296 296 self.records = unbundlerecords()
297 297 self.reply = None
298 298 self.captureoutput = captureoutput
299 299 self.hookargs = {}
300 300 self._gettransaction = transactiongetter
301 301
302 302 def gettransaction(self):
303 303 transaction = self._gettransaction()
304 304
305 305 if self.hookargs:
306 306 # the ones added to the transaction supercede those added
307 307 # to the operation.
308 308 self.hookargs.update(transaction.hookargs)
309 309 transaction.hookargs = self.hookargs
310 310
311 311 # mark the hookargs as flushed. further attempts to add to
312 312 # hookargs will result in an abort.
313 313 self.hookargs = None
314 314
315 315 return transaction
316 316
317 317 def addhookargs(self, hookargs):
318 318 if self.hookargs is None:
319 319 raise error.ProgrammingError('attempted to add hookargs to '
320 320 'operation after transaction started')
321 321 self.hookargs.update(hookargs)
322 322
323 323 class TransactionUnavailable(RuntimeError):
324 324 pass
325 325
326 326 def _notransaction():
327 327 """default method to get a transaction while processing a bundle
328 328
329 329 Raise an exception to highlight the fact that no transaction was expected
330 330 to be created"""
331 331 raise TransactionUnavailable()
332 332
333 333 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
334 334 # transform me into unbundler.apply() as soon as the freeze is lifted
335 335 if isinstance(unbundler, unbundle20):
336 336 tr.hookargs['bundle2'] = '1'
337 337 if source is not None and 'source' not in tr.hookargs:
338 338 tr.hookargs['source'] = source
339 339 if url is not None and 'url' not in tr.hookargs:
340 340 tr.hookargs['url'] = url
341 341 return processbundle(repo, unbundler, lambda: tr)
342 342 else:
343 343 # the transactiongetter won't be used, but we might as well set it
344 344 op = bundleoperation(repo, lambda: tr)
345 345 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
346 346 return op
347 347
348 348 class partiterator(object):
349 349 def __init__(self, repo, op, unbundler):
350 350 self.repo = repo
351 351 self.op = op
352 352 self.unbundler = unbundler
353 353 self.iterator = None
354 354 self.count = 0
355 355 self.current = None
356 356
357 357 def __enter__(self):
358 358 def func():
359 359 itr = enumerate(self.unbundler.iterparts())
360 360 for count, p in itr:
361 361 self.count = count
362 362 self.current = p
363 363 yield p
364 364 p.seek(0, 2)
365 365 self.current = None
366 366 self.iterator = func()
367 367 return self.iterator
368 368
369 369 def __exit__(self, type, exc, tb):
370 370 if not self.iterator:
371 371 return
372 372
373 if exc:
374 # If exiting or interrupted, do not attempt to seek the stream in
375 # the finally block below. This makes abort faster.
376 if (self.current and
377 not isinstance(exc, (SystemExit, KeyboardInterrupt))):
378 # consume the part content to not corrupt the stream.
379 self.current.seek(0, 2)
380
373 # Only gracefully abort in a normal exception situation. User aborts
374 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
375 # and should not gracefully cleanup.
376 if isinstance(exc, Exception):
381 377 # Any exceptions seeking to the end of the bundle at this point are
382 378 # almost certainly related to the underlying stream being bad.
383 379 # And, chances are that the exception we're handling is related to
384 380 # getting in that bad state. So, we swallow the seeking error and
385 381 # re-raise the original error.
386 382 seekerror = False
387 383 try:
384 if self.current:
385 # consume the part content to not corrupt the stream.
386 self.current.seek(0, 2)
387
388 388 for part in self.iterator:
389 389 # consume the bundle content
390 390 part.seek(0, 2)
391 391 except Exception:
392 392 seekerror = True
393 393
394 394 # Small hack to let caller code distinguish exceptions from bundle2
395 395 # processing from processing the old format. This is mostly needed
396 396 # to handle different return codes to unbundle according to the type
397 397 # of bundle. We should probably clean up or drop this return code
398 398 # craziness in a future version.
399 399 exc.duringunbundle2 = True
400 400 salvaged = []
401 401 replycaps = None
402 402 if self.op.reply is not None:
403 403 salvaged = self.op.reply.salvageoutput()
404 404 replycaps = self.op.reply.capabilities
405 405 exc._replycaps = replycaps
406 406 exc._bundle2salvagedoutput = salvaged
407 407
408 408 # Re-raising from a variable loses the original stack. So only use
409 409 # that form if we need to.
410 410 if seekerror:
411 411 raise exc
412 412
413 413 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
414 414 self.count)
415 415
416 416 def processbundle(repo, unbundler, transactiongetter=None, op=None):
417 417 """This function process a bundle, apply effect to/from a repo
418 418
419 419 It iterates over each part then searches for and uses the proper handling
420 420 code to process the part. Parts are processed in order.
421 421
422 422 Unknown Mandatory part will abort the process.
423 423
424 424 It is temporarily possible to provide a prebuilt bundleoperation to the
425 425 function. This is used to ensure output is properly propagated in case of
426 426 an error during the unbundling. This output capturing part will likely be
427 427 reworked and this ability will probably go away in the process.
428 428 """
429 429 if op is None:
430 430 if transactiongetter is None:
431 431 transactiongetter = _notransaction
432 432 op = bundleoperation(repo, transactiongetter)
433 433 # todo:
434 434 # - replace this is a init function soon.
435 435 # - exception catching
436 436 unbundler.params
437 437 if repo.ui.debugflag:
438 438 msg = ['bundle2-input-bundle:']
439 439 if unbundler.params:
440 440 msg.append(' %i params' % len(unbundler.params))
441 441 if op._gettransaction is None or op._gettransaction is _notransaction:
442 442 msg.append(' no-transaction')
443 443 else:
444 444 msg.append(' with-transaction')
445 445 msg.append('\n')
446 446 repo.ui.debug(''.join(msg))
447 447
448 448 processparts(repo, op, unbundler)
449 449
450 450 return op
451 451
452 452 def processparts(repo, op, unbundler):
453 453 with partiterator(repo, op, unbundler) as parts:
454 454 for part in parts:
455 455 _processpart(op, part)
456 456
457 457 def _processchangegroup(op, cg, tr, source, url, **kwargs):
458 458 ret = cg.apply(op.repo, tr, source, url, **kwargs)
459 459 op.records.add('changegroup', {
460 460 'return': ret,
461 461 })
462 462 return ret
463 463
464 464 def _gethandler(op, part):
465 465 status = 'unknown' # used by debug output
466 466 try:
467 467 handler = parthandlermapping.get(part.type)
468 468 if handler is None:
469 469 status = 'unsupported-type'
470 470 raise error.BundleUnknownFeatureError(parttype=part.type)
471 471 indebug(op.ui, 'found a handler for part %s' % part.type)
472 472 unknownparams = part.mandatorykeys - handler.params
473 473 if unknownparams:
474 474 unknownparams = list(unknownparams)
475 475 unknownparams.sort()
476 476 status = 'unsupported-params (%s)' % ', '.join(unknownparams)
477 477 raise error.BundleUnknownFeatureError(parttype=part.type,
478 478 params=unknownparams)
479 479 status = 'supported'
480 480 except error.BundleUnknownFeatureError as exc:
481 481 if part.mandatory: # mandatory parts
482 482 raise
483 483 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
484 484 return # skip to part processing
485 485 finally:
486 486 if op.ui.debugflag:
487 487 msg = ['bundle2-input-part: "%s"' % part.type]
488 488 if not part.mandatory:
489 489 msg.append(' (advisory)')
490 490 nbmp = len(part.mandatorykeys)
491 491 nbap = len(part.params) - nbmp
492 492 if nbmp or nbap:
493 493 msg.append(' (params:')
494 494 if nbmp:
495 495 msg.append(' %i mandatory' % nbmp)
496 496 if nbap:
497 497 msg.append(' %i advisory' % nbmp)
498 498 msg.append(')')
499 499 msg.append(' %s\n' % status)
500 500 op.ui.debug(''.join(msg))
501 501
502 502 return handler
503 503
504 504 def _processpart(op, part):
505 505 """process a single part from a bundle
506 506
507 507 The part is guaranteed to have been fully consumed when the function exits
508 508 (even if an exception is raised)."""
509 509 handler = _gethandler(op, part)
510 510 if handler is None:
511 511 return
512 512
513 513 # handler is called outside the above try block so that we don't
514 514 # risk catching KeyErrors from anything other than the
515 515 # parthandlermapping lookup (any KeyError raised by handler()
516 516 # itself represents a defect of a different variety).
517 517 output = None
518 518 if op.captureoutput and op.reply is not None:
519 519 op.ui.pushbuffer(error=True, subproc=True)
520 520 output = ''
521 521 try:
522 522 handler(op, part)
523 523 finally:
524 524 if output is not None:
525 525 output = op.ui.popbuffer()
526 526 if output:
527 527 outpart = op.reply.newpart('output', data=output,
528 528 mandatory=False)
529 529 outpart.addparam(
530 530 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
531 531
532 532 def decodecaps(blob):
533 533 """decode a bundle2 caps bytes blob into a dictionary
534 534
535 535 The blob is a list of capabilities (one per line)
536 536 Capabilities may have values using a line of the form::
537 537
538 538 capability=value1,value2,value3
539 539
540 540 The values are always a list."""
541 541 caps = {}
542 542 for line in blob.splitlines():
543 543 if not line:
544 544 continue
545 545 if '=' not in line:
546 546 key, vals = line, ()
547 547 else:
548 548 key, vals = line.split('=', 1)
549 549 vals = vals.split(',')
550 550 key = urlreq.unquote(key)
551 551 vals = [urlreq.unquote(v) for v in vals]
552 552 caps[key] = vals
553 553 return caps
554 554
555 555 def encodecaps(caps):
556 556 """encode a bundle2 caps dictionary into a bytes blob"""
557 557 chunks = []
558 558 for ca in sorted(caps):
559 559 vals = caps[ca]
560 560 ca = urlreq.quote(ca)
561 561 vals = [urlreq.quote(v) for v in vals]
562 562 if vals:
563 563 ca = "%s=%s" % (ca, ','.join(vals))
564 564 chunks.append(ca)
565 565 return '\n'.join(chunks)
566 566
567 567 bundletypes = {
568 568 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
569 569 # since the unification ssh accepts a header but there
570 570 # is no capability signaling it.
571 571 "HG20": (), # special-cased below
572 572 "HG10UN": ("HG10UN", 'UN'),
573 573 "HG10BZ": ("HG10", 'BZ'),
574 574 "HG10GZ": ("HG10GZ", 'GZ'),
575 575 }
576 576
577 577 # hgweb uses this list to communicate its preferred type
578 578 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
579 579
580 580 class bundle20(object):
581 581 """represent an outgoing bundle2 container
582 582
583 583 Use the `addparam` method to add stream level parameter. and `newpart` to
584 584 populate it. Then call `getchunks` to retrieve all the binary chunks of
585 585 data that compose the bundle2 container."""
586 586
587 587 _magicstring = 'HG20'
588 588
589 589 def __init__(self, ui, capabilities=()):
590 590 self.ui = ui
591 591 self._params = []
592 592 self._parts = []
593 593 self.capabilities = dict(capabilities)
594 594 self._compengine = util.compengines.forbundletype('UN')
595 595 self._compopts = None
596 596
597 597 def setcompression(self, alg, compopts=None):
598 598 """setup core part compression to <alg>"""
599 599 if alg in (None, 'UN'):
600 600 return
601 601 assert not any(n.lower() == 'compression' for n, v in self._params)
602 602 self.addparam('Compression', alg)
603 603 self._compengine = util.compengines.forbundletype(alg)
604 604 self._compopts = compopts
605 605
606 606 @property
607 607 def nbparts(self):
608 608 """total number of parts added to the bundler"""
609 609 return len(self._parts)
610 610
611 611 # methods used to defines the bundle2 content
612 612 def addparam(self, name, value=None):
613 613 """add a stream level parameter"""
614 614 if not name:
615 615 raise ValueError(r'empty parameter name')
616 616 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
617 617 raise ValueError(r'non letter first character: %s' % name)
618 618 self._params.append((name, value))
619 619
620 620 def addpart(self, part):
621 621 """add a new part to the bundle2 container
622 622
623 623 Parts contains the actual applicative payload."""
624 624 assert part.id is None
625 625 part.id = len(self._parts) # very cheap counter
626 626 self._parts.append(part)
627 627
628 628 def newpart(self, typeid, *args, **kwargs):
629 629 """create a new part and add it to the containers
630 630
631 631 As the part is directly added to the containers. For now, this means
632 632 that any failure to properly initialize the part after calling
633 633 ``newpart`` should result in a failure of the whole bundling process.
634 634
635 635 You can still fall back to manually create and add if you need better
636 636 control."""
637 637 part = bundlepart(typeid, *args, **kwargs)
638 638 self.addpart(part)
639 639 return part
640 640
641 641 # methods used to generate the bundle2 stream
642 642 def getchunks(self):
643 643 if self.ui.debugflag:
644 644 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
645 645 if self._params:
646 646 msg.append(' (%i params)' % len(self._params))
647 647 msg.append(' %i parts total\n' % len(self._parts))
648 648 self.ui.debug(''.join(msg))
649 649 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
650 650 yield self._magicstring
651 651 param = self._paramchunk()
652 652 outdebug(self.ui, 'bundle parameter: %s' % param)
653 653 yield _pack(_fstreamparamsize, len(param))
654 654 if param:
655 655 yield param
656 656 for chunk in self._compengine.compressstream(self._getcorechunk(),
657 657 self._compopts):
658 658 yield chunk
659 659
660 660 def _paramchunk(self):
661 661 """return a encoded version of all stream parameters"""
662 662 blocks = []
663 663 for par, value in self._params:
664 664 par = urlreq.quote(par)
665 665 if value is not None:
666 666 value = urlreq.quote(value)
667 667 par = '%s=%s' % (par, value)
668 668 blocks.append(par)
669 669 return ' '.join(blocks)
670 670
671 671 def _getcorechunk(self):
672 672 """yield chunk for the core part of the bundle
673 673
674 674 (all but headers and parameters)"""
675 675 outdebug(self.ui, 'start of parts')
676 676 for part in self._parts:
677 677 outdebug(self.ui, 'bundle part: "%s"' % part.type)
678 678 for chunk in part.getchunks(ui=self.ui):
679 679 yield chunk
680 680 outdebug(self.ui, 'end of bundle')
681 681 yield _pack(_fpartheadersize, 0)
682 682
683 683
684 684 def salvageoutput(self):
685 685 """return a list with a copy of all output parts in the bundle
686 686
687 687 This is meant to be used during error handling to make sure we preserve
688 688 server output"""
689 689 salvaged = []
690 690 for part in self._parts:
691 691 if part.type.startswith('output'):
692 692 salvaged.append(part.copy())
693 693 return salvaged
694 694
695 695
696 696 class unpackermixin(object):
697 697 """A mixin to extract bytes and struct data from a stream"""
698 698
699 699 def __init__(self, fp):
700 700 self._fp = fp
701 701
702 702 def _unpack(self, format):
703 703 """unpack this struct format from the stream
704 704
705 705 This method is meant for internal usage by the bundle2 protocol only.
706 706 They directly manipulate the low level stream including bundle2 level
707 707 instruction.
708 708
709 709 Do not use it to implement higher-level logic or methods."""
710 710 data = self._readexact(struct.calcsize(format))
711 711 return _unpack(format, data)
712 712
713 713 def _readexact(self, size):
714 714 """read exactly <size> bytes from the stream
715 715
716 716 This method is meant for internal usage by the bundle2 protocol only.
717 717 They directly manipulate the low level stream including bundle2 level
718 718 instruction.
719 719
720 720 Do not use it to implement higher-level logic or methods."""
721 721 return changegroup.readexactly(self._fp, size)
722 722
723 723 def getunbundler(ui, fp, magicstring=None):
724 724 """return a valid unbundler object for a given magicstring"""
725 725 if magicstring is None:
726 726 magicstring = changegroup.readexactly(fp, 4)
727 727 magic, version = magicstring[0:2], magicstring[2:4]
728 728 if magic != 'HG':
729 729 ui.debug(
730 730 "error: invalid magic: %r (version %r), should be 'HG'\n"
731 731 % (magic, version))
732 732 raise error.Abort(_('not a Mercurial bundle'))
733 733 unbundlerclass = formatmap.get(version)
734 734 if unbundlerclass is None:
735 735 raise error.Abort(_('unknown bundle version %s') % version)
736 736 unbundler = unbundlerclass(ui, fp)
737 737 indebug(ui, 'start processing of %s stream' % magicstring)
738 738 return unbundler
739 739
740 740 class unbundle20(unpackermixin):
741 741 """interpret a bundle2 stream
742 742
743 743 This class is fed with a binary stream and yields parts through its
744 744 `iterparts` methods."""
745 745
746 746 _magicstring = 'HG20'
747 747
748 748 def __init__(self, ui, fp):
749 749 """If header is specified, we do not read it out of the stream."""
750 750 self.ui = ui
751 751 self._compengine = util.compengines.forbundletype('UN')
752 752 self._compressed = None
753 753 super(unbundle20, self).__init__(fp)
754 754
755 755 @util.propertycache
756 756 def params(self):
757 757 """dictionary of stream level parameters"""
758 758 indebug(self.ui, 'reading bundle2 stream parameters')
759 759 params = {}
760 760 paramssize = self._unpack(_fstreamparamsize)[0]
761 761 if paramssize < 0:
762 762 raise error.BundleValueError('negative bundle param size: %i'
763 763 % paramssize)
764 764 if paramssize:
765 765 params = self._readexact(paramssize)
766 766 params = self._processallparams(params)
767 767 return params
768 768
769 769 def _processallparams(self, paramsblock):
770 770 """"""
771 771 params = util.sortdict()
772 772 for p in paramsblock.split(' '):
773 773 p = p.split('=', 1)
774 774 p = [urlreq.unquote(i) for i in p]
775 775 if len(p) < 2:
776 776 p.append(None)
777 777 self._processparam(*p)
778 778 params[p[0]] = p[1]
779 779 return params
780 780
781 781
782 782 def _processparam(self, name, value):
783 783 """process a parameter, applying its effect if needed
784 784
785 785 Parameter starting with a lower case letter are advisory and will be
786 786 ignored when unknown. Those starting with an upper case letter are
787 787 mandatory and will this function will raise a KeyError when unknown.
788 788
789 789 Note: no option are currently supported. Any input will be either
790 790 ignored or failing.
791 791 """
792 792 if not name:
793 793 raise ValueError(r'empty parameter name')
794 794 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
795 795 raise ValueError(r'non letter first character: %s' % name)
796 796 try:
797 797 handler = b2streamparamsmap[name.lower()]
798 798 except KeyError:
799 799 if name[0:1].islower():
800 800 indebug(self.ui, "ignoring unknown parameter %s" % name)
801 801 else:
802 802 raise error.BundleUnknownFeatureError(params=(name,))
803 803 else:
804 804 handler(self, name, value)
805 805
806 806 def _forwardchunks(self):
807 807 """utility to transfer a bundle2 as binary
808 808
809 809 This is made necessary by the fact the 'getbundle' command over 'ssh'
810 810 have no way to know then the reply end, relying on the bundle to be
811 811 interpreted to know its end. This is terrible and we are sorry, but we
812 812 needed to move forward to get general delta enabled.
813 813 """
814 814 yield self._magicstring
815 815 assert 'params' not in vars(self)
816 816 paramssize = self._unpack(_fstreamparamsize)[0]
817 817 if paramssize < 0:
818 818 raise error.BundleValueError('negative bundle param size: %i'
819 819 % paramssize)
820 820 yield _pack(_fstreamparamsize, paramssize)
821 821 if paramssize:
822 822 params = self._readexact(paramssize)
823 823 self._processallparams(params)
824 824 yield params
825 825 assert self._compengine.bundletype == 'UN'
826 826 # From there, payload might need to be decompressed
827 827 self._fp = self._compengine.decompressorreader(self._fp)
828 828 emptycount = 0
829 829 while emptycount < 2:
830 830 # so we can brainlessly loop
831 831 assert _fpartheadersize == _fpayloadsize
832 832 size = self._unpack(_fpartheadersize)[0]
833 833 yield _pack(_fpartheadersize, size)
834 834 if size:
835 835 emptycount = 0
836 836 else:
837 837 emptycount += 1
838 838 continue
839 839 if size == flaginterrupt:
840 840 continue
841 841 elif size < 0:
842 842 raise error.BundleValueError('negative chunk size: %i')
843 843 yield self._readexact(size)
844 844
845 845
846 846 def iterparts(self):
847 847 """yield all parts contained in the stream"""
848 848 # make sure param have been loaded
849 849 self.params
850 850 # From there, payload need to be decompressed
851 851 self._fp = self._compengine.decompressorreader(self._fp)
852 852 indebug(self.ui, 'start extraction of bundle2 parts')
853 853 headerblock = self._readpartheader()
854 854 while headerblock is not None:
855 855 part = unbundlepart(self.ui, headerblock, self._fp)
856 856 yield part
857 857 # Seek to the end of the part to force it's consumption so the next
858 858 # part can be read. But then seek back to the beginning so the
859 859 # code consuming this generator has a part that starts at 0.
860 860 part.seek(0, 2)
861 861 part.seek(0)
862 862 headerblock = self._readpartheader()
863 863 indebug(self.ui, 'end of bundle2 stream')
864 864
865 865 def _readpartheader(self):
866 866 """reads a part header size and return the bytes blob
867 867
868 868 returns None if empty"""
869 869 headersize = self._unpack(_fpartheadersize)[0]
870 870 if headersize < 0:
871 871 raise error.BundleValueError('negative part header size: %i'
872 872 % headersize)
873 873 indebug(self.ui, 'part header size: %i' % headersize)
874 874 if headersize:
875 875 return self._readexact(headersize)
876 876 return None
877 877
878 878 def compressed(self):
879 879 self.params # load params
880 880 return self._compressed
881 881
882 882 def close(self):
883 883 """close underlying file"""
884 884 if util.safehasattr(self._fp, 'close'):
885 885 return self._fp.close()
886 886
887 887 formatmap = {'20': unbundle20}
888 888
889 889 b2streamparamsmap = {}
890 890
891 891 def b2streamparamhandler(name):
892 892 """register a handler for a stream level parameter"""
893 893 def decorator(func):
894 894 assert name not in formatmap
895 895 b2streamparamsmap[name] = func
896 896 return func
897 897 return decorator
898 898
899 899 @b2streamparamhandler('compression')
900 900 def processcompression(unbundler, param, value):
901 901 """read compression parameter and install payload decompression"""
902 902 if value not in util.compengines.supportedbundletypes:
903 903 raise error.BundleUnknownFeatureError(params=(param,),
904 904 values=(value,))
905 905 unbundler._compengine = util.compengines.forbundletype(value)
906 906 if value is not None:
907 907 unbundler._compressed = True
908 908
909 909 class bundlepart(object):
910 910 """A bundle2 part contains application level payload
911 911
912 912 The part `type` is used to route the part to the application level
913 913 handler.
914 914
915 915 The part payload is contained in ``part.data``. It could be raw bytes or a
916 916 generator of byte chunks.
917 917
918 918 You can add parameters to the part using the ``addparam`` method.
919 919 Parameters can be either mandatory (default) or advisory. Remote side
920 920 should be able to safely ignore the advisory ones.
921 921
922 922 Both data and parameters cannot be modified after the generation has begun.
923 923 """
924 924
925 925 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
926 926 data='', mandatory=True):
927 927 validateparttype(parttype)
928 928 self.id = None
929 929 self.type = parttype
930 930 self._data = data
931 931 self._mandatoryparams = list(mandatoryparams)
932 932 self._advisoryparams = list(advisoryparams)
933 933 # checking for duplicated entries
934 934 self._seenparams = set()
935 935 for pname, __ in self._mandatoryparams + self._advisoryparams:
936 936 if pname in self._seenparams:
937 937 raise error.ProgrammingError('duplicated params: %s' % pname)
938 938 self._seenparams.add(pname)
939 939 # status of the part's generation:
940 940 # - None: not started,
941 941 # - False: currently generated,
942 942 # - True: generation done.
943 943 self._generated = None
944 944 self.mandatory = mandatory
945 945
946 946 def __repr__(self):
947 947 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
948 948 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
949 949 % (cls, id(self), self.id, self.type, self.mandatory))
950 950
951 951 def copy(self):
952 952 """return a copy of the part
953 953
954 954 The new part have the very same content but no partid assigned yet.
955 955 Parts with generated data cannot be copied."""
956 956 assert not util.safehasattr(self.data, 'next')
957 957 return self.__class__(self.type, self._mandatoryparams,
958 958 self._advisoryparams, self._data, self.mandatory)
959 959
960 960 # methods used to defines the part content
961 961 @property
962 962 def data(self):
963 963 return self._data
964 964
965 965 @data.setter
966 966 def data(self, data):
967 967 if self._generated is not None:
968 968 raise error.ReadOnlyPartError('part is being generated')
969 969 self._data = data
970 970
971 971 @property
972 972 def mandatoryparams(self):
973 973 # make it an immutable tuple to force people through ``addparam``
974 974 return tuple(self._mandatoryparams)
975 975
976 976 @property
977 977 def advisoryparams(self):
978 978 # make it an immutable tuple to force people through ``addparam``
979 979 return tuple(self._advisoryparams)
980 980
981 981 def addparam(self, name, value='', mandatory=True):
982 982 """add a parameter to the part
983 983
984 984 If 'mandatory' is set to True, the remote handler must claim support
985 985 for this parameter or the unbundling will be aborted.
986 986
987 987 The 'name' and 'value' cannot exceed 255 bytes each.
988 988 """
989 989 if self._generated is not None:
990 990 raise error.ReadOnlyPartError('part is being generated')
991 991 if name in self._seenparams:
992 992 raise ValueError('duplicated params: %s' % name)
993 993 self._seenparams.add(name)
994 994 params = self._advisoryparams
995 995 if mandatory:
996 996 params = self._mandatoryparams
997 997 params.append((name, value))
998 998
999 999 # methods used to generates the bundle2 stream
1000 1000 def getchunks(self, ui):
1001 1001 if self._generated is not None:
1002 1002 raise error.ProgrammingError('part can only be consumed once')
1003 1003 self._generated = False
1004 1004
1005 1005 if ui.debugflag:
1006 1006 msg = ['bundle2-output-part: "%s"' % self.type]
1007 1007 if not self.mandatory:
1008 1008 msg.append(' (advisory)')
1009 1009 nbmp = len(self.mandatoryparams)
1010 1010 nbap = len(self.advisoryparams)
1011 1011 if nbmp or nbap:
1012 1012 msg.append(' (params:')
1013 1013 if nbmp:
1014 1014 msg.append(' %i mandatory' % nbmp)
1015 1015 if nbap:
1016 1016 msg.append(' %i advisory' % nbmp)
1017 1017 msg.append(')')
1018 1018 if not self.data:
1019 1019 msg.append(' empty payload')
1020 1020 elif (util.safehasattr(self.data, 'next')
1021 1021 or util.safehasattr(self.data, '__next__')):
1022 1022 msg.append(' streamed payload')
1023 1023 else:
1024 1024 msg.append(' %i bytes payload' % len(self.data))
1025 1025 msg.append('\n')
1026 1026 ui.debug(''.join(msg))
1027 1027
1028 1028 #### header
1029 1029 if self.mandatory:
1030 1030 parttype = self.type.upper()
1031 1031 else:
1032 1032 parttype = self.type.lower()
1033 1033 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1034 1034 ## parttype
1035 1035 header = [_pack(_fparttypesize, len(parttype)),
1036 1036 parttype, _pack(_fpartid, self.id),
1037 1037 ]
1038 1038 ## parameters
1039 1039 # count
1040 1040 manpar = self.mandatoryparams
1041 1041 advpar = self.advisoryparams
1042 1042 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1043 1043 # size
1044 1044 parsizes = []
1045 1045 for key, value in manpar:
1046 1046 parsizes.append(len(key))
1047 1047 parsizes.append(len(value))
1048 1048 for key, value in advpar:
1049 1049 parsizes.append(len(key))
1050 1050 parsizes.append(len(value))
1051 1051 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1052 1052 header.append(paramsizes)
1053 1053 # key, value
1054 1054 for key, value in manpar:
1055 1055 header.append(key)
1056 1056 header.append(value)
1057 1057 for key, value in advpar:
1058 1058 header.append(key)
1059 1059 header.append(value)
1060 1060 ## finalize header
1061 1061 try:
1062 1062 headerchunk = ''.join(header)
1063 1063 except TypeError:
1064 1064 raise TypeError(r'Found a non-bytes trying to '
1065 1065 r'build bundle part header: %r' % header)
1066 1066 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1067 1067 yield _pack(_fpartheadersize, len(headerchunk))
1068 1068 yield headerchunk
1069 1069 ## payload
1070 1070 try:
1071 1071 for chunk in self._payloadchunks():
1072 1072 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1073 1073 yield _pack(_fpayloadsize, len(chunk))
1074 1074 yield chunk
1075 1075 except GeneratorExit:
1076 1076 # GeneratorExit means that nobody is listening for our
1077 1077 # results anyway, so just bail quickly rather than trying
1078 1078 # to produce an error part.
1079 1079 ui.debug('bundle2-generatorexit\n')
1080 1080 raise
1081 1081 except BaseException as exc:
1082 1082 bexc = util.forcebytestr(exc)
1083 1083 # backup exception data for later
1084 1084 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1085 1085 % bexc)
1086 1086 tb = sys.exc_info()[2]
1087 1087 msg = 'unexpected error: %s' % bexc
1088 1088 interpart = bundlepart('error:abort', [('message', msg)],
1089 1089 mandatory=False)
1090 1090 interpart.id = 0
1091 1091 yield _pack(_fpayloadsize, -1)
1092 1092 for chunk in interpart.getchunks(ui=ui):
1093 1093 yield chunk
1094 1094 outdebug(ui, 'closing payload chunk')
1095 1095 # abort current part payload
1096 1096 yield _pack(_fpayloadsize, 0)
1097 1097 pycompat.raisewithtb(exc, tb)
1098 1098 # end of payload
1099 1099 outdebug(ui, 'closing payload chunk')
1100 1100 yield _pack(_fpayloadsize, 0)
1101 1101 self._generated = True
1102 1102
1103 1103 def _payloadchunks(self):
1104 1104 """yield chunks of a the part payload
1105 1105
1106 1106 Exists to handle the different methods to provide data to a part."""
1107 1107 # we only support fixed size data now.
1108 1108 # This will be improved in the future.
1109 1109 if (util.safehasattr(self.data, 'next')
1110 1110 or util.safehasattr(self.data, '__next__')):
1111 1111 buff = util.chunkbuffer(self.data)
1112 1112 chunk = buff.read(preferedchunksize)
1113 1113 while chunk:
1114 1114 yield chunk
1115 1115 chunk = buff.read(preferedchunksize)
1116 1116 elif len(self.data):
1117 1117 yield self.data
1118 1118
1119 1119
1120 1120 flaginterrupt = -1
1121 1121
1122 1122 class interrupthandler(unpackermixin):
1123 1123 """read one part and process it with restricted capability
1124 1124
1125 1125 This allows to transmit exception raised on the producer size during part
1126 1126 iteration while the consumer is reading a part.
1127 1127
1128 1128 Part processed in this manner only have access to a ui object,"""
1129 1129
1130 1130 def __init__(self, ui, fp):
1131 1131 super(interrupthandler, self).__init__(fp)
1132 1132 self.ui = ui
1133 1133
1134 1134 def _readpartheader(self):
1135 1135 """reads a part header size and return the bytes blob
1136 1136
1137 1137 returns None if empty"""
1138 1138 headersize = self._unpack(_fpartheadersize)[0]
1139 1139 if headersize < 0:
1140 1140 raise error.BundleValueError('negative part header size: %i'
1141 1141 % headersize)
1142 1142 indebug(self.ui, 'part header size: %i\n' % headersize)
1143 1143 if headersize:
1144 1144 return self._readexact(headersize)
1145 1145 return None
1146 1146
1147 1147 def __call__(self):
1148 1148
1149 1149 self.ui.debug('bundle2-input-stream-interrupt:'
1150 1150 ' opening out of band context\n')
1151 1151 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1152 1152 headerblock = self._readpartheader()
1153 1153 if headerblock is None:
1154 1154 indebug(self.ui, 'no part found during interruption.')
1155 1155 return
1156 1156 part = unbundlepart(self.ui, headerblock, self._fp)
1157 1157 op = interruptoperation(self.ui)
1158 1158 hardabort = False
1159 1159 try:
1160 1160 _processpart(op, part)
1161 1161 except (SystemExit, KeyboardInterrupt):
1162 1162 hardabort = True
1163 1163 raise
1164 1164 finally:
1165 1165 if not hardabort:
1166 1166 part.seek(0, 2)
1167 1167 self.ui.debug('bundle2-input-stream-interrupt:'
1168 1168 ' closing out of band context\n')
1169 1169
1170 1170 class interruptoperation(object):
1171 1171 """A limited operation to be use by part handler during interruption
1172 1172
1173 1173 It only have access to an ui object.
1174 1174 """
1175 1175
1176 1176 def __init__(self, ui):
1177 1177 self.ui = ui
1178 1178 self.reply = None
1179 1179 self.captureoutput = False
1180 1180
1181 1181 @property
1182 1182 def repo(self):
1183 1183 raise error.ProgrammingError('no repo access from stream interruption')
1184 1184
1185 1185 def gettransaction(self):
1186 1186 raise TransactionUnavailable('no repo access from stream interruption')
1187 1187
1188 1188 class unbundlepart(unpackermixin):
1189 1189 """a bundle part read from a bundle"""
1190 1190
1191 1191 def __init__(self, ui, header, fp):
1192 1192 super(unbundlepart, self).__init__(fp)
1193 1193 self._seekable = (util.safehasattr(fp, 'seek') and
1194 1194 util.safehasattr(fp, 'tell'))
1195 1195 self.ui = ui
1196 1196 # unbundle state attr
1197 1197 self._headerdata = header
1198 1198 self._headeroffset = 0
1199 1199 self._initialized = False
1200 1200 self.consumed = False
1201 1201 # part data
1202 1202 self.id = None
1203 1203 self.type = None
1204 1204 self.mandatoryparams = None
1205 1205 self.advisoryparams = None
1206 1206 self.params = None
1207 1207 self.mandatorykeys = ()
1208 1208 self._payloadstream = None
1209 1209 self._readheader()
1210 1210 self._mandatory = None
1211 1211 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1212 1212 self._pos = 0
1213 1213
1214 1214 def _fromheader(self, size):
1215 1215 """return the next <size> byte from the header"""
1216 1216 offset = self._headeroffset
1217 1217 data = self._headerdata[offset:(offset + size)]
1218 1218 self._headeroffset = offset + size
1219 1219 return data
1220 1220
1221 1221 def _unpackheader(self, format):
1222 1222 """read given format from header
1223 1223
1224 1224 This automatically compute the size of the format to read."""
1225 1225 data = self._fromheader(struct.calcsize(format))
1226 1226 return _unpack(format, data)
1227 1227
1228 1228 def _initparams(self, mandatoryparams, advisoryparams):
1229 1229 """internal function to setup all logic related parameters"""
1230 1230 # make it read only to prevent people touching it by mistake.
1231 1231 self.mandatoryparams = tuple(mandatoryparams)
1232 1232 self.advisoryparams = tuple(advisoryparams)
1233 1233 # user friendly UI
1234 1234 self.params = util.sortdict(self.mandatoryparams)
1235 1235 self.params.update(self.advisoryparams)
1236 1236 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1237 1237
1238 1238 def _payloadchunks(self, chunknum=0):
1239 1239 '''seek to specified chunk and start yielding data'''
1240 1240 if len(self._chunkindex) == 0:
1241 1241 assert chunknum == 0, 'Must start with chunk 0'
1242 1242 self._chunkindex.append((0, self._tellfp()))
1243 1243 else:
1244 1244 assert chunknum < len(self._chunkindex), \
1245 1245 'Unknown chunk %d' % chunknum
1246 1246 self._seekfp(self._chunkindex[chunknum][1])
1247 1247
1248 1248 pos = self._chunkindex[chunknum][0]
1249 1249 payloadsize = self._unpack(_fpayloadsize)[0]
1250 1250 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1251 1251 while payloadsize:
1252 1252 if payloadsize == flaginterrupt:
1253 1253 # interruption detection, the handler will now read a
1254 1254 # single part and process it.
1255 1255 interrupthandler(self.ui, self._fp)()
1256 1256 elif payloadsize < 0:
1257 1257 msg = 'negative payload chunk size: %i' % payloadsize
1258 1258 raise error.BundleValueError(msg)
1259 1259 else:
1260 1260 result = self._readexact(payloadsize)
1261 1261 chunknum += 1
1262 1262 pos += payloadsize
1263 1263 if chunknum == len(self._chunkindex):
1264 1264 self._chunkindex.append((pos, self._tellfp()))
1265 1265 yield result
1266 1266 payloadsize = self._unpack(_fpayloadsize)[0]
1267 1267 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1268 1268
1269 1269 def _findchunk(self, pos):
1270 1270 '''for a given payload position, return a chunk number and offset'''
1271 1271 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1272 1272 if ppos == pos:
1273 1273 return chunk, 0
1274 1274 elif ppos > pos:
1275 1275 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1276 1276 raise ValueError('Unknown chunk')
1277 1277
1278 1278 def _readheader(self):
1279 1279 """read the header and setup the object"""
1280 1280 typesize = self._unpackheader(_fparttypesize)[0]
1281 1281 self.type = self._fromheader(typesize)
1282 1282 indebug(self.ui, 'part type: "%s"' % self.type)
1283 1283 self.id = self._unpackheader(_fpartid)[0]
1284 1284 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1285 1285 # extract mandatory bit from type
1286 1286 self.mandatory = (self.type != self.type.lower())
1287 1287 self.type = self.type.lower()
1288 1288 ## reading parameters
1289 1289 # param count
1290 1290 mancount, advcount = self._unpackheader(_fpartparamcount)
1291 1291 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1292 1292 # param size
1293 1293 fparamsizes = _makefpartparamsizes(mancount + advcount)
1294 1294 paramsizes = self._unpackheader(fparamsizes)
1295 1295 # make it a list of couple again
1296 1296 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1297 1297 # split mandatory from advisory
1298 1298 mansizes = paramsizes[:mancount]
1299 1299 advsizes = paramsizes[mancount:]
1300 1300 # retrieve param value
1301 1301 manparams = []
1302 1302 for key, value in mansizes:
1303 1303 manparams.append((self._fromheader(key), self._fromheader(value)))
1304 1304 advparams = []
1305 1305 for key, value in advsizes:
1306 1306 advparams.append((self._fromheader(key), self._fromheader(value)))
1307 1307 self._initparams(manparams, advparams)
1308 1308 ## part payload
1309 1309 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1310 1310 # we read the data, tell it
1311 1311 self._initialized = True
1312 1312
1313 1313 def read(self, size=None):
1314 1314 """read payload data"""
1315 1315 if not self._initialized:
1316 1316 self._readheader()
1317 1317 if size is None:
1318 1318 data = self._payloadstream.read()
1319 1319 else:
1320 1320 data = self._payloadstream.read(size)
1321 1321 self._pos += len(data)
1322 1322 if size is None or len(data) < size:
1323 1323 if not self.consumed and self._pos:
1324 1324 self.ui.debug('bundle2-input-part: total payload size %i\n'
1325 1325 % self._pos)
1326 1326 self.consumed = True
1327 1327 return data
1328 1328
1329 1329 def tell(self):
1330 1330 return self._pos
1331 1331
1332 1332 def seek(self, offset, whence=0):
1333 1333 if whence == 0:
1334 1334 newpos = offset
1335 1335 elif whence == 1:
1336 1336 newpos = self._pos + offset
1337 1337 elif whence == 2:
1338 1338 if not self.consumed:
1339 1339 self.read()
1340 1340 newpos = self._chunkindex[-1][0] - offset
1341 1341 else:
1342 1342 raise ValueError('Unknown whence value: %r' % (whence,))
1343 1343
1344 1344 if newpos > self._chunkindex[-1][0] and not self.consumed:
1345 1345 self.read()
1346 1346 if not 0 <= newpos <= self._chunkindex[-1][0]:
1347 1347 raise ValueError('Offset out of range')
1348 1348
1349 1349 if self._pos != newpos:
1350 1350 chunk, internaloffset = self._findchunk(newpos)
1351 1351 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1352 1352 adjust = self.read(internaloffset)
1353 1353 if len(adjust) != internaloffset:
1354 1354 raise error.Abort(_('Seek failed\n'))
1355 1355 self._pos = newpos
1356 1356
1357 1357 def _seekfp(self, offset, whence=0):
1358 1358 """move the underlying file pointer
1359 1359
1360 1360 This method is meant for internal usage by the bundle2 protocol only.
1361 1361 They directly manipulate the low level stream including bundle2 level
1362 1362 instruction.
1363 1363
1364 1364 Do not use it to implement higher-level logic or methods."""
1365 1365 if self._seekable:
1366 1366 return self._fp.seek(offset, whence)
1367 1367 else:
1368 1368 raise NotImplementedError(_('File pointer is not seekable'))
1369 1369
1370 1370 def _tellfp(self):
1371 1371 """return the file offset, or None if file is not seekable
1372 1372
1373 1373 This method is meant for internal usage by the bundle2 protocol only.
1374 1374 They directly manipulate the low level stream including bundle2 level
1375 1375 instruction.
1376 1376
1377 1377 Do not use it to implement higher-level logic or methods."""
1378 1378 if self._seekable:
1379 1379 try:
1380 1380 return self._fp.tell()
1381 1381 except IOError as e:
1382 1382 if e.errno == errno.ESPIPE:
1383 1383 self._seekable = False
1384 1384 else:
1385 1385 raise
1386 1386 return None
1387 1387
1388 1388 # These are only the static capabilities.
1389 1389 # Check the 'getrepocaps' function for the rest.
1390 1390 capabilities = {'HG20': (),
1391 1391 'error': ('abort', 'unsupportedcontent', 'pushraced',
1392 1392 'pushkey'),
1393 1393 'listkeys': (),
1394 1394 'pushkey': (),
1395 1395 'digests': tuple(sorted(util.DIGESTS.keys())),
1396 1396 'remote-changegroup': ('http', 'https'),
1397 1397 'hgtagsfnodes': (),
1398 1398 'phases': ('heads',),
1399 1399 }
1400 1400
1401 1401 def getrepocaps(repo, allowpushback=False):
1402 1402 """return the bundle2 capabilities for a given repo
1403 1403
1404 1404 Exists to allow extensions (like evolution) to mutate the capabilities.
1405 1405 """
1406 1406 caps = capabilities.copy()
1407 1407 caps['changegroup'] = tuple(sorted(
1408 1408 changegroup.supportedincomingversions(repo)))
1409 1409 if obsolete.isenabled(repo, obsolete.exchangeopt):
1410 1410 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1411 1411 caps['obsmarkers'] = supportedformat
1412 1412 if allowpushback:
1413 1413 caps['pushback'] = ()
1414 1414 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1415 1415 if cpmode == 'check-related':
1416 1416 caps['checkheads'] = ('related',)
1417 1417 if 'phases' in repo.ui.configlist('devel', 'legacy.exchange'):
1418 1418 caps.pop('phases')
1419 1419 return caps
1420 1420
1421 1421 def bundle2caps(remote):
1422 1422 """return the bundle capabilities of a peer as dict"""
1423 1423 raw = remote.capable('bundle2')
1424 1424 if not raw and raw != '':
1425 1425 return {}
1426 1426 capsblob = urlreq.unquote(remote.capable('bundle2'))
1427 1427 return decodecaps(capsblob)
1428 1428
1429 1429 def obsmarkersversion(caps):
1430 1430 """extract the list of supported obsmarkers versions from a bundle2caps dict
1431 1431 """
1432 1432 obscaps = caps.get('obsmarkers', ())
1433 1433 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1434 1434
1435 1435 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1436 1436 vfs=None, compression=None, compopts=None):
1437 1437 if bundletype.startswith('HG10'):
1438 1438 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1439 1439 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1440 1440 compression=compression, compopts=compopts)
1441 1441 elif not bundletype.startswith('HG20'):
1442 1442 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1443 1443
1444 1444 caps = {}
1445 1445 if 'obsolescence' in opts:
1446 1446 caps['obsmarkers'] = ('V1',)
1447 1447 bundle = bundle20(ui, caps)
1448 1448 bundle.setcompression(compression, compopts)
1449 1449 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1450 1450 chunkiter = bundle.getchunks()
1451 1451
1452 1452 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1453 1453
1454 1454 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1455 1455 # We should eventually reconcile this logic with the one behind
1456 1456 # 'exchange.getbundle2partsgenerator'.
1457 1457 #
1458 1458 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1459 1459 # different right now. So we keep them separated for now for the sake of
1460 1460 # simplicity.
1461 1461
1462 1462 # we always want a changegroup in such bundle
1463 1463 cgversion = opts.get('cg.version')
1464 1464 if cgversion is None:
1465 1465 cgversion = changegroup.safeversion(repo)
1466 1466 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1467 1467 part = bundler.newpart('changegroup', data=cg.getchunks())
1468 1468 part.addparam('version', cg.version)
1469 1469 if 'clcount' in cg.extras:
1470 1470 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1471 1471 mandatory=False)
1472 1472 if opts.get('phases') and repo.revs('%ln and secret()',
1473 1473 outgoing.missingheads):
1474 1474 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1475 1475
1476 1476 addparttagsfnodescache(repo, bundler, outgoing)
1477 1477
1478 1478 if opts.get('obsolescence', False):
1479 1479 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1480 1480 buildobsmarkerspart(bundler, obsmarkers)
1481 1481
1482 1482 if opts.get('phases', False):
1483 1483 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1484 1484 phasedata = phases.binaryencode(headsbyphase)
1485 1485 bundler.newpart('phase-heads', data=phasedata)
1486 1486
1487 1487 def addparttagsfnodescache(repo, bundler, outgoing):
1488 1488 # we include the tags fnode cache for the bundle changeset
1489 1489 # (as an optional parts)
1490 1490 cache = tags.hgtagsfnodescache(repo.unfiltered())
1491 1491 chunks = []
1492 1492
1493 1493 # .hgtags fnodes are only relevant for head changesets. While we could
1494 1494 # transfer values for all known nodes, there will likely be little to
1495 1495 # no benefit.
1496 1496 #
1497 1497 # We don't bother using a generator to produce output data because
1498 1498 # a) we only have 40 bytes per head and even esoteric numbers of heads
1499 1499 # consume little memory (1M heads is 40MB) b) we don't want to send the
1500 1500 # part if we don't have entries and knowing if we have entries requires
1501 1501 # cache lookups.
1502 1502 for node in outgoing.missingheads:
1503 1503 # Don't compute missing, as this may slow down serving.
1504 1504 fnode = cache.getfnode(node, computemissing=False)
1505 1505 if fnode is not None:
1506 1506 chunks.extend([node, fnode])
1507 1507
1508 1508 if chunks:
1509 1509 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1510 1510
1511 1511 def buildobsmarkerspart(bundler, markers):
1512 1512 """add an obsmarker part to the bundler with <markers>
1513 1513
1514 1514 No part is created if markers is empty.
1515 1515 Raises ValueError if the bundler doesn't support any known obsmarker format.
1516 1516 """
1517 1517 if not markers:
1518 1518 return None
1519 1519
1520 1520 remoteversions = obsmarkersversion(bundler.capabilities)
1521 1521 version = obsolete.commonversion(remoteversions)
1522 1522 if version is None:
1523 1523 raise ValueError('bundler does not support common obsmarker format')
1524 1524 stream = obsolete.encodemarkers(markers, True, version=version)
1525 1525 return bundler.newpart('obsmarkers', data=stream)
1526 1526
1527 1527 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1528 1528 compopts=None):
1529 1529 """Write a bundle file and return its filename.
1530 1530
1531 1531 Existing files will not be overwritten.
1532 1532 If no filename is specified, a temporary file is created.
1533 1533 bz2 compression can be turned off.
1534 1534 The bundle file will be deleted in case of errors.
1535 1535 """
1536 1536
1537 1537 if bundletype == "HG20":
1538 1538 bundle = bundle20(ui)
1539 1539 bundle.setcompression(compression, compopts)
1540 1540 part = bundle.newpart('changegroup', data=cg.getchunks())
1541 1541 part.addparam('version', cg.version)
1542 1542 if 'clcount' in cg.extras:
1543 1543 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1544 1544 mandatory=False)
1545 1545 chunkiter = bundle.getchunks()
1546 1546 else:
1547 1547 # compression argument is only for the bundle2 case
1548 1548 assert compression is None
1549 1549 if cg.version != '01':
1550 1550 raise error.Abort(_('old bundle types only supports v1 '
1551 1551 'changegroups'))
1552 1552 header, comp = bundletypes[bundletype]
1553 1553 if comp not in util.compengines.supportedbundletypes:
1554 1554 raise error.Abort(_('unknown stream compression type: %s')
1555 1555 % comp)
1556 1556 compengine = util.compengines.forbundletype(comp)
1557 1557 def chunkiter():
1558 1558 yield header
1559 1559 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1560 1560 yield chunk
1561 1561 chunkiter = chunkiter()
1562 1562
1563 1563 # parse the changegroup data, otherwise we will block
1564 1564 # in case of sshrepo because we don't know the end of the stream
1565 1565 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1566 1566
1567 1567 def combinechangegroupresults(op):
1568 1568 """logic to combine 0 or more addchangegroup results into one"""
1569 1569 results = [r.get('return', 0)
1570 1570 for r in op.records['changegroup']]
1571 1571 changedheads = 0
1572 1572 result = 1
1573 1573 for ret in results:
1574 1574 # If any changegroup result is 0, return 0
1575 1575 if ret == 0:
1576 1576 result = 0
1577 1577 break
1578 1578 if ret < -1:
1579 1579 changedheads += ret + 1
1580 1580 elif ret > 1:
1581 1581 changedheads += ret - 1
1582 1582 if changedheads > 0:
1583 1583 result = 1 + changedheads
1584 1584 elif changedheads < 0:
1585 1585 result = -1 + changedheads
1586 1586 return result
1587 1587
1588 1588 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1589 1589 'targetphase'))
1590 1590 def handlechangegroup(op, inpart):
1591 1591 """apply a changegroup part on the repo
1592 1592
1593 1593 This is a very early implementation that will massive rework before being
1594 1594 inflicted to any end-user.
1595 1595 """
1596 1596 tr = op.gettransaction()
1597 1597 unpackerversion = inpart.params.get('version', '01')
1598 1598 # We should raise an appropriate exception here
1599 1599 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1600 1600 # the source and url passed here are overwritten by the one contained in
1601 1601 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1602 1602 nbchangesets = None
1603 1603 if 'nbchanges' in inpart.params:
1604 1604 nbchangesets = int(inpart.params.get('nbchanges'))
1605 1605 if ('treemanifest' in inpart.params and
1606 1606 'treemanifest' not in op.repo.requirements):
1607 1607 if len(op.repo.changelog) != 0:
1608 1608 raise error.Abort(_(
1609 1609 "bundle contains tree manifests, but local repo is "
1610 1610 "non-empty and does not use tree manifests"))
1611 1611 op.repo.requirements.add('treemanifest')
1612 1612 op.repo._applyopenerreqs()
1613 1613 op.repo._writerequirements()
1614 1614 extrakwargs = {}
1615 1615 targetphase = inpart.params.get('targetphase')
1616 1616 if targetphase is not None:
1617 1617 extrakwargs['targetphase'] = int(targetphase)
1618 1618 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1619 1619 expectedtotal=nbchangesets, **extrakwargs)
1620 1620 if op.reply is not None:
1621 1621 # This is definitely not the final form of this
1622 1622 # return. But one need to start somewhere.
1623 1623 part = op.reply.newpart('reply:changegroup', mandatory=False)
1624 1624 part.addparam(
1625 1625 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1626 1626 part.addparam('return', '%i' % ret, mandatory=False)
1627 1627 assert not inpart.read()
1628 1628
1629 1629 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1630 1630 ['digest:%s' % k for k in util.DIGESTS.keys()])
1631 1631 @parthandler('remote-changegroup', _remotechangegroupparams)
1632 1632 def handleremotechangegroup(op, inpart):
1633 1633 """apply a bundle10 on the repo, given an url and validation information
1634 1634
1635 1635 All the information about the remote bundle to import are given as
1636 1636 parameters. The parameters include:
1637 1637 - url: the url to the bundle10.
1638 1638 - size: the bundle10 file size. It is used to validate what was
1639 1639 retrieved by the client matches the server knowledge about the bundle.
1640 1640 - digests: a space separated list of the digest types provided as
1641 1641 parameters.
1642 1642 - digest:<digest-type>: the hexadecimal representation of the digest with
1643 1643 that name. Like the size, it is used to validate what was retrieved by
1644 1644 the client matches what the server knows about the bundle.
1645 1645
1646 1646 When multiple digest types are given, all of them are checked.
1647 1647 """
1648 1648 try:
1649 1649 raw_url = inpart.params['url']
1650 1650 except KeyError:
1651 1651 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1652 1652 parsed_url = util.url(raw_url)
1653 1653 if parsed_url.scheme not in capabilities['remote-changegroup']:
1654 1654 raise error.Abort(_('remote-changegroup does not support %s urls') %
1655 1655 parsed_url.scheme)
1656 1656
1657 1657 try:
1658 1658 size = int(inpart.params['size'])
1659 1659 except ValueError:
1660 1660 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1661 1661 % 'size')
1662 1662 except KeyError:
1663 1663 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1664 1664
1665 1665 digests = {}
1666 1666 for typ in inpart.params.get('digests', '').split():
1667 1667 param = 'digest:%s' % typ
1668 1668 try:
1669 1669 value = inpart.params[param]
1670 1670 except KeyError:
1671 1671 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1672 1672 param)
1673 1673 digests[typ] = value
1674 1674
1675 1675 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1676 1676
1677 1677 tr = op.gettransaction()
1678 1678 from . import exchange
1679 1679 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1680 1680 if not isinstance(cg, changegroup.cg1unpacker):
1681 1681 raise error.Abort(_('%s: not a bundle version 1.0') %
1682 1682 util.hidepassword(raw_url))
1683 1683 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1684 1684 if op.reply is not None:
1685 1685 # This is definitely not the final form of this
1686 1686 # return. But one need to start somewhere.
1687 1687 part = op.reply.newpart('reply:changegroup')
1688 1688 part.addparam(
1689 1689 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1690 1690 part.addparam('return', '%i' % ret, mandatory=False)
1691 1691 try:
1692 1692 real_part.validate()
1693 1693 except error.Abort as e:
1694 1694 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1695 1695 (util.hidepassword(raw_url), str(e)))
1696 1696 assert not inpart.read()
1697 1697
1698 1698 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1699 1699 def handlereplychangegroup(op, inpart):
1700 1700 ret = int(inpart.params['return'])
1701 1701 replyto = int(inpart.params['in-reply-to'])
1702 1702 op.records.add('changegroup', {'return': ret}, replyto)
1703 1703
1704 1704 @parthandler('check:heads')
1705 1705 def handlecheckheads(op, inpart):
1706 1706 """check that head of the repo did not change
1707 1707
1708 1708 This is used to detect a push race when using unbundle.
1709 1709 This replaces the "heads" argument of unbundle."""
1710 1710 h = inpart.read(20)
1711 1711 heads = []
1712 1712 while len(h) == 20:
1713 1713 heads.append(h)
1714 1714 h = inpart.read(20)
1715 1715 assert not h
1716 1716 # Trigger a transaction so that we are guaranteed to have the lock now.
1717 1717 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1718 1718 op.gettransaction()
1719 1719 if sorted(heads) != sorted(op.repo.heads()):
1720 1720 raise error.PushRaced('repository changed while pushing - '
1721 1721 'please try again')
1722 1722
1723 1723 @parthandler('check:updated-heads')
1724 1724 def handlecheckupdatedheads(op, inpart):
1725 1725 """check for race on the heads touched by a push
1726 1726
1727 1727 This is similar to 'check:heads' but focus on the heads actually updated
1728 1728 during the push. If other activities happen on unrelated heads, it is
1729 1729 ignored.
1730 1730
1731 1731 This allow server with high traffic to avoid push contention as long as
1732 1732 unrelated parts of the graph are involved."""
1733 1733 h = inpart.read(20)
1734 1734 heads = []
1735 1735 while len(h) == 20:
1736 1736 heads.append(h)
1737 1737 h = inpart.read(20)
1738 1738 assert not h
1739 1739 # trigger a transaction so that we are guaranteed to have the lock now.
1740 1740 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1741 1741 op.gettransaction()
1742 1742
1743 1743 currentheads = set()
1744 1744 for ls in op.repo.branchmap().itervalues():
1745 1745 currentheads.update(ls)
1746 1746
1747 1747 for h in heads:
1748 1748 if h not in currentheads:
1749 1749 raise error.PushRaced('repository changed while pushing - '
1750 1750 'please try again')
1751 1751
1752 1752 @parthandler('output')
1753 1753 def handleoutput(op, inpart):
1754 1754 """forward output captured on the server to the client"""
1755 1755 for line in inpart.read().splitlines():
1756 1756 op.ui.status(_('remote: %s\n') % line)
1757 1757
1758 1758 @parthandler('replycaps')
1759 1759 def handlereplycaps(op, inpart):
1760 1760 """Notify that a reply bundle should be created
1761 1761
1762 1762 The payload contains the capabilities information for the reply"""
1763 1763 caps = decodecaps(inpart.read())
1764 1764 if op.reply is None:
1765 1765 op.reply = bundle20(op.ui, caps)
1766 1766
1767 1767 class AbortFromPart(error.Abort):
1768 1768 """Sub-class of Abort that denotes an error from a bundle2 part."""
1769 1769
1770 1770 @parthandler('error:abort', ('message', 'hint'))
1771 1771 def handleerrorabort(op, inpart):
1772 1772 """Used to transmit abort error over the wire"""
1773 1773 raise AbortFromPart(inpart.params['message'],
1774 1774 hint=inpart.params.get('hint'))
1775 1775
1776 1776 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1777 1777 'in-reply-to'))
1778 1778 def handleerrorpushkey(op, inpart):
1779 1779 """Used to transmit failure of a mandatory pushkey over the wire"""
1780 1780 kwargs = {}
1781 1781 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1782 1782 value = inpart.params.get(name)
1783 1783 if value is not None:
1784 1784 kwargs[name] = value
1785 1785 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1786 1786
1787 1787 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1788 1788 def handleerrorunsupportedcontent(op, inpart):
1789 1789 """Used to transmit unknown content error over the wire"""
1790 1790 kwargs = {}
1791 1791 parttype = inpart.params.get('parttype')
1792 1792 if parttype is not None:
1793 1793 kwargs['parttype'] = parttype
1794 1794 params = inpart.params.get('params')
1795 1795 if params is not None:
1796 1796 kwargs['params'] = params.split('\0')
1797 1797
1798 1798 raise error.BundleUnknownFeatureError(**kwargs)
1799 1799
1800 1800 @parthandler('error:pushraced', ('message',))
1801 1801 def handleerrorpushraced(op, inpart):
1802 1802 """Used to transmit push race error over the wire"""
1803 1803 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1804 1804
1805 1805 @parthandler('listkeys', ('namespace',))
1806 1806 def handlelistkeys(op, inpart):
1807 1807 """retrieve pushkey namespace content stored in a bundle2"""
1808 1808 namespace = inpart.params['namespace']
1809 1809 r = pushkey.decodekeys(inpart.read())
1810 1810 op.records.add('listkeys', (namespace, r))
1811 1811
1812 1812 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1813 1813 def handlepushkey(op, inpart):
1814 1814 """process a pushkey request"""
1815 1815 dec = pushkey.decode
1816 1816 namespace = dec(inpart.params['namespace'])
1817 1817 key = dec(inpart.params['key'])
1818 1818 old = dec(inpart.params['old'])
1819 1819 new = dec(inpart.params['new'])
1820 1820 # Grab the transaction to ensure that we have the lock before performing the
1821 1821 # pushkey.
1822 1822 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1823 1823 op.gettransaction()
1824 1824 ret = op.repo.pushkey(namespace, key, old, new)
1825 1825 record = {'namespace': namespace,
1826 1826 'key': key,
1827 1827 'old': old,
1828 1828 'new': new}
1829 1829 op.records.add('pushkey', record)
1830 1830 if op.reply is not None:
1831 1831 rpart = op.reply.newpart('reply:pushkey')
1832 1832 rpart.addparam(
1833 1833 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1834 1834 rpart.addparam('return', '%i' % ret, mandatory=False)
1835 1835 if inpart.mandatory and not ret:
1836 1836 kwargs = {}
1837 1837 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1838 1838 if key in inpart.params:
1839 1839 kwargs[key] = inpart.params[key]
1840 1840 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1841 1841
1842 1842 @parthandler('phase-heads')
1843 1843 def handlephases(op, inpart):
1844 1844 """apply phases from bundle part to repo"""
1845 1845 headsbyphase = phases.binarydecode(inpart)
1846 1846 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
1847 1847
1848 1848 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1849 1849 def handlepushkeyreply(op, inpart):
1850 1850 """retrieve the result of a pushkey request"""
1851 1851 ret = int(inpart.params['return'])
1852 1852 partid = int(inpart.params['in-reply-to'])
1853 1853 op.records.add('pushkey', {'return': ret}, partid)
1854 1854
1855 1855 @parthandler('obsmarkers')
1856 1856 def handleobsmarker(op, inpart):
1857 1857 """add a stream of obsmarkers to the repo"""
1858 1858 tr = op.gettransaction()
1859 1859 markerdata = inpart.read()
1860 1860 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1861 1861 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1862 1862 % len(markerdata))
1863 1863 # The mergemarkers call will crash if marker creation is not enabled.
1864 1864 # we want to avoid this if the part is advisory.
1865 1865 if not inpart.mandatory and op.repo.obsstore.readonly:
1866 1866 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1867 1867 return
1868 1868 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1869 1869 op.repo.invalidatevolatilesets()
1870 1870 if new:
1871 1871 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1872 1872 op.records.add('obsmarkers', {'new': new})
1873 1873 if op.reply is not None:
1874 1874 rpart = op.reply.newpart('reply:obsmarkers')
1875 1875 rpart.addparam(
1876 1876 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1877 1877 rpart.addparam('new', '%i' % new, mandatory=False)
1878 1878
1879 1879
1880 1880 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1881 1881 def handleobsmarkerreply(op, inpart):
1882 1882 """retrieve the result of a pushkey request"""
1883 1883 ret = int(inpart.params['new'])
1884 1884 partid = int(inpart.params['in-reply-to'])
1885 1885 op.records.add('obsmarkers', {'new': ret}, partid)
1886 1886
1887 1887 @parthandler('hgtagsfnodes')
1888 1888 def handlehgtagsfnodes(op, inpart):
1889 1889 """Applies .hgtags fnodes cache entries to the local repo.
1890 1890
1891 1891 Payload is pairs of 20 byte changeset nodes and filenodes.
1892 1892 """
1893 1893 # Grab the transaction so we ensure that we have the lock at this point.
1894 1894 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1895 1895 op.gettransaction()
1896 1896 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1897 1897
1898 1898 count = 0
1899 1899 while True:
1900 1900 node = inpart.read(20)
1901 1901 fnode = inpart.read(20)
1902 1902 if len(node) < 20 or len(fnode) < 20:
1903 1903 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1904 1904 break
1905 1905 cache.setfnode(node, fnode)
1906 1906 count += 1
1907 1907
1908 1908 cache.write()
1909 1909 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1910 1910
1911 1911 @parthandler('pushvars')
1912 1912 def bundle2getvars(op, part):
1913 1913 '''unbundle a bundle2 containing shellvars on the server'''
1914 1914 # An option to disable unbundling on server-side for security reasons
1915 1915 if op.ui.configbool('push', 'pushvars.server'):
1916 1916 hookargs = {}
1917 1917 for key, value in part.advisoryparams:
1918 1918 key = key.upper()
1919 1919 # We want pushed variables to have USERVAR_ prepended so we know
1920 1920 # they came from the --pushvar flag.
1921 1921 key = "USERVAR_" + key
1922 1922 hookargs[key] = value
1923 1923 op.addhookargs(hookargs)
General Comments 0
You need to be logged in to leave comments. Login now