##// END OF EJS Templates
bundle2: move part processing to a separate function...
Durham Goode -
r34262:f010097c default
parent child Browse files
Show More
@@ -1,1935 +1,1938 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import, division
149 149
150 150 import errno
151 151 import re
152 152 import string
153 153 import struct
154 154 import sys
155 155
156 156 from .i18n import _
157 157 from . import (
158 158 changegroup,
159 159 error,
160 160 obsolete,
161 161 phases,
162 162 pushkey,
163 163 pycompat,
164 164 tags,
165 165 url,
166 166 util,
167 167 )
168 168
169 169 urlerr = util.urlerr
170 170 urlreq = util.urlreq
171 171
172 172 _pack = struct.pack
173 173 _unpack = struct.unpack
174 174
175 175 _fstreamparamsize = '>i'
176 176 _fpartheadersize = '>i'
177 177 _fparttypesize = '>B'
178 178 _fpartid = '>I'
179 179 _fpayloadsize = '>i'
180 180 _fpartparamcount = '>BB'
181 181
182 182 _fphasesentry = '>i20s'
183 183
184 184 preferedchunksize = 4096
185 185
186 186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
187 187
188 188 def outdebug(ui, message):
189 189 """debug regarding output stream (bundling)"""
190 190 if ui.configbool('devel', 'bundle2.debug'):
191 191 ui.debug('bundle2-output: %s\n' % message)
192 192
193 193 def indebug(ui, message):
194 194 """debug on input stream (unbundling)"""
195 195 if ui.configbool('devel', 'bundle2.debug'):
196 196 ui.debug('bundle2-input: %s\n' % message)
197 197
198 198 def validateparttype(parttype):
199 199 """raise ValueError if a parttype contains invalid character"""
200 200 if _parttypeforbidden.search(parttype):
201 201 raise ValueError(parttype)
202 202
203 203 def _makefpartparamsizes(nbparams):
204 204 """return a struct format to read part parameter sizes
205 205
206 206 The number parameters is variable so we need to build that format
207 207 dynamically.
208 208 """
209 209 return '>'+('BB'*nbparams)
210 210
211 211 parthandlermapping = {}
212 212
213 213 def parthandler(parttype, params=()):
214 214 """decorator that register a function as a bundle2 part handler
215 215
216 216 eg::
217 217
218 218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
219 219 def myparttypehandler(...):
220 220 '''process a part of type "my part".'''
221 221 ...
222 222 """
223 223 validateparttype(parttype)
224 224 def _decorator(func):
225 225 lparttype = parttype.lower() # enforce lower case matching.
226 226 assert lparttype not in parthandlermapping
227 227 parthandlermapping[lparttype] = func
228 228 func.params = frozenset(params)
229 229 return func
230 230 return _decorator
231 231
232 232 class unbundlerecords(object):
233 233 """keep record of what happens during and unbundle
234 234
235 235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
236 236 category of record and obj is an arbitrary object.
237 237
238 238 `records['cat']` will return all entries of this category 'cat'.
239 239
240 240 Iterating on the object itself will yield `('category', obj)` tuples
241 241 for all entries.
242 242
243 243 All iterations happens in chronological order.
244 244 """
245 245
246 246 def __init__(self):
247 247 self._categories = {}
248 248 self._sequences = []
249 249 self._replies = {}
250 250
251 251 def add(self, category, entry, inreplyto=None):
252 252 """add a new record of a given category.
253 253
254 254 The entry can then be retrieved in the list returned by
255 255 self['category']."""
256 256 self._categories.setdefault(category, []).append(entry)
257 257 self._sequences.append((category, entry))
258 258 if inreplyto is not None:
259 259 self.getreplies(inreplyto).add(category, entry)
260 260
261 261 def getreplies(self, partid):
262 262 """get the records that are replies to a specific part"""
263 263 return self._replies.setdefault(partid, unbundlerecords())
264 264
265 265 def __getitem__(self, cat):
266 266 return tuple(self._categories.get(cat, ()))
267 267
268 268 def __iter__(self):
269 269 return iter(self._sequences)
270 270
271 271 def __len__(self):
272 272 return len(self._sequences)
273 273
274 274 def __nonzero__(self):
275 275 return bool(self._sequences)
276 276
277 277 __bool__ = __nonzero__
278 278
279 279 class bundleoperation(object):
280 280 """an object that represents a single bundling process
281 281
282 282 Its purpose is to carry unbundle-related objects and states.
283 283
284 284 A new object should be created at the beginning of each bundle processing.
285 285 The object is to be returned by the processing function.
286 286
287 287 The object has very little content now it will ultimately contain:
288 288 * an access to the repo the bundle is applied to,
289 289 * a ui object,
290 290 * a way to retrieve a transaction to add changes to the repo,
291 291 * a way to record the result of processing each part,
292 292 * a way to construct a bundle response when applicable.
293 293 """
294 294
295 295 def __init__(self, repo, transactiongetter, captureoutput=True):
296 296 self.repo = repo
297 297 self.ui = repo.ui
298 298 self.records = unbundlerecords()
299 299 self.reply = None
300 300 self.captureoutput = captureoutput
301 301 self.hookargs = {}
302 302 self._gettransaction = transactiongetter
303 303
304 304 def gettransaction(self):
305 305 transaction = self._gettransaction()
306 306
307 307 if self.hookargs:
308 308 # the ones added to the transaction supercede those added
309 309 # to the operation.
310 310 self.hookargs.update(transaction.hookargs)
311 311 transaction.hookargs = self.hookargs
312 312
313 313 # mark the hookargs as flushed. further attempts to add to
314 314 # hookargs will result in an abort.
315 315 self.hookargs = None
316 316
317 317 return transaction
318 318
319 319 def addhookargs(self, hookargs):
320 320 if self.hookargs is None:
321 321 raise error.ProgrammingError('attempted to add hookargs to '
322 322 'operation after transaction started')
323 323 self.hookargs.update(hookargs)
324 324
325 325 class TransactionUnavailable(RuntimeError):
326 326 pass
327 327
328 328 def _notransaction():
329 329 """default method to get a transaction while processing a bundle
330 330
331 331 Raise an exception to highlight the fact that no transaction was expected
332 332 to be created"""
333 333 raise TransactionUnavailable()
334 334
335 335 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
336 336 # transform me into unbundler.apply() as soon as the freeze is lifted
337 337 if isinstance(unbundler, unbundle20):
338 338 tr.hookargs['bundle2'] = '1'
339 339 if source is not None and 'source' not in tr.hookargs:
340 340 tr.hookargs['source'] = source
341 341 if url is not None and 'url' not in tr.hookargs:
342 342 tr.hookargs['url'] = url
343 343 return processbundle(repo, unbundler, lambda: tr)
344 344 else:
345 345 # the transactiongetter won't be used, but we might as well set it
346 346 op = bundleoperation(repo, lambda: tr)
347 347 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
348 348 return op
349 349
350 350 class partiterator(object):
351 351 def __init__(self, repo, op, unbundler):
352 352 self.repo = repo
353 353 self.op = op
354 354 self.unbundler = unbundler
355 355 self.iterator = None
356 356 self.count = 0
357 357 self.current = None
358 358
359 359 def __enter__(self):
360 360 def func():
361 361 itr = enumerate(self.unbundler.iterparts())
362 362 for count, p in itr:
363 363 self.count = count
364 364 self.current = p
365 365 yield p
366 366 p.seek(0, 2)
367 367 self.current = None
368 368 self.iterator = func()
369 369 return self.iterator
370 370
371 371 def __exit__(self, type, exc, tb):
372 372 if not self.iterator:
373 373 return
374 374
375 375 if exc:
376 376 # If exiting or interrupted, do not attempt to seek the stream in
377 377 # the finally block below. This makes abort faster.
378 378 if (self.current and
379 379 not isinstance(exc, (SystemExit, KeyboardInterrupt))):
380 380 # consume the part content to not corrupt the stream.
381 381 self.current.seek(0, 2)
382 382
383 383 # Any exceptions seeking to the end of the bundle at this point are
384 384 # almost certainly related to the underlying stream being bad.
385 385 # And, chances are that the exception we're handling is related to
386 386 # getting in that bad state. So, we swallow the seeking error and
387 387 # re-raise the original error.
388 388 seekerror = False
389 389 try:
390 390 for part in self.iterator:
391 391 # consume the bundle content
392 392 part.seek(0, 2)
393 393 except Exception:
394 394 seekerror = True
395 395
396 396 # Small hack to let caller code distinguish exceptions from bundle2
397 397 # processing from processing the old format. This is mostly needed
398 398 # to handle different return codes to unbundle according to the type
399 399 # of bundle. We should probably clean up or drop this return code
400 400 # craziness in a future version.
401 401 exc.duringunbundle2 = True
402 402 salvaged = []
403 403 replycaps = None
404 404 if self.op.reply is not None:
405 405 salvaged = self.op.reply.salvageoutput()
406 406 replycaps = self.op.reply.capabilities
407 407 exc._replycaps = replycaps
408 408 exc._bundle2salvagedoutput = salvaged
409 409
410 410 # Re-raising from a variable loses the original stack. So only use
411 411 # that form if we need to.
412 412 if seekerror:
413 413 raise exc
414 414
415 415 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
416 416 self.count)
417 417
418 418 def processbundle(repo, unbundler, transactiongetter=None, op=None):
419 419 """This function process a bundle, apply effect to/from a repo
420 420
421 421 It iterates over each part then searches for and uses the proper handling
422 422 code to process the part. Parts are processed in order.
423 423
424 424 Unknown Mandatory part will abort the process.
425 425
426 426 It is temporarily possible to provide a prebuilt bundleoperation to the
427 427 function. This is used to ensure output is properly propagated in case of
428 428 an error during the unbundling. This output capturing part will likely be
429 429 reworked and this ability will probably go away in the process.
430 430 """
431 431 if op is None:
432 432 if transactiongetter is None:
433 433 transactiongetter = _notransaction
434 434 op = bundleoperation(repo, transactiongetter)
435 435 # todo:
436 436 # - replace this is a init function soon.
437 437 # - exception catching
438 438 unbundler.params
439 439 if repo.ui.debugflag:
440 440 msg = ['bundle2-input-bundle:']
441 441 if unbundler.params:
442 442 msg.append(' %i params' % len(unbundler.params))
443 443 if op._gettransaction is None or op._gettransaction is _notransaction:
444 444 msg.append(' no-transaction')
445 445 else:
446 446 msg.append(' with-transaction')
447 447 msg.append('\n')
448 448 repo.ui.debug(''.join(msg))
449 449
450 processparts(repo, op, unbundler)
451
452 return op
453
454 def processparts(repo, op, unbundler):
450 455 with partiterator(repo, op, unbundler) as parts:
451 456 for part in parts:
452 457 _processpart(op, part)
453 458
454 return op
455
456 459 def _processchangegroup(op, cg, tr, source, url, **kwargs):
457 460 ret = cg.apply(op.repo, tr, source, url, **kwargs)
458 461 op.records.add('changegroup', {
459 462 'return': ret,
460 463 })
461 464 return ret
462 465
463 466 def _gethandler(op, part):
464 467 status = 'unknown' # used by debug output
465 468 try:
466 469 handler = parthandlermapping.get(part.type)
467 470 if handler is None:
468 471 status = 'unsupported-type'
469 472 raise error.BundleUnknownFeatureError(parttype=part.type)
470 473 indebug(op.ui, 'found a handler for part %r' % part.type)
471 474 unknownparams = part.mandatorykeys - handler.params
472 475 if unknownparams:
473 476 unknownparams = list(unknownparams)
474 477 unknownparams.sort()
475 478 status = 'unsupported-params (%s)' % unknownparams
476 479 raise error.BundleUnknownFeatureError(parttype=part.type,
477 480 params=unknownparams)
478 481 status = 'supported'
479 482 except error.BundleUnknownFeatureError as exc:
480 483 if part.mandatory: # mandatory parts
481 484 raise
482 485 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
483 486 return # skip to part processing
484 487 finally:
485 488 if op.ui.debugflag:
486 489 msg = ['bundle2-input-part: "%s"' % part.type]
487 490 if not part.mandatory:
488 491 msg.append(' (advisory)')
489 492 nbmp = len(part.mandatorykeys)
490 493 nbap = len(part.params) - nbmp
491 494 if nbmp or nbap:
492 495 msg.append(' (params:')
493 496 if nbmp:
494 497 msg.append(' %i mandatory' % nbmp)
495 498 if nbap:
496 499 msg.append(' %i advisory' % nbmp)
497 500 msg.append(')')
498 501 msg.append(' %s\n' % status)
499 502 op.ui.debug(''.join(msg))
500 503
501 504 return handler
502 505
503 506 def _processpart(op, part):
504 507 """process a single part from a bundle
505 508
506 509 The part is guaranteed to have been fully consumed when the function exits
507 510 (even if an exception is raised)."""
508 511 handler = _gethandler(op, part)
509 512 if handler is None:
510 513 return
511 514
512 515 # handler is called outside the above try block so that we don't
513 516 # risk catching KeyErrors from anything other than the
514 517 # parthandlermapping lookup (any KeyError raised by handler()
515 518 # itself represents a defect of a different variety).
516 519 output = None
517 520 if op.captureoutput and op.reply is not None:
518 521 op.ui.pushbuffer(error=True, subproc=True)
519 522 output = ''
520 523 try:
521 524 handler(op, part)
522 525 finally:
523 526 if output is not None:
524 527 output = op.ui.popbuffer()
525 528 if output:
526 529 outpart = op.reply.newpart('output', data=output,
527 530 mandatory=False)
528 531 outpart.addparam(
529 532 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
530 533
531 534 def decodecaps(blob):
532 535 """decode a bundle2 caps bytes blob into a dictionary
533 536
534 537 The blob is a list of capabilities (one per line)
535 538 Capabilities may have values using a line of the form::
536 539
537 540 capability=value1,value2,value3
538 541
539 542 The values are always a list."""
540 543 caps = {}
541 544 for line in blob.splitlines():
542 545 if not line:
543 546 continue
544 547 if '=' not in line:
545 548 key, vals = line, ()
546 549 else:
547 550 key, vals = line.split('=', 1)
548 551 vals = vals.split(',')
549 552 key = urlreq.unquote(key)
550 553 vals = [urlreq.unquote(v) for v in vals]
551 554 caps[key] = vals
552 555 return caps
553 556
554 557 def encodecaps(caps):
555 558 """encode a bundle2 caps dictionary into a bytes blob"""
556 559 chunks = []
557 560 for ca in sorted(caps):
558 561 vals = caps[ca]
559 562 ca = urlreq.quote(ca)
560 563 vals = [urlreq.quote(v) for v in vals]
561 564 if vals:
562 565 ca = "%s=%s" % (ca, ','.join(vals))
563 566 chunks.append(ca)
564 567 return '\n'.join(chunks)
565 568
566 569 bundletypes = {
567 570 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
568 571 # since the unification ssh accepts a header but there
569 572 # is no capability signaling it.
570 573 "HG20": (), # special-cased below
571 574 "HG10UN": ("HG10UN", 'UN'),
572 575 "HG10BZ": ("HG10", 'BZ'),
573 576 "HG10GZ": ("HG10GZ", 'GZ'),
574 577 }
575 578
576 579 # hgweb uses this list to communicate its preferred type
577 580 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
578 581
579 582 class bundle20(object):
580 583 """represent an outgoing bundle2 container
581 584
582 585 Use the `addparam` method to add stream level parameter. and `newpart` to
583 586 populate it. Then call `getchunks` to retrieve all the binary chunks of
584 587 data that compose the bundle2 container."""
585 588
586 589 _magicstring = 'HG20'
587 590
588 591 def __init__(self, ui, capabilities=()):
589 592 self.ui = ui
590 593 self._params = []
591 594 self._parts = []
592 595 self.capabilities = dict(capabilities)
593 596 self._compengine = util.compengines.forbundletype('UN')
594 597 self._compopts = None
595 598
596 599 def setcompression(self, alg, compopts=None):
597 600 """setup core part compression to <alg>"""
598 601 if alg in (None, 'UN'):
599 602 return
600 603 assert not any(n.lower() == 'compression' for n, v in self._params)
601 604 self.addparam('Compression', alg)
602 605 self._compengine = util.compengines.forbundletype(alg)
603 606 self._compopts = compopts
604 607
605 608 @property
606 609 def nbparts(self):
607 610 """total number of parts added to the bundler"""
608 611 return len(self._parts)
609 612
610 613 # methods used to defines the bundle2 content
611 614 def addparam(self, name, value=None):
612 615 """add a stream level parameter"""
613 616 if not name:
614 617 raise ValueError('empty parameter name')
615 618 if name[0] not in pycompat.bytestr(string.ascii_letters):
616 619 raise ValueError('non letter first character: %r' % name)
617 620 self._params.append((name, value))
618 621
619 622 def addpart(self, part):
620 623 """add a new part to the bundle2 container
621 624
622 625 Parts contains the actual applicative payload."""
623 626 assert part.id is None
624 627 part.id = len(self._parts) # very cheap counter
625 628 self._parts.append(part)
626 629
627 630 def newpart(self, typeid, *args, **kwargs):
628 631 """create a new part and add it to the containers
629 632
630 633 As the part is directly added to the containers. For now, this means
631 634 that any failure to properly initialize the part after calling
632 635 ``newpart`` should result in a failure of the whole bundling process.
633 636
634 637 You can still fall back to manually create and add if you need better
635 638 control."""
636 639 part = bundlepart(typeid, *args, **kwargs)
637 640 self.addpart(part)
638 641 return part
639 642
640 643 # methods used to generate the bundle2 stream
641 644 def getchunks(self):
642 645 if self.ui.debugflag:
643 646 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
644 647 if self._params:
645 648 msg.append(' (%i params)' % len(self._params))
646 649 msg.append(' %i parts total\n' % len(self._parts))
647 650 self.ui.debug(''.join(msg))
648 651 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
649 652 yield self._magicstring
650 653 param = self._paramchunk()
651 654 outdebug(self.ui, 'bundle parameter: %s' % param)
652 655 yield _pack(_fstreamparamsize, len(param))
653 656 if param:
654 657 yield param
655 658 for chunk in self._compengine.compressstream(self._getcorechunk(),
656 659 self._compopts):
657 660 yield chunk
658 661
659 662 def _paramchunk(self):
660 663 """return a encoded version of all stream parameters"""
661 664 blocks = []
662 665 for par, value in self._params:
663 666 par = urlreq.quote(par)
664 667 if value is not None:
665 668 value = urlreq.quote(value)
666 669 par = '%s=%s' % (par, value)
667 670 blocks.append(par)
668 671 return ' '.join(blocks)
669 672
670 673 def _getcorechunk(self):
671 674 """yield chunk for the core part of the bundle
672 675
673 676 (all but headers and parameters)"""
674 677 outdebug(self.ui, 'start of parts')
675 678 for part in self._parts:
676 679 outdebug(self.ui, 'bundle part: "%s"' % part.type)
677 680 for chunk in part.getchunks(ui=self.ui):
678 681 yield chunk
679 682 outdebug(self.ui, 'end of bundle')
680 683 yield _pack(_fpartheadersize, 0)
681 684
682 685
683 686 def salvageoutput(self):
684 687 """return a list with a copy of all output parts in the bundle
685 688
686 689 This is meant to be used during error handling to make sure we preserve
687 690 server output"""
688 691 salvaged = []
689 692 for part in self._parts:
690 693 if part.type.startswith('output'):
691 694 salvaged.append(part.copy())
692 695 return salvaged
693 696
694 697
695 698 class unpackermixin(object):
696 699 """A mixin to extract bytes and struct data from a stream"""
697 700
698 701 def __init__(self, fp):
699 702 self._fp = fp
700 703
701 704 def _unpack(self, format):
702 705 """unpack this struct format from the stream
703 706
704 707 This method is meant for internal usage by the bundle2 protocol only.
705 708 They directly manipulate the low level stream including bundle2 level
706 709 instruction.
707 710
708 711 Do not use it to implement higher-level logic or methods."""
709 712 data = self._readexact(struct.calcsize(format))
710 713 return _unpack(format, data)
711 714
712 715 def _readexact(self, size):
713 716 """read exactly <size> bytes from the stream
714 717
715 718 This method is meant for internal usage by the bundle2 protocol only.
716 719 They directly manipulate the low level stream including bundle2 level
717 720 instruction.
718 721
719 722 Do not use it to implement higher-level logic or methods."""
720 723 return changegroup.readexactly(self._fp, size)
721 724
722 725 def getunbundler(ui, fp, magicstring=None):
723 726 """return a valid unbundler object for a given magicstring"""
724 727 if magicstring is None:
725 728 magicstring = changegroup.readexactly(fp, 4)
726 729 magic, version = magicstring[0:2], magicstring[2:4]
727 730 if magic != 'HG':
728 731 ui.debug(
729 732 "error: invalid magic: %r (version %r), should be 'HG'\n"
730 733 % (magic, version))
731 734 raise error.Abort(_('not a Mercurial bundle'))
732 735 unbundlerclass = formatmap.get(version)
733 736 if unbundlerclass is None:
734 737 raise error.Abort(_('unknown bundle version %s') % version)
735 738 unbundler = unbundlerclass(ui, fp)
736 739 indebug(ui, 'start processing of %s stream' % magicstring)
737 740 return unbundler
738 741
739 742 class unbundle20(unpackermixin):
740 743 """interpret a bundle2 stream
741 744
742 745 This class is fed with a binary stream and yields parts through its
743 746 `iterparts` methods."""
744 747
745 748 _magicstring = 'HG20'
746 749
747 750 def __init__(self, ui, fp):
748 751 """If header is specified, we do not read it out of the stream."""
749 752 self.ui = ui
750 753 self._compengine = util.compengines.forbundletype('UN')
751 754 self._compressed = None
752 755 super(unbundle20, self).__init__(fp)
753 756
754 757 @util.propertycache
755 758 def params(self):
756 759 """dictionary of stream level parameters"""
757 760 indebug(self.ui, 'reading bundle2 stream parameters')
758 761 params = {}
759 762 paramssize = self._unpack(_fstreamparamsize)[0]
760 763 if paramssize < 0:
761 764 raise error.BundleValueError('negative bundle param size: %i'
762 765 % paramssize)
763 766 if paramssize:
764 767 params = self._readexact(paramssize)
765 768 params = self._processallparams(params)
766 769 return params
767 770
768 771 def _processallparams(self, paramsblock):
769 772 """"""
770 773 params = util.sortdict()
771 774 for p in paramsblock.split(' '):
772 775 p = p.split('=', 1)
773 776 p = [urlreq.unquote(i) for i in p]
774 777 if len(p) < 2:
775 778 p.append(None)
776 779 self._processparam(*p)
777 780 params[p[0]] = p[1]
778 781 return params
779 782
780 783
781 784 def _processparam(self, name, value):
782 785 """process a parameter, applying its effect if needed
783 786
784 787 Parameter starting with a lower case letter are advisory and will be
785 788 ignored when unknown. Those starting with an upper case letter are
786 789 mandatory and will this function will raise a KeyError when unknown.
787 790
788 791 Note: no option are currently supported. Any input will be either
789 792 ignored or failing.
790 793 """
791 794 if not name:
792 795 raise ValueError('empty parameter name')
793 796 if name[0] not in pycompat.bytestr(string.ascii_letters):
794 797 raise ValueError('non letter first character: %r' % name)
795 798 try:
796 799 handler = b2streamparamsmap[name.lower()]
797 800 except KeyError:
798 801 if name[0].islower():
799 802 indebug(self.ui, "ignoring unknown parameter %r" % name)
800 803 else:
801 804 raise error.BundleUnknownFeatureError(params=(name,))
802 805 else:
803 806 handler(self, name, value)
804 807
805 808 def _forwardchunks(self):
806 809 """utility to transfer a bundle2 as binary
807 810
808 811 This is made necessary by the fact the 'getbundle' command over 'ssh'
809 812 have no way to know then the reply end, relying on the bundle to be
810 813 interpreted to know its end. This is terrible and we are sorry, but we
811 814 needed to move forward to get general delta enabled.
812 815 """
813 816 yield self._magicstring
814 817 assert 'params' not in vars(self)
815 818 paramssize = self._unpack(_fstreamparamsize)[0]
816 819 if paramssize < 0:
817 820 raise error.BundleValueError('negative bundle param size: %i'
818 821 % paramssize)
819 822 yield _pack(_fstreamparamsize, paramssize)
820 823 if paramssize:
821 824 params = self._readexact(paramssize)
822 825 self._processallparams(params)
823 826 yield params
824 827 assert self._compengine.bundletype == 'UN'
825 828 # From there, payload might need to be decompressed
826 829 self._fp = self._compengine.decompressorreader(self._fp)
827 830 emptycount = 0
828 831 while emptycount < 2:
829 832 # so we can brainlessly loop
830 833 assert _fpartheadersize == _fpayloadsize
831 834 size = self._unpack(_fpartheadersize)[0]
832 835 yield _pack(_fpartheadersize, size)
833 836 if size:
834 837 emptycount = 0
835 838 else:
836 839 emptycount += 1
837 840 continue
838 841 if size == flaginterrupt:
839 842 continue
840 843 elif size < 0:
841 844 raise error.BundleValueError('negative chunk size: %i')
842 845 yield self._readexact(size)
843 846
844 847
845 848 def iterparts(self):
846 849 """yield all parts contained in the stream"""
847 850 # make sure param have been loaded
848 851 self.params
849 852 # From there, payload need to be decompressed
850 853 self._fp = self._compengine.decompressorreader(self._fp)
851 854 indebug(self.ui, 'start extraction of bundle2 parts')
852 855 headerblock = self._readpartheader()
853 856 while headerblock is not None:
854 857 part = unbundlepart(self.ui, headerblock, self._fp)
855 858 yield part
856 859 # Seek to the end of the part to force it's consumption so the next
857 860 # part can be read. But then seek back to the beginning so the
858 861 # code consuming this generator has a part that starts at 0.
859 862 part.seek(0, 2)
860 863 part.seek(0)
861 864 headerblock = self._readpartheader()
862 865 indebug(self.ui, 'end of bundle2 stream')
863 866
864 867 def _readpartheader(self):
865 868 """reads a part header size and return the bytes blob
866 869
867 870 returns None if empty"""
868 871 headersize = self._unpack(_fpartheadersize)[0]
869 872 if headersize < 0:
870 873 raise error.BundleValueError('negative part header size: %i'
871 874 % headersize)
872 875 indebug(self.ui, 'part header size: %i' % headersize)
873 876 if headersize:
874 877 return self._readexact(headersize)
875 878 return None
876 879
877 880 def compressed(self):
878 881 self.params # load params
879 882 return self._compressed
880 883
881 884 def close(self):
882 885 """close underlying file"""
883 886 if util.safehasattr(self._fp, 'close'):
884 887 return self._fp.close()
885 888
886 889 formatmap = {'20': unbundle20}
887 890
888 891 b2streamparamsmap = {}
889 892
890 893 def b2streamparamhandler(name):
891 894 """register a handler for a stream level parameter"""
892 895 def decorator(func):
893 896 assert name not in formatmap
894 897 b2streamparamsmap[name] = func
895 898 return func
896 899 return decorator
897 900
898 901 @b2streamparamhandler('compression')
899 902 def processcompression(unbundler, param, value):
900 903 """read compression parameter and install payload decompression"""
901 904 if value not in util.compengines.supportedbundletypes:
902 905 raise error.BundleUnknownFeatureError(params=(param,),
903 906 values=(value,))
904 907 unbundler._compengine = util.compengines.forbundletype(value)
905 908 if value is not None:
906 909 unbundler._compressed = True
907 910
908 911 class bundlepart(object):
909 912 """A bundle2 part contains application level payload
910 913
911 914 The part `type` is used to route the part to the application level
912 915 handler.
913 916
914 917 The part payload is contained in ``part.data``. It could be raw bytes or a
915 918 generator of byte chunks.
916 919
917 920 You can add parameters to the part using the ``addparam`` method.
918 921 Parameters can be either mandatory (default) or advisory. Remote side
919 922 should be able to safely ignore the advisory ones.
920 923
921 924 Both data and parameters cannot be modified after the generation has begun.
922 925 """
923 926
924 927 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
925 928 data='', mandatory=True):
926 929 validateparttype(parttype)
927 930 self.id = None
928 931 self.type = parttype
929 932 self._data = data
930 933 self._mandatoryparams = list(mandatoryparams)
931 934 self._advisoryparams = list(advisoryparams)
932 935 # checking for duplicated entries
933 936 self._seenparams = set()
934 937 for pname, __ in self._mandatoryparams + self._advisoryparams:
935 938 if pname in self._seenparams:
936 939 raise error.ProgrammingError('duplicated params: %s' % pname)
937 940 self._seenparams.add(pname)
938 941 # status of the part's generation:
939 942 # - None: not started,
940 943 # - False: currently generated,
941 944 # - True: generation done.
942 945 self._generated = None
943 946 self.mandatory = mandatory
944 947
945 948 def __repr__(self):
946 949 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
947 950 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
948 951 % (cls, id(self), self.id, self.type, self.mandatory))
949 952
950 953 def copy(self):
951 954 """return a copy of the part
952 955
953 956 The new part have the very same content but no partid assigned yet.
954 957 Parts with generated data cannot be copied."""
955 958 assert not util.safehasattr(self.data, 'next')
956 959 return self.__class__(self.type, self._mandatoryparams,
957 960 self._advisoryparams, self._data, self.mandatory)
958 961
959 962 # methods used to defines the part content
960 963 @property
961 964 def data(self):
962 965 return self._data
963 966
964 967 @data.setter
965 968 def data(self, data):
966 969 if self._generated is not None:
967 970 raise error.ReadOnlyPartError('part is being generated')
968 971 self._data = data
969 972
970 973 @property
971 974 def mandatoryparams(self):
972 975 # make it an immutable tuple to force people through ``addparam``
973 976 return tuple(self._mandatoryparams)
974 977
975 978 @property
976 979 def advisoryparams(self):
977 980 # make it an immutable tuple to force people through ``addparam``
978 981 return tuple(self._advisoryparams)
979 982
980 983 def addparam(self, name, value='', mandatory=True):
981 984 """add a parameter to the part
982 985
983 986 If 'mandatory' is set to True, the remote handler must claim support
984 987 for this parameter or the unbundling will be aborted.
985 988
986 989 The 'name' and 'value' cannot exceed 255 bytes each.
987 990 """
988 991 if self._generated is not None:
989 992 raise error.ReadOnlyPartError('part is being generated')
990 993 if name in self._seenparams:
991 994 raise ValueError('duplicated params: %s' % name)
992 995 self._seenparams.add(name)
993 996 params = self._advisoryparams
994 997 if mandatory:
995 998 params = self._mandatoryparams
996 999 params.append((name, value))
997 1000
998 1001 # methods used to generates the bundle2 stream
999 1002 def getchunks(self, ui):
1000 1003 if self._generated is not None:
1001 1004 raise error.ProgrammingError('part can only be consumed once')
1002 1005 self._generated = False
1003 1006
1004 1007 if ui.debugflag:
1005 1008 msg = ['bundle2-output-part: "%s"' % self.type]
1006 1009 if not self.mandatory:
1007 1010 msg.append(' (advisory)')
1008 1011 nbmp = len(self.mandatoryparams)
1009 1012 nbap = len(self.advisoryparams)
1010 1013 if nbmp or nbap:
1011 1014 msg.append(' (params:')
1012 1015 if nbmp:
1013 1016 msg.append(' %i mandatory' % nbmp)
1014 1017 if nbap:
1015 1018 msg.append(' %i advisory' % nbmp)
1016 1019 msg.append(')')
1017 1020 if not self.data:
1018 1021 msg.append(' empty payload')
1019 1022 elif util.safehasattr(self.data, 'next'):
1020 1023 msg.append(' streamed payload')
1021 1024 else:
1022 1025 msg.append(' %i bytes payload' % len(self.data))
1023 1026 msg.append('\n')
1024 1027 ui.debug(''.join(msg))
1025 1028
1026 1029 #### header
1027 1030 if self.mandatory:
1028 1031 parttype = self.type.upper()
1029 1032 else:
1030 1033 parttype = self.type.lower()
1031 1034 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1032 1035 ## parttype
1033 1036 header = [_pack(_fparttypesize, len(parttype)),
1034 1037 parttype, _pack(_fpartid, self.id),
1035 1038 ]
1036 1039 ## parameters
1037 1040 # count
1038 1041 manpar = self.mandatoryparams
1039 1042 advpar = self.advisoryparams
1040 1043 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1041 1044 # size
1042 1045 parsizes = []
1043 1046 for key, value in manpar:
1044 1047 parsizes.append(len(key))
1045 1048 parsizes.append(len(value))
1046 1049 for key, value in advpar:
1047 1050 parsizes.append(len(key))
1048 1051 parsizes.append(len(value))
1049 1052 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1050 1053 header.append(paramsizes)
1051 1054 # key, value
1052 1055 for key, value in manpar:
1053 1056 header.append(key)
1054 1057 header.append(value)
1055 1058 for key, value in advpar:
1056 1059 header.append(key)
1057 1060 header.append(value)
1058 1061 ## finalize header
1059 1062 try:
1060 1063 headerchunk = ''.join(header)
1061 1064 except TypeError:
1062 1065 raise TypeError(r'Found a non-bytes trying to '
1063 1066 r'build bundle part header: %r' % header)
1064 1067 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1065 1068 yield _pack(_fpartheadersize, len(headerchunk))
1066 1069 yield headerchunk
1067 1070 ## payload
1068 1071 try:
1069 1072 for chunk in self._payloadchunks():
1070 1073 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1071 1074 yield _pack(_fpayloadsize, len(chunk))
1072 1075 yield chunk
1073 1076 except GeneratorExit:
1074 1077 # GeneratorExit means that nobody is listening for our
1075 1078 # results anyway, so just bail quickly rather than trying
1076 1079 # to produce an error part.
1077 1080 ui.debug('bundle2-generatorexit\n')
1078 1081 raise
1079 1082 except BaseException as exc:
1080 1083 bexc = util.forcebytestr(exc)
1081 1084 # backup exception data for later
1082 1085 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1083 1086 % bexc)
1084 1087 tb = sys.exc_info()[2]
1085 1088 msg = 'unexpected error: %s' % bexc
1086 1089 interpart = bundlepart('error:abort', [('message', msg)],
1087 1090 mandatory=False)
1088 1091 interpart.id = 0
1089 1092 yield _pack(_fpayloadsize, -1)
1090 1093 for chunk in interpart.getchunks(ui=ui):
1091 1094 yield chunk
1092 1095 outdebug(ui, 'closing payload chunk')
1093 1096 # abort current part payload
1094 1097 yield _pack(_fpayloadsize, 0)
1095 1098 pycompat.raisewithtb(exc, tb)
1096 1099 # end of payload
1097 1100 outdebug(ui, 'closing payload chunk')
1098 1101 yield _pack(_fpayloadsize, 0)
1099 1102 self._generated = True
1100 1103
1101 1104 def _payloadchunks(self):
1102 1105 """yield chunks of a the part payload
1103 1106
1104 1107 Exists to handle the different methods to provide data to a part."""
1105 1108 # we only support fixed size data now.
1106 1109 # This will be improved in the future.
1107 1110 if (util.safehasattr(self.data, 'next')
1108 1111 or util.safehasattr(self.data, '__next__')):
1109 1112 buff = util.chunkbuffer(self.data)
1110 1113 chunk = buff.read(preferedchunksize)
1111 1114 while chunk:
1112 1115 yield chunk
1113 1116 chunk = buff.read(preferedchunksize)
1114 1117 elif len(self.data):
1115 1118 yield self.data
1116 1119
1117 1120
1118 1121 flaginterrupt = -1
1119 1122
1120 1123 class interrupthandler(unpackermixin):
1121 1124 """read one part and process it with restricted capability
1122 1125
1123 1126 This allows to transmit exception raised on the producer size during part
1124 1127 iteration while the consumer is reading a part.
1125 1128
1126 1129 Part processed in this manner only have access to a ui object,"""
1127 1130
1128 1131 def __init__(self, ui, fp):
1129 1132 super(interrupthandler, self).__init__(fp)
1130 1133 self.ui = ui
1131 1134
1132 1135 def _readpartheader(self):
1133 1136 """reads a part header size and return the bytes blob
1134 1137
1135 1138 returns None if empty"""
1136 1139 headersize = self._unpack(_fpartheadersize)[0]
1137 1140 if headersize < 0:
1138 1141 raise error.BundleValueError('negative part header size: %i'
1139 1142 % headersize)
1140 1143 indebug(self.ui, 'part header size: %i\n' % headersize)
1141 1144 if headersize:
1142 1145 return self._readexact(headersize)
1143 1146 return None
1144 1147
1145 1148 def __call__(self):
1146 1149
1147 1150 self.ui.debug('bundle2-input-stream-interrupt:'
1148 1151 ' opening out of band context\n')
1149 1152 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1150 1153 headerblock = self._readpartheader()
1151 1154 if headerblock is None:
1152 1155 indebug(self.ui, 'no part found during interruption.')
1153 1156 return
1154 1157 part = unbundlepart(self.ui, headerblock, self._fp)
1155 1158 op = interruptoperation(self.ui)
1156 1159 hardabort = False
1157 1160 try:
1158 1161 _processpart(op, part)
1159 1162 except (SystemExit, KeyboardInterrupt):
1160 1163 hardabort = True
1161 1164 raise
1162 1165 finally:
1163 1166 if not hardabort:
1164 1167 part.seek(0, 2)
1165 1168 self.ui.debug('bundle2-input-stream-interrupt:'
1166 1169 ' closing out of band context\n')
1167 1170
1168 1171 class interruptoperation(object):
1169 1172 """A limited operation to be use by part handler during interruption
1170 1173
1171 1174 It only have access to an ui object.
1172 1175 """
1173 1176
1174 1177 def __init__(self, ui):
1175 1178 self.ui = ui
1176 1179 self.reply = None
1177 1180 self.captureoutput = False
1178 1181
1179 1182 @property
1180 1183 def repo(self):
1181 1184 raise error.ProgrammingError('no repo access from stream interruption')
1182 1185
1183 1186 def gettransaction(self):
1184 1187 raise TransactionUnavailable('no repo access from stream interruption')
1185 1188
1186 1189 class unbundlepart(unpackermixin):
1187 1190 """a bundle part read from a bundle"""
1188 1191
1189 1192 def __init__(self, ui, header, fp):
1190 1193 super(unbundlepart, self).__init__(fp)
1191 1194 self._seekable = (util.safehasattr(fp, 'seek') and
1192 1195 util.safehasattr(fp, 'tell'))
1193 1196 self.ui = ui
1194 1197 # unbundle state attr
1195 1198 self._headerdata = header
1196 1199 self._headeroffset = 0
1197 1200 self._initialized = False
1198 1201 self.consumed = False
1199 1202 # part data
1200 1203 self.id = None
1201 1204 self.type = None
1202 1205 self.mandatoryparams = None
1203 1206 self.advisoryparams = None
1204 1207 self.params = None
1205 1208 self.mandatorykeys = ()
1206 1209 self._payloadstream = None
1207 1210 self._readheader()
1208 1211 self._mandatory = None
1209 1212 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1210 1213 self._pos = 0
1211 1214
1212 1215 def _fromheader(self, size):
1213 1216 """return the next <size> byte from the header"""
1214 1217 offset = self._headeroffset
1215 1218 data = self._headerdata[offset:(offset + size)]
1216 1219 self._headeroffset = offset + size
1217 1220 return data
1218 1221
1219 1222 def _unpackheader(self, format):
1220 1223 """read given format from header
1221 1224
1222 1225 This automatically compute the size of the format to read."""
1223 1226 data = self._fromheader(struct.calcsize(format))
1224 1227 return _unpack(format, data)
1225 1228
1226 1229 def _initparams(self, mandatoryparams, advisoryparams):
1227 1230 """internal function to setup all logic related parameters"""
1228 1231 # make it read only to prevent people touching it by mistake.
1229 1232 self.mandatoryparams = tuple(mandatoryparams)
1230 1233 self.advisoryparams = tuple(advisoryparams)
1231 1234 # user friendly UI
1232 1235 self.params = util.sortdict(self.mandatoryparams)
1233 1236 self.params.update(self.advisoryparams)
1234 1237 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1235 1238
1236 1239 def _payloadchunks(self, chunknum=0):
1237 1240 '''seek to specified chunk and start yielding data'''
1238 1241 if len(self._chunkindex) == 0:
1239 1242 assert chunknum == 0, 'Must start with chunk 0'
1240 1243 self._chunkindex.append((0, self._tellfp()))
1241 1244 else:
1242 1245 assert chunknum < len(self._chunkindex), \
1243 1246 'Unknown chunk %d' % chunknum
1244 1247 self._seekfp(self._chunkindex[chunknum][1])
1245 1248
1246 1249 pos = self._chunkindex[chunknum][0]
1247 1250 payloadsize = self._unpack(_fpayloadsize)[0]
1248 1251 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1249 1252 while payloadsize:
1250 1253 if payloadsize == flaginterrupt:
1251 1254 # interruption detection, the handler will now read a
1252 1255 # single part and process it.
1253 1256 interrupthandler(self.ui, self._fp)()
1254 1257 elif payloadsize < 0:
1255 1258 msg = 'negative payload chunk size: %i' % payloadsize
1256 1259 raise error.BundleValueError(msg)
1257 1260 else:
1258 1261 result = self._readexact(payloadsize)
1259 1262 chunknum += 1
1260 1263 pos += payloadsize
1261 1264 if chunknum == len(self._chunkindex):
1262 1265 self._chunkindex.append((pos, self._tellfp()))
1263 1266 yield result
1264 1267 payloadsize = self._unpack(_fpayloadsize)[0]
1265 1268 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1266 1269
1267 1270 def _findchunk(self, pos):
1268 1271 '''for a given payload position, return a chunk number and offset'''
1269 1272 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1270 1273 if ppos == pos:
1271 1274 return chunk, 0
1272 1275 elif ppos > pos:
1273 1276 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1274 1277 raise ValueError('Unknown chunk')
1275 1278
1276 1279 def _readheader(self):
1277 1280 """read the header and setup the object"""
1278 1281 typesize = self._unpackheader(_fparttypesize)[0]
1279 1282 self.type = self._fromheader(typesize)
1280 1283 indebug(self.ui, 'part type: "%s"' % self.type)
1281 1284 self.id = self._unpackheader(_fpartid)[0]
1282 1285 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1283 1286 # extract mandatory bit from type
1284 1287 self.mandatory = (self.type != self.type.lower())
1285 1288 self.type = self.type.lower()
1286 1289 ## reading parameters
1287 1290 # param count
1288 1291 mancount, advcount = self._unpackheader(_fpartparamcount)
1289 1292 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1290 1293 # param size
1291 1294 fparamsizes = _makefpartparamsizes(mancount + advcount)
1292 1295 paramsizes = self._unpackheader(fparamsizes)
1293 1296 # make it a list of couple again
1294 1297 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1295 1298 # split mandatory from advisory
1296 1299 mansizes = paramsizes[:mancount]
1297 1300 advsizes = paramsizes[mancount:]
1298 1301 # retrieve param value
1299 1302 manparams = []
1300 1303 for key, value in mansizes:
1301 1304 manparams.append((self._fromheader(key), self._fromheader(value)))
1302 1305 advparams = []
1303 1306 for key, value in advsizes:
1304 1307 advparams.append((self._fromheader(key), self._fromheader(value)))
1305 1308 self._initparams(manparams, advparams)
1306 1309 ## part payload
1307 1310 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1308 1311 # we read the data, tell it
1309 1312 self._initialized = True
1310 1313
1311 1314 def read(self, size=None):
1312 1315 """read payload data"""
1313 1316 if not self._initialized:
1314 1317 self._readheader()
1315 1318 if size is None:
1316 1319 data = self._payloadstream.read()
1317 1320 else:
1318 1321 data = self._payloadstream.read(size)
1319 1322 self._pos += len(data)
1320 1323 if size is None or len(data) < size:
1321 1324 if not self.consumed and self._pos:
1322 1325 self.ui.debug('bundle2-input-part: total payload size %i\n'
1323 1326 % self._pos)
1324 1327 self.consumed = True
1325 1328 return data
1326 1329
1327 1330 def tell(self):
1328 1331 return self._pos
1329 1332
1330 1333 def seek(self, offset, whence=0):
1331 1334 if whence == 0:
1332 1335 newpos = offset
1333 1336 elif whence == 1:
1334 1337 newpos = self._pos + offset
1335 1338 elif whence == 2:
1336 1339 if not self.consumed:
1337 1340 self.read()
1338 1341 newpos = self._chunkindex[-1][0] - offset
1339 1342 else:
1340 1343 raise ValueError('Unknown whence value: %r' % (whence,))
1341 1344
1342 1345 if newpos > self._chunkindex[-1][0] and not self.consumed:
1343 1346 self.read()
1344 1347 if not 0 <= newpos <= self._chunkindex[-1][0]:
1345 1348 raise ValueError('Offset out of range')
1346 1349
1347 1350 if self._pos != newpos:
1348 1351 chunk, internaloffset = self._findchunk(newpos)
1349 1352 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1350 1353 adjust = self.read(internaloffset)
1351 1354 if len(adjust) != internaloffset:
1352 1355 raise error.Abort(_('Seek failed\n'))
1353 1356 self._pos = newpos
1354 1357
1355 1358 def _seekfp(self, offset, whence=0):
1356 1359 """move the underlying file pointer
1357 1360
1358 1361 This method is meant for internal usage by the bundle2 protocol only.
1359 1362 They directly manipulate the low level stream including bundle2 level
1360 1363 instruction.
1361 1364
1362 1365 Do not use it to implement higher-level logic or methods."""
1363 1366 if self._seekable:
1364 1367 return self._fp.seek(offset, whence)
1365 1368 else:
1366 1369 raise NotImplementedError(_('File pointer is not seekable'))
1367 1370
1368 1371 def _tellfp(self):
1369 1372 """return the file offset, or None if file is not seekable
1370 1373
1371 1374 This method is meant for internal usage by the bundle2 protocol only.
1372 1375 They directly manipulate the low level stream including bundle2 level
1373 1376 instruction.
1374 1377
1375 1378 Do not use it to implement higher-level logic or methods."""
1376 1379 if self._seekable:
1377 1380 try:
1378 1381 return self._fp.tell()
1379 1382 except IOError as e:
1380 1383 if e.errno == errno.ESPIPE:
1381 1384 self._seekable = False
1382 1385 else:
1383 1386 raise
1384 1387 return None
1385 1388
1386 1389 # These are only the static capabilities.
1387 1390 # Check the 'getrepocaps' function for the rest.
1388 1391 capabilities = {'HG20': (),
1389 1392 'error': ('abort', 'unsupportedcontent', 'pushraced',
1390 1393 'pushkey'),
1391 1394 'listkeys': (),
1392 1395 'pushkey': (),
1393 1396 'digests': tuple(sorted(util.DIGESTS.keys())),
1394 1397 'remote-changegroup': ('http', 'https'),
1395 1398 'hgtagsfnodes': (),
1396 1399 }
1397 1400
1398 1401 def getrepocaps(repo, allowpushback=False):
1399 1402 """return the bundle2 capabilities for a given repo
1400 1403
1401 1404 Exists to allow extensions (like evolution) to mutate the capabilities.
1402 1405 """
1403 1406 caps = capabilities.copy()
1404 1407 caps['changegroup'] = tuple(sorted(
1405 1408 changegroup.supportedincomingversions(repo)))
1406 1409 if obsolete.isenabled(repo, obsolete.exchangeopt):
1407 1410 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1408 1411 caps['obsmarkers'] = supportedformat
1409 1412 if allowpushback:
1410 1413 caps['pushback'] = ()
1411 1414 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1412 1415 if cpmode == 'check-related':
1413 1416 caps['checkheads'] = ('related',)
1414 1417 return caps
1415 1418
1416 1419 def bundle2caps(remote):
1417 1420 """return the bundle capabilities of a peer as dict"""
1418 1421 raw = remote.capable('bundle2')
1419 1422 if not raw and raw != '':
1420 1423 return {}
1421 1424 capsblob = urlreq.unquote(remote.capable('bundle2'))
1422 1425 return decodecaps(capsblob)
1423 1426
1424 1427 def obsmarkersversion(caps):
1425 1428 """extract the list of supported obsmarkers versions from a bundle2caps dict
1426 1429 """
1427 1430 obscaps = caps.get('obsmarkers', ())
1428 1431 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1429 1432
1430 1433 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1431 1434 vfs=None, compression=None, compopts=None):
1432 1435 if bundletype.startswith('HG10'):
1433 1436 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1434 1437 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1435 1438 compression=compression, compopts=compopts)
1436 1439 elif not bundletype.startswith('HG20'):
1437 1440 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1438 1441
1439 1442 caps = {}
1440 1443 if 'obsolescence' in opts:
1441 1444 caps['obsmarkers'] = ('V1',)
1442 1445 bundle = bundle20(ui, caps)
1443 1446 bundle.setcompression(compression, compopts)
1444 1447 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1445 1448 chunkiter = bundle.getchunks()
1446 1449
1447 1450 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1448 1451
1449 1452 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1450 1453 # We should eventually reconcile this logic with the one behind
1451 1454 # 'exchange.getbundle2partsgenerator'.
1452 1455 #
1453 1456 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1454 1457 # different right now. So we keep them separated for now for the sake of
1455 1458 # simplicity.
1456 1459
1457 1460 # we always want a changegroup in such bundle
1458 1461 cgversion = opts.get('cg.version')
1459 1462 if cgversion is None:
1460 1463 cgversion = changegroup.safeversion(repo)
1461 1464 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1462 1465 part = bundler.newpart('changegroup', data=cg.getchunks())
1463 1466 part.addparam('version', cg.version)
1464 1467 if 'clcount' in cg.extras:
1465 1468 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1466 1469 mandatory=False)
1467 1470 if opts.get('phases') and repo.revs('%ln and secret()',
1468 1471 outgoing.missingheads):
1469 1472 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1470 1473
1471 1474 addparttagsfnodescache(repo, bundler, outgoing)
1472 1475
1473 1476 if opts.get('obsolescence', False):
1474 1477 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1475 1478 buildobsmarkerspart(bundler, obsmarkers)
1476 1479
1477 1480 if opts.get('phases', False):
1478 1481 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1479 1482 phasedata = []
1480 1483 for phase in phases.allphases:
1481 1484 for head in headsbyphase[phase]:
1482 1485 phasedata.append(_pack(_fphasesentry, phase, head))
1483 1486 bundler.newpart('phase-heads', data=''.join(phasedata))
1484 1487
1485 1488 def addparttagsfnodescache(repo, bundler, outgoing):
1486 1489 # we include the tags fnode cache for the bundle changeset
1487 1490 # (as an optional parts)
1488 1491 cache = tags.hgtagsfnodescache(repo.unfiltered())
1489 1492 chunks = []
1490 1493
1491 1494 # .hgtags fnodes are only relevant for head changesets. While we could
1492 1495 # transfer values for all known nodes, there will likely be little to
1493 1496 # no benefit.
1494 1497 #
1495 1498 # We don't bother using a generator to produce output data because
1496 1499 # a) we only have 40 bytes per head and even esoteric numbers of heads
1497 1500 # consume little memory (1M heads is 40MB) b) we don't want to send the
1498 1501 # part if we don't have entries and knowing if we have entries requires
1499 1502 # cache lookups.
1500 1503 for node in outgoing.missingheads:
1501 1504 # Don't compute missing, as this may slow down serving.
1502 1505 fnode = cache.getfnode(node, computemissing=False)
1503 1506 if fnode is not None:
1504 1507 chunks.extend([node, fnode])
1505 1508
1506 1509 if chunks:
1507 1510 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1508 1511
1509 1512 def buildobsmarkerspart(bundler, markers):
1510 1513 """add an obsmarker part to the bundler with <markers>
1511 1514
1512 1515 No part is created if markers is empty.
1513 1516 Raises ValueError if the bundler doesn't support any known obsmarker format.
1514 1517 """
1515 1518 if not markers:
1516 1519 return None
1517 1520
1518 1521 remoteversions = obsmarkersversion(bundler.capabilities)
1519 1522 version = obsolete.commonversion(remoteversions)
1520 1523 if version is None:
1521 1524 raise ValueError('bundler does not support common obsmarker format')
1522 1525 stream = obsolete.encodemarkers(markers, True, version=version)
1523 1526 return bundler.newpart('obsmarkers', data=stream)
1524 1527
1525 1528 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1526 1529 compopts=None):
1527 1530 """Write a bundle file and return its filename.
1528 1531
1529 1532 Existing files will not be overwritten.
1530 1533 If no filename is specified, a temporary file is created.
1531 1534 bz2 compression can be turned off.
1532 1535 The bundle file will be deleted in case of errors.
1533 1536 """
1534 1537
1535 1538 if bundletype == "HG20":
1536 1539 bundle = bundle20(ui)
1537 1540 bundle.setcompression(compression, compopts)
1538 1541 part = bundle.newpart('changegroup', data=cg.getchunks())
1539 1542 part.addparam('version', cg.version)
1540 1543 if 'clcount' in cg.extras:
1541 1544 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1542 1545 mandatory=False)
1543 1546 chunkiter = bundle.getchunks()
1544 1547 else:
1545 1548 # compression argument is only for the bundle2 case
1546 1549 assert compression is None
1547 1550 if cg.version != '01':
1548 1551 raise error.Abort(_('old bundle types only supports v1 '
1549 1552 'changegroups'))
1550 1553 header, comp = bundletypes[bundletype]
1551 1554 if comp not in util.compengines.supportedbundletypes:
1552 1555 raise error.Abort(_('unknown stream compression type: %s')
1553 1556 % comp)
1554 1557 compengine = util.compengines.forbundletype(comp)
1555 1558 def chunkiter():
1556 1559 yield header
1557 1560 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1558 1561 yield chunk
1559 1562 chunkiter = chunkiter()
1560 1563
1561 1564 # parse the changegroup data, otherwise we will block
1562 1565 # in case of sshrepo because we don't know the end of the stream
1563 1566 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1564 1567
1565 1568 def combinechangegroupresults(op):
1566 1569 """logic to combine 0 or more addchangegroup results into one"""
1567 1570 results = [r.get('return', 0)
1568 1571 for r in op.records['changegroup']]
1569 1572 changedheads = 0
1570 1573 result = 1
1571 1574 for ret in results:
1572 1575 # If any changegroup result is 0, return 0
1573 1576 if ret == 0:
1574 1577 result = 0
1575 1578 break
1576 1579 if ret < -1:
1577 1580 changedheads += ret + 1
1578 1581 elif ret > 1:
1579 1582 changedheads += ret - 1
1580 1583 if changedheads > 0:
1581 1584 result = 1 + changedheads
1582 1585 elif changedheads < 0:
1583 1586 result = -1 + changedheads
1584 1587 return result
1585 1588
1586 1589 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1587 1590 'targetphase'))
1588 1591 def handlechangegroup(op, inpart):
1589 1592 """apply a changegroup part on the repo
1590 1593
1591 1594 This is a very early implementation that will massive rework before being
1592 1595 inflicted to any end-user.
1593 1596 """
1594 1597 tr = op.gettransaction()
1595 1598 unpackerversion = inpart.params.get('version', '01')
1596 1599 # We should raise an appropriate exception here
1597 1600 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1598 1601 # the source and url passed here are overwritten by the one contained in
1599 1602 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1600 1603 nbchangesets = None
1601 1604 if 'nbchanges' in inpart.params:
1602 1605 nbchangesets = int(inpart.params.get('nbchanges'))
1603 1606 if ('treemanifest' in inpart.params and
1604 1607 'treemanifest' not in op.repo.requirements):
1605 1608 if len(op.repo.changelog) != 0:
1606 1609 raise error.Abort(_(
1607 1610 "bundle contains tree manifests, but local repo is "
1608 1611 "non-empty and does not use tree manifests"))
1609 1612 op.repo.requirements.add('treemanifest')
1610 1613 op.repo._applyopenerreqs()
1611 1614 op.repo._writerequirements()
1612 1615 extrakwargs = {}
1613 1616 targetphase = inpart.params.get('targetphase')
1614 1617 if targetphase is not None:
1615 1618 extrakwargs['targetphase'] = int(targetphase)
1616 1619 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1617 1620 expectedtotal=nbchangesets, **extrakwargs)
1618 1621 if op.reply is not None:
1619 1622 # This is definitely not the final form of this
1620 1623 # return. But one need to start somewhere.
1621 1624 part = op.reply.newpart('reply:changegroup', mandatory=False)
1622 1625 part.addparam(
1623 1626 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1624 1627 part.addparam('return', '%i' % ret, mandatory=False)
1625 1628 assert not inpart.read()
1626 1629
1627 1630 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1628 1631 ['digest:%s' % k for k in util.DIGESTS.keys()])
1629 1632 @parthandler('remote-changegroup', _remotechangegroupparams)
1630 1633 def handleremotechangegroup(op, inpart):
1631 1634 """apply a bundle10 on the repo, given an url and validation information
1632 1635
1633 1636 All the information about the remote bundle to import are given as
1634 1637 parameters. The parameters include:
1635 1638 - url: the url to the bundle10.
1636 1639 - size: the bundle10 file size. It is used to validate what was
1637 1640 retrieved by the client matches the server knowledge about the bundle.
1638 1641 - digests: a space separated list of the digest types provided as
1639 1642 parameters.
1640 1643 - digest:<digest-type>: the hexadecimal representation of the digest with
1641 1644 that name. Like the size, it is used to validate what was retrieved by
1642 1645 the client matches what the server knows about the bundle.
1643 1646
1644 1647 When multiple digest types are given, all of them are checked.
1645 1648 """
1646 1649 try:
1647 1650 raw_url = inpart.params['url']
1648 1651 except KeyError:
1649 1652 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1650 1653 parsed_url = util.url(raw_url)
1651 1654 if parsed_url.scheme not in capabilities['remote-changegroup']:
1652 1655 raise error.Abort(_('remote-changegroup does not support %s urls') %
1653 1656 parsed_url.scheme)
1654 1657
1655 1658 try:
1656 1659 size = int(inpart.params['size'])
1657 1660 except ValueError:
1658 1661 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1659 1662 % 'size')
1660 1663 except KeyError:
1661 1664 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1662 1665
1663 1666 digests = {}
1664 1667 for typ in inpart.params.get('digests', '').split():
1665 1668 param = 'digest:%s' % typ
1666 1669 try:
1667 1670 value = inpart.params[param]
1668 1671 except KeyError:
1669 1672 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1670 1673 param)
1671 1674 digests[typ] = value
1672 1675
1673 1676 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1674 1677
1675 1678 tr = op.gettransaction()
1676 1679 from . import exchange
1677 1680 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1678 1681 if not isinstance(cg, changegroup.cg1unpacker):
1679 1682 raise error.Abort(_('%s: not a bundle version 1.0') %
1680 1683 util.hidepassword(raw_url))
1681 1684 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1682 1685 if op.reply is not None:
1683 1686 # This is definitely not the final form of this
1684 1687 # return. But one need to start somewhere.
1685 1688 part = op.reply.newpart('reply:changegroup')
1686 1689 part.addparam(
1687 1690 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1688 1691 part.addparam('return', '%i' % ret, mandatory=False)
1689 1692 try:
1690 1693 real_part.validate()
1691 1694 except error.Abort as e:
1692 1695 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1693 1696 (util.hidepassword(raw_url), str(e)))
1694 1697 assert not inpart.read()
1695 1698
1696 1699 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1697 1700 def handlereplychangegroup(op, inpart):
1698 1701 ret = int(inpart.params['return'])
1699 1702 replyto = int(inpart.params['in-reply-to'])
1700 1703 op.records.add('changegroup', {'return': ret}, replyto)
1701 1704
1702 1705 @parthandler('check:heads')
1703 1706 def handlecheckheads(op, inpart):
1704 1707 """check that head of the repo did not change
1705 1708
1706 1709 This is used to detect a push race when using unbundle.
1707 1710 This replaces the "heads" argument of unbundle."""
1708 1711 h = inpart.read(20)
1709 1712 heads = []
1710 1713 while len(h) == 20:
1711 1714 heads.append(h)
1712 1715 h = inpart.read(20)
1713 1716 assert not h
1714 1717 # Trigger a transaction so that we are guaranteed to have the lock now.
1715 1718 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1716 1719 op.gettransaction()
1717 1720 if sorted(heads) != sorted(op.repo.heads()):
1718 1721 raise error.PushRaced('repository changed while pushing - '
1719 1722 'please try again')
1720 1723
1721 1724 @parthandler('check:updated-heads')
1722 1725 def handlecheckupdatedheads(op, inpart):
1723 1726 """check for race on the heads touched by a push
1724 1727
1725 1728 This is similar to 'check:heads' but focus on the heads actually updated
1726 1729 during the push. If other activities happen on unrelated heads, it is
1727 1730 ignored.
1728 1731
1729 1732 This allow server with high traffic to avoid push contention as long as
1730 1733 unrelated parts of the graph are involved."""
1731 1734 h = inpart.read(20)
1732 1735 heads = []
1733 1736 while len(h) == 20:
1734 1737 heads.append(h)
1735 1738 h = inpart.read(20)
1736 1739 assert not h
1737 1740 # trigger a transaction so that we are guaranteed to have the lock now.
1738 1741 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1739 1742 op.gettransaction()
1740 1743
1741 1744 currentheads = set()
1742 1745 for ls in op.repo.branchmap().itervalues():
1743 1746 currentheads.update(ls)
1744 1747
1745 1748 for h in heads:
1746 1749 if h not in currentheads:
1747 1750 raise error.PushRaced('repository changed while pushing - '
1748 1751 'please try again')
1749 1752
1750 1753 @parthandler('output')
1751 1754 def handleoutput(op, inpart):
1752 1755 """forward output captured on the server to the client"""
1753 1756 for line in inpart.read().splitlines():
1754 1757 op.ui.status(_('remote: %s\n') % line)
1755 1758
1756 1759 @parthandler('replycaps')
1757 1760 def handlereplycaps(op, inpart):
1758 1761 """Notify that a reply bundle should be created
1759 1762
1760 1763 The payload contains the capabilities information for the reply"""
1761 1764 caps = decodecaps(inpart.read())
1762 1765 if op.reply is None:
1763 1766 op.reply = bundle20(op.ui, caps)
1764 1767
1765 1768 class AbortFromPart(error.Abort):
1766 1769 """Sub-class of Abort that denotes an error from a bundle2 part."""
1767 1770
1768 1771 @parthandler('error:abort', ('message', 'hint'))
1769 1772 def handleerrorabort(op, inpart):
1770 1773 """Used to transmit abort error over the wire"""
1771 1774 raise AbortFromPart(inpart.params['message'],
1772 1775 hint=inpart.params.get('hint'))
1773 1776
1774 1777 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1775 1778 'in-reply-to'))
1776 1779 def handleerrorpushkey(op, inpart):
1777 1780 """Used to transmit failure of a mandatory pushkey over the wire"""
1778 1781 kwargs = {}
1779 1782 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1780 1783 value = inpart.params.get(name)
1781 1784 if value is not None:
1782 1785 kwargs[name] = value
1783 1786 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1784 1787
1785 1788 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1786 1789 def handleerrorunsupportedcontent(op, inpart):
1787 1790 """Used to transmit unknown content error over the wire"""
1788 1791 kwargs = {}
1789 1792 parttype = inpart.params.get('parttype')
1790 1793 if parttype is not None:
1791 1794 kwargs['parttype'] = parttype
1792 1795 params = inpart.params.get('params')
1793 1796 if params is not None:
1794 1797 kwargs['params'] = params.split('\0')
1795 1798
1796 1799 raise error.BundleUnknownFeatureError(**kwargs)
1797 1800
1798 1801 @parthandler('error:pushraced', ('message',))
1799 1802 def handleerrorpushraced(op, inpart):
1800 1803 """Used to transmit push race error over the wire"""
1801 1804 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1802 1805
1803 1806 @parthandler('listkeys', ('namespace',))
1804 1807 def handlelistkeys(op, inpart):
1805 1808 """retrieve pushkey namespace content stored in a bundle2"""
1806 1809 namespace = inpart.params['namespace']
1807 1810 r = pushkey.decodekeys(inpart.read())
1808 1811 op.records.add('listkeys', (namespace, r))
1809 1812
1810 1813 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1811 1814 def handlepushkey(op, inpart):
1812 1815 """process a pushkey request"""
1813 1816 dec = pushkey.decode
1814 1817 namespace = dec(inpart.params['namespace'])
1815 1818 key = dec(inpart.params['key'])
1816 1819 old = dec(inpart.params['old'])
1817 1820 new = dec(inpart.params['new'])
1818 1821 # Grab the transaction to ensure that we have the lock before performing the
1819 1822 # pushkey.
1820 1823 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1821 1824 op.gettransaction()
1822 1825 ret = op.repo.pushkey(namespace, key, old, new)
1823 1826 record = {'namespace': namespace,
1824 1827 'key': key,
1825 1828 'old': old,
1826 1829 'new': new}
1827 1830 op.records.add('pushkey', record)
1828 1831 if op.reply is not None:
1829 1832 rpart = op.reply.newpart('reply:pushkey')
1830 1833 rpart.addparam(
1831 1834 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1832 1835 rpart.addparam('return', '%i' % ret, mandatory=False)
1833 1836 if inpart.mandatory and not ret:
1834 1837 kwargs = {}
1835 1838 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1836 1839 if key in inpart.params:
1837 1840 kwargs[key] = inpart.params[key]
1838 1841 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1839 1842
1840 1843 def _readphaseheads(inpart):
1841 1844 headsbyphase = [[] for i in phases.allphases]
1842 1845 entrysize = struct.calcsize(_fphasesentry)
1843 1846 while True:
1844 1847 entry = inpart.read(entrysize)
1845 1848 if len(entry) < entrysize:
1846 1849 if entry:
1847 1850 raise error.Abort(_('bad phase-heads bundle part'))
1848 1851 break
1849 1852 phase, node = struct.unpack(_fphasesentry, entry)
1850 1853 headsbyphase[phase].append(node)
1851 1854 return headsbyphase
1852 1855
1853 1856 @parthandler('phase-heads')
1854 1857 def handlephases(op, inpart):
1855 1858 """apply phases from bundle part to repo"""
1856 1859 headsbyphase = _readphaseheads(inpart)
1857 1860 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1858 1861 op.records.add('phase-heads', {})
1859 1862
1860 1863 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1861 1864 def handlepushkeyreply(op, inpart):
1862 1865 """retrieve the result of a pushkey request"""
1863 1866 ret = int(inpart.params['return'])
1864 1867 partid = int(inpart.params['in-reply-to'])
1865 1868 op.records.add('pushkey', {'return': ret}, partid)
1866 1869
1867 1870 @parthandler('obsmarkers')
1868 1871 def handleobsmarker(op, inpart):
1869 1872 """add a stream of obsmarkers to the repo"""
1870 1873 tr = op.gettransaction()
1871 1874 markerdata = inpart.read()
1872 1875 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1873 1876 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1874 1877 % len(markerdata))
1875 1878 # The mergemarkers call will crash if marker creation is not enabled.
1876 1879 # we want to avoid this if the part is advisory.
1877 1880 if not inpart.mandatory and op.repo.obsstore.readonly:
1878 1881 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1879 1882 return
1880 1883 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1881 1884 op.repo.invalidatevolatilesets()
1882 1885 if new:
1883 1886 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1884 1887 op.records.add('obsmarkers', {'new': new})
1885 1888 if op.reply is not None:
1886 1889 rpart = op.reply.newpart('reply:obsmarkers')
1887 1890 rpart.addparam(
1888 1891 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1889 1892 rpart.addparam('new', '%i' % new, mandatory=False)
1890 1893
1891 1894
1892 1895 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1893 1896 def handleobsmarkerreply(op, inpart):
1894 1897 """retrieve the result of a pushkey request"""
1895 1898 ret = int(inpart.params['new'])
1896 1899 partid = int(inpart.params['in-reply-to'])
1897 1900 op.records.add('obsmarkers', {'new': ret}, partid)
1898 1901
1899 1902 @parthandler('hgtagsfnodes')
1900 1903 def handlehgtagsfnodes(op, inpart):
1901 1904 """Applies .hgtags fnodes cache entries to the local repo.
1902 1905
1903 1906 Payload is pairs of 20 byte changeset nodes and filenodes.
1904 1907 """
1905 1908 # Grab the transaction so we ensure that we have the lock at this point.
1906 1909 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1907 1910 op.gettransaction()
1908 1911 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1909 1912
1910 1913 count = 0
1911 1914 while True:
1912 1915 node = inpart.read(20)
1913 1916 fnode = inpart.read(20)
1914 1917 if len(node) < 20 or len(fnode) < 20:
1915 1918 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1916 1919 break
1917 1920 cache.setfnode(node, fnode)
1918 1921 count += 1
1919 1922
1920 1923 cache.write()
1921 1924 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1922 1925
1923 1926 @parthandler('pushvars')
1924 1927 def bundle2getvars(op, part):
1925 1928 '''unbundle a bundle2 containing shellvars on the server'''
1926 1929 # An option to disable unbundling on server-side for security reasons
1927 1930 if op.ui.configbool('push', 'pushvars.server'):
1928 1931 hookargs = {}
1929 1932 for key, value in part.advisoryparams:
1930 1933 key = key.upper()
1931 1934 # We want pushed variables to have USERVAR_ prepended so we know
1932 1935 # they came from the --pushvar flag.
1933 1936 key = "USERVAR_" + key
1934 1937 hookargs[key] = value
1935 1938 op.addhookargs(hookargs)
General Comments 0
You need to be logged in to leave comments. Login now