##// END OF EJS Templates
bundle2: add an UnsupportedPartError...
Pierre-Yves David -
r23010:73f394f4 default
parent child Browse files
Show More
@@ -1,956 +1,956 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part headers. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 import util
149 149 import struct
150 150 import urllib
151 151 import string
152 152 import obsolete
153 153 import pushkey
154 154
155 155 import changegroup, error
156 156 from i18n import _
157 157
158 158 _pack = struct.pack
159 159 _unpack = struct.unpack
160 160
161 161 _magicstring = 'HG2Y'
162 162
163 163 _fstreamparamsize = '>i'
164 164 _fpartheadersize = '>i'
165 165 _fparttypesize = '>B'
166 166 _fpartid = '>I'
167 167 _fpayloadsize = '>i'
168 168 _fpartparamcount = '>BB'
169 169
170 170 preferedchunksize = 4096
171 171
172 172 def _makefpartparamsizes(nbparams):
173 173 """return a struct format to read part parameter sizes
174 174
175 175 The number parameters is variable so we need to build that format
176 176 dynamically.
177 177 """
178 178 return '>'+('BB'*nbparams)
179 179
180 180 parthandlermapping = {}
181 181
182 182 def parthandler(parttype, params=()):
183 183 """decorator that register a function as a bundle2 part handler
184 184
185 185 eg::
186 186
187 187 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
188 188 def myparttypehandler(...):
189 189 '''process a part of type "my part".'''
190 190 ...
191 191 """
192 192 def _decorator(func):
193 193 lparttype = parttype.lower() # enforce lower case matching.
194 194 assert lparttype not in parthandlermapping
195 195 parthandlermapping[lparttype] = func
196 196 func.params = frozenset(params)
197 197 return func
198 198 return _decorator
199 199
200 200 class unbundlerecords(object):
201 201 """keep record of what happens during and unbundle
202 202
203 203 New records are added using `records.add('cat', obj)`. Where 'cat' is a
204 204 category of record and obj is an arbitrary object.
205 205
206 206 `records['cat']` will return all entries of this category 'cat'.
207 207
208 208 Iterating on the object itself will yield `('category', obj)` tuples
209 209 for all entries.
210 210
211 211 All iterations happens in chronological order.
212 212 """
213 213
214 214 def __init__(self):
215 215 self._categories = {}
216 216 self._sequences = []
217 217 self._replies = {}
218 218
219 219 def add(self, category, entry, inreplyto=None):
220 220 """add a new record of a given category.
221 221
222 222 The entry can then be retrieved in the list returned by
223 223 self['category']."""
224 224 self._categories.setdefault(category, []).append(entry)
225 225 self._sequences.append((category, entry))
226 226 if inreplyto is not None:
227 227 self.getreplies(inreplyto).add(category, entry)
228 228
229 229 def getreplies(self, partid):
230 230 """get the subrecords that replies to a specific part"""
231 231 return self._replies.setdefault(partid, unbundlerecords())
232 232
233 233 def __getitem__(self, cat):
234 234 return tuple(self._categories.get(cat, ()))
235 235
236 236 def __iter__(self):
237 237 return iter(self._sequences)
238 238
239 239 def __len__(self):
240 240 return len(self._sequences)
241 241
242 242 def __nonzero__(self):
243 243 return bool(self._sequences)
244 244
245 245 class bundleoperation(object):
246 246 """an object that represents a single bundling process
247 247
248 248 Its purpose is to carry unbundle-related objects and states.
249 249
250 250 A new object should be created at the beginning of each bundle processing.
251 251 The object is to be returned by the processing function.
252 252
253 253 The object has very little content now it will ultimately contain:
254 254 * an access to the repo the bundle is applied to,
255 255 * a ui object,
256 256 * a way to retrieve a transaction to add changes to the repo,
257 257 * a way to record the result of processing each part,
258 258 * a way to construct a bundle response when applicable.
259 259 """
260 260
261 261 def __init__(self, repo, transactiongetter):
262 262 self.repo = repo
263 263 self.ui = repo.ui
264 264 self.records = unbundlerecords()
265 265 self.gettransaction = transactiongetter
266 266 self.reply = None
267 267
268 268 class TransactionUnavailable(RuntimeError):
269 269 pass
270 270
271 271 def _notransaction():
272 272 """default method to get a transaction while processing a bundle
273 273
274 274 Raise an exception to highlight the fact that no transaction was expected
275 275 to be created"""
276 276 raise TransactionUnavailable()
277 277
278 278 def processbundle(repo, unbundler, transactiongetter=_notransaction):
279 279 """This function process a bundle, apply effect to/from a repo
280 280
281 281 It iterates over each part then searches for and uses the proper handling
282 282 code to process the part. Parts are processed in order.
283 283
284 284 This is very early version of this function that will be strongly reworked
285 285 before final usage.
286 286
287 287 Unknown Mandatory part will abort the process.
288 288 """
289 289 op = bundleoperation(repo, transactiongetter)
290 290 # todo:
291 291 # - replace this is a init function soon.
292 292 # - exception catching
293 293 unbundler.params
294 294 iterparts = unbundler.iterparts()
295 295 part = None
296 296 try:
297 297 for part in iterparts:
298 298 _processpart(op, part)
299 299 except Exception, exc:
300 300 for part in iterparts:
301 301 # consume the bundle content
302 302 part.read()
303 303 # Small hack to let caller code distinguish exceptions from bundle2
304 304 # processing fron the ones from bundle1 processing. This is mostly
305 305 # needed to handle different return codes to unbundle according to the
306 306 # type of bundle. We should probably clean up or drop this return code
307 307 # craziness in a future version.
308 308 exc.duringunbundle2 = True
309 309 raise
310 310 return op
311 311
312 312 def _processpart(op, part):
313 313 """process a single part from a bundle
314 314
315 315 The part is guaranteed to have been fully consumed when the function exits
316 316 (even if an exception is raised)."""
317 317 try:
318 318 parttype = part.type
319 319 # part key are matched lower case
320 320 key = parttype.lower()
321 321 try:
322 322 handler = parthandlermapping.get(key)
323 323 if handler is None:
324 raise error.BundleValueError(parttype=key)
324 raise error.UnsupportedPartError(parttype=key)
325 325 op.ui.debug('found a handler for part %r\n' % parttype)
326 326 unknownparams = part.mandatorykeys - handler.params
327 327 if unknownparams:
328 328 unknownparams = list(unknownparams)
329 329 unknownparams.sort()
330 raise error.BundleValueError(parttype=key,
330 raise error.UnsupportedPartError(parttype=key,
331 331 params=unknownparams)
332 except error.BundleValueError, exc:
332 except error.UnsupportedPartError, exc:
333 333 if key != parttype: # mandatory parts
334 334 raise
335 335 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
336 336 return # skip to part processing
337 337
338 338 # handler is called outside the above try block so that we don't
339 339 # risk catching KeyErrors from anything other than the
340 340 # parthandlermapping lookup (any KeyError raised by handler()
341 341 # itself represents a defect of a different variety).
342 342 output = None
343 343 if op.reply is not None:
344 344 op.ui.pushbuffer(error=True)
345 345 output = ''
346 346 try:
347 347 handler(op, part)
348 348 finally:
349 349 if output is not None:
350 350 output = op.ui.popbuffer()
351 351 if output:
352 352 outpart = op.reply.newpart('b2x:output', data=output)
353 353 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
354 354 finally:
355 355 # consume the part content to not corrupt the stream.
356 356 part.read()
357 357
358 358
359 359 def decodecaps(blob):
360 360 """decode a bundle2 caps bytes blob into a dictionnary
361 361
362 362 The blob is a list of capabilities (one per line)
363 363 Capabilities may have values using a line of the form::
364 364
365 365 capability=value1,value2,value3
366 366
367 367 The values are always a list."""
368 368 caps = {}
369 369 for line in blob.splitlines():
370 370 if not line:
371 371 continue
372 372 if '=' not in line:
373 373 key, vals = line, ()
374 374 else:
375 375 key, vals = line.split('=', 1)
376 376 vals = vals.split(',')
377 377 key = urllib.unquote(key)
378 378 vals = [urllib.unquote(v) for v in vals]
379 379 caps[key] = vals
380 380 return caps
381 381
382 382 def encodecaps(caps):
383 383 """encode a bundle2 caps dictionary into a bytes blob"""
384 384 chunks = []
385 385 for ca in sorted(caps):
386 386 vals = caps[ca]
387 387 ca = urllib.quote(ca)
388 388 vals = [urllib.quote(v) for v in vals]
389 389 if vals:
390 390 ca = "%s=%s" % (ca, ','.join(vals))
391 391 chunks.append(ca)
392 392 return '\n'.join(chunks)
393 393
394 394 class bundle20(object):
395 395 """represent an outgoing bundle2 container
396 396
397 397 Use the `addparam` method to add stream level parameter. and `newpart` to
398 398 populate it. Then call `getchunks` to retrieve all the binary chunks of
399 399 data that compose the bundle2 container."""
400 400
401 401 def __init__(self, ui, capabilities=()):
402 402 self.ui = ui
403 403 self._params = []
404 404 self._parts = []
405 405 self.capabilities = dict(capabilities)
406 406
407 407 @property
408 408 def nbparts(self):
409 409 """total number of parts added to the bundler"""
410 410 return len(self._parts)
411 411
412 412 # methods used to defines the bundle2 content
413 413 def addparam(self, name, value=None):
414 414 """add a stream level parameter"""
415 415 if not name:
416 416 raise ValueError('empty parameter name')
417 417 if name[0] not in string.letters:
418 418 raise ValueError('non letter first character: %r' % name)
419 419 self._params.append((name, value))
420 420
421 421 def addpart(self, part):
422 422 """add a new part to the bundle2 container
423 423
424 424 Parts contains the actual applicative payload."""
425 425 assert part.id is None
426 426 part.id = len(self._parts) # very cheap counter
427 427 self._parts.append(part)
428 428
429 429 def newpart(self, typeid, *args, **kwargs):
430 430 """create a new part and add it to the containers
431 431
432 432 As the part is directly added to the containers. For now, this means
433 433 that any failure to properly initialize the part after calling
434 434 ``newpart`` should result in a failure of the whole bundling process.
435 435
436 436 You can still fall back to manually create and add if you need better
437 437 control."""
438 438 part = bundlepart(typeid, *args, **kwargs)
439 439 self.addpart(part)
440 440 return part
441 441
442 442 # methods used to generate the bundle2 stream
443 443 def getchunks(self):
444 444 self.ui.debug('start emission of %s stream\n' % _magicstring)
445 445 yield _magicstring
446 446 param = self._paramchunk()
447 447 self.ui.debug('bundle parameter: %s\n' % param)
448 448 yield _pack(_fstreamparamsize, len(param))
449 449 if param:
450 450 yield param
451 451
452 452 self.ui.debug('start of parts\n')
453 453 for part in self._parts:
454 454 self.ui.debug('bundle part: "%s"\n' % part.type)
455 455 for chunk in part.getchunks():
456 456 yield chunk
457 457 self.ui.debug('end of bundle\n')
458 458 yield _pack(_fpartheadersize, 0)
459 459
460 460 def _paramchunk(self):
461 461 """return a encoded version of all stream parameters"""
462 462 blocks = []
463 463 for par, value in self._params:
464 464 par = urllib.quote(par)
465 465 if value is not None:
466 466 value = urllib.quote(value)
467 467 par = '%s=%s' % (par, value)
468 468 blocks.append(par)
469 469 return ' '.join(blocks)
470 470
471 471 class unpackermixin(object):
472 472 """A mixin to extract bytes and struct data from a stream"""
473 473
474 474 def __init__(self, fp):
475 475 self._fp = fp
476 476
477 477 def _unpack(self, format):
478 478 """unpack this struct format from the stream"""
479 479 data = self._readexact(struct.calcsize(format))
480 480 return _unpack(format, data)
481 481
482 482 def _readexact(self, size):
483 483 """read exactly <size> bytes from the stream"""
484 484 return changegroup.readexactly(self._fp, size)
485 485
486 486
487 487 class unbundle20(unpackermixin):
488 488 """interpret a bundle2 stream
489 489
490 490 This class is fed with a binary stream and yields parts through its
491 491 `iterparts` methods."""
492 492
493 493 def __init__(self, ui, fp, header=None):
494 494 """If header is specified, we do not read it out of the stream."""
495 495 self.ui = ui
496 496 super(unbundle20, self).__init__(fp)
497 497 if header is None:
498 498 header = self._readexact(4)
499 499 magic, version = header[0:2], header[2:4]
500 500 if magic != 'HG':
501 501 raise util.Abort(_('not a Mercurial bundle'))
502 502 if version != '2Y':
503 503 raise util.Abort(_('unknown bundle version %s') % version)
504 504 self.ui.debug('start processing of %s stream\n' % header)
505 505
506 506 @util.propertycache
507 507 def params(self):
508 508 """dictionary of stream level parameters"""
509 509 self.ui.debug('reading bundle2 stream parameters\n')
510 510 params = {}
511 511 paramssize = self._unpack(_fstreamparamsize)[0]
512 512 if paramssize:
513 513 for p in self._readexact(paramssize).split(' '):
514 514 p = p.split('=', 1)
515 515 p = [urllib.unquote(i) for i in p]
516 516 if len(p) < 2:
517 517 p.append(None)
518 518 self._processparam(*p)
519 519 params[p[0]] = p[1]
520 520 return params
521 521
522 522 def _processparam(self, name, value):
523 523 """process a parameter, applying its effect if needed
524 524
525 525 Parameter starting with a lower case letter are advisory and will be
526 526 ignored when unknown. Those starting with an upper case letter are
527 527 mandatory and will this function will raise a KeyError when unknown.
528 528
529 529 Note: no option are currently supported. Any input will be either
530 530 ignored or failing.
531 531 """
532 532 if not name:
533 533 raise ValueError('empty parameter name')
534 534 if name[0] not in string.letters:
535 535 raise ValueError('non letter first character: %r' % name)
536 536 # Some logic will be later added here to try to process the option for
537 537 # a dict of known parameter.
538 538 if name[0].islower():
539 539 self.ui.debug("ignoring unknown parameter %r\n" % name)
540 540 else:
541 raise error.BundleValueError(params=(name,))
541 raise error.UnsupportedPartError(params=(name,))
542 542
543 543
544 544 def iterparts(self):
545 545 """yield all parts contained in the stream"""
546 546 # make sure param have been loaded
547 547 self.params
548 548 self.ui.debug('start extraction of bundle2 parts\n')
549 549 headerblock = self._readpartheader()
550 550 while headerblock is not None:
551 551 part = unbundlepart(self.ui, headerblock, self._fp)
552 552 yield part
553 553 headerblock = self._readpartheader()
554 554 self.ui.debug('end of bundle2 stream\n')
555 555
556 556 def _readpartheader(self):
557 557 """reads a part header size and return the bytes blob
558 558
559 559 returns None if empty"""
560 560 headersize = self._unpack(_fpartheadersize)[0]
561 561 self.ui.debug('part header size: %i\n' % headersize)
562 562 if headersize:
563 563 return self._readexact(headersize)
564 564 return None
565 565
566 566
567 567 class bundlepart(object):
568 568 """A bundle2 part contains application level payload
569 569
570 570 The part `type` is used to route the part to the application level
571 571 handler.
572 572
573 573 The part payload is contained in ``part.data``. It could be raw bytes or a
574 574 generator of byte chunks.
575 575
576 576 You can add parameters to the part using the ``addparam`` method.
577 577 Parameters can be either mandatory (default) or advisory. Remote side
578 578 should be able to safely ignore the advisory ones.
579 579
580 580 Both data and parameters cannot be modified after the generation has begun.
581 581 """
582 582
583 583 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
584 584 data=''):
585 585 self.id = None
586 586 self.type = parttype
587 587 self._data = data
588 588 self._mandatoryparams = list(mandatoryparams)
589 589 self._advisoryparams = list(advisoryparams)
590 590 # checking for duplicated entries
591 591 self._seenparams = set()
592 592 for pname, __ in self._mandatoryparams + self._advisoryparams:
593 593 if pname in self._seenparams:
594 594 raise RuntimeError('duplicated params: %s' % pname)
595 595 self._seenparams.add(pname)
596 596 # status of the part's generation:
597 597 # - None: not started,
598 598 # - False: currently generated,
599 599 # - True: generation done.
600 600 self._generated = None
601 601
602 602 # methods used to defines the part content
603 603 def __setdata(self, data):
604 604 if self._generated is not None:
605 605 raise error.ReadOnlyPartError('part is being generated')
606 606 self._data = data
607 607 def __getdata(self):
608 608 return self._data
609 609 data = property(__getdata, __setdata)
610 610
611 611 @property
612 612 def mandatoryparams(self):
613 613 # make it an immutable tuple to force people through ``addparam``
614 614 return tuple(self._mandatoryparams)
615 615
616 616 @property
617 617 def advisoryparams(self):
618 618 # make it an immutable tuple to force people through ``addparam``
619 619 return tuple(self._advisoryparams)
620 620
621 621 def addparam(self, name, value='', mandatory=True):
622 622 if self._generated is not None:
623 623 raise error.ReadOnlyPartError('part is being generated')
624 624 if name in self._seenparams:
625 625 raise ValueError('duplicated params: %s' % name)
626 626 self._seenparams.add(name)
627 627 params = self._advisoryparams
628 628 if mandatory:
629 629 params = self._mandatoryparams
630 630 params.append((name, value))
631 631
632 632 # methods used to generates the bundle2 stream
633 633 def getchunks(self):
634 634 if self._generated is not None:
635 635 raise RuntimeError('part can only be consumed once')
636 636 self._generated = False
637 637 #### header
638 638 ## parttype
639 639 header = [_pack(_fparttypesize, len(self.type)),
640 640 self.type, _pack(_fpartid, self.id),
641 641 ]
642 642 ## parameters
643 643 # count
644 644 manpar = self.mandatoryparams
645 645 advpar = self.advisoryparams
646 646 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
647 647 # size
648 648 parsizes = []
649 649 for key, value in manpar:
650 650 parsizes.append(len(key))
651 651 parsizes.append(len(value))
652 652 for key, value in advpar:
653 653 parsizes.append(len(key))
654 654 parsizes.append(len(value))
655 655 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
656 656 header.append(paramsizes)
657 657 # key, value
658 658 for key, value in manpar:
659 659 header.append(key)
660 660 header.append(value)
661 661 for key, value in advpar:
662 662 header.append(key)
663 663 header.append(value)
664 664 ## finalize header
665 665 headerchunk = ''.join(header)
666 666 yield _pack(_fpartheadersize, len(headerchunk))
667 667 yield headerchunk
668 668 ## payload
669 669 for chunk in self._payloadchunks():
670 670 yield _pack(_fpayloadsize, len(chunk))
671 671 yield chunk
672 672 # end of payload
673 673 yield _pack(_fpayloadsize, 0)
674 674 self._generated = True
675 675
676 676 def _payloadchunks(self):
677 677 """yield chunks of a the part payload
678 678
679 679 Exists to handle the different methods to provide data to a part."""
680 680 # we only support fixed size data now.
681 681 # This will be improved in the future.
682 682 if util.safehasattr(self.data, 'next'):
683 683 buff = util.chunkbuffer(self.data)
684 684 chunk = buff.read(preferedchunksize)
685 685 while chunk:
686 686 yield chunk
687 687 chunk = buff.read(preferedchunksize)
688 688 elif len(self.data):
689 689 yield self.data
690 690
691 691 class unbundlepart(unpackermixin):
692 692 """a bundle part read from a bundle"""
693 693
694 694 def __init__(self, ui, header, fp):
695 695 super(unbundlepart, self).__init__(fp)
696 696 self.ui = ui
697 697 # unbundle state attr
698 698 self._headerdata = header
699 699 self._headeroffset = 0
700 700 self._initialized = False
701 701 self.consumed = False
702 702 # part data
703 703 self.id = None
704 704 self.type = None
705 705 self.mandatoryparams = None
706 706 self.advisoryparams = None
707 707 self.params = None
708 708 self.mandatorykeys = ()
709 709 self._payloadstream = None
710 710 self._readheader()
711 711
712 712 def _fromheader(self, size):
713 713 """return the next <size> byte from the header"""
714 714 offset = self._headeroffset
715 715 data = self._headerdata[offset:(offset + size)]
716 716 self._headeroffset = offset + size
717 717 return data
718 718
719 719 def _unpackheader(self, format):
720 720 """read given format from header
721 721
722 722 This automatically compute the size of the format to read."""
723 723 data = self._fromheader(struct.calcsize(format))
724 724 return _unpack(format, data)
725 725
726 726 def _initparams(self, mandatoryparams, advisoryparams):
727 727 """internal function to setup all logic related parameters"""
728 728 # make it read only to prevent people touching it by mistake.
729 729 self.mandatoryparams = tuple(mandatoryparams)
730 730 self.advisoryparams = tuple(advisoryparams)
731 731 # user friendly UI
732 732 self.params = dict(self.mandatoryparams)
733 733 self.params.update(dict(self.advisoryparams))
734 734 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
735 735
736 736 def _readheader(self):
737 737 """read the header and setup the object"""
738 738 typesize = self._unpackheader(_fparttypesize)[0]
739 739 self.type = self._fromheader(typesize)
740 740 self.ui.debug('part type: "%s"\n' % self.type)
741 741 self.id = self._unpackheader(_fpartid)[0]
742 742 self.ui.debug('part id: "%s"\n' % self.id)
743 743 ## reading parameters
744 744 # param count
745 745 mancount, advcount = self._unpackheader(_fpartparamcount)
746 746 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
747 747 # param size
748 748 fparamsizes = _makefpartparamsizes(mancount + advcount)
749 749 paramsizes = self._unpackheader(fparamsizes)
750 750 # make it a list of couple again
751 751 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
752 752 # split mandatory from advisory
753 753 mansizes = paramsizes[:mancount]
754 754 advsizes = paramsizes[mancount:]
755 755 # retrive param value
756 756 manparams = []
757 757 for key, value in mansizes:
758 758 manparams.append((self._fromheader(key), self._fromheader(value)))
759 759 advparams = []
760 760 for key, value in advsizes:
761 761 advparams.append((self._fromheader(key), self._fromheader(value)))
762 762 self._initparams(manparams, advparams)
763 763 ## part payload
764 764 def payloadchunks():
765 765 payloadsize = self._unpack(_fpayloadsize)[0]
766 766 self.ui.debug('payload chunk size: %i\n' % payloadsize)
767 767 while payloadsize:
768 768 yield self._readexact(payloadsize)
769 769 payloadsize = self._unpack(_fpayloadsize)[0]
770 770 self.ui.debug('payload chunk size: %i\n' % payloadsize)
771 771 self._payloadstream = util.chunkbuffer(payloadchunks())
772 772 # we read the data, tell it
773 773 self._initialized = True
774 774
775 775 def read(self, size=None):
776 776 """read payload data"""
777 777 if not self._initialized:
778 778 self._readheader()
779 779 if size is None:
780 780 data = self._payloadstream.read()
781 781 else:
782 782 data = self._payloadstream.read(size)
783 783 if size is None or len(data) < size:
784 784 self.consumed = True
785 785 return data
786 786
787 787 capabilities = {'HG2Y': (),
788 788 'b2x:listkeys': (),
789 789 'b2x:pushkey': (),
790 790 'b2x:changegroup': (),
791 791 }
792 792
793 793 def getrepocaps(repo):
794 794 """return the bundle2 capabilities for a given repo
795 795
796 796 Exists to allow extensions (like evolution) to mutate the capabilities.
797 797 """
798 798 caps = capabilities.copy()
799 799 if obsolete.isenabled(repo, obsolete.exchangeopt):
800 800 supportedformat = tuple('V%i' % v for v in obsolete.formats)
801 801 caps['b2x:obsmarkers'] = supportedformat
802 802 return caps
803 803
804 804 def bundle2caps(remote):
805 805 """return the bundlecapabilities of a peer as dict"""
806 806 raw = remote.capable('bundle2-exp')
807 807 if not raw and raw != '':
808 808 return {}
809 809 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
810 810 return decodecaps(capsblob)
811 811
812 812 def obsmarkersversion(caps):
813 813 """extract the list of supported obsmarkers versions from a bundle2caps dict
814 814 """
815 815 obscaps = caps.get('b2x:obsmarkers', ())
816 816 return [int(c[1:]) for c in obscaps if c.startswith('V')]
817 817
818 818 @parthandler('b2x:changegroup')
819 819 def handlechangegroup(op, inpart):
820 820 """apply a changegroup part on the repo
821 821
822 822 This is a very early implementation that will massive rework before being
823 823 inflicted to any end-user.
824 824 """
825 825 # Make sure we trigger a transaction creation
826 826 #
827 827 # The addchangegroup function will get a transaction object by itself, but
828 828 # we need to make sure we trigger the creation of a transaction object used
829 829 # for the whole processing scope.
830 830 op.gettransaction()
831 831 cg = changegroup.cg1unpacker(inpart, 'UN')
832 832 # the source and url passed here are overwritten by the one contained in
833 833 # the transaction.hookargs argument. So 'bundle2' is a placeholder
834 834 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
835 835 op.records.add('changegroup', {'return': ret})
836 836 if op.reply is not None:
837 837 # This is definitly not the final form of this
838 838 # return. But one need to start somewhere.
839 839 part = op.reply.newpart('b2x:reply:changegroup')
840 840 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
841 841 part.addparam('return', '%i' % ret, mandatory=False)
842 842 assert not inpart.read()
843 843
844 844 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
845 845 def handlereplychangegroup(op, inpart):
846 846 ret = int(inpart.params['return'])
847 847 replyto = int(inpart.params['in-reply-to'])
848 848 op.records.add('changegroup', {'return': ret}, replyto)
849 849
850 850 @parthandler('b2x:check:heads')
851 851 def handlecheckheads(op, inpart):
852 852 """check that head of the repo did not change
853 853
854 854 This is used to detect a push race when using unbundle.
855 855 This replaces the "heads" argument of unbundle."""
856 856 h = inpart.read(20)
857 857 heads = []
858 858 while len(h) == 20:
859 859 heads.append(h)
860 860 h = inpart.read(20)
861 861 assert not h
862 862 if heads != op.repo.heads():
863 863 raise error.PushRaced('repository changed while pushing - '
864 864 'please try again')
865 865
866 866 @parthandler('b2x:output')
867 867 def handleoutput(op, inpart):
868 868 """forward output captured on the server to the client"""
869 869 for line in inpart.read().splitlines():
870 870 op.ui.write(('remote: %s\n' % line))
871 871
872 872 @parthandler('b2x:replycaps')
873 873 def handlereplycaps(op, inpart):
874 874 """Notify that a reply bundle should be created
875 875
876 876 The payload contains the capabilities information for the reply"""
877 877 caps = decodecaps(inpart.read())
878 878 if op.reply is None:
879 879 op.reply = bundle20(op.ui, caps)
880 880
881 881 @parthandler('b2x:error:abort', ('message', 'hint'))
882 882 def handlereplycaps(op, inpart):
883 883 """Used to transmit abort error over the wire"""
884 884 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
885 885
886 886 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
887 887 def handlereplycaps(op, inpart):
888 888 """Used to transmit unknown content error over the wire"""
889 889 kwargs = {}
890 890 parttype = inpart.params.get('parttype')
891 891 if parttype is not None:
892 892 kwargs['parttype'] = parttype
893 893 params = inpart.params.get('params')
894 894 if params is not None:
895 895 kwargs['params'] = params.split('\0')
896 896
897 raise error.BundleValueError(**kwargs)
897 raise error.UnsupportedPartError(**kwargs)
898 898
899 899 @parthandler('b2x:error:pushraced', ('message',))
900 900 def handlereplycaps(op, inpart):
901 901 """Used to transmit push race error over the wire"""
902 902 raise error.ResponseError(_('push failed:'), inpart.params['message'])
903 903
904 904 @parthandler('b2x:listkeys', ('namespace',))
905 905 def handlelistkeys(op, inpart):
906 906 """retrieve pushkey namespace content stored in a bundle2"""
907 907 namespace = inpart.params['namespace']
908 908 r = pushkey.decodekeys(inpart.read())
909 909 op.records.add('listkeys', (namespace, r))
910 910
911 911 @parthandler('b2x:pushkey', ('namespace', 'key', 'old', 'new'))
912 912 def handlepushkey(op, inpart):
913 913 """process a pushkey request"""
914 914 dec = pushkey.decode
915 915 namespace = dec(inpart.params['namespace'])
916 916 key = dec(inpart.params['key'])
917 917 old = dec(inpart.params['old'])
918 918 new = dec(inpart.params['new'])
919 919 ret = op.repo.pushkey(namespace, key, old, new)
920 920 record = {'namespace': namespace,
921 921 'key': key,
922 922 'old': old,
923 923 'new': new}
924 924 op.records.add('pushkey', record)
925 925 if op.reply is not None:
926 926 rpart = op.reply.newpart('b2x:reply:pushkey')
927 927 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
928 928 rpart.addparam('return', '%i' % ret, mandatory=False)
929 929
930 930 @parthandler('b2x:reply:pushkey', ('return', 'in-reply-to'))
931 931 def handlepushkeyreply(op, inpart):
932 932 """retrieve the result of a pushkey request"""
933 933 ret = int(inpart.params['return'])
934 934 partid = int(inpart.params['in-reply-to'])
935 935 op.records.add('pushkey', {'return': ret}, partid)
936 936
937 937 @parthandler('b2x:obsmarkers')
938 938 def handleobsmarker(op, inpart):
939 939 """add a stream of obsmarkers to the repo"""
940 940 tr = op.gettransaction()
941 941 new = op.repo.obsstore.mergemarkers(tr, inpart.read())
942 942 if new:
943 943 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
944 944 op.records.add('obsmarkers', {'new': new})
945 945 if op.reply is not None:
946 946 rpart = op.reply.newpart('b2x:reply:obsmarkers')
947 947 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
948 948 rpart.addparam('new', '%i' % new, mandatory=False)
949 949
950 950
951 951 @parthandler('b2x:reply:obsmarkers', ('new', 'in-reply-to'))
952 952 def handlepushkeyreply(op, inpart):
953 953 """retrieve the result of a pushkey request"""
954 954 ret = int(inpart.params['new'])
955 955 partid = int(inpart.params['in-reply-to'])
956 956 op.records.add('obsmarkers', {'new': ret}, partid)
@@ -1,125 +1,126 b''
1 1 # error.py - Mercurial exceptions
2 2 #
3 3 # Copyright 2005-2008 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 """Mercurial exceptions.
9 9
10 10 This allows us to catch exceptions at higher levels without forcing
11 11 imports.
12 12 """
13 13
14 14 # Do not import anything here, please
15 15
16 16 class RevlogError(Exception):
17 17 pass
18 18
19 19 class LookupError(RevlogError, KeyError):
20 20 def __init__(self, name, index, message):
21 21 self.name = name
22 22 if isinstance(name, str) and len(name) == 20:
23 23 from node import short
24 24 name = short(name)
25 25 RevlogError.__init__(self, '%s@%s: %s' % (index, name, message))
26 26
27 27 def __str__(self):
28 28 return RevlogError.__str__(self)
29 29
30 30 class ManifestLookupError(LookupError):
31 31 pass
32 32
33 33 class CommandError(Exception):
34 34 """Exception raised on errors in parsing the command line."""
35 35
36 36 class InterventionRequired(Exception):
37 37 """Exception raised when a command requires human intervention."""
38 38
39 39 class Abort(Exception):
40 40 """Raised if a command needs to print an error and exit."""
41 41 def __init__(self, *args, **kw):
42 42 Exception.__init__(self, *args)
43 43 self.hint = kw.get('hint')
44 44
45 45 class ConfigError(Abort):
46 46 """Exception raised when parsing config files"""
47 47
48 48 class OutOfBandError(Exception):
49 49 """Exception raised when a remote repo reports failure"""
50 50
51 51 class ParseError(Exception):
52 52 """Exception raised when parsing config files (msg[, pos])"""
53 53
54 54 class RepoError(Exception):
55 55 def __init__(self, *args, **kw):
56 56 Exception.__init__(self, *args)
57 57 self.hint = kw.get('hint')
58 58
59 59 class RepoLookupError(RepoError):
60 60 pass
61 61
62 62 class CapabilityError(RepoError):
63 63 pass
64 64
65 65 class RequirementError(RepoError):
66 66 """Exception raised if .hg/requires has an unknown entry."""
67 67 pass
68 68
69 69 class LockError(IOError):
70 70 def __init__(self, errno, strerror, filename, desc):
71 71 IOError.__init__(self, errno, strerror, filename)
72 72 self.desc = desc
73 73
74 74 class LockHeld(LockError):
75 75 def __init__(self, errno, filename, desc, locker):
76 76 LockError.__init__(self, errno, 'Lock held', filename, desc)
77 77 self.locker = locker
78 78
79 79 class LockUnavailable(LockError):
80 80 pass
81 81
82 82 class ResponseError(Exception):
83 83 """Raised to print an error with part of output and exit."""
84 84
85 85 class UnknownCommand(Exception):
86 86 """Exception raised if command is not in the command table."""
87 87
88 88 class AmbiguousCommand(Exception):
89 89 """Exception raised if command shortcut matches more than one command."""
90 90
91 91 # derived from KeyboardInterrupt to simplify some breakout code
92 92 class SignalInterrupt(KeyboardInterrupt):
93 93 """Exception raised on SIGTERM and SIGHUP."""
94 94
95 95 class SignatureError(Exception):
96 96 pass
97 97
98 98 class PushRaced(RuntimeError):
99 99 """An exception raised during unbundling that indicate a push race"""
100 100
101 101 # bundle2 related errors
102 102 class BundleValueError(ValueError):
103 103 """error raised when bundle2 cannot be processed"""
104 104
105 class UnsupportedPartError(BundleValueError):
105 106 def __init__(self, parttype=None, params=()):
106 107 self.parttype = parttype
107 108 self.params = params
108 109 if self.parttype is None:
109 110 msg = 'Stream Parameter'
110 111 else:
111 112 msg = parttype
112 113 if self.params:
113 114 msg = '%s - %s' % (msg, ', '.join(self.params))
114 115 ValueError.__init__(self, msg)
115 116
116 117 class ReadOnlyPartError(RuntimeError):
117 118 """error raised when code tries to alter a part being generated"""
118 119 pass
119 120
120 121 class CensoredNodeError(RevlogError):
121 122 """error raised when content verification fails on a censored node"""
122 123
123 124 def __init__(self, filename, node):
124 125 from node import short
125 126 RevlogError.__init__(self, '%s:%s' % (filename, short(node)))
General Comments 0
You need to be logged in to leave comments. Login now