##// END OF EJS Templates
bundle2: directly feed part to readbundle...
Pierre-Yves David -
r21020:5041163e default
parent child Browse files
Show More
@@ -1,662 +1,659 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: (16 bits integer)
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. parameter with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safefly ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remains simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a the bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: (16 bits inter)
68 68
69 69 The total number of Bytes used by the part headers. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :typename: alphanumerical part name
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitraty content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 :payload:
117 117
118 118 payload is a series of `<chunksize><chunkdata>`.
119 119
120 120 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
121 121 `chunksize` says)` The payload part is concluded by a zero size chunk.
122 122
123 123 The current implementation always produces either zero or one chunk.
124 124 This is an implementation limitation that will ultimatly be lifted.
125 125
126 126 Bundle processing
127 127 ============================
128 128
129 129 Each part is processed in order using a "part handler". Handler are registered
130 130 for a certain part type.
131 131
132 132 The matching of a part to its handler is case insensitive. The case of the
133 133 part type is used to know if a part is mandatory or advisory. If the Part type
134 134 contains any uppercase char it is considered mandatory. When no handler is
135 135 known for a Mandatory part, the process is aborted and an exception is raised.
136 136 If the part is advisory and no handler is known, the part is ignored. When the
137 137 process is aborted, the full bundle is still read from the stream to keep the
138 138 channel usable. But none of the part read from an abort are processed. In the
139 139 future, dropping the stream may become an option for channel we do not care to
140 140 preserve.
141 141 """
142 142
143 143 import util
144 144 import struct
145 145 import urllib
146 146 import string
147 import StringIO
148 147
149 148 import changegroup
150 149 from i18n import _
151 150
152 151 _pack = struct.pack
153 152 _unpack = struct.unpack
154 153
155 154 _magicstring = 'HG20'
156 155
157 156 _fstreamparamsize = '>H'
158 157 _fpartheadersize = '>H'
159 158 _fparttypesize = '>B'
160 159 _fpartid = '>I'
161 160 _fpayloadsize = '>I'
162 161 _fpartparamcount = '>BB'
163 162
164 163 preferedchunksize = 4096
165 164
166 165 def _makefpartparamsizes(nbparams):
167 166 """return a struct format to read part parameter sizes
168 167
169 168 The number parameters is variable so we need to build that format
170 169 dynamically.
171 170 """
172 171 return '>'+('BB'*nbparams)
173 172
174 173 parthandlermapping = {}
175 174
176 175 def parthandler(parttype):
177 176 """decorator that register a function as a bundle2 part handler
178 177
179 178 eg::
180 179
181 180 @parthandler('myparttype')
182 181 def myparttypehandler(...):
183 182 '''process a part of type "my part".'''
184 183 ...
185 184 """
186 185 def _decorator(func):
187 186 lparttype = parttype.lower() # enforce lower case matching.
188 187 assert lparttype not in parthandlermapping
189 188 parthandlermapping[lparttype] = func
190 189 return func
191 190 return _decorator
192 191
193 192 class unbundlerecords(object):
194 193 """keep record of what happens during and unbundle
195 194
196 195 New records are added using `records.add('cat', obj)`. Where 'cat' is a
197 196 category of record and obj is an arbitraty object.
198 197
199 198 `records['cat']` will return all entries of this category 'cat'.
200 199
201 200 Iterating on the object itself will yield `('category', obj)` tuples
202 201 for all entries.
203 202
204 203 All iterations happens in chronological order.
205 204 """
206 205
207 206 def __init__(self):
208 207 self._categories = {}
209 208 self._sequences = []
210 209 self._replies = {}
211 210
212 211 def add(self, category, entry, inreplyto=None):
213 212 """add a new record of a given category.
214 213
215 214 The entry can then be retrieved in the list returned by
216 215 self['category']."""
217 216 self._categories.setdefault(category, []).append(entry)
218 217 self._sequences.append((category, entry))
219 218 if inreplyto is not None:
220 219 self.getreplies(inreplyto).add(category, entry)
221 220
222 221 def getreplies(self, partid):
223 222 """get the subrecords that replies to a specific part"""
224 223 return self._replies.setdefault(partid, unbundlerecords())
225 224
226 225 def __getitem__(self, cat):
227 226 return tuple(self._categories.get(cat, ()))
228 227
229 228 def __iter__(self):
230 229 return iter(self._sequences)
231 230
232 231 def __len__(self):
233 232 return len(self._sequences)
234 233
235 234 def __nonzero__(self):
236 235 return bool(self._sequences)
237 236
238 237 class bundleoperation(object):
239 238 """an object that represents a single bundling process
240 239
241 240 Its purpose is to carry unbundle-related objects and states.
242 241
243 242 A new object should be created at the beginning of each bundle processing.
244 243 The object is to be returned by the processing function.
245 244
246 245 The object has very little content now it will ultimately contain:
247 246 * an access to the repo the bundle is applied to,
248 247 * a ui object,
249 248 * a way to retrieve a transaction to add changes to the repo,
250 249 * a way to record the result of processing each part,
251 250 * a way to construct a bundle response when applicable.
252 251 """
253 252
254 253 def __init__(self, repo, transactiongetter):
255 254 self.repo = repo
256 255 self.ui = repo.ui
257 256 self.records = unbundlerecords()
258 257 self.gettransaction = transactiongetter
259 258 self.reply = None
260 259
261 260 class TransactionUnavailable(RuntimeError):
262 261 pass
263 262
264 263 def _notransaction():
265 264 """default method to get a transaction while processing a bundle
266 265
267 266 Raise an exception to highlight the fact that no transaction was expected
268 267 to be created"""
269 268 raise TransactionUnavailable()
270 269
271 270 def processbundle(repo, unbundler, transactiongetter=_notransaction):
272 271 """This function process a bundle, apply effect to/from a repo
273 272
274 273 It iterates over each part then searches for and uses the proper handling
275 274 code to process the part. Parts are processed in order.
276 275
277 276 This is very early version of this function that will be strongly reworked
278 277 before final usage.
279 278
280 279 Unknown Mandatory part will abort the process.
281 280 """
282 281 op = bundleoperation(repo, transactiongetter)
283 282 # todo:
284 283 # - only create reply bundle if requested.
285 284 op.reply = bundle20(op.ui)
286 285 # todo:
287 286 # - replace this is a init function soon.
288 287 # - exception catching
289 288 unbundler.params
290 289 iterparts = iter(unbundler)
291 290 part = None
292 291 try:
293 292 for part in iterparts:
294 293 parttype = part.type
295 294 # part key are matched lower case
296 295 key = parttype.lower()
297 296 try:
298 297 handler = parthandlermapping[key]
299 298 op.ui.debug('found a handler for part %r\n' % parttype)
300 299 except KeyError:
301 300 if key != parttype: # mandatory parts
302 301 # todo:
303 302 # - use a more precise exception
304 303 raise
305 304 op.ui.debug('ignoring unknown advisory part %r\n' % key)
306 305 # consuming the part
307 306 part.read()
308 307 continue
309 308
310 309 # handler is called outside the above try block so that we don't
311 310 # risk catching KeyErrors from anything other than the
312 311 # parthandlermapping lookup (any KeyError raised by handler()
313 312 # itself represents a defect of a different variety).
314 313 handler(op, part)
315 314 part.read()
316 315 except Exception:
317 316 if part is not None:
318 317 # consume the bundle content
319 318 part.read()
320 319 for part in iterparts:
321 320 # consume the bundle content
322 321 part.read()
323 322 raise
324 323 return op
325 324
326 325 class bundle20(object):
327 326 """represent an outgoing bundle2 container
328 327
329 328 Use the `addparam` method to add stream level parameter. and `addpart` to
330 329 populate it. Then call `getchunks` to retrieve all the binary chunks of
331 330 datathat compose the bundle2 container."""
332 331
333 332 def __init__(self, ui):
334 333 self.ui = ui
335 334 self._params = []
336 335 self._parts = []
337 336
338 337 def addparam(self, name, value=None):
339 338 """add a stream level parameter"""
340 339 if not name:
341 340 raise ValueError('empty parameter name')
342 341 if name[0] not in string.letters:
343 342 raise ValueError('non letter first character: %r' % name)
344 343 self._params.append((name, value))
345 344
346 345 def addpart(self, part):
347 346 """add a new part to the bundle2 container
348 347
349 348 Parts contains the actuall applicative payload."""
350 349 assert part.id is None
351 350 part.id = len(self._parts) # very cheap counter
352 351 self._parts.append(part)
353 352
354 353 def getchunks(self):
355 354 self.ui.debug('start emission of %s stream\n' % _magicstring)
356 355 yield _magicstring
357 356 param = self._paramchunk()
358 357 self.ui.debug('bundle parameter: %s\n' % param)
359 358 yield _pack(_fstreamparamsize, len(param))
360 359 if param:
361 360 yield param
362 361
363 362 self.ui.debug('start of parts\n')
364 363 for part in self._parts:
365 364 self.ui.debug('bundle part: "%s"\n' % part.type)
366 365 for chunk in part.getchunks():
367 366 yield chunk
368 367 self.ui.debug('end of bundle\n')
369 368 yield '\0\0'
370 369
371 370 def _paramchunk(self):
372 371 """return a encoded version of all stream parameters"""
373 372 blocks = []
374 373 for par, value in self._params:
375 374 par = urllib.quote(par)
376 375 if value is not None:
377 376 value = urllib.quote(value)
378 377 par = '%s=%s' % (par, value)
379 378 blocks.append(par)
380 379 return ' '.join(blocks)
381 380
382 381 class unpackermixin(object):
383 382 """A mixin to extract bytes and struct data from a stream"""
384 383
385 384 def __init__(self, fp):
386 385 self._fp = fp
387 386
388 387 def _unpack(self, format):
389 388 """unpack this struct format from the stream"""
390 389 data = self._readexact(struct.calcsize(format))
391 390 return _unpack(format, data)
392 391
393 392 def _readexact(self, size):
394 393 """read exactly <size> bytes from the stream"""
395 394 return changegroup.readexactly(self._fp, size)
396 395
397 396
398 397 class unbundle20(unpackermixin):
399 398 """interpret a bundle2 stream
400 399
401 400 (this will eventually yield parts)"""
402 401
403 402 def __init__(self, ui, fp):
404 403 self.ui = ui
405 404 super(unbundle20, self).__init__(fp)
406 405 header = self._readexact(4)
407 406 magic, version = header[0:2], header[2:4]
408 407 if magic != 'HG':
409 408 raise util.Abort(_('not a Mercurial bundle'))
410 409 if version != '20':
411 410 raise util.Abort(_('unknown bundle version %s') % version)
412 411 self.ui.debug('start processing of %s stream\n' % header)
413 412
414 413 @util.propertycache
415 414 def params(self):
416 415 """dictionnary of stream level parameters"""
417 416 self.ui.debug('reading bundle2 stream parameters\n')
418 417 params = {}
419 418 paramssize = self._unpack(_fstreamparamsize)[0]
420 419 if paramssize:
421 420 for p in self._readexact(paramssize).split(' '):
422 421 p = p.split('=', 1)
423 422 p = [urllib.unquote(i) for i in p]
424 423 if len(p) < 2:
425 424 p.append(None)
426 425 self._processparam(*p)
427 426 params[p[0]] = p[1]
428 427 return params
429 428
430 429 def _processparam(self, name, value):
431 430 """process a parameter, applying its effect if needed
432 431
433 432 Parameter starting with a lower case letter are advisory and will be
434 433 ignored when unknown. Those starting with an upper case letter are
435 434 mandatory and will this function will raise a KeyError when unknown.
436 435
437 436 Note: no option are currently supported. Any input will be either
438 437 ignored or failing.
439 438 """
440 439 if not name:
441 440 raise ValueError('empty parameter name')
442 441 if name[0] not in string.letters:
443 442 raise ValueError('non letter first character: %r' % name)
444 443 # Some logic will be later added here to try to process the option for
445 444 # a dict of known parameter.
446 445 if name[0].islower():
447 446 self.ui.debug("ignoring unknown parameter %r\n" % name)
448 447 else:
449 448 raise KeyError(name)
450 449
451 450
452 451 def __iter__(self):
453 452 """yield all parts contained in the stream"""
454 453 # make sure param have been loaded
455 454 self.params
456 455 self.ui.debug('start extraction of bundle2 parts\n')
457 456 headerblock = self._readpartheader()
458 457 while headerblock is not None:
459 458 part = unbundlepart(self.ui, headerblock, self._fp)
460 459 yield part
461 460 headerblock = self._readpartheader()
462 461 self.ui.debug('end of bundle2 stream\n')
463 462
464 463 def _readpartheader(self):
465 464 """reads a part header size and return the bytes blob
466 465
467 466 returns None if empty"""
468 467 headersize = self._unpack(_fpartheadersize)[0]
469 468 self.ui.debug('part header size: %i\n' % headersize)
470 469 if headersize:
471 470 return self._readexact(headersize)
472 471 return None
473 472
474 473
475 474 class bundlepart(object):
476 475 """A bundle2 part contains application level payload
477 476
478 477 The part `type` is used to route the part to the application level
479 478 handler.
480 479 """
481 480
482 481 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
483 482 data=''):
484 483 self.id = None
485 484 self.type = parttype
486 485 self.data = data
487 486 self.mandatoryparams = mandatoryparams
488 487 self.advisoryparams = advisoryparams
489 488
490 489 def getchunks(self):
491 490 #### header
492 491 ## parttype
493 492 header = [_pack(_fparttypesize, len(self.type)),
494 493 self.type, _pack(_fpartid, self.id),
495 494 ]
496 495 ## parameters
497 496 # count
498 497 manpar = self.mandatoryparams
499 498 advpar = self.advisoryparams
500 499 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
501 500 # size
502 501 parsizes = []
503 502 for key, value in manpar:
504 503 parsizes.append(len(key))
505 504 parsizes.append(len(value))
506 505 for key, value in advpar:
507 506 parsizes.append(len(key))
508 507 parsizes.append(len(value))
509 508 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
510 509 header.append(paramsizes)
511 510 # key, value
512 511 for key, value in manpar:
513 512 header.append(key)
514 513 header.append(value)
515 514 for key, value in advpar:
516 515 header.append(key)
517 516 header.append(value)
518 517 ## finalize header
519 518 headerchunk = ''.join(header)
520 519 yield _pack(_fpartheadersize, len(headerchunk))
521 520 yield headerchunk
522 521 ## payload
523 522 for chunk in self._payloadchunks():
524 523 yield _pack(_fpayloadsize, len(chunk))
525 524 yield chunk
526 525 # end of payload
527 526 yield _pack(_fpayloadsize, 0)
528 527
529 528 def _payloadchunks(self):
530 529 """yield chunks of a the part payload
531 530
532 531 Exists to handle the different methods to provide data to a part."""
533 532 # we only support fixed size data now.
534 533 # This will be improved in the future.
535 534 if util.safehasattr(self.data, 'next'):
536 535 buff = util.chunkbuffer(self.data)
537 536 chunk = buff.read(preferedchunksize)
538 537 while chunk:
539 538 yield chunk
540 539 chunk = buff.read(preferedchunksize)
541 540 elif len(self.data):
542 541 yield self.data
543 542
544 543 class unbundlepart(unpackermixin):
545 544 """a bundle part read from a bundle"""
546 545
547 546 def __init__(self, ui, header, fp):
548 547 super(unbundlepart, self).__init__(fp)
549 548 self.ui = ui
550 549 # unbundle state attr
551 550 self._headerdata = header
552 551 self._headeroffset = 0
553 552 self._initialized = False
554 553 self.consumed = False
555 554 # part data
556 555 self.id = None
557 556 self.type = None
558 557 self.mandatoryparams = None
559 558 self.advisoryparams = None
560 559 self._payloadstream = None
561 560 self._readheader()
562 561
563 562 def _fromheader(self, size):
564 563 """return the next <size> byte from the header"""
565 564 offset = self._headeroffset
566 565 data = self._headerdata[offset:(offset + size)]
567 566 self._headeroffset = offset + size
568 567 return data
569 568
570 569 def _unpackheader(self, format):
571 570 """read given format from header
572 571
573 572 This automatically compute the size of the format to read."""
574 573 data = self._fromheader(struct.calcsize(format))
575 574 return _unpack(format, data)
576 575
577 576 def _readheader(self):
578 577 """read the header and setup the object"""
579 578 typesize = self._unpackheader(_fparttypesize)[0]
580 579 self.type = self._fromheader(typesize)
581 580 self.ui.debug('part type: "%s"\n' % self.type)
582 581 self.id = self._unpackheader(_fpartid)[0]
583 582 self.ui.debug('part id: "%s"\n' % self.id)
584 583 ## reading parameters
585 584 # param count
586 585 mancount, advcount = self._unpackheader(_fpartparamcount)
587 586 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
588 587 # param size
589 588 fparamsizes = _makefpartparamsizes(mancount + advcount)
590 589 paramsizes = self._unpackheader(fparamsizes)
591 590 # make it a list of couple again
592 591 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
593 592 # split mandatory from advisory
594 593 mansizes = paramsizes[:mancount]
595 594 advsizes = paramsizes[mancount:]
596 595 # retrive param value
597 596 manparams = []
598 597 for key, value in mansizes:
599 598 manparams.append((self._fromheader(key), self._fromheader(value)))
600 599 advparams = []
601 600 for key, value in advsizes:
602 601 advparams.append((self._fromheader(key), self._fromheader(value)))
603 602 self.mandatoryparams = manparams
604 603 self.advisoryparams = advparams
605 604 ## part payload
606 605 def payloadchunks():
607 606 payloadsize = self._unpack(_fpayloadsize)[0]
608 607 self.ui.debug('payload chunk size: %i\n' % payloadsize)
609 608 while payloadsize:
610 609 yield self._readexact(payloadsize)
611 610 payloadsize = self._unpack(_fpayloadsize)[0]
612 611 self.ui.debug('payload chunk size: %i\n' % payloadsize)
613 612 self._payloadstream = util.chunkbuffer(payloadchunks())
614 613 # we read the data, tell it
615 614 self._initialized = True
616 615
617 616 def read(self, size=None):
618 617 """read payload data"""
619 618 if not self._initialized:
620 619 self._readheader()
621 620 if size is None:
622 621 data = self._payloadstream.read()
623 622 else:
624 623 data = self._payloadstream.read(size)
625 624 if size is None or len(data) < size:
626 625 self.consumed = True
627 626 return data
628 627
629 628
630 629 @parthandler('changegroup')
631 630 def handlechangegroup(op, inpart):
632 631 """apply a changegroup part on the repo
633 632
634 633 This is a very early implementation that will massive rework before being
635 634 inflicted to any end-user.
636 635 """
637 636 # Make sure we trigger a transaction creation
638 637 #
639 638 # The addchangegroup function will get a transaction object by itself, but
640 639 # we need to make sure we trigger the creation of a transaction object used
641 640 # for the whole processing scope.
642 641 op.gettransaction()
643 data = StringIO.StringIO(inpart.read())
644 data.seek(0)
645 cg = changegroup.readbundle(data, 'bundle2part')
642 cg = changegroup.readbundle(inpart, 'bundle2part')
646 643 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
647 644 op.records.add('changegroup', {'return': ret})
648 645 if op.reply is not None:
649 646 # This is definitly not the final form of this
650 647 # return. But one need to start somewhere.
651 648 part = bundlepart('reply:changegroup', (),
652 649 [('in-reply-to', str(inpart.id)),
653 650 ('return', '%i' % ret)])
654 651 op.reply.addpart(part)
655 652 assert not inpart.read()
656 653
657 654 @parthandler('reply:changegroup')
658 655 def handlechangegroup(op, inpart):
659 656 p = dict(inpart.advisoryparams)
660 657 ret = int(p['return'])
661 658 op.records.add('changegroup', {'return': ret}, int(p['in-reply-to']))
662 659
General Comments 0
You need to be logged in to leave comments. Login now