##// END OF EJS Templates
bundle2: make header reading optional...
Pierre-Yves David -
r21066:5ecfe76d default
parent child Browse files
Show More
@@ -1,673 +1,675
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: (16 bits integer)
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: (16 bits inter)
68 68
69 69 The total number of Bytes used by the part headers. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 :payload:
117 117
118 118 payload is a series of `<chunksize><chunkdata>`.
119 119
120 120 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
121 121 `chunksize` says)` The payload part is concluded by a zero size chunk.
122 122
123 123 The current implementation always produces either zero or one chunk.
124 124 This is an implementation limitation that will ultimately be lifted.
125 125
126 126 Bundle processing
127 127 ============================
128 128
129 129 Each part is processed in order using a "part handler". Handler are registered
130 130 for a certain part type.
131 131
132 132 The matching of a part to its handler is case insensitive. The case of the
133 133 part type is used to know if a part is mandatory or advisory. If the Part type
134 134 contains any uppercase char it is considered mandatory. When no handler is
135 135 known for a Mandatory part, the process is aborted and an exception is raised.
136 136 If the part is advisory and no handler is known, the part is ignored. When the
137 137 process is aborted, the full bundle is still read from the stream to keep the
138 138 channel usable. But none of the part read from an abort are processed. In the
139 139 future, dropping the stream may become an option for channel we do not care to
140 140 preserve.
141 141 """
142 142
143 143 import util
144 144 import struct
145 145 import urllib
146 146 import string
147 147
148 148 import changegroup
149 149 from i18n import _
150 150
151 151 _pack = struct.pack
152 152 _unpack = struct.unpack
153 153
154 154 _magicstring = 'HG20'
155 155
156 156 _fstreamparamsize = '>H'
157 157 _fpartheadersize = '>H'
158 158 _fparttypesize = '>B'
159 159 _fpartid = '>I'
160 160 _fpayloadsize = '>I'
161 161 _fpartparamcount = '>BB'
162 162
163 163 preferedchunksize = 4096
164 164
165 165 def _makefpartparamsizes(nbparams):
166 166 """return a struct format to read part parameter sizes
167 167
168 168 The number parameters is variable so we need to build that format
169 169 dynamically.
170 170 """
171 171 return '>'+('BB'*nbparams)
172 172
173 173 parthandlermapping = {}
174 174
175 175 def parthandler(parttype):
176 176 """decorator that register a function as a bundle2 part handler
177 177
178 178 eg::
179 179
180 180 @parthandler('myparttype')
181 181 def myparttypehandler(...):
182 182 '''process a part of type "my part".'''
183 183 ...
184 184 """
185 185 def _decorator(func):
186 186 lparttype = parttype.lower() # enforce lower case matching.
187 187 assert lparttype not in parthandlermapping
188 188 parthandlermapping[lparttype] = func
189 189 return func
190 190 return _decorator
191 191
192 192 class unbundlerecords(object):
193 193 """keep record of what happens during and unbundle
194 194
195 195 New records are added using `records.add('cat', obj)`. Where 'cat' is a
196 196 category of record and obj is an arbitrary object.
197 197
198 198 `records['cat']` will return all entries of this category 'cat'.
199 199
200 200 Iterating on the object itself will yield `('category', obj)` tuples
201 201 for all entries.
202 202
203 203 All iterations happens in chronological order.
204 204 """
205 205
206 206 def __init__(self):
207 207 self._categories = {}
208 208 self._sequences = []
209 209 self._replies = {}
210 210
211 211 def add(self, category, entry, inreplyto=None):
212 212 """add a new record of a given category.
213 213
214 214 The entry can then be retrieved in the list returned by
215 215 self['category']."""
216 216 self._categories.setdefault(category, []).append(entry)
217 217 self._sequences.append((category, entry))
218 218 if inreplyto is not None:
219 219 self.getreplies(inreplyto).add(category, entry)
220 220
221 221 def getreplies(self, partid):
222 222 """get the subrecords that replies to a specific part"""
223 223 return self._replies.setdefault(partid, unbundlerecords())
224 224
225 225 def __getitem__(self, cat):
226 226 return tuple(self._categories.get(cat, ()))
227 227
228 228 def __iter__(self):
229 229 return iter(self._sequences)
230 230
231 231 def __len__(self):
232 232 return len(self._sequences)
233 233
234 234 def __nonzero__(self):
235 235 return bool(self._sequences)
236 236
237 237 class bundleoperation(object):
238 238 """an object that represents a single bundling process
239 239
240 240 Its purpose is to carry unbundle-related objects and states.
241 241
242 242 A new object should be created at the beginning of each bundle processing.
243 243 The object is to be returned by the processing function.
244 244
245 245 The object has very little content now it will ultimately contain:
246 246 * an access to the repo the bundle is applied to,
247 247 * a ui object,
248 248 * a way to retrieve a transaction to add changes to the repo,
249 249 * a way to record the result of processing each part,
250 250 * a way to construct a bundle response when applicable.
251 251 """
252 252
253 253 def __init__(self, repo, transactiongetter):
254 254 self.repo = repo
255 255 self.ui = repo.ui
256 256 self.records = unbundlerecords()
257 257 self.gettransaction = transactiongetter
258 258 self.reply = None
259 259
260 260 class TransactionUnavailable(RuntimeError):
261 261 pass
262 262
263 263 def _notransaction():
264 264 """default method to get a transaction while processing a bundle
265 265
266 266 Raise an exception to highlight the fact that no transaction was expected
267 267 to be created"""
268 268 raise TransactionUnavailable()
269 269
270 270 def processbundle(repo, unbundler, transactiongetter=_notransaction):
271 271 """This function process a bundle, apply effect to/from a repo
272 272
273 273 It iterates over each part then searches for and uses the proper handling
274 274 code to process the part. Parts are processed in order.
275 275
276 276 This is very early version of this function that will be strongly reworked
277 277 before final usage.
278 278
279 279 Unknown Mandatory part will abort the process.
280 280 """
281 281 op = bundleoperation(repo, transactiongetter)
282 282 # todo:
283 283 # - only create reply bundle if requested.
284 284 op.reply = bundle20(op.ui)
285 285 # todo:
286 286 # - replace this is a init function soon.
287 287 # - exception catching
288 288 unbundler.params
289 289 iterparts = iter(unbundler)
290 290 part = None
291 291 try:
292 292 for part in iterparts:
293 293 parttype = part.type
294 294 # part key are matched lower case
295 295 key = parttype.lower()
296 296 try:
297 297 handler = parthandlermapping[key]
298 298 op.ui.debug('found a handler for part %r\n' % parttype)
299 299 except KeyError:
300 300 if key != parttype: # mandatory parts
301 301 # todo:
302 302 # - use a more precise exception
303 303 raise
304 304 op.ui.debug('ignoring unknown advisory part %r\n' % key)
305 305 # consuming the part
306 306 part.read()
307 307 continue
308 308
309 309 # handler is called outside the above try block so that we don't
310 310 # risk catching KeyErrors from anything other than the
311 311 # parthandlermapping lookup (any KeyError raised by handler()
312 312 # itself represents a defect of a different variety).
313 313 handler(op, part)
314 314 part.read()
315 315 except Exception:
316 316 if part is not None:
317 317 # consume the bundle content
318 318 part.read()
319 319 for part in iterparts:
320 320 # consume the bundle content
321 321 part.read()
322 322 raise
323 323 return op
324 324
325 325 class bundle20(object):
326 326 """represent an outgoing bundle2 container
327 327
328 328 Use the `addparam` method to add stream level parameter. and `addpart` to
329 329 populate it. Then call `getchunks` to retrieve all the binary chunks of
330 330 data that compose the bundle2 container."""
331 331
332 332 def __init__(self, ui):
333 333 self.ui = ui
334 334 self._params = []
335 335 self._parts = []
336 336
337 337 def addparam(self, name, value=None):
338 338 """add a stream level parameter"""
339 339 if not name:
340 340 raise ValueError('empty parameter name')
341 341 if name[0] not in string.letters:
342 342 raise ValueError('non letter first character: %r' % name)
343 343 self._params.append((name, value))
344 344
345 345 def addpart(self, part):
346 346 """add a new part to the bundle2 container
347 347
348 348 Parts contains the actual applicative payload."""
349 349 assert part.id is None
350 350 part.id = len(self._parts) # very cheap counter
351 351 self._parts.append(part)
352 352
353 353 def getchunks(self):
354 354 self.ui.debug('start emission of %s stream\n' % _magicstring)
355 355 yield _magicstring
356 356 param = self._paramchunk()
357 357 self.ui.debug('bundle parameter: %s\n' % param)
358 358 yield _pack(_fstreamparamsize, len(param))
359 359 if param:
360 360 yield param
361 361
362 362 self.ui.debug('start of parts\n')
363 363 for part in self._parts:
364 364 self.ui.debug('bundle part: "%s"\n' % part.type)
365 365 for chunk in part.getchunks():
366 366 yield chunk
367 367 self.ui.debug('end of bundle\n')
368 368 yield '\0\0'
369 369
370 370 def _paramchunk(self):
371 371 """return a encoded version of all stream parameters"""
372 372 blocks = []
373 373 for par, value in self._params:
374 374 par = urllib.quote(par)
375 375 if value is not None:
376 376 value = urllib.quote(value)
377 377 par = '%s=%s' % (par, value)
378 378 blocks.append(par)
379 379 return ' '.join(blocks)
380 380
381 381 class unpackermixin(object):
382 382 """A mixin to extract bytes and struct data from a stream"""
383 383
384 384 def __init__(self, fp):
385 385 self._fp = fp
386 386
387 387 def _unpack(self, format):
388 388 """unpack this struct format from the stream"""
389 389 data = self._readexact(struct.calcsize(format))
390 390 return _unpack(format, data)
391 391
392 392 def _readexact(self, size):
393 393 """read exactly <size> bytes from the stream"""
394 394 return changegroup.readexactly(self._fp, size)
395 395
396 396
397 397 class unbundle20(unpackermixin):
398 398 """interpret a bundle2 stream
399 399
400 400 (this will eventually yield parts)"""
401 401
402 def __init__(self, ui, fp):
402 def __init__(self, ui, fp, header=None):
403 """If header is specified, we do not read it out of the stream."""
403 404 self.ui = ui
404 405 super(unbundle20, self).__init__(fp)
406 if header is None:
405 407 header = self._readexact(4)
406 408 magic, version = header[0:2], header[2:4]
407 409 if magic != 'HG':
408 410 raise util.Abort(_('not a Mercurial bundle'))
409 411 if version != '20':
410 412 raise util.Abort(_('unknown bundle version %s') % version)
411 413 self.ui.debug('start processing of %s stream\n' % header)
412 414
413 415 @util.propertycache
414 416 def params(self):
415 417 """dictionary of stream level parameters"""
416 418 self.ui.debug('reading bundle2 stream parameters\n')
417 419 params = {}
418 420 paramssize = self._unpack(_fstreamparamsize)[0]
419 421 if paramssize:
420 422 for p in self._readexact(paramssize).split(' '):
421 423 p = p.split('=', 1)
422 424 p = [urllib.unquote(i) for i in p]
423 425 if len(p) < 2:
424 426 p.append(None)
425 427 self._processparam(*p)
426 428 params[p[0]] = p[1]
427 429 return params
428 430
429 431 def _processparam(self, name, value):
430 432 """process a parameter, applying its effect if needed
431 433
432 434 Parameter starting with a lower case letter are advisory and will be
433 435 ignored when unknown. Those starting with an upper case letter are
434 436 mandatory and will this function will raise a KeyError when unknown.
435 437
436 438 Note: no option are currently supported. Any input will be either
437 439 ignored or failing.
438 440 """
439 441 if not name:
440 442 raise ValueError('empty parameter name')
441 443 if name[0] not in string.letters:
442 444 raise ValueError('non letter first character: %r' % name)
443 445 # Some logic will be later added here to try to process the option for
444 446 # a dict of known parameter.
445 447 if name[0].islower():
446 448 self.ui.debug("ignoring unknown parameter %r\n" % name)
447 449 else:
448 450 raise KeyError(name)
449 451
450 452
451 453 def __iter__(self):
452 454 """yield all parts contained in the stream"""
453 455 # make sure param have been loaded
454 456 self.params
455 457 self.ui.debug('start extraction of bundle2 parts\n')
456 458 headerblock = self._readpartheader()
457 459 while headerblock is not None:
458 460 part = unbundlepart(self.ui, headerblock, self._fp)
459 461 yield part
460 462 headerblock = self._readpartheader()
461 463 self.ui.debug('end of bundle2 stream\n')
462 464
463 465 def _readpartheader(self):
464 466 """reads a part header size and return the bytes blob
465 467
466 468 returns None if empty"""
467 469 headersize = self._unpack(_fpartheadersize)[0]
468 470 self.ui.debug('part header size: %i\n' % headersize)
469 471 if headersize:
470 472 return self._readexact(headersize)
471 473 return None
472 474
473 475
474 476 class bundlepart(object):
475 477 """A bundle2 part contains application level payload
476 478
477 479 The part `type` is used to route the part to the application level
478 480 handler.
479 481 """
480 482
481 483 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
482 484 data=''):
483 485 self.id = None
484 486 self.type = parttype
485 487 self.data = data
486 488 self.mandatoryparams = mandatoryparams
487 489 self.advisoryparams = advisoryparams
488 490
489 491 def getchunks(self):
490 492 #### header
491 493 ## parttype
492 494 header = [_pack(_fparttypesize, len(self.type)),
493 495 self.type, _pack(_fpartid, self.id),
494 496 ]
495 497 ## parameters
496 498 # count
497 499 manpar = self.mandatoryparams
498 500 advpar = self.advisoryparams
499 501 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
500 502 # size
501 503 parsizes = []
502 504 for key, value in manpar:
503 505 parsizes.append(len(key))
504 506 parsizes.append(len(value))
505 507 for key, value in advpar:
506 508 parsizes.append(len(key))
507 509 parsizes.append(len(value))
508 510 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
509 511 header.append(paramsizes)
510 512 # key, value
511 513 for key, value in manpar:
512 514 header.append(key)
513 515 header.append(value)
514 516 for key, value in advpar:
515 517 header.append(key)
516 518 header.append(value)
517 519 ## finalize header
518 520 headerchunk = ''.join(header)
519 521 yield _pack(_fpartheadersize, len(headerchunk))
520 522 yield headerchunk
521 523 ## payload
522 524 for chunk in self._payloadchunks():
523 525 yield _pack(_fpayloadsize, len(chunk))
524 526 yield chunk
525 527 # end of payload
526 528 yield _pack(_fpayloadsize, 0)
527 529
528 530 def _payloadchunks(self):
529 531 """yield chunks of a the part payload
530 532
531 533 Exists to handle the different methods to provide data to a part."""
532 534 # we only support fixed size data now.
533 535 # This will be improved in the future.
534 536 if util.safehasattr(self.data, 'next'):
535 537 buff = util.chunkbuffer(self.data)
536 538 chunk = buff.read(preferedchunksize)
537 539 while chunk:
538 540 yield chunk
539 541 chunk = buff.read(preferedchunksize)
540 542 elif len(self.data):
541 543 yield self.data
542 544
543 545 class unbundlepart(unpackermixin):
544 546 """a bundle part read from a bundle"""
545 547
546 548 def __init__(self, ui, header, fp):
547 549 super(unbundlepart, self).__init__(fp)
548 550 self.ui = ui
549 551 # unbundle state attr
550 552 self._headerdata = header
551 553 self._headeroffset = 0
552 554 self._initialized = False
553 555 self.consumed = False
554 556 # part data
555 557 self.id = None
556 558 self.type = None
557 559 self.mandatoryparams = None
558 560 self.advisoryparams = None
559 561 self._payloadstream = None
560 562 self._readheader()
561 563
562 564 def _fromheader(self, size):
563 565 """return the next <size> byte from the header"""
564 566 offset = self._headeroffset
565 567 data = self._headerdata[offset:(offset + size)]
566 568 self._headeroffset = offset + size
567 569 return data
568 570
569 571 def _unpackheader(self, format):
570 572 """read given format from header
571 573
572 574 This automatically compute the size of the format to read."""
573 575 data = self._fromheader(struct.calcsize(format))
574 576 return _unpack(format, data)
575 577
576 578 def _readheader(self):
577 579 """read the header and setup the object"""
578 580 typesize = self._unpackheader(_fparttypesize)[0]
579 581 self.type = self._fromheader(typesize)
580 582 self.ui.debug('part type: "%s"\n' % self.type)
581 583 self.id = self._unpackheader(_fpartid)[0]
582 584 self.ui.debug('part id: "%s"\n' % self.id)
583 585 ## reading parameters
584 586 # param count
585 587 mancount, advcount = self._unpackheader(_fpartparamcount)
586 588 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
587 589 # param size
588 590 fparamsizes = _makefpartparamsizes(mancount + advcount)
589 591 paramsizes = self._unpackheader(fparamsizes)
590 592 # make it a list of couple again
591 593 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
592 594 # split mandatory from advisory
593 595 mansizes = paramsizes[:mancount]
594 596 advsizes = paramsizes[mancount:]
595 597 # retrive param value
596 598 manparams = []
597 599 for key, value in mansizes:
598 600 manparams.append((self._fromheader(key), self._fromheader(value)))
599 601 advparams = []
600 602 for key, value in advsizes:
601 603 advparams.append((self._fromheader(key), self._fromheader(value)))
602 604 self.mandatoryparams = manparams
603 605 self.advisoryparams = advparams
604 606 ## part payload
605 607 def payloadchunks():
606 608 payloadsize = self._unpack(_fpayloadsize)[0]
607 609 self.ui.debug('payload chunk size: %i\n' % payloadsize)
608 610 while payloadsize:
609 611 yield self._readexact(payloadsize)
610 612 payloadsize = self._unpack(_fpayloadsize)[0]
611 613 self.ui.debug('payload chunk size: %i\n' % payloadsize)
612 614 self._payloadstream = util.chunkbuffer(payloadchunks())
613 615 # we read the data, tell it
614 616 self._initialized = True
615 617
616 618 def read(self, size=None):
617 619 """read payload data"""
618 620 if not self._initialized:
619 621 self._readheader()
620 622 if size is None:
621 623 data = self._payloadstream.read()
622 624 else:
623 625 data = self._payloadstream.read(size)
624 626 if size is None or len(data) < size:
625 627 self.consumed = True
626 628 return data
627 629
628 630
629 631 @parthandler('changegroup')
630 632 def handlechangegroup(op, inpart):
631 633 """apply a changegroup part on the repo
632 634
633 635 This is a very early implementation that will massive rework before being
634 636 inflicted to any end-user.
635 637 """
636 638 # Make sure we trigger a transaction creation
637 639 #
638 640 # The addchangegroup function will get a transaction object by itself, but
639 641 # we need to make sure we trigger the creation of a transaction object used
640 642 # for the whole processing scope.
641 643 op.gettransaction()
642 644 cg = changegroup.unbundle10(inpart, 'UN')
643 645 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
644 646 op.records.add('changegroup', {'return': ret})
645 647 if op.reply is not None:
646 648 # This is definitly not the final form of this
647 649 # return. But one need to start somewhere.
648 650 part = bundlepart('reply:changegroup', (),
649 651 [('in-reply-to', str(inpart.id)),
650 652 ('return', '%i' % ret)])
651 653 op.reply.addpart(part)
652 654 assert not inpart.read()
653 655
654 656 @parthandler('reply:changegroup')
655 657 def handlechangegroup(op, inpart):
656 658 p = dict(inpart.advisoryparams)
657 659 ret = int(p['return'])
658 660 op.records.add('changegroup', {'return': ret}, int(p['in-reply-to']))
659 661
660 662 @parthandler('check:heads')
661 663 def handlechangegroup(op, inpart):
662 664 """check that head of the repo did not change
663 665
664 666 This is used to detect a push race when using unbundle.
665 667 This replaces the "heads" argument of unbundle."""
666 668 h = inpart.read(20)
667 669 heads = []
668 670 while len(h) == 20:
669 671 heads.append(h)
670 672 h = inpart.read(20)
671 673 assert not h
672 674 if heads != op.repo.heads():
673 675 raise exchange.PushRaced()
General Comments 0
You need to be logged in to leave comments. Login now