##// END OF EJS Templates
bundle2: use modern Python division...
Augie Fackler -
r33635:da7c285e default
parent child Browse files
Show More
@@ -1,1874 +1,1874 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 from __future__ import absolute_import
148 from __future__ import absolute_import, division
149 149
150 150 import errno
151 151 import re
152 152 import string
153 153 import struct
154 154 import sys
155 155
156 156 from .i18n import _
157 157 from . import (
158 158 changegroup,
159 159 error,
160 160 obsolete,
161 161 phases,
162 162 pushkey,
163 163 pycompat,
164 164 tags,
165 165 url,
166 166 util,
167 167 )
168 168
169 169 urlerr = util.urlerr
170 170 urlreq = util.urlreq
171 171
172 172 _pack = struct.pack
173 173 _unpack = struct.unpack
174 174
175 175 _fstreamparamsize = '>i'
176 176 _fpartheadersize = '>i'
177 177 _fparttypesize = '>B'
178 178 _fpartid = '>I'
179 179 _fpayloadsize = '>i'
180 180 _fpartparamcount = '>BB'
181 181
182 182 _fphasesentry = '>i20s'
183 183
184 184 preferedchunksize = 4096
185 185
186 186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
187 187
188 188 def outdebug(ui, message):
189 189 """debug regarding output stream (bundling)"""
190 190 if ui.configbool('devel', 'bundle2.debug'):
191 191 ui.debug('bundle2-output: %s\n' % message)
192 192
193 193 def indebug(ui, message):
194 194 """debug on input stream (unbundling)"""
195 195 if ui.configbool('devel', 'bundle2.debug'):
196 196 ui.debug('bundle2-input: %s\n' % message)
197 197
198 198 def validateparttype(parttype):
199 199 """raise ValueError if a parttype contains invalid character"""
200 200 if _parttypeforbidden.search(parttype):
201 201 raise ValueError(parttype)
202 202
203 203 def _makefpartparamsizes(nbparams):
204 204 """return a struct format to read part parameter sizes
205 205
206 206 The number parameters is variable so we need to build that format
207 207 dynamically.
208 208 """
209 209 return '>'+('BB'*nbparams)
210 210
211 211 parthandlermapping = {}
212 212
213 213 def parthandler(parttype, params=()):
214 214 """decorator that register a function as a bundle2 part handler
215 215
216 216 eg::
217 217
218 218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
219 219 def myparttypehandler(...):
220 220 '''process a part of type "my part".'''
221 221 ...
222 222 """
223 223 validateparttype(parttype)
224 224 def _decorator(func):
225 225 lparttype = parttype.lower() # enforce lower case matching.
226 226 assert lparttype not in parthandlermapping
227 227 parthandlermapping[lparttype] = func
228 228 func.params = frozenset(params)
229 229 return func
230 230 return _decorator
231 231
232 232 class unbundlerecords(object):
233 233 """keep record of what happens during and unbundle
234 234
235 235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
236 236 category of record and obj is an arbitrary object.
237 237
238 238 `records['cat']` will return all entries of this category 'cat'.
239 239
240 240 Iterating on the object itself will yield `('category', obj)` tuples
241 241 for all entries.
242 242
243 243 All iterations happens in chronological order.
244 244 """
245 245
246 246 def __init__(self):
247 247 self._categories = {}
248 248 self._sequences = []
249 249 self._replies = {}
250 250
251 251 def add(self, category, entry, inreplyto=None):
252 252 """add a new record of a given category.
253 253
254 254 The entry can then be retrieved in the list returned by
255 255 self['category']."""
256 256 self._categories.setdefault(category, []).append(entry)
257 257 self._sequences.append((category, entry))
258 258 if inreplyto is not None:
259 259 self.getreplies(inreplyto).add(category, entry)
260 260
261 261 def getreplies(self, partid):
262 262 """get the records that are replies to a specific part"""
263 263 return self._replies.setdefault(partid, unbundlerecords())
264 264
265 265 def __getitem__(self, cat):
266 266 return tuple(self._categories.get(cat, ()))
267 267
268 268 def __iter__(self):
269 269 return iter(self._sequences)
270 270
271 271 def __len__(self):
272 272 return len(self._sequences)
273 273
274 274 def __nonzero__(self):
275 275 return bool(self._sequences)
276 276
277 277 __bool__ = __nonzero__
278 278
279 279 class bundleoperation(object):
280 280 """an object that represents a single bundling process
281 281
282 282 Its purpose is to carry unbundle-related objects and states.
283 283
284 284 A new object should be created at the beginning of each bundle processing.
285 285 The object is to be returned by the processing function.
286 286
287 287 The object has very little content now it will ultimately contain:
288 288 * an access to the repo the bundle is applied to,
289 289 * a ui object,
290 290 * a way to retrieve a transaction to add changes to the repo,
291 291 * a way to record the result of processing each part,
292 292 * a way to construct a bundle response when applicable.
293 293 """
294 294
295 295 def __init__(self, repo, transactiongetter, captureoutput=True):
296 296 self.repo = repo
297 297 self.ui = repo.ui
298 298 self.records = unbundlerecords()
299 299 self.reply = None
300 300 self.captureoutput = captureoutput
301 301 self.hookargs = {}
302 302 self._gettransaction = transactiongetter
303 303
304 304 def gettransaction(self):
305 305 transaction = self._gettransaction()
306 306
307 307 if self.hookargs is not None:
308 308 # the ones added to the transaction supercede those added
309 309 # to the operation.
310 310 self.hookargs.update(transaction.hookargs)
311 311 transaction.hookargs = self.hookargs
312 312
313 313 # mark the hookargs as flushed. further attempts to add to
314 314 # hookargs will result in an abort.
315 315 self.hookargs = None
316 316
317 317 return transaction
318 318
319 319 def addhookargs(self, hookargs):
320 320 if self.hookargs is None:
321 321 raise error.Abort(
322 322 _('attempted to add hooks to operation after transaction '
323 323 'started'))
324 324 self.hookargs.update(hookargs)
325 325
326 326 class TransactionUnavailable(RuntimeError):
327 327 pass
328 328
329 329 def _notransaction():
330 330 """default method to get a transaction while processing a bundle
331 331
332 332 Raise an exception to highlight the fact that no transaction was expected
333 333 to be created"""
334 334 raise TransactionUnavailable()
335 335
336 336 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
337 337 # transform me into unbundler.apply() as soon as the freeze is lifted
338 338 if isinstance(unbundler, unbundle20):
339 339 tr.hookargs['bundle2'] = '1'
340 340 if source is not None and 'source' not in tr.hookargs:
341 341 tr.hookargs['source'] = source
342 342 if url is not None and 'url' not in tr.hookargs:
343 343 tr.hookargs['url'] = url
344 344 return processbundle(repo, unbundler, lambda: tr)
345 345 else:
346 346 # the transactiongetter won't be used, but we might as well set it
347 347 op = bundleoperation(repo, lambda: tr)
348 348 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
349 349 return op
350 350
351 351 def processbundle(repo, unbundler, transactiongetter=None, op=None):
352 352 """This function process a bundle, apply effect to/from a repo
353 353
354 354 It iterates over each part then searches for and uses the proper handling
355 355 code to process the part. Parts are processed in order.
356 356
357 357 Unknown Mandatory part will abort the process.
358 358
359 359 It is temporarily possible to provide a prebuilt bundleoperation to the
360 360 function. This is used to ensure output is properly propagated in case of
361 361 an error during the unbundling. This output capturing part will likely be
362 362 reworked and this ability will probably go away in the process.
363 363 """
364 364 if op is None:
365 365 if transactiongetter is None:
366 366 transactiongetter = _notransaction
367 367 op = bundleoperation(repo, transactiongetter)
368 368 # todo:
369 369 # - replace this is a init function soon.
370 370 # - exception catching
371 371 unbundler.params
372 372 if repo.ui.debugflag:
373 373 msg = ['bundle2-input-bundle:']
374 374 if unbundler.params:
375 375 msg.append(' %i params' % len(unbundler.params))
376 376 if op.gettransaction is None or op.gettransaction is _notransaction:
377 377 msg.append(' no-transaction')
378 378 else:
379 379 msg.append(' with-transaction')
380 380 msg.append('\n')
381 381 repo.ui.debug(''.join(msg))
382 382 iterparts = enumerate(unbundler.iterparts())
383 383 part = None
384 384 nbpart = 0
385 385 try:
386 386 for nbpart, part in iterparts:
387 387 _processpart(op, part)
388 388 except Exception as exc:
389 389 # Any exceptions seeking to the end of the bundle at this point are
390 390 # almost certainly related to the underlying stream being bad.
391 391 # And, chances are that the exception we're handling is related to
392 392 # getting in that bad state. So, we swallow the seeking error and
393 393 # re-raise the original error.
394 394 seekerror = False
395 395 try:
396 396 for nbpart, part in iterparts:
397 397 # consume the bundle content
398 398 part.seek(0, 2)
399 399 except Exception:
400 400 seekerror = True
401 401
402 402 # Small hack to let caller code distinguish exceptions from bundle2
403 403 # processing from processing the old format. This is mostly
404 404 # needed to handle different return codes to unbundle according to the
405 405 # type of bundle. We should probably clean up or drop this return code
406 406 # craziness in a future version.
407 407 exc.duringunbundle2 = True
408 408 salvaged = []
409 409 replycaps = None
410 410 if op.reply is not None:
411 411 salvaged = op.reply.salvageoutput()
412 412 replycaps = op.reply.capabilities
413 413 exc._replycaps = replycaps
414 414 exc._bundle2salvagedoutput = salvaged
415 415
416 416 # Re-raising from a variable loses the original stack. So only use
417 417 # that form if we need to.
418 418 if seekerror:
419 419 raise exc
420 420 else:
421 421 raise
422 422 finally:
423 423 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
424 424
425 425 return op
426 426
427 427 def _processchangegroup(op, cg, tr, source, url, **kwargs):
428 428 ret = cg.apply(op.repo, tr, source, url, **kwargs)
429 429 op.records.add('changegroup', {
430 430 'return': ret,
431 431 })
432 432 return ret
433 433
434 434 def _processpart(op, part):
435 435 """process a single part from a bundle
436 436
437 437 The part is guaranteed to have been fully consumed when the function exits
438 438 (even if an exception is raised)."""
439 439 status = 'unknown' # used by debug output
440 440 hardabort = False
441 441 try:
442 442 try:
443 443 handler = parthandlermapping.get(part.type)
444 444 if handler is None:
445 445 status = 'unsupported-type'
446 446 raise error.BundleUnknownFeatureError(parttype=part.type)
447 447 indebug(op.ui, 'found a handler for part %r' % part.type)
448 448 unknownparams = part.mandatorykeys - handler.params
449 449 if unknownparams:
450 450 unknownparams = list(unknownparams)
451 451 unknownparams.sort()
452 452 status = 'unsupported-params (%s)' % unknownparams
453 453 raise error.BundleUnknownFeatureError(parttype=part.type,
454 454 params=unknownparams)
455 455 status = 'supported'
456 456 except error.BundleUnknownFeatureError as exc:
457 457 if part.mandatory: # mandatory parts
458 458 raise
459 459 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
460 460 return # skip to part processing
461 461 finally:
462 462 if op.ui.debugflag:
463 463 msg = ['bundle2-input-part: "%s"' % part.type]
464 464 if not part.mandatory:
465 465 msg.append(' (advisory)')
466 466 nbmp = len(part.mandatorykeys)
467 467 nbap = len(part.params) - nbmp
468 468 if nbmp or nbap:
469 469 msg.append(' (params:')
470 470 if nbmp:
471 471 msg.append(' %i mandatory' % nbmp)
472 472 if nbap:
473 473 msg.append(' %i advisory' % nbmp)
474 474 msg.append(')')
475 475 msg.append(' %s\n' % status)
476 476 op.ui.debug(''.join(msg))
477 477
478 478 # handler is called outside the above try block so that we don't
479 479 # risk catching KeyErrors from anything other than the
480 480 # parthandlermapping lookup (any KeyError raised by handler()
481 481 # itself represents a defect of a different variety).
482 482 output = None
483 483 if op.captureoutput and op.reply is not None:
484 484 op.ui.pushbuffer(error=True, subproc=True)
485 485 output = ''
486 486 try:
487 487 handler(op, part)
488 488 finally:
489 489 if output is not None:
490 490 output = op.ui.popbuffer()
491 491 if output:
492 492 outpart = op.reply.newpart('output', data=output,
493 493 mandatory=False)
494 494 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
495 495 # If exiting or interrupted, do not attempt to seek the stream in the
496 496 # finally block below. This makes abort faster.
497 497 except (SystemExit, KeyboardInterrupt):
498 498 hardabort = True
499 499 raise
500 500 finally:
501 501 # consume the part content to not corrupt the stream.
502 502 if not hardabort:
503 503 part.seek(0, 2)
504 504
505 505
506 506 def decodecaps(blob):
507 507 """decode a bundle2 caps bytes blob into a dictionary
508 508
509 509 The blob is a list of capabilities (one per line)
510 510 Capabilities may have values using a line of the form::
511 511
512 512 capability=value1,value2,value3
513 513
514 514 The values are always a list."""
515 515 caps = {}
516 516 for line in blob.splitlines():
517 517 if not line:
518 518 continue
519 519 if '=' not in line:
520 520 key, vals = line, ()
521 521 else:
522 522 key, vals = line.split('=', 1)
523 523 vals = vals.split(',')
524 524 key = urlreq.unquote(key)
525 525 vals = [urlreq.unquote(v) for v in vals]
526 526 caps[key] = vals
527 527 return caps
528 528
529 529 def encodecaps(caps):
530 530 """encode a bundle2 caps dictionary into a bytes blob"""
531 531 chunks = []
532 532 for ca in sorted(caps):
533 533 vals = caps[ca]
534 534 ca = urlreq.quote(ca)
535 535 vals = [urlreq.quote(v) for v in vals]
536 536 if vals:
537 537 ca = "%s=%s" % (ca, ','.join(vals))
538 538 chunks.append(ca)
539 539 return '\n'.join(chunks)
540 540
541 541 bundletypes = {
542 542 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
543 543 # since the unification ssh accepts a header but there
544 544 # is no capability signaling it.
545 545 "HG20": (), # special-cased below
546 546 "HG10UN": ("HG10UN", 'UN'),
547 547 "HG10BZ": ("HG10", 'BZ'),
548 548 "HG10GZ": ("HG10GZ", 'GZ'),
549 549 }
550 550
551 551 # hgweb uses this list to communicate its preferred type
552 552 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
553 553
554 554 class bundle20(object):
555 555 """represent an outgoing bundle2 container
556 556
557 557 Use the `addparam` method to add stream level parameter. and `newpart` to
558 558 populate it. Then call `getchunks` to retrieve all the binary chunks of
559 559 data that compose the bundle2 container."""
560 560
561 561 _magicstring = 'HG20'
562 562
563 563 def __init__(self, ui, capabilities=()):
564 564 self.ui = ui
565 565 self._params = []
566 566 self._parts = []
567 567 self.capabilities = dict(capabilities)
568 568 self._compengine = util.compengines.forbundletype('UN')
569 569 self._compopts = None
570 570
571 571 def setcompression(self, alg, compopts=None):
572 572 """setup core part compression to <alg>"""
573 573 if alg in (None, 'UN'):
574 574 return
575 575 assert not any(n.lower() == 'compression' for n, v in self._params)
576 576 self.addparam('Compression', alg)
577 577 self._compengine = util.compengines.forbundletype(alg)
578 578 self._compopts = compopts
579 579
580 580 @property
581 581 def nbparts(self):
582 582 """total number of parts added to the bundler"""
583 583 return len(self._parts)
584 584
585 585 # methods used to defines the bundle2 content
586 586 def addparam(self, name, value=None):
587 587 """add a stream level parameter"""
588 588 if not name:
589 589 raise ValueError('empty parameter name')
590 590 if name[0] not in string.letters:
591 591 raise ValueError('non letter first character: %r' % name)
592 592 self._params.append((name, value))
593 593
594 594 def addpart(self, part):
595 595 """add a new part to the bundle2 container
596 596
597 597 Parts contains the actual applicative payload."""
598 598 assert part.id is None
599 599 part.id = len(self._parts) # very cheap counter
600 600 self._parts.append(part)
601 601
602 602 def newpart(self, typeid, *args, **kwargs):
603 603 """create a new part and add it to the containers
604 604
605 605 As the part is directly added to the containers. For now, this means
606 606 that any failure to properly initialize the part after calling
607 607 ``newpart`` should result in a failure of the whole bundling process.
608 608
609 609 You can still fall back to manually create and add if you need better
610 610 control."""
611 611 part = bundlepart(typeid, *args, **kwargs)
612 612 self.addpart(part)
613 613 return part
614 614
615 615 # methods used to generate the bundle2 stream
616 616 def getchunks(self):
617 617 if self.ui.debugflag:
618 618 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
619 619 if self._params:
620 620 msg.append(' (%i params)' % len(self._params))
621 621 msg.append(' %i parts total\n' % len(self._parts))
622 622 self.ui.debug(''.join(msg))
623 623 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
624 624 yield self._magicstring
625 625 param = self._paramchunk()
626 626 outdebug(self.ui, 'bundle parameter: %s' % param)
627 627 yield _pack(_fstreamparamsize, len(param))
628 628 if param:
629 629 yield param
630 630 for chunk in self._compengine.compressstream(self._getcorechunk(),
631 631 self._compopts):
632 632 yield chunk
633 633
634 634 def _paramchunk(self):
635 635 """return a encoded version of all stream parameters"""
636 636 blocks = []
637 637 for par, value in self._params:
638 638 par = urlreq.quote(par)
639 639 if value is not None:
640 640 value = urlreq.quote(value)
641 641 par = '%s=%s' % (par, value)
642 642 blocks.append(par)
643 643 return ' '.join(blocks)
644 644
645 645 def _getcorechunk(self):
646 646 """yield chunk for the core part of the bundle
647 647
648 648 (all but headers and parameters)"""
649 649 outdebug(self.ui, 'start of parts')
650 650 for part in self._parts:
651 651 outdebug(self.ui, 'bundle part: "%s"' % part.type)
652 652 for chunk in part.getchunks(ui=self.ui):
653 653 yield chunk
654 654 outdebug(self.ui, 'end of bundle')
655 655 yield _pack(_fpartheadersize, 0)
656 656
657 657
658 658 def salvageoutput(self):
659 659 """return a list with a copy of all output parts in the bundle
660 660
661 661 This is meant to be used during error handling to make sure we preserve
662 662 server output"""
663 663 salvaged = []
664 664 for part in self._parts:
665 665 if part.type.startswith('output'):
666 666 salvaged.append(part.copy())
667 667 return salvaged
668 668
669 669
670 670 class unpackermixin(object):
671 671 """A mixin to extract bytes and struct data from a stream"""
672 672
673 673 def __init__(self, fp):
674 674 self._fp = fp
675 675
676 676 def _unpack(self, format):
677 677 """unpack this struct format from the stream
678 678
679 679 This method is meant for internal usage by the bundle2 protocol only.
680 680 They directly manipulate the low level stream including bundle2 level
681 681 instruction.
682 682
683 683 Do not use it to implement higher-level logic or methods."""
684 684 data = self._readexact(struct.calcsize(format))
685 685 return _unpack(format, data)
686 686
687 687 def _readexact(self, size):
688 688 """read exactly <size> bytes from the stream
689 689
690 690 This method is meant for internal usage by the bundle2 protocol only.
691 691 They directly manipulate the low level stream including bundle2 level
692 692 instruction.
693 693
694 694 Do not use it to implement higher-level logic or methods."""
695 695 return changegroup.readexactly(self._fp, size)
696 696
697 697 def getunbundler(ui, fp, magicstring=None):
698 698 """return a valid unbundler object for a given magicstring"""
699 699 if magicstring is None:
700 700 magicstring = changegroup.readexactly(fp, 4)
701 701 magic, version = magicstring[0:2], magicstring[2:4]
702 702 if magic != 'HG':
703 703 ui.debug(
704 704 "error: invalid magic: %r (version %r), should be 'HG'\n"
705 705 % (magic, version))
706 706 raise error.Abort(_('not a Mercurial bundle'))
707 707 unbundlerclass = formatmap.get(version)
708 708 if unbundlerclass is None:
709 709 raise error.Abort(_('unknown bundle version %s') % version)
710 710 unbundler = unbundlerclass(ui, fp)
711 711 indebug(ui, 'start processing of %s stream' % magicstring)
712 712 return unbundler
713 713
714 714 class unbundle20(unpackermixin):
715 715 """interpret a bundle2 stream
716 716
717 717 This class is fed with a binary stream and yields parts through its
718 718 `iterparts` methods."""
719 719
720 720 _magicstring = 'HG20'
721 721
722 722 def __init__(self, ui, fp):
723 723 """If header is specified, we do not read it out of the stream."""
724 724 self.ui = ui
725 725 self._compengine = util.compengines.forbundletype('UN')
726 726 self._compressed = None
727 727 super(unbundle20, self).__init__(fp)
728 728
729 729 @util.propertycache
730 730 def params(self):
731 731 """dictionary of stream level parameters"""
732 732 indebug(self.ui, 'reading bundle2 stream parameters')
733 733 params = {}
734 734 paramssize = self._unpack(_fstreamparamsize)[0]
735 735 if paramssize < 0:
736 736 raise error.BundleValueError('negative bundle param size: %i'
737 737 % paramssize)
738 738 if paramssize:
739 739 params = self._readexact(paramssize)
740 740 params = self._processallparams(params)
741 741 return params
742 742
743 743 def _processallparams(self, paramsblock):
744 744 """"""
745 745 params = util.sortdict()
746 746 for p in paramsblock.split(' '):
747 747 p = p.split('=', 1)
748 748 p = [urlreq.unquote(i) for i in p]
749 749 if len(p) < 2:
750 750 p.append(None)
751 751 self._processparam(*p)
752 752 params[p[0]] = p[1]
753 753 return params
754 754
755 755
756 756 def _processparam(self, name, value):
757 757 """process a parameter, applying its effect if needed
758 758
759 759 Parameter starting with a lower case letter are advisory and will be
760 760 ignored when unknown. Those starting with an upper case letter are
761 761 mandatory and will this function will raise a KeyError when unknown.
762 762
763 763 Note: no option are currently supported. Any input will be either
764 764 ignored or failing.
765 765 """
766 766 if not name:
767 767 raise ValueError('empty parameter name')
768 768 if name[0] not in string.letters:
769 769 raise ValueError('non letter first character: %r' % name)
770 770 try:
771 771 handler = b2streamparamsmap[name.lower()]
772 772 except KeyError:
773 773 if name[0].islower():
774 774 indebug(self.ui, "ignoring unknown parameter %r" % name)
775 775 else:
776 776 raise error.BundleUnknownFeatureError(params=(name,))
777 777 else:
778 778 handler(self, name, value)
779 779
780 780 def _forwardchunks(self):
781 781 """utility to transfer a bundle2 as binary
782 782
783 783 This is made necessary by the fact the 'getbundle' command over 'ssh'
784 784 have no way to know then the reply end, relying on the bundle to be
785 785 interpreted to know its end. This is terrible and we are sorry, but we
786 786 needed to move forward to get general delta enabled.
787 787 """
788 788 yield self._magicstring
789 789 assert 'params' not in vars(self)
790 790 paramssize = self._unpack(_fstreamparamsize)[0]
791 791 if paramssize < 0:
792 792 raise error.BundleValueError('negative bundle param size: %i'
793 793 % paramssize)
794 794 yield _pack(_fstreamparamsize, paramssize)
795 795 if paramssize:
796 796 params = self._readexact(paramssize)
797 797 self._processallparams(params)
798 798 yield params
799 799 assert self._compengine.bundletype == 'UN'
800 800 # From there, payload might need to be decompressed
801 801 self._fp = self._compengine.decompressorreader(self._fp)
802 802 emptycount = 0
803 803 while emptycount < 2:
804 804 # so we can brainlessly loop
805 805 assert _fpartheadersize == _fpayloadsize
806 806 size = self._unpack(_fpartheadersize)[0]
807 807 yield _pack(_fpartheadersize, size)
808 808 if size:
809 809 emptycount = 0
810 810 else:
811 811 emptycount += 1
812 812 continue
813 813 if size == flaginterrupt:
814 814 continue
815 815 elif size < 0:
816 816 raise error.BundleValueError('negative chunk size: %i')
817 817 yield self._readexact(size)
818 818
819 819
820 820 def iterparts(self):
821 821 """yield all parts contained in the stream"""
822 822 # make sure param have been loaded
823 823 self.params
824 824 # From there, payload need to be decompressed
825 825 self._fp = self._compengine.decompressorreader(self._fp)
826 826 indebug(self.ui, 'start extraction of bundle2 parts')
827 827 headerblock = self._readpartheader()
828 828 while headerblock is not None:
829 829 part = unbundlepart(self.ui, headerblock, self._fp)
830 830 yield part
831 831 part.seek(0, 2)
832 832 headerblock = self._readpartheader()
833 833 indebug(self.ui, 'end of bundle2 stream')
834 834
835 835 def _readpartheader(self):
836 836 """reads a part header size and return the bytes blob
837 837
838 838 returns None if empty"""
839 839 headersize = self._unpack(_fpartheadersize)[0]
840 840 if headersize < 0:
841 841 raise error.BundleValueError('negative part header size: %i'
842 842 % headersize)
843 843 indebug(self.ui, 'part header size: %i' % headersize)
844 844 if headersize:
845 845 return self._readexact(headersize)
846 846 return None
847 847
848 848 def compressed(self):
849 849 self.params # load params
850 850 return self._compressed
851 851
852 852 def close(self):
853 853 """close underlying file"""
854 854 if util.safehasattr(self._fp, 'close'):
855 855 return self._fp.close()
856 856
857 857 formatmap = {'20': unbundle20}
858 858
859 859 b2streamparamsmap = {}
860 860
861 861 def b2streamparamhandler(name):
862 862 """register a handler for a stream level parameter"""
863 863 def decorator(func):
864 864 assert name not in formatmap
865 865 b2streamparamsmap[name] = func
866 866 return func
867 867 return decorator
868 868
869 869 @b2streamparamhandler('compression')
870 870 def processcompression(unbundler, param, value):
871 871 """read compression parameter and install payload decompression"""
872 872 if value not in util.compengines.supportedbundletypes:
873 873 raise error.BundleUnknownFeatureError(params=(param,),
874 874 values=(value,))
875 875 unbundler._compengine = util.compengines.forbundletype(value)
876 876 if value is not None:
877 877 unbundler._compressed = True
878 878
879 879 class bundlepart(object):
880 880 """A bundle2 part contains application level payload
881 881
882 882 The part `type` is used to route the part to the application level
883 883 handler.
884 884
885 885 The part payload is contained in ``part.data``. It could be raw bytes or a
886 886 generator of byte chunks.
887 887
888 888 You can add parameters to the part using the ``addparam`` method.
889 889 Parameters can be either mandatory (default) or advisory. Remote side
890 890 should be able to safely ignore the advisory ones.
891 891
892 892 Both data and parameters cannot be modified after the generation has begun.
893 893 """
894 894
895 895 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
896 896 data='', mandatory=True):
897 897 validateparttype(parttype)
898 898 self.id = None
899 899 self.type = parttype
900 900 self._data = data
901 901 self._mandatoryparams = list(mandatoryparams)
902 902 self._advisoryparams = list(advisoryparams)
903 903 # checking for duplicated entries
904 904 self._seenparams = set()
905 905 for pname, __ in self._mandatoryparams + self._advisoryparams:
906 906 if pname in self._seenparams:
907 907 raise error.ProgrammingError('duplicated params: %s' % pname)
908 908 self._seenparams.add(pname)
909 909 # status of the part's generation:
910 910 # - None: not started,
911 911 # - False: currently generated,
912 912 # - True: generation done.
913 913 self._generated = None
914 914 self.mandatory = mandatory
915 915
916 916 def __repr__(self):
917 917 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
918 918 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
919 919 % (cls, id(self), self.id, self.type, self.mandatory))
920 920
921 921 def copy(self):
922 922 """return a copy of the part
923 923
924 924 The new part have the very same content but no partid assigned yet.
925 925 Parts with generated data cannot be copied."""
926 926 assert not util.safehasattr(self.data, 'next')
927 927 return self.__class__(self.type, self._mandatoryparams,
928 928 self._advisoryparams, self._data, self.mandatory)
929 929
930 930 # methods used to defines the part content
931 931 @property
932 932 def data(self):
933 933 return self._data
934 934
935 935 @data.setter
936 936 def data(self, data):
937 937 if self._generated is not None:
938 938 raise error.ReadOnlyPartError('part is being generated')
939 939 self._data = data
940 940
941 941 @property
942 942 def mandatoryparams(self):
943 943 # make it an immutable tuple to force people through ``addparam``
944 944 return tuple(self._mandatoryparams)
945 945
946 946 @property
947 947 def advisoryparams(self):
948 948 # make it an immutable tuple to force people through ``addparam``
949 949 return tuple(self._advisoryparams)
950 950
951 951 def addparam(self, name, value='', mandatory=True):
952 952 """add a parameter to the part
953 953
954 954 If 'mandatory' is set to True, the remote handler must claim support
955 955 for this parameter or the unbundling will be aborted.
956 956
957 957 The 'name' and 'value' cannot exceed 255 bytes each.
958 958 """
959 959 if self._generated is not None:
960 960 raise error.ReadOnlyPartError('part is being generated')
961 961 if name in self._seenparams:
962 962 raise ValueError('duplicated params: %s' % name)
963 963 self._seenparams.add(name)
964 964 params = self._advisoryparams
965 965 if mandatory:
966 966 params = self._mandatoryparams
967 967 params.append((name, value))
968 968
969 969 # methods used to generates the bundle2 stream
970 970 def getchunks(self, ui):
971 971 if self._generated is not None:
972 972 raise error.ProgrammingError('part can only be consumed once')
973 973 self._generated = False
974 974
975 975 if ui.debugflag:
976 976 msg = ['bundle2-output-part: "%s"' % self.type]
977 977 if not self.mandatory:
978 978 msg.append(' (advisory)')
979 979 nbmp = len(self.mandatoryparams)
980 980 nbap = len(self.advisoryparams)
981 981 if nbmp or nbap:
982 982 msg.append(' (params:')
983 983 if nbmp:
984 984 msg.append(' %i mandatory' % nbmp)
985 985 if nbap:
986 986 msg.append(' %i advisory' % nbmp)
987 987 msg.append(')')
988 988 if not self.data:
989 989 msg.append(' empty payload')
990 990 elif util.safehasattr(self.data, 'next'):
991 991 msg.append(' streamed payload')
992 992 else:
993 993 msg.append(' %i bytes payload' % len(self.data))
994 994 msg.append('\n')
995 995 ui.debug(''.join(msg))
996 996
997 997 #### header
998 998 if self.mandatory:
999 999 parttype = self.type.upper()
1000 1000 else:
1001 1001 parttype = self.type.lower()
1002 1002 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
1003 1003 ## parttype
1004 1004 header = [_pack(_fparttypesize, len(parttype)),
1005 1005 parttype, _pack(_fpartid, self.id),
1006 1006 ]
1007 1007 ## parameters
1008 1008 # count
1009 1009 manpar = self.mandatoryparams
1010 1010 advpar = self.advisoryparams
1011 1011 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1012 1012 # size
1013 1013 parsizes = []
1014 1014 for key, value in manpar:
1015 1015 parsizes.append(len(key))
1016 1016 parsizes.append(len(value))
1017 1017 for key, value in advpar:
1018 1018 parsizes.append(len(key))
1019 1019 parsizes.append(len(value))
1020 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
1020 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1021 1021 header.append(paramsizes)
1022 1022 # key, value
1023 1023 for key, value in manpar:
1024 1024 header.append(key)
1025 1025 header.append(value)
1026 1026 for key, value in advpar:
1027 1027 header.append(key)
1028 1028 header.append(value)
1029 1029 ## finalize header
1030 1030 headerchunk = ''.join(header)
1031 1031 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1032 1032 yield _pack(_fpartheadersize, len(headerchunk))
1033 1033 yield headerchunk
1034 1034 ## payload
1035 1035 try:
1036 1036 for chunk in self._payloadchunks():
1037 1037 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1038 1038 yield _pack(_fpayloadsize, len(chunk))
1039 1039 yield chunk
1040 1040 except GeneratorExit:
1041 1041 # GeneratorExit means that nobody is listening for our
1042 1042 # results anyway, so just bail quickly rather than trying
1043 1043 # to produce an error part.
1044 1044 ui.debug('bundle2-generatorexit\n')
1045 1045 raise
1046 1046 except BaseException as exc:
1047 1047 # backup exception data for later
1048 1048 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1049 1049 % exc)
1050 1050 tb = sys.exc_info()[2]
1051 1051 msg = 'unexpected error: %s' % exc
1052 1052 interpart = bundlepart('error:abort', [('message', msg)],
1053 1053 mandatory=False)
1054 1054 interpart.id = 0
1055 1055 yield _pack(_fpayloadsize, -1)
1056 1056 for chunk in interpart.getchunks(ui=ui):
1057 1057 yield chunk
1058 1058 outdebug(ui, 'closing payload chunk')
1059 1059 # abort current part payload
1060 1060 yield _pack(_fpayloadsize, 0)
1061 1061 pycompat.raisewithtb(exc, tb)
1062 1062 # end of payload
1063 1063 outdebug(ui, 'closing payload chunk')
1064 1064 yield _pack(_fpayloadsize, 0)
1065 1065 self._generated = True
1066 1066
1067 1067 def _payloadchunks(self):
1068 1068 """yield chunks of a the part payload
1069 1069
1070 1070 Exists to handle the different methods to provide data to a part."""
1071 1071 # we only support fixed size data now.
1072 1072 # This will be improved in the future.
1073 1073 if util.safehasattr(self.data, 'next'):
1074 1074 buff = util.chunkbuffer(self.data)
1075 1075 chunk = buff.read(preferedchunksize)
1076 1076 while chunk:
1077 1077 yield chunk
1078 1078 chunk = buff.read(preferedchunksize)
1079 1079 elif len(self.data):
1080 1080 yield self.data
1081 1081
1082 1082
1083 1083 flaginterrupt = -1
1084 1084
1085 1085 class interrupthandler(unpackermixin):
1086 1086 """read one part and process it with restricted capability
1087 1087
1088 1088 This allows to transmit exception raised on the producer size during part
1089 1089 iteration while the consumer is reading a part.
1090 1090
1091 1091 Part processed in this manner only have access to a ui object,"""
1092 1092
1093 1093 def __init__(self, ui, fp):
1094 1094 super(interrupthandler, self).__init__(fp)
1095 1095 self.ui = ui
1096 1096
1097 1097 def _readpartheader(self):
1098 1098 """reads a part header size and return the bytes blob
1099 1099
1100 1100 returns None if empty"""
1101 1101 headersize = self._unpack(_fpartheadersize)[0]
1102 1102 if headersize < 0:
1103 1103 raise error.BundleValueError('negative part header size: %i'
1104 1104 % headersize)
1105 1105 indebug(self.ui, 'part header size: %i\n' % headersize)
1106 1106 if headersize:
1107 1107 return self._readexact(headersize)
1108 1108 return None
1109 1109
1110 1110 def __call__(self):
1111 1111
1112 1112 self.ui.debug('bundle2-input-stream-interrupt:'
1113 1113 ' opening out of band context\n')
1114 1114 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1115 1115 headerblock = self._readpartheader()
1116 1116 if headerblock is None:
1117 1117 indebug(self.ui, 'no part found during interruption.')
1118 1118 return
1119 1119 part = unbundlepart(self.ui, headerblock, self._fp)
1120 1120 op = interruptoperation(self.ui)
1121 1121 _processpart(op, part)
1122 1122 self.ui.debug('bundle2-input-stream-interrupt:'
1123 1123 ' closing out of band context\n')
1124 1124
1125 1125 class interruptoperation(object):
1126 1126 """A limited operation to be use by part handler during interruption
1127 1127
1128 1128 It only have access to an ui object.
1129 1129 """
1130 1130
1131 1131 def __init__(self, ui):
1132 1132 self.ui = ui
1133 1133 self.reply = None
1134 1134 self.captureoutput = False
1135 1135
1136 1136 @property
1137 1137 def repo(self):
1138 1138 raise error.ProgrammingError('no repo access from stream interruption')
1139 1139
1140 1140 def gettransaction(self):
1141 1141 raise TransactionUnavailable('no repo access from stream interruption')
1142 1142
1143 1143 class unbundlepart(unpackermixin):
1144 1144 """a bundle part read from a bundle"""
1145 1145
1146 1146 def __init__(self, ui, header, fp):
1147 1147 super(unbundlepart, self).__init__(fp)
1148 1148 self._seekable = (util.safehasattr(fp, 'seek') and
1149 1149 util.safehasattr(fp, 'tell'))
1150 1150 self.ui = ui
1151 1151 # unbundle state attr
1152 1152 self._headerdata = header
1153 1153 self._headeroffset = 0
1154 1154 self._initialized = False
1155 1155 self.consumed = False
1156 1156 # part data
1157 1157 self.id = None
1158 1158 self.type = None
1159 1159 self.mandatoryparams = None
1160 1160 self.advisoryparams = None
1161 1161 self.params = None
1162 1162 self.mandatorykeys = ()
1163 1163 self._payloadstream = None
1164 1164 self._readheader()
1165 1165 self._mandatory = None
1166 1166 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1167 1167 self._pos = 0
1168 1168
1169 1169 def _fromheader(self, size):
1170 1170 """return the next <size> byte from the header"""
1171 1171 offset = self._headeroffset
1172 1172 data = self._headerdata[offset:(offset + size)]
1173 1173 self._headeroffset = offset + size
1174 1174 return data
1175 1175
1176 1176 def _unpackheader(self, format):
1177 1177 """read given format from header
1178 1178
1179 1179 This automatically compute the size of the format to read."""
1180 1180 data = self._fromheader(struct.calcsize(format))
1181 1181 return _unpack(format, data)
1182 1182
1183 1183 def _initparams(self, mandatoryparams, advisoryparams):
1184 1184 """internal function to setup all logic related parameters"""
1185 1185 # make it read only to prevent people touching it by mistake.
1186 1186 self.mandatoryparams = tuple(mandatoryparams)
1187 1187 self.advisoryparams = tuple(advisoryparams)
1188 1188 # user friendly UI
1189 1189 self.params = util.sortdict(self.mandatoryparams)
1190 1190 self.params.update(self.advisoryparams)
1191 1191 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1192 1192
1193 1193 def _payloadchunks(self, chunknum=0):
1194 1194 '''seek to specified chunk and start yielding data'''
1195 1195 if len(self._chunkindex) == 0:
1196 1196 assert chunknum == 0, 'Must start with chunk 0'
1197 1197 self._chunkindex.append((0, self._tellfp()))
1198 1198 else:
1199 1199 assert chunknum < len(self._chunkindex), \
1200 1200 'Unknown chunk %d' % chunknum
1201 1201 self._seekfp(self._chunkindex[chunknum][1])
1202 1202
1203 1203 pos = self._chunkindex[chunknum][0]
1204 1204 payloadsize = self._unpack(_fpayloadsize)[0]
1205 1205 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1206 1206 while payloadsize:
1207 1207 if payloadsize == flaginterrupt:
1208 1208 # interruption detection, the handler will now read a
1209 1209 # single part and process it.
1210 1210 interrupthandler(self.ui, self._fp)()
1211 1211 elif payloadsize < 0:
1212 1212 msg = 'negative payload chunk size: %i' % payloadsize
1213 1213 raise error.BundleValueError(msg)
1214 1214 else:
1215 1215 result = self._readexact(payloadsize)
1216 1216 chunknum += 1
1217 1217 pos += payloadsize
1218 1218 if chunknum == len(self._chunkindex):
1219 1219 self._chunkindex.append((pos, self._tellfp()))
1220 1220 yield result
1221 1221 payloadsize = self._unpack(_fpayloadsize)[0]
1222 1222 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1223 1223
1224 1224 def _findchunk(self, pos):
1225 1225 '''for a given payload position, return a chunk number and offset'''
1226 1226 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1227 1227 if ppos == pos:
1228 1228 return chunk, 0
1229 1229 elif ppos > pos:
1230 1230 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1231 1231 raise ValueError('Unknown chunk')
1232 1232
1233 1233 def _readheader(self):
1234 1234 """read the header and setup the object"""
1235 1235 typesize = self._unpackheader(_fparttypesize)[0]
1236 1236 self.type = self._fromheader(typesize)
1237 1237 indebug(self.ui, 'part type: "%s"' % self.type)
1238 1238 self.id = self._unpackheader(_fpartid)[0]
1239 1239 indebug(self.ui, 'part id: "%s"' % self.id)
1240 1240 # extract mandatory bit from type
1241 1241 self.mandatory = (self.type != self.type.lower())
1242 1242 self.type = self.type.lower()
1243 1243 ## reading parameters
1244 1244 # param count
1245 1245 mancount, advcount = self._unpackheader(_fpartparamcount)
1246 1246 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1247 1247 # param size
1248 1248 fparamsizes = _makefpartparamsizes(mancount + advcount)
1249 1249 paramsizes = self._unpackheader(fparamsizes)
1250 1250 # make it a list of couple again
1251 1251 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1252 1252 # split mandatory from advisory
1253 1253 mansizes = paramsizes[:mancount]
1254 1254 advsizes = paramsizes[mancount:]
1255 1255 # retrieve param value
1256 1256 manparams = []
1257 1257 for key, value in mansizes:
1258 1258 manparams.append((self._fromheader(key), self._fromheader(value)))
1259 1259 advparams = []
1260 1260 for key, value in advsizes:
1261 1261 advparams.append((self._fromheader(key), self._fromheader(value)))
1262 1262 self._initparams(manparams, advparams)
1263 1263 ## part payload
1264 1264 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1265 1265 # we read the data, tell it
1266 1266 self._initialized = True
1267 1267
1268 1268 def read(self, size=None):
1269 1269 """read payload data"""
1270 1270 if not self._initialized:
1271 1271 self._readheader()
1272 1272 if size is None:
1273 1273 data = self._payloadstream.read()
1274 1274 else:
1275 1275 data = self._payloadstream.read(size)
1276 1276 self._pos += len(data)
1277 1277 if size is None or len(data) < size:
1278 1278 if not self.consumed and self._pos:
1279 1279 self.ui.debug('bundle2-input-part: total payload size %i\n'
1280 1280 % self._pos)
1281 1281 self.consumed = True
1282 1282 return data
1283 1283
1284 1284 def tell(self):
1285 1285 return self._pos
1286 1286
1287 1287 def seek(self, offset, whence=0):
1288 1288 if whence == 0:
1289 1289 newpos = offset
1290 1290 elif whence == 1:
1291 1291 newpos = self._pos + offset
1292 1292 elif whence == 2:
1293 1293 if not self.consumed:
1294 1294 self.read()
1295 1295 newpos = self._chunkindex[-1][0] - offset
1296 1296 else:
1297 1297 raise ValueError('Unknown whence value: %r' % (whence,))
1298 1298
1299 1299 if newpos > self._chunkindex[-1][0] and not self.consumed:
1300 1300 self.read()
1301 1301 if not 0 <= newpos <= self._chunkindex[-1][0]:
1302 1302 raise ValueError('Offset out of range')
1303 1303
1304 1304 if self._pos != newpos:
1305 1305 chunk, internaloffset = self._findchunk(newpos)
1306 1306 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1307 1307 adjust = self.read(internaloffset)
1308 1308 if len(adjust) != internaloffset:
1309 1309 raise error.Abort(_('Seek failed\n'))
1310 1310 self._pos = newpos
1311 1311
1312 1312 def _seekfp(self, offset, whence=0):
1313 1313 """move the underlying file pointer
1314 1314
1315 1315 This method is meant for internal usage by the bundle2 protocol only.
1316 1316 They directly manipulate the low level stream including bundle2 level
1317 1317 instruction.
1318 1318
1319 1319 Do not use it to implement higher-level logic or methods."""
1320 1320 if self._seekable:
1321 1321 return self._fp.seek(offset, whence)
1322 1322 else:
1323 1323 raise NotImplementedError(_('File pointer is not seekable'))
1324 1324
1325 1325 def _tellfp(self):
1326 1326 """return the file offset, or None if file is not seekable
1327 1327
1328 1328 This method is meant for internal usage by the bundle2 protocol only.
1329 1329 They directly manipulate the low level stream including bundle2 level
1330 1330 instruction.
1331 1331
1332 1332 Do not use it to implement higher-level logic or methods."""
1333 1333 if self._seekable:
1334 1334 try:
1335 1335 return self._fp.tell()
1336 1336 except IOError as e:
1337 1337 if e.errno == errno.ESPIPE:
1338 1338 self._seekable = False
1339 1339 else:
1340 1340 raise
1341 1341 return None
1342 1342
1343 1343 # These are only the static capabilities.
1344 1344 # Check the 'getrepocaps' function for the rest.
1345 1345 capabilities = {'HG20': (),
1346 1346 'error': ('abort', 'unsupportedcontent', 'pushraced',
1347 1347 'pushkey'),
1348 1348 'listkeys': (),
1349 1349 'pushkey': (),
1350 1350 'digests': tuple(sorted(util.DIGESTS.keys())),
1351 1351 'remote-changegroup': ('http', 'https'),
1352 1352 'hgtagsfnodes': (),
1353 1353 }
1354 1354
1355 1355 def getrepocaps(repo, allowpushback=False):
1356 1356 """return the bundle2 capabilities for a given repo
1357 1357
1358 1358 Exists to allow extensions (like evolution) to mutate the capabilities.
1359 1359 """
1360 1360 caps = capabilities.copy()
1361 1361 caps['changegroup'] = tuple(sorted(
1362 1362 changegroup.supportedincomingversions(repo)))
1363 1363 if obsolete.isenabled(repo, obsolete.exchangeopt):
1364 1364 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1365 1365 caps['obsmarkers'] = supportedformat
1366 1366 if allowpushback:
1367 1367 caps['pushback'] = ()
1368 1368 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1369 1369 if cpmode == 'check-related':
1370 1370 caps['checkheads'] = ('related',)
1371 1371 return caps
1372 1372
1373 1373 def bundle2caps(remote):
1374 1374 """return the bundle capabilities of a peer as dict"""
1375 1375 raw = remote.capable('bundle2')
1376 1376 if not raw and raw != '':
1377 1377 return {}
1378 1378 capsblob = urlreq.unquote(remote.capable('bundle2'))
1379 1379 return decodecaps(capsblob)
1380 1380
1381 1381 def obsmarkersversion(caps):
1382 1382 """extract the list of supported obsmarkers versions from a bundle2caps dict
1383 1383 """
1384 1384 obscaps = caps.get('obsmarkers', ())
1385 1385 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1386 1386
1387 1387 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1388 1388 vfs=None, compression=None, compopts=None):
1389 1389 if bundletype.startswith('HG10'):
1390 1390 cg = changegroup.getchangegroup(repo, source, outgoing, version='01')
1391 1391 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1392 1392 compression=compression, compopts=compopts)
1393 1393 elif not bundletype.startswith('HG20'):
1394 1394 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1395 1395
1396 1396 caps = {}
1397 1397 if 'obsolescence' in opts:
1398 1398 caps['obsmarkers'] = ('V1',)
1399 1399 bundle = bundle20(ui, caps)
1400 1400 bundle.setcompression(compression, compopts)
1401 1401 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1402 1402 chunkiter = bundle.getchunks()
1403 1403
1404 1404 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1405 1405
1406 1406 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1407 1407 # We should eventually reconcile this logic with the one behind
1408 1408 # 'exchange.getbundle2partsgenerator'.
1409 1409 #
1410 1410 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1411 1411 # different right now. So we keep them separated for now for the sake of
1412 1412 # simplicity.
1413 1413
1414 1414 # we always want a changegroup in such bundle
1415 1415 cgversion = opts.get('cg.version')
1416 1416 if cgversion is None:
1417 1417 cgversion = changegroup.safeversion(repo)
1418 1418 cg = changegroup.getchangegroup(repo, source, outgoing,
1419 1419 version=cgversion)
1420 1420 part = bundler.newpart('changegroup', data=cg.getchunks())
1421 1421 part.addparam('version', cg.version)
1422 1422 if 'clcount' in cg.extras:
1423 1423 part.addparam('nbchanges', str(cg.extras['clcount']),
1424 1424 mandatory=False)
1425 1425 if opts.get('phases') and repo.revs('%ln and secret()',
1426 1426 outgoing.missingheads):
1427 1427 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1428 1428
1429 1429 addparttagsfnodescache(repo, bundler, outgoing)
1430 1430
1431 1431 if opts.get('obsolescence', False):
1432 1432 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1433 1433 buildobsmarkerspart(bundler, obsmarkers)
1434 1434
1435 1435 if opts.get('phases', False):
1436 1436 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1437 1437 phasedata = []
1438 1438 for phase in phases.allphases:
1439 1439 for head in headsbyphase[phase]:
1440 1440 phasedata.append(_pack(_fphasesentry, phase, head))
1441 1441 bundler.newpart('phase-heads', data=''.join(phasedata))
1442 1442
1443 1443 def addparttagsfnodescache(repo, bundler, outgoing):
1444 1444 # we include the tags fnode cache for the bundle changeset
1445 1445 # (as an optional parts)
1446 1446 cache = tags.hgtagsfnodescache(repo.unfiltered())
1447 1447 chunks = []
1448 1448
1449 1449 # .hgtags fnodes are only relevant for head changesets. While we could
1450 1450 # transfer values for all known nodes, there will likely be little to
1451 1451 # no benefit.
1452 1452 #
1453 1453 # We don't bother using a generator to produce output data because
1454 1454 # a) we only have 40 bytes per head and even esoteric numbers of heads
1455 1455 # consume little memory (1M heads is 40MB) b) we don't want to send the
1456 1456 # part if we don't have entries and knowing if we have entries requires
1457 1457 # cache lookups.
1458 1458 for node in outgoing.missingheads:
1459 1459 # Don't compute missing, as this may slow down serving.
1460 1460 fnode = cache.getfnode(node, computemissing=False)
1461 1461 if fnode is not None:
1462 1462 chunks.extend([node, fnode])
1463 1463
1464 1464 if chunks:
1465 1465 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1466 1466
1467 1467 def buildobsmarkerspart(bundler, markers):
1468 1468 """add an obsmarker part to the bundler with <markers>
1469 1469
1470 1470 No part is created if markers is empty.
1471 1471 Raises ValueError if the bundler doesn't support any known obsmarker format.
1472 1472 """
1473 1473 if not markers:
1474 1474 return None
1475 1475
1476 1476 remoteversions = obsmarkersversion(bundler.capabilities)
1477 1477 version = obsolete.commonversion(remoteversions)
1478 1478 if version is None:
1479 1479 raise ValueError('bundler does not support common obsmarker format')
1480 1480 stream = obsolete.encodemarkers(markers, True, version=version)
1481 1481 return bundler.newpart('obsmarkers', data=stream)
1482 1482
1483 1483 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1484 1484 compopts=None):
1485 1485 """Write a bundle file and return its filename.
1486 1486
1487 1487 Existing files will not be overwritten.
1488 1488 If no filename is specified, a temporary file is created.
1489 1489 bz2 compression can be turned off.
1490 1490 The bundle file will be deleted in case of errors.
1491 1491 """
1492 1492
1493 1493 if bundletype == "HG20":
1494 1494 bundle = bundle20(ui)
1495 1495 bundle.setcompression(compression, compopts)
1496 1496 part = bundle.newpart('changegroup', data=cg.getchunks())
1497 1497 part.addparam('version', cg.version)
1498 1498 if 'clcount' in cg.extras:
1499 1499 part.addparam('nbchanges', str(cg.extras['clcount']),
1500 1500 mandatory=False)
1501 1501 chunkiter = bundle.getchunks()
1502 1502 else:
1503 1503 # compression argument is only for the bundle2 case
1504 1504 assert compression is None
1505 1505 if cg.version != '01':
1506 1506 raise error.Abort(_('old bundle types only supports v1 '
1507 1507 'changegroups'))
1508 1508 header, comp = bundletypes[bundletype]
1509 1509 if comp not in util.compengines.supportedbundletypes:
1510 1510 raise error.Abort(_('unknown stream compression type: %s')
1511 1511 % comp)
1512 1512 compengine = util.compengines.forbundletype(comp)
1513 1513 def chunkiter():
1514 1514 yield header
1515 1515 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1516 1516 yield chunk
1517 1517 chunkiter = chunkiter()
1518 1518
1519 1519 # parse the changegroup data, otherwise we will block
1520 1520 # in case of sshrepo because we don't know the end of the stream
1521 1521 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1522 1522
1523 1523 def combinechangegroupresults(op):
1524 1524 """logic to combine 0 or more addchangegroup results into one"""
1525 1525 results = [r.get('return', 0)
1526 1526 for r in op.records['changegroup']]
1527 1527 changedheads = 0
1528 1528 result = 1
1529 1529 for ret in results:
1530 1530 # If any changegroup result is 0, return 0
1531 1531 if ret == 0:
1532 1532 result = 0
1533 1533 break
1534 1534 if ret < -1:
1535 1535 changedheads += ret + 1
1536 1536 elif ret > 1:
1537 1537 changedheads += ret - 1
1538 1538 if changedheads > 0:
1539 1539 result = 1 + changedheads
1540 1540 elif changedheads < 0:
1541 1541 result = -1 + changedheads
1542 1542 return result
1543 1543
1544 1544 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1545 1545 'targetphase'))
1546 1546 def handlechangegroup(op, inpart):
1547 1547 """apply a changegroup part on the repo
1548 1548
1549 1549 This is a very early implementation that will massive rework before being
1550 1550 inflicted to any end-user.
1551 1551 """
1552 1552 tr = op.gettransaction()
1553 1553 unpackerversion = inpart.params.get('version', '01')
1554 1554 # We should raise an appropriate exception here
1555 1555 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1556 1556 # the source and url passed here are overwritten by the one contained in
1557 1557 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1558 1558 nbchangesets = None
1559 1559 if 'nbchanges' in inpart.params:
1560 1560 nbchangesets = int(inpart.params.get('nbchanges'))
1561 1561 if ('treemanifest' in inpart.params and
1562 1562 'treemanifest' not in op.repo.requirements):
1563 1563 if len(op.repo.changelog) != 0:
1564 1564 raise error.Abort(_(
1565 1565 "bundle contains tree manifests, but local repo is "
1566 1566 "non-empty and does not use tree manifests"))
1567 1567 op.repo.requirements.add('treemanifest')
1568 1568 op.repo._applyopenerreqs()
1569 1569 op.repo._writerequirements()
1570 1570 extrakwargs = {}
1571 1571 targetphase = inpart.params.get('targetphase')
1572 1572 if targetphase is not None:
1573 1573 extrakwargs['targetphase'] = int(targetphase)
1574 1574 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1575 1575 expectedtotal=nbchangesets, **extrakwargs)
1576 1576 if op.reply is not None:
1577 1577 # This is definitely not the final form of this
1578 1578 # return. But one need to start somewhere.
1579 1579 part = op.reply.newpart('reply:changegroup', mandatory=False)
1580 1580 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1581 1581 part.addparam('return', '%i' % ret, mandatory=False)
1582 1582 assert not inpart.read()
1583 1583
1584 1584 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1585 1585 ['digest:%s' % k for k in util.DIGESTS.keys()])
1586 1586 @parthandler('remote-changegroup', _remotechangegroupparams)
1587 1587 def handleremotechangegroup(op, inpart):
1588 1588 """apply a bundle10 on the repo, given an url and validation information
1589 1589
1590 1590 All the information about the remote bundle to import are given as
1591 1591 parameters. The parameters include:
1592 1592 - url: the url to the bundle10.
1593 1593 - size: the bundle10 file size. It is used to validate what was
1594 1594 retrieved by the client matches the server knowledge about the bundle.
1595 1595 - digests: a space separated list of the digest types provided as
1596 1596 parameters.
1597 1597 - digest:<digest-type>: the hexadecimal representation of the digest with
1598 1598 that name. Like the size, it is used to validate what was retrieved by
1599 1599 the client matches what the server knows about the bundle.
1600 1600
1601 1601 When multiple digest types are given, all of them are checked.
1602 1602 """
1603 1603 try:
1604 1604 raw_url = inpart.params['url']
1605 1605 except KeyError:
1606 1606 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1607 1607 parsed_url = util.url(raw_url)
1608 1608 if parsed_url.scheme not in capabilities['remote-changegroup']:
1609 1609 raise error.Abort(_('remote-changegroup does not support %s urls') %
1610 1610 parsed_url.scheme)
1611 1611
1612 1612 try:
1613 1613 size = int(inpart.params['size'])
1614 1614 except ValueError:
1615 1615 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1616 1616 % 'size')
1617 1617 except KeyError:
1618 1618 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1619 1619
1620 1620 digests = {}
1621 1621 for typ in inpart.params.get('digests', '').split():
1622 1622 param = 'digest:%s' % typ
1623 1623 try:
1624 1624 value = inpart.params[param]
1625 1625 except KeyError:
1626 1626 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1627 1627 param)
1628 1628 digests[typ] = value
1629 1629
1630 1630 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1631 1631
1632 1632 tr = op.gettransaction()
1633 1633 from . import exchange
1634 1634 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1635 1635 if not isinstance(cg, changegroup.cg1unpacker):
1636 1636 raise error.Abort(_('%s: not a bundle version 1.0') %
1637 1637 util.hidepassword(raw_url))
1638 1638 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1639 1639 if op.reply is not None:
1640 1640 # This is definitely not the final form of this
1641 1641 # return. But one need to start somewhere.
1642 1642 part = op.reply.newpart('reply:changegroup')
1643 1643 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1644 1644 part.addparam('return', '%i' % ret, mandatory=False)
1645 1645 try:
1646 1646 real_part.validate()
1647 1647 except error.Abort as e:
1648 1648 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1649 1649 (util.hidepassword(raw_url), str(e)))
1650 1650 assert not inpart.read()
1651 1651
1652 1652 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1653 1653 def handlereplychangegroup(op, inpart):
1654 1654 ret = int(inpart.params['return'])
1655 1655 replyto = int(inpart.params['in-reply-to'])
1656 1656 op.records.add('changegroup', {'return': ret}, replyto)
1657 1657
1658 1658 @parthandler('check:heads')
1659 1659 def handlecheckheads(op, inpart):
1660 1660 """check that head of the repo did not change
1661 1661
1662 1662 This is used to detect a push race when using unbundle.
1663 1663 This replaces the "heads" argument of unbundle."""
1664 1664 h = inpart.read(20)
1665 1665 heads = []
1666 1666 while len(h) == 20:
1667 1667 heads.append(h)
1668 1668 h = inpart.read(20)
1669 1669 assert not h
1670 1670 # Trigger a transaction so that we are guaranteed to have the lock now.
1671 1671 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1672 1672 op.gettransaction()
1673 1673 if sorted(heads) != sorted(op.repo.heads()):
1674 1674 raise error.PushRaced('repository changed while pushing - '
1675 1675 'please try again')
1676 1676
1677 1677 @parthandler('check:updated-heads')
1678 1678 def handlecheckupdatedheads(op, inpart):
1679 1679 """check for race on the heads touched by a push
1680 1680
1681 1681 This is similar to 'check:heads' but focus on the heads actually updated
1682 1682 during the push. If other activities happen on unrelated heads, it is
1683 1683 ignored.
1684 1684
1685 1685 This allow server with high traffic to avoid push contention as long as
1686 1686 unrelated parts of the graph are involved."""
1687 1687 h = inpart.read(20)
1688 1688 heads = []
1689 1689 while len(h) == 20:
1690 1690 heads.append(h)
1691 1691 h = inpart.read(20)
1692 1692 assert not h
1693 1693 # trigger a transaction so that we are guaranteed to have the lock now.
1694 1694 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1695 1695 op.gettransaction()
1696 1696
1697 1697 currentheads = set()
1698 1698 for ls in op.repo.branchmap().itervalues():
1699 1699 currentheads.update(ls)
1700 1700
1701 1701 for h in heads:
1702 1702 if h not in currentheads:
1703 1703 raise error.PushRaced('repository changed while pushing - '
1704 1704 'please try again')
1705 1705
1706 1706 @parthandler('output')
1707 1707 def handleoutput(op, inpart):
1708 1708 """forward output captured on the server to the client"""
1709 1709 for line in inpart.read().splitlines():
1710 1710 op.ui.status(_('remote: %s\n') % line)
1711 1711
1712 1712 @parthandler('replycaps')
1713 1713 def handlereplycaps(op, inpart):
1714 1714 """Notify that a reply bundle should be created
1715 1715
1716 1716 The payload contains the capabilities information for the reply"""
1717 1717 caps = decodecaps(inpart.read())
1718 1718 if op.reply is None:
1719 1719 op.reply = bundle20(op.ui, caps)
1720 1720
1721 1721 class AbortFromPart(error.Abort):
1722 1722 """Sub-class of Abort that denotes an error from a bundle2 part."""
1723 1723
1724 1724 @parthandler('error:abort', ('message', 'hint'))
1725 1725 def handleerrorabort(op, inpart):
1726 1726 """Used to transmit abort error over the wire"""
1727 1727 raise AbortFromPart(inpart.params['message'],
1728 1728 hint=inpart.params.get('hint'))
1729 1729
1730 1730 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1731 1731 'in-reply-to'))
1732 1732 def handleerrorpushkey(op, inpart):
1733 1733 """Used to transmit failure of a mandatory pushkey over the wire"""
1734 1734 kwargs = {}
1735 1735 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1736 1736 value = inpart.params.get(name)
1737 1737 if value is not None:
1738 1738 kwargs[name] = value
1739 1739 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1740 1740
1741 1741 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1742 1742 def handleerrorunsupportedcontent(op, inpart):
1743 1743 """Used to transmit unknown content error over the wire"""
1744 1744 kwargs = {}
1745 1745 parttype = inpart.params.get('parttype')
1746 1746 if parttype is not None:
1747 1747 kwargs['parttype'] = parttype
1748 1748 params = inpart.params.get('params')
1749 1749 if params is not None:
1750 1750 kwargs['params'] = params.split('\0')
1751 1751
1752 1752 raise error.BundleUnknownFeatureError(**kwargs)
1753 1753
1754 1754 @parthandler('error:pushraced', ('message',))
1755 1755 def handleerrorpushraced(op, inpart):
1756 1756 """Used to transmit push race error over the wire"""
1757 1757 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1758 1758
1759 1759 @parthandler('listkeys', ('namespace',))
1760 1760 def handlelistkeys(op, inpart):
1761 1761 """retrieve pushkey namespace content stored in a bundle2"""
1762 1762 namespace = inpart.params['namespace']
1763 1763 r = pushkey.decodekeys(inpart.read())
1764 1764 op.records.add('listkeys', (namespace, r))
1765 1765
1766 1766 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1767 1767 def handlepushkey(op, inpart):
1768 1768 """process a pushkey request"""
1769 1769 dec = pushkey.decode
1770 1770 namespace = dec(inpart.params['namespace'])
1771 1771 key = dec(inpart.params['key'])
1772 1772 old = dec(inpart.params['old'])
1773 1773 new = dec(inpart.params['new'])
1774 1774 # Grab the transaction to ensure that we have the lock before performing the
1775 1775 # pushkey.
1776 1776 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1777 1777 op.gettransaction()
1778 1778 ret = op.repo.pushkey(namespace, key, old, new)
1779 1779 record = {'namespace': namespace,
1780 1780 'key': key,
1781 1781 'old': old,
1782 1782 'new': new}
1783 1783 op.records.add('pushkey', record)
1784 1784 if op.reply is not None:
1785 1785 rpart = op.reply.newpart('reply:pushkey')
1786 1786 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1787 1787 rpart.addparam('return', '%i' % ret, mandatory=False)
1788 1788 if inpart.mandatory and not ret:
1789 1789 kwargs = {}
1790 1790 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1791 1791 if key in inpart.params:
1792 1792 kwargs[key] = inpart.params[key]
1793 1793 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1794 1794
1795 1795 def _readphaseheads(inpart):
1796 1796 headsbyphase = [[] for i in phases.allphases]
1797 1797 entrysize = struct.calcsize(_fphasesentry)
1798 1798 while True:
1799 1799 entry = inpart.read(entrysize)
1800 1800 if len(entry) < entrysize:
1801 1801 if entry:
1802 1802 raise error.Abort(_('bad phase-heads bundle part'))
1803 1803 break
1804 1804 phase, node = struct.unpack(_fphasesentry, entry)
1805 1805 headsbyphase[phase].append(node)
1806 1806 return headsbyphase
1807 1807
1808 1808 @parthandler('phase-heads')
1809 1809 def handlephases(op, inpart):
1810 1810 """apply phases from bundle part to repo"""
1811 1811 headsbyphase = _readphaseheads(inpart)
1812 1812 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1813 1813
1814 1814 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1815 1815 def handlepushkeyreply(op, inpart):
1816 1816 """retrieve the result of a pushkey request"""
1817 1817 ret = int(inpart.params['return'])
1818 1818 partid = int(inpart.params['in-reply-to'])
1819 1819 op.records.add('pushkey', {'return': ret}, partid)
1820 1820
1821 1821 @parthandler('obsmarkers')
1822 1822 def handleobsmarker(op, inpart):
1823 1823 """add a stream of obsmarkers to the repo"""
1824 1824 tr = op.gettransaction()
1825 1825 markerdata = inpart.read()
1826 1826 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1827 1827 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1828 1828 % len(markerdata))
1829 1829 # The mergemarkers call will crash if marker creation is not enabled.
1830 1830 # we want to avoid this if the part is advisory.
1831 1831 if not inpart.mandatory and op.repo.obsstore.readonly:
1832 1832 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1833 1833 return
1834 1834 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1835 1835 op.repo.invalidatevolatilesets()
1836 1836 if new:
1837 1837 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1838 1838 op.records.add('obsmarkers', {'new': new})
1839 1839 if op.reply is not None:
1840 1840 rpart = op.reply.newpart('reply:obsmarkers')
1841 1841 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1842 1842 rpart.addparam('new', '%i' % new, mandatory=False)
1843 1843
1844 1844
1845 1845 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1846 1846 def handleobsmarkerreply(op, inpart):
1847 1847 """retrieve the result of a pushkey request"""
1848 1848 ret = int(inpart.params['new'])
1849 1849 partid = int(inpart.params['in-reply-to'])
1850 1850 op.records.add('obsmarkers', {'new': ret}, partid)
1851 1851
1852 1852 @parthandler('hgtagsfnodes')
1853 1853 def handlehgtagsfnodes(op, inpart):
1854 1854 """Applies .hgtags fnodes cache entries to the local repo.
1855 1855
1856 1856 Payload is pairs of 20 byte changeset nodes and filenodes.
1857 1857 """
1858 1858 # Grab the transaction so we ensure that we have the lock at this point.
1859 1859 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1860 1860 op.gettransaction()
1861 1861 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1862 1862
1863 1863 count = 0
1864 1864 while True:
1865 1865 node = inpart.read(20)
1866 1866 fnode = inpart.read(20)
1867 1867 if len(node) < 20 or len(fnode) < 20:
1868 1868 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1869 1869 break
1870 1870 cache.setfnode(node, fnode)
1871 1871 count += 1
1872 1872
1873 1873 cache.write()
1874 1874 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now