##// END OF EJS Templates
bundle2: detect and disallow a negative chunk size...
Pierre-Yves David -
r23011:006a81d0 default
parent child Browse files
Show More
@@ -1,956 +1,965 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part headers. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 import util
149 149 import struct
150 150 import urllib
151 151 import string
152 152 import obsolete
153 153 import pushkey
154 154
155 155 import changegroup, error
156 156 from i18n import _
157 157
158 158 _pack = struct.pack
159 159 _unpack = struct.unpack
160 160
161 161 _magicstring = 'HG2Y'
162 162
163 163 _fstreamparamsize = '>i'
164 164 _fpartheadersize = '>i'
165 165 _fparttypesize = '>B'
166 166 _fpartid = '>I'
167 167 _fpayloadsize = '>i'
168 168 _fpartparamcount = '>BB'
169 169
170 170 preferedchunksize = 4096
171 171
172 172 def _makefpartparamsizes(nbparams):
173 173 """return a struct format to read part parameter sizes
174 174
175 175 The number parameters is variable so we need to build that format
176 176 dynamically.
177 177 """
178 178 return '>'+('BB'*nbparams)
179 179
180 180 parthandlermapping = {}
181 181
182 182 def parthandler(parttype, params=()):
183 183 """decorator that register a function as a bundle2 part handler
184 184
185 185 eg::
186 186
187 187 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
188 188 def myparttypehandler(...):
189 189 '''process a part of type "my part".'''
190 190 ...
191 191 """
192 192 def _decorator(func):
193 193 lparttype = parttype.lower() # enforce lower case matching.
194 194 assert lparttype not in parthandlermapping
195 195 parthandlermapping[lparttype] = func
196 196 func.params = frozenset(params)
197 197 return func
198 198 return _decorator
199 199
200 200 class unbundlerecords(object):
201 201 """keep record of what happens during and unbundle
202 202
203 203 New records are added using `records.add('cat', obj)`. Where 'cat' is a
204 204 category of record and obj is an arbitrary object.
205 205
206 206 `records['cat']` will return all entries of this category 'cat'.
207 207
208 208 Iterating on the object itself will yield `('category', obj)` tuples
209 209 for all entries.
210 210
211 211 All iterations happens in chronological order.
212 212 """
213 213
214 214 def __init__(self):
215 215 self._categories = {}
216 216 self._sequences = []
217 217 self._replies = {}
218 218
219 219 def add(self, category, entry, inreplyto=None):
220 220 """add a new record of a given category.
221 221
222 222 The entry can then be retrieved in the list returned by
223 223 self['category']."""
224 224 self._categories.setdefault(category, []).append(entry)
225 225 self._sequences.append((category, entry))
226 226 if inreplyto is not None:
227 227 self.getreplies(inreplyto).add(category, entry)
228 228
229 229 def getreplies(self, partid):
230 230 """get the subrecords that replies to a specific part"""
231 231 return self._replies.setdefault(partid, unbundlerecords())
232 232
233 233 def __getitem__(self, cat):
234 234 return tuple(self._categories.get(cat, ()))
235 235
236 236 def __iter__(self):
237 237 return iter(self._sequences)
238 238
239 239 def __len__(self):
240 240 return len(self._sequences)
241 241
242 242 def __nonzero__(self):
243 243 return bool(self._sequences)
244 244
245 245 class bundleoperation(object):
246 246 """an object that represents a single bundling process
247 247
248 248 Its purpose is to carry unbundle-related objects and states.
249 249
250 250 A new object should be created at the beginning of each bundle processing.
251 251 The object is to be returned by the processing function.
252 252
253 253 The object has very little content now it will ultimately contain:
254 254 * an access to the repo the bundle is applied to,
255 255 * a ui object,
256 256 * a way to retrieve a transaction to add changes to the repo,
257 257 * a way to record the result of processing each part,
258 258 * a way to construct a bundle response when applicable.
259 259 """
260 260
261 261 def __init__(self, repo, transactiongetter):
262 262 self.repo = repo
263 263 self.ui = repo.ui
264 264 self.records = unbundlerecords()
265 265 self.gettransaction = transactiongetter
266 266 self.reply = None
267 267
268 268 class TransactionUnavailable(RuntimeError):
269 269 pass
270 270
271 271 def _notransaction():
272 272 """default method to get a transaction while processing a bundle
273 273
274 274 Raise an exception to highlight the fact that no transaction was expected
275 275 to be created"""
276 276 raise TransactionUnavailable()
277 277
278 278 def processbundle(repo, unbundler, transactiongetter=_notransaction):
279 279 """This function process a bundle, apply effect to/from a repo
280 280
281 281 It iterates over each part then searches for and uses the proper handling
282 282 code to process the part. Parts are processed in order.
283 283
284 284 This is very early version of this function that will be strongly reworked
285 285 before final usage.
286 286
287 287 Unknown Mandatory part will abort the process.
288 288 """
289 289 op = bundleoperation(repo, transactiongetter)
290 290 # todo:
291 291 # - replace this is a init function soon.
292 292 # - exception catching
293 293 unbundler.params
294 294 iterparts = unbundler.iterparts()
295 295 part = None
296 296 try:
297 297 for part in iterparts:
298 298 _processpart(op, part)
299 299 except Exception, exc:
300 300 for part in iterparts:
301 301 # consume the bundle content
302 302 part.read()
303 303 # Small hack to let caller code distinguish exceptions from bundle2
304 304 # processing fron the ones from bundle1 processing. This is mostly
305 305 # needed to handle different return codes to unbundle according to the
306 306 # type of bundle. We should probably clean up or drop this return code
307 307 # craziness in a future version.
308 308 exc.duringunbundle2 = True
309 309 raise
310 310 return op
311 311
312 312 def _processpart(op, part):
313 313 """process a single part from a bundle
314 314
315 315 The part is guaranteed to have been fully consumed when the function exits
316 316 (even if an exception is raised)."""
317 317 try:
318 318 parttype = part.type
319 319 # part key are matched lower case
320 320 key = parttype.lower()
321 321 try:
322 322 handler = parthandlermapping.get(key)
323 323 if handler is None:
324 324 raise error.UnsupportedPartError(parttype=key)
325 325 op.ui.debug('found a handler for part %r\n' % parttype)
326 326 unknownparams = part.mandatorykeys - handler.params
327 327 if unknownparams:
328 328 unknownparams = list(unknownparams)
329 329 unknownparams.sort()
330 330 raise error.UnsupportedPartError(parttype=key,
331 331 params=unknownparams)
332 332 except error.UnsupportedPartError, exc:
333 333 if key != parttype: # mandatory parts
334 334 raise
335 335 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
336 336 return # skip to part processing
337 337
338 338 # handler is called outside the above try block so that we don't
339 339 # risk catching KeyErrors from anything other than the
340 340 # parthandlermapping lookup (any KeyError raised by handler()
341 341 # itself represents a defect of a different variety).
342 342 output = None
343 343 if op.reply is not None:
344 344 op.ui.pushbuffer(error=True)
345 345 output = ''
346 346 try:
347 347 handler(op, part)
348 348 finally:
349 349 if output is not None:
350 350 output = op.ui.popbuffer()
351 351 if output:
352 352 outpart = op.reply.newpart('b2x:output', data=output)
353 353 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
354 354 finally:
355 355 # consume the part content to not corrupt the stream.
356 356 part.read()
357 357
358 358
359 359 def decodecaps(blob):
360 360 """decode a bundle2 caps bytes blob into a dictionnary
361 361
362 362 The blob is a list of capabilities (one per line)
363 363 Capabilities may have values using a line of the form::
364 364
365 365 capability=value1,value2,value3
366 366
367 367 The values are always a list."""
368 368 caps = {}
369 369 for line in blob.splitlines():
370 370 if not line:
371 371 continue
372 372 if '=' not in line:
373 373 key, vals = line, ()
374 374 else:
375 375 key, vals = line.split('=', 1)
376 376 vals = vals.split(',')
377 377 key = urllib.unquote(key)
378 378 vals = [urllib.unquote(v) for v in vals]
379 379 caps[key] = vals
380 380 return caps
381 381
382 382 def encodecaps(caps):
383 383 """encode a bundle2 caps dictionary into a bytes blob"""
384 384 chunks = []
385 385 for ca in sorted(caps):
386 386 vals = caps[ca]
387 387 ca = urllib.quote(ca)
388 388 vals = [urllib.quote(v) for v in vals]
389 389 if vals:
390 390 ca = "%s=%s" % (ca, ','.join(vals))
391 391 chunks.append(ca)
392 392 return '\n'.join(chunks)
393 393
394 394 class bundle20(object):
395 395 """represent an outgoing bundle2 container
396 396
397 397 Use the `addparam` method to add stream level parameter. and `newpart` to
398 398 populate it. Then call `getchunks` to retrieve all the binary chunks of
399 399 data that compose the bundle2 container."""
400 400
401 401 def __init__(self, ui, capabilities=()):
402 402 self.ui = ui
403 403 self._params = []
404 404 self._parts = []
405 405 self.capabilities = dict(capabilities)
406 406
407 407 @property
408 408 def nbparts(self):
409 409 """total number of parts added to the bundler"""
410 410 return len(self._parts)
411 411
412 412 # methods used to defines the bundle2 content
413 413 def addparam(self, name, value=None):
414 414 """add a stream level parameter"""
415 415 if not name:
416 416 raise ValueError('empty parameter name')
417 417 if name[0] not in string.letters:
418 418 raise ValueError('non letter first character: %r' % name)
419 419 self._params.append((name, value))
420 420
421 421 def addpart(self, part):
422 422 """add a new part to the bundle2 container
423 423
424 424 Parts contains the actual applicative payload."""
425 425 assert part.id is None
426 426 part.id = len(self._parts) # very cheap counter
427 427 self._parts.append(part)
428 428
429 429 def newpart(self, typeid, *args, **kwargs):
430 430 """create a new part and add it to the containers
431 431
432 432 As the part is directly added to the containers. For now, this means
433 433 that any failure to properly initialize the part after calling
434 434 ``newpart`` should result in a failure of the whole bundling process.
435 435
436 436 You can still fall back to manually create and add if you need better
437 437 control."""
438 438 part = bundlepart(typeid, *args, **kwargs)
439 439 self.addpart(part)
440 440 return part
441 441
442 442 # methods used to generate the bundle2 stream
443 443 def getchunks(self):
444 444 self.ui.debug('start emission of %s stream\n' % _magicstring)
445 445 yield _magicstring
446 446 param = self._paramchunk()
447 447 self.ui.debug('bundle parameter: %s\n' % param)
448 448 yield _pack(_fstreamparamsize, len(param))
449 449 if param:
450 450 yield param
451 451
452 452 self.ui.debug('start of parts\n')
453 453 for part in self._parts:
454 454 self.ui.debug('bundle part: "%s"\n' % part.type)
455 455 for chunk in part.getchunks():
456 456 yield chunk
457 457 self.ui.debug('end of bundle\n')
458 458 yield _pack(_fpartheadersize, 0)
459 459
460 460 def _paramchunk(self):
461 461 """return a encoded version of all stream parameters"""
462 462 blocks = []
463 463 for par, value in self._params:
464 464 par = urllib.quote(par)
465 465 if value is not None:
466 466 value = urllib.quote(value)
467 467 par = '%s=%s' % (par, value)
468 468 blocks.append(par)
469 469 return ' '.join(blocks)
470 470
471 471 class unpackermixin(object):
472 472 """A mixin to extract bytes and struct data from a stream"""
473 473
474 474 def __init__(self, fp):
475 475 self._fp = fp
476 476
477 477 def _unpack(self, format):
478 478 """unpack this struct format from the stream"""
479 479 data = self._readexact(struct.calcsize(format))
480 480 return _unpack(format, data)
481 481
482 482 def _readexact(self, size):
483 483 """read exactly <size> bytes from the stream"""
484 484 return changegroup.readexactly(self._fp, size)
485 485
486 486
487 487 class unbundle20(unpackermixin):
488 488 """interpret a bundle2 stream
489 489
490 490 This class is fed with a binary stream and yields parts through its
491 491 `iterparts` methods."""
492 492
493 493 def __init__(self, ui, fp, header=None):
494 494 """If header is specified, we do not read it out of the stream."""
495 495 self.ui = ui
496 496 super(unbundle20, self).__init__(fp)
497 497 if header is None:
498 498 header = self._readexact(4)
499 499 magic, version = header[0:2], header[2:4]
500 500 if magic != 'HG':
501 501 raise util.Abort(_('not a Mercurial bundle'))
502 502 if version != '2Y':
503 503 raise util.Abort(_('unknown bundle version %s') % version)
504 504 self.ui.debug('start processing of %s stream\n' % header)
505 505
506 506 @util.propertycache
507 507 def params(self):
508 508 """dictionary of stream level parameters"""
509 509 self.ui.debug('reading bundle2 stream parameters\n')
510 510 params = {}
511 511 paramssize = self._unpack(_fstreamparamsize)[0]
512 if paramssize < 0:
513 raise error.BundleValueError('negative bundle param size: %i'
514 % paramssize)
512 515 if paramssize:
513 516 for p in self._readexact(paramssize).split(' '):
514 517 p = p.split('=', 1)
515 518 p = [urllib.unquote(i) for i in p]
516 519 if len(p) < 2:
517 520 p.append(None)
518 521 self._processparam(*p)
519 522 params[p[0]] = p[1]
520 523 return params
521 524
522 525 def _processparam(self, name, value):
523 526 """process a parameter, applying its effect if needed
524 527
525 528 Parameter starting with a lower case letter are advisory and will be
526 529 ignored when unknown. Those starting with an upper case letter are
527 530 mandatory and will this function will raise a KeyError when unknown.
528 531
529 532 Note: no option are currently supported. Any input will be either
530 533 ignored or failing.
531 534 """
532 535 if not name:
533 536 raise ValueError('empty parameter name')
534 537 if name[0] not in string.letters:
535 538 raise ValueError('non letter first character: %r' % name)
536 539 # Some logic will be later added here to try to process the option for
537 540 # a dict of known parameter.
538 541 if name[0].islower():
539 542 self.ui.debug("ignoring unknown parameter %r\n" % name)
540 543 else:
541 544 raise error.UnsupportedPartError(params=(name,))
542 545
543 546
544 547 def iterparts(self):
545 548 """yield all parts contained in the stream"""
546 549 # make sure param have been loaded
547 550 self.params
548 551 self.ui.debug('start extraction of bundle2 parts\n')
549 552 headerblock = self._readpartheader()
550 553 while headerblock is not None:
551 554 part = unbundlepart(self.ui, headerblock, self._fp)
552 555 yield part
553 556 headerblock = self._readpartheader()
554 557 self.ui.debug('end of bundle2 stream\n')
555 558
556 559 def _readpartheader(self):
557 560 """reads a part header size and return the bytes blob
558 561
559 562 returns None if empty"""
560 563 headersize = self._unpack(_fpartheadersize)[0]
564 if headersize < 0:
565 raise error.BundleValueError('negative part header size: %i'
566 % headersize)
561 567 self.ui.debug('part header size: %i\n' % headersize)
562 568 if headersize:
563 569 return self._readexact(headersize)
564 570 return None
565 571
566 572
567 573 class bundlepart(object):
568 574 """A bundle2 part contains application level payload
569 575
570 576 The part `type` is used to route the part to the application level
571 577 handler.
572 578
573 579 The part payload is contained in ``part.data``. It could be raw bytes or a
574 580 generator of byte chunks.
575 581
576 582 You can add parameters to the part using the ``addparam`` method.
577 583 Parameters can be either mandatory (default) or advisory. Remote side
578 584 should be able to safely ignore the advisory ones.
579 585
580 586 Both data and parameters cannot be modified after the generation has begun.
581 587 """
582 588
583 589 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
584 590 data=''):
585 591 self.id = None
586 592 self.type = parttype
587 593 self._data = data
588 594 self._mandatoryparams = list(mandatoryparams)
589 595 self._advisoryparams = list(advisoryparams)
590 596 # checking for duplicated entries
591 597 self._seenparams = set()
592 598 for pname, __ in self._mandatoryparams + self._advisoryparams:
593 599 if pname in self._seenparams:
594 600 raise RuntimeError('duplicated params: %s' % pname)
595 601 self._seenparams.add(pname)
596 602 # status of the part's generation:
597 603 # - None: not started,
598 604 # - False: currently generated,
599 605 # - True: generation done.
600 606 self._generated = None
601 607
602 608 # methods used to defines the part content
603 609 def __setdata(self, data):
604 610 if self._generated is not None:
605 611 raise error.ReadOnlyPartError('part is being generated')
606 612 self._data = data
607 613 def __getdata(self):
608 614 return self._data
609 615 data = property(__getdata, __setdata)
610 616
611 617 @property
612 618 def mandatoryparams(self):
613 619 # make it an immutable tuple to force people through ``addparam``
614 620 return tuple(self._mandatoryparams)
615 621
616 622 @property
617 623 def advisoryparams(self):
618 624 # make it an immutable tuple to force people through ``addparam``
619 625 return tuple(self._advisoryparams)
620 626
621 627 def addparam(self, name, value='', mandatory=True):
622 628 if self._generated is not None:
623 629 raise error.ReadOnlyPartError('part is being generated')
624 630 if name in self._seenparams:
625 631 raise ValueError('duplicated params: %s' % name)
626 632 self._seenparams.add(name)
627 633 params = self._advisoryparams
628 634 if mandatory:
629 635 params = self._mandatoryparams
630 636 params.append((name, value))
631 637
632 638 # methods used to generates the bundle2 stream
633 639 def getchunks(self):
634 640 if self._generated is not None:
635 641 raise RuntimeError('part can only be consumed once')
636 642 self._generated = False
637 643 #### header
638 644 ## parttype
639 645 header = [_pack(_fparttypesize, len(self.type)),
640 646 self.type, _pack(_fpartid, self.id),
641 647 ]
642 648 ## parameters
643 649 # count
644 650 manpar = self.mandatoryparams
645 651 advpar = self.advisoryparams
646 652 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
647 653 # size
648 654 parsizes = []
649 655 for key, value in manpar:
650 656 parsizes.append(len(key))
651 657 parsizes.append(len(value))
652 658 for key, value in advpar:
653 659 parsizes.append(len(key))
654 660 parsizes.append(len(value))
655 661 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
656 662 header.append(paramsizes)
657 663 # key, value
658 664 for key, value in manpar:
659 665 header.append(key)
660 666 header.append(value)
661 667 for key, value in advpar:
662 668 header.append(key)
663 669 header.append(value)
664 670 ## finalize header
665 671 headerchunk = ''.join(header)
666 672 yield _pack(_fpartheadersize, len(headerchunk))
667 673 yield headerchunk
668 674 ## payload
669 675 for chunk in self._payloadchunks():
670 676 yield _pack(_fpayloadsize, len(chunk))
671 677 yield chunk
672 678 # end of payload
673 679 yield _pack(_fpayloadsize, 0)
674 680 self._generated = True
675 681
676 682 def _payloadchunks(self):
677 683 """yield chunks of a the part payload
678 684
679 685 Exists to handle the different methods to provide data to a part."""
680 686 # we only support fixed size data now.
681 687 # This will be improved in the future.
682 688 if util.safehasattr(self.data, 'next'):
683 689 buff = util.chunkbuffer(self.data)
684 690 chunk = buff.read(preferedchunksize)
685 691 while chunk:
686 692 yield chunk
687 693 chunk = buff.read(preferedchunksize)
688 694 elif len(self.data):
689 695 yield self.data
690 696
691 697 class unbundlepart(unpackermixin):
692 698 """a bundle part read from a bundle"""
693 699
694 700 def __init__(self, ui, header, fp):
695 701 super(unbundlepart, self).__init__(fp)
696 702 self.ui = ui
697 703 # unbundle state attr
698 704 self._headerdata = header
699 705 self._headeroffset = 0
700 706 self._initialized = False
701 707 self.consumed = False
702 708 # part data
703 709 self.id = None
704 710 self.type = None
705 711 self.mandatoryparams = None
706 712 self.advisoryparams = None
707 713 self.params = None
708 714 self.mandatorykeys = ()
709 715 self._payloadstream = None
710 716 self._readheader()
711 717
712 718 def _fromheader(self, size):
713 719 """return the next <size> byte from the header"""
714 720 offset = self._headeroffset
715 721 data = self._headerdata[offset:(offset + size)]
716 722 self._headeroffset = offset + size
717 723 return data
718 724
719 725 def _unpackheader(self, format):
720 726 """read given format from header
721 727
722 728 This automatically compute the size of the format to read."""
723 729 data = self._fromheader(struct.calcsize(format))
724 730 return _unpack(format, data)
725 731
726 732 def _initparams(self, mandatoryparams, advisoryparams):
727 733 """internal function to setup all logic related parameters"""
728 734 # make it read only to prevent people touching it by mistake.
729 735 self.mandatoryparams = tuple(mandatoryparams)
730 736 self.advisoryparams = tuple(advisoryparams)
731 737 # user friendly UI
732 738 self.params = dict(self.mandatoryparams)
733 739 self.params.update(dict(self.advisoryparams))
734 740 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
735 741
736 742 def _readheader(self):
737 743 """read the header and setup the object"""
738 744 typesize = self._unpackheader(_fparttypesize)[0]
739 745 self.type = self._fromheader(typesize)
740 746 self.ui.debug('part type: "%s"\n' % self.type)
741 747 self.id = self._unpackheader(_fpartid)[0]
742 748 self.ui.debug('part id: "%s"\n' % self.id)
743 749 ## reading parameters
744 750 # param count
745 751 mancount, advcount = self._unpackheader(_fpartparamcount)
746 752 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
747 753 # param size
748 754 fparamsizes = _makefpartparamsizes(mancount + advcount)
749 755 paramsizes = self._unpackheader(fparamsizes)
750 756 # make it a list of couple again
751 757 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
752 758 # split mandatory from advisory
753 759 mansizes = paramsizes[:mancount]
754 760 advsizes = paramsizes[mancount:]
755 761 # retrive param value
756 762 manparams = []
757 763 for key, value in mansizes:
758 764 manparams.append((self._fromheader(key), self._fromheader(value)))
759 765 advparams = []
760 766 for key, value in advsizes:
761 767 advparams.append((self._fromheader(key), self._fromheader(value)))
762 768 self._initparams(manparams, advparams)
763 769 ## part payload
764 770 def payloadchunks():
765 771 payloadsize = self._unpack(_fpayloadsize)[0]
766 772 self.ui.debug('payload chunk size: %i\n' % payloadsize)
767 773 while payloadsize:
774 if payloadsize < 0:
775 msg = 'negative payload chunk size: %i' % payloadsize
776 raise error.BundleValueError(msg)
768 777 yield self._readexact(payloadsize)
769 778 payloadsize = self._unpack(_fpayloadsize)[0]
770 779 self.ui.debug('payload chunk size: %i\n' % payloadsize)
771 780 self._payloadstream = util.chunkbuffer(payloadchunks())
772 781 # we read the data, tell it
773 782 self._initialized = True
774 783
775 784 def read(self, size=None):
776 785 """read payload data"""
777 786 if not self._initialized:
778 787 self._readheader()
779 788 if size is None:
780 789 data = self._payloadstream.read()
781 790 else:
782 791 data = self._payloadstream.read(size)
783 792 if size is None or len(data) < size:
784 793 self.consumed = True
785 794 return data
786 795
787 796 capabilities = {'HG2Y': (),
788 797 'b2x:listkeys': (),
789 798 'b2x:pushkey': (),
790 799 'b2x:changegroup': (),
791 800 }
792 801
793 802 def getrepocaps(repo):
794 803 """return the bundle2 capabilities for a given repo
795 804
796 805 Exists to allow extensions (like evolution) to mutate the capabilities.
797 806 """
798 807 caps = capabilities.copy()
799 808 if obsolete.isenabled(repo, obsolete.exchangeopt):
800 809 supportedformat = tuple('V%i' % v for v in obsolete.formats)
801 810 caps['b2x:obsmarkers'] = supportedformat
802 811 return caps
803 812
804 813 def bundle2caps(remote):
805 814 """return the bundlecapabilities of a peer as dict"""
806 815 raw = remote.capable('bundle2-exp')
807 816 if not raw and raw != '':
808 817 return {}
809 818 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
810 819 return decodecaps(capsblob)
811 820
812 821 def obsmarkersversion(caps):
813 822 """extract the list of supported obsmarkers versions from a bundle2caps dict
814 823 """
815 824 obscaps = caps.get('b2x:obsmarkers', ())
816 825 return [int(c[1:]) for c in obscaps if c.startswith('V')]
817 826
818 827 @parthandler('b2x:changegroup')
819 828 def handlechangegroup(op, inpart):
820 829 """apply a changegroup part on the repo
821 830
822 831 This is a very early implementation that will massive rework before being
823 832 inflicted to any end-user.
824 833 """
825 834 # Make sure we trigger a transaction creation
826 835 #
827 836 # The addchangegroup function will get a transaction object by itself, but
828 837 # we need to make sure we trigger the creation of a transaction object used
829 838 # for the whole processing scope.
830 839 op.gettransaction()
831 840 cg = changegroup.cg1unpacker(inpart, 'UN')
832 841 # the source and url passed here are overwritten by the one contained in
833 842 # the transaction.hookargs argument. So 'bundle2' is a placeholder
834 843 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
835 844 op.records.add('changegroup', {'return': ret})
836 845 if op.reply is not None:
837 846 # This is definitly not the final form of this
838 847 # return. But one need to start somewhere.
839 848 part = op.reply.newpart('b2x:reply:changegroup')
840 849 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
841 850 part.addparam('return', '%i' % ret, mandatory=False)
842 851 assert not inpart.read()
843 852
844 853 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
845 854 def handlereplychangegroup(op, inpart):
846 855 ret = int(inpart.params['return'])
847 856 replyto = int(inpart.params['in-reply-to'])
848 857 op.records.add('changegroup', {'return': ret}, replyto)
849 858
850 859 @parthandler('b2x:check:heads')
851 860 def handlecheckheads(op, inpart):
852 861 """check that head of the repo did not change
853 862
854 863 This is used to detect a push race when using unbundle.
855 864 This replaces the "heads" argument of unbundle."""
856 865 h = inpart.read(20)
857 866 heads = []
858 867 while len(h) == 20:
859 868 heads.append(h)
860 869 h = inpart.read(20)
861 870 assert not h
862 871 if heads != op.repo.heads():
863 872 raise error.PushRaced('repository changed while pushing - '
864 873 'please try again')
865 874
866 875 @parthandler('b2x:output')
867 876 def handleoutput(op, inpart):
868 877 """forward output captured on the server to the client"""
869 878 for line in inpart.read().splitlines():
870 879 op.ui.write(('remote: %s\n' % line))
871 880
872 881 @parthandler('b2x:replycaps')
873 882 def handlereplycaps(op, inpart):
874 883 """Notify that a reply bundle should be created
875 884
876 885 The payload contains the capabilities information for the reply"""
877 886 caps = decodecaps(inpart.read())
878 887 if op.reply is None:
879 888 op.reply = bundle20(op.ui, caps)
880 889
881 890 @parthandler('b2x:error:abort', ('message', 'hint'))
882 891 def handlereplycaps(op, inpart):
883 892 """Used to transmit abort error over the wire"""
884 893 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
885 894
886 895 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
887 896 def handlereplycaps(op, inpart):
888 897 """Used to transmit unknown content error over the wire"""
889 898 kwargs = {}
890 899 parttype = inpart.params.get('parttype')
891 900 if parttype is not None:
892 901 kwargs['parttype'] = parttype
893 902 params = inpart.params.get('params')
894 903 if params is not None:
895 904 kwargs['params'] = params.split('\0')
896 905
897 906 raise error.UnsupportedPartError(**kwargs)
898 907
899 908 @parthandler('b2x:error:pushraced', ('message',))
900 909 def handlereplycaps(op, inpart):
901 910 """Used to transmit push race error over the wire"""
902 911 raise error.ResponseError(_('push failed:'), inpart.params['message'])
903 912
904 913 @parthandler('b2x:listkeys', ('namespace',))
905 914 def handlelistkeys(op, inpart):
906 915 """retrieve pushkey namespace content stored in a bundle2"""
907 916 namespace = inpart.params['namespace']
908 917 r = pushkey.decodekeys(inpart.read())
909 918 op.records.add('listkeys', (namespace, r))
910 919
911 920 @parthandler('b2x:pushkey', ('namespace', 'key', 'old', 'new'))
912 921 def handlepushkey(op, inpart):
913 922 """process a pushkey request"""
914 923 dec = pushkey.decode
915 924 namespace = dec(inpart.params['namespace'])
916 925 key = dec(inpart.params['key'])
917 926 old = dec(inpart.params['old'])
918 927 new = dec(inpart.params['new'])
919 928 ret = op.repo.pushkey(namespace, key, old, new)
920 929 record = {'namespace': namespace,
921 930 'key': key,
922 931 'old': old,
923 932 'new': new}
924 933 op.records.add('pushkey', record)
925 934 if op.reply is not None:
926 935 rpart = op.reply.newpart('b2x:reply:pushkey')
927 936 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
928 937 rpart.addparam('return', '%i' % ret, mandatory=False)
929 938
930 939 @parthandler('b2x:reply:pushkey', ('return', 'in-reply-to'))
931 940 def handlepushkeyreply(op, inpart):
932 941 """retrieve the result of a pushkey request"""
933 942 ret = int(inpart.params['return'])
934 943 partid = int(inpart.params['in-reply-to'])
935 944 op.records.add('pushkey', {'return': ret}, partid)
936 945
937 946 @parthandler('b2x:obsmarkers')
938 947 def handleobsmarker(op, inpart):
939 948 """add a stream of obsmarkers to the repo"""
940 949 tr = op.gettransaction()
941 950 new = op.repo.obsstore.mergemarkers(tr, inpart.read())
942 951 if new:
943 952 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
944 953 op.records.add('obsmarkers', {'new': new})
945 954 if op.reply is not None:
946 955 rpart = op.reply.newpart('b2x:reply:obsmarkers')
947 956 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
948 957 rpart.addparam('new', '%i' % new, mandatory=False)
949 958
950 959
951 960 @parthandler('b2x:reply:obsmarkers', ('new', 'in-reply-to'))
952 961 def handlepushkeyreply(op, inpart):
953 962 """retrieve the result of a pushkey request"""
954 963 ret = int(inpart.params['new'])
955 964 partid = int(inpart.params['in-reply-to'])
956 965 op.records.add('obsmarkers', {'new': ret}, partid)
General Comments 0
You need to be logged in to leave comments. Login now