##// END OF EJS Templates
bundle2: extract a _payloadchunks method for part...
Pierre-Yves David -
r21000:4cae06ae default
parent child Browse files
Show More
@@ -1,590 +1,597 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: (16 bits integer)
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. parameter with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safefly ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remains simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a the bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: (16 bits inter)
68 68
69 69 The total number of Bytes used by the part headers. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :typename: alphanumerical part name
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitraty content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 :payload:
117 117
118 118 payload is a series of `<chunksize><chunkdata>`.
119 119
120 120 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
121 121 `chunksize` says)` The payload part is concluded by a zero size chunk.
122 122
123 123 The current implementation always produces either zero or one chunk.
124 124 This is an implementation limitation that will ultimatly be lifted.
125 125
126 126 Bundle processing
127 127 ============================
128 128
129 129 Each part is processed in order using a "part handler". Handler are registered
130 130 for a certain part type.
131 131
132 132 The matching of a part to its handler is case insensitive. The case of the
133 133 part type is used to know if a part is mandatory or advisory. If the Part type
134 134 contains any uppercase char it is considered mandatory. When no handler is
135 135 known for a Mandatory part, the process is aborted and an exception is raised.
136 136 If the part is advisory and no handler is known, the part is ignored. When the
137 137 process is aborted, the full bundle is still read from the stream to keep the
138 138 channel usable. But none of the part read from an abort are processed. In the
139 139 future, dropping the stream may become an option for channel we do not care to
140 140 preserve.
141 141 """
142 142
143 143 import util
144 144 import struct
145 145 import urllib
146 146 import string
147 147 import StringIO
148 148
149 149 import changegroup
150 150 from i18n import _
151 151
152 152 _pack = struct.pack
153 153 _unpack = struct.unpack
154 154
155 155 _magicstring = 'HG20'
156 156
157 157 _fstreamparamsize = '>H'
158 158 _fpartheadersize = '>H'
159 159 _fparttypesize = '>B'
160 160 _fpartid = '>I'
161 161 _fpayloadsize = '>I'
162 162 _fpartparamcount = '>BB'
163 163
164 164 def _makefpartparamsizes(nbparams):
165 165 """return a struct format to read part parameter sizes
166 166
167 167 The number parameters is variable so we need to build that format
168 168 dynamically.
169 169 """
170 170 return '>'+('BB'*nbparams)
171 171
172 172 parthandlermapping = {}
173 173
174 174 def parthandler(parttype):
175 175 """decorator that register a function as a bundle2 part handler
176 176
177 177 eg::
178 178
179 179 @parthandler('myparttype')
180 180 def myparttypehandler(...):
181 181 '''process a part of type "my part".'''
182 182 ...
183 183 """
184 184 def _decorator(func):
185 185 lparttype = parttype.lower() # enforce lower case matching.
186 186 assert lparttype not in parthandlermapping
187 187 parthandlermapping[lparttype] = func
188 188 return func
189 189 return _decorator
190 190
191 191 class unbundlerecords(object):
192 192 """keep record of what happens during and unbundle
193 193
194 194 New records are added using `records.add('cat', obj)`. Where 'cat' is a
195 195 category of record and obj is an arbitraty object.
196 196
197 197 `records['cat']` will return all entries of this category 'cat'.
198 198
199 199 Iterating on the object itself will yield `('category', obj)` tuples
200 200 for all entries.
201 201
202 202 All iterations happens in chronological order.
203 203 """
204 204
205 205 def __init__(self):
206 206 self._categories = {}
207 207 self._sequences = []
208 208 self._replies = {}
209 209
210 210 def add(self, category, entry, inreplyto=None):
211 211 """add a new record of a given category.
212 212
213 213 The entry can then be retrieved in the list returned by
214 214 self['category']."""
215 215 self._categories.setdefault(category, []).append(entry)
216 216 self._sequences.append((category, entry))
217 217 if inreplyto is not None:
218 218 self.getreplies(inreplyto).add(category, entry)
219 219
220 220 def getreplies(self, partid):
221 221 """get the subrecords that replies to a specific part"""
222 222 return self._replies.setdefault(partid, unbundlerecords())
223 223
224 224 def __getitem__(self, cat):
225 225 return tuple(self._categories.get(cat, ()))
226 226
227 227 def __iter__(self):
228 228 return iter(self._sequences)
229 229
230 230 def __len__(self):
231 231 return len(self._sequences)
232 232
233 233 def __nonzero__(self):
234 234 return bool(self._sequences)
235 235
236 236 class bundleoperation(object):
237 237 """an object that represents a single bundling process
238 238
239 239 Its purpose is to carry unbundle-related objects and states.
240 240
241 241 A new object should be created at the beginning of each bundle processing.
242 242 The object is to be returned by the processing function.
243 243
244 244 The object has very little content now it will ultimately contain:
245 245 * an access to the repo the bundle is applied to,
246 246 * a ui object,
247 247 * a way to retrieve a transaction to add changes to the repo,
248 248 * a way to record the result of processing each part,
249 249 * a way to construct a bundle response when applicable.
250 250 """
251 251
252 252 def __init__(self, repo, transactiongetter):
253 253 self.repo = repo
254 254 self.ui = repo.ui
255 255 self.records = unbundlerecords()
256 256 self.gettransaction = transactiongetter
257 257 self.reply = None
258 258
259 259 class TransactionUnavailable(RuntimeError):
260 260 pass
261 261
262 262 def _notransaction():
263 263 """default method to get a transaction while processing a bundle
264 264
265 265 Raise an exception to highlight the fact that no transaction was expected
266 266 to be created"""
267 267 raise TransactionUnavailable()
268 268
269 269 def processbundle(repo, unbundler, transactiongetter=_notransaction):
270 270 """This function process a bundle, apply effect to/from a repo
271 271
272 272 It iterates over each part then searches for and uses the proper handling
273 273 code to process the part. Parts are processed in order.
274 274
275 275 This is very early version of this function that will be strongly reworked
276 276 before final usage.
277 277
278 278 Unknown Mandatory part will abort the process.
279 279 """
280 280 op = bundleoperation(repo, transactiongetter)
281 281 # todo:
282 282 # - only create reply bundle if requested.
283 283 op.reply = bundle20(op.ui)
284 284 # todo:
285 285 # - replace this is a init function soon.
286 286 # - exception catching
287 287 unbundler.params
288 288 iterparts = iter(unbundler)
289 289 try:
290 290 for part in iterparts:
291 291 parttype = part.type
292 292 # part key are matched lower case
293 293 key = parttype.lower()
294 294 try:
295 295 handler = parthandlermapping[key]
296 296 op.ui.debug('found a handler for part %r\n' % parttype)
297 297 except KeyError:
298 298 if key != parttype: # mandatory parts
299 299 # todo:
300 300 # - use a more precise exception
301 301 raise
302 302 op.ui.debug('ignoring unknown advisory part %r\n' % key)
303 303 # todo:
304 304 # - consume the part once we use streaming
305 305 continue
306 306 handler(op, part)
307 307 except Exception:
308 308 for part in iterparts:
309 309 pass # consume the bundle content
310 310 raise
311 311 return op
312 312
313 313 class bundle20(object):
314 314 """represent an outgoing bundle2 container
315 315
316 316 Use the `addparam` method to add stream level parameter. and `addpart` to
317 317 populate it. Then call `getchunks` to retrieve all the binary chunks of
318 318 datathat compose the bundle2 container."""
319 319
320 320 def __init__(self, ui):
321 321 self.ui = ui
322 322 self._params = []
323 323 self._parts = []
324 324
325 325 def addparam(self, name, value=None):
326 326 """add a stream level parameter"""
327 327 if not name:
328 328 raise ValueError('empty parameter name')
329 329 if name[0] not in string.letters:
330 330 raise ValueError('non letter first character: %r' % name)
331 331 self._params.append((name, value))
332 332
333 333 def addpart(self, part):
334 334 """add a new part to the bundle2 container
335 335
336 336 Parts contains the actuall applicative payload."""
337 337 assert part.id is None
338 338 part.id = len(self._parts) # very cheap counter
339 339 self._parts.append(part)
340 340
341 341 def getchunks(self):
342 342 self.ui.debug('start emission of %s stream\n' % _magicstring)
343 343 yield _magicstring
344 344 param = self._paramchunk()
345 345 self.ui.debug('bundle parameter: %s\n' % param)
346 346 yield _pack(_fstreamparamsize, len(param))
347 347 if param:
348 348 yield param
349 349
350 350 self.ui.debug('start of parts\n')
351 351 for part in self._parts:
352 352 self.ui.debug('bundle part: "%s"\n' % part.type)
353 353 for chunk in part.getchunks():
354 354 yield chunk
355 355 self.ui.debug('end of bundle\n')
356 356 yield '\0\0'
357 357
358 358 def _paramchunk(self):
359 359 """return a encoded version of all stream parameters"""
360 360 blocks = []
361 361 for par, value in self._params:
362 362 par = urllib.quote(par)
363 363 if value is not None:
364 364 value = urllib.quote(value)
365 365 par = '%s=%s' % (par, value)
366 366 blocks.append(par)
367 367 return ' '.join(blocks)
368 368
369 369 class unbundle20(object):
370 370 """interpret a bundle2 stream
371 371
372 372 (this will eventually yield parts)"""
373 373
374 374 def __init__(self, ui, fp):
375 375 self.ui = ui
376 376 self._fp = fp
377 377 header = self._readexact(4)
378 378 magic, version = header[0:2], header[2:4]
379 379 if magic != 'HG':
380 380 raise util.Abort(_('not a Mercurial bundle'))
381 381 if version != '20':
382 382 raise util.Abort(_('unknown bundle version %s') % version)
383 383 self.ui.debug('start processing of %s stream\n' % header)
384 384
385 385 def _unpack(self, format):
386 386 """unpack this struct format from the stream"""
387 387 data = self._readexact(struct.calcsize(format))
388 388 return _unpack(format, data)
389 389
390 390 def _readexact(self, size):
391 391 """read exactly <size> bytes from the stream"""
392 392 return changegroup.readexactly(self._fp, size)
393 393
394 394 @util.propertycache
395 395 def params(self):
396 396 """dictionnary of stream level parameters"""
397 397 self.ui.debug('reading bundle2 stream parameters\n')
398 398 params = {}
399 399 paramssize = self._unpack(_fstreamparamsize)[0]
400 400 if paramssize:
401 401 for p in self._readexact(paramssize).split(' '):
402 402 p = p.split('=', 1)
403 403 p = [urllib.unquote(i) for i in p]
404 404 if len(p) < 2:
405 405 p.append(None)
406 406 self._processparam(*p)
407 407 params[p[0]] = p[1]
408 408 return params
409 409
410 410 def _processparam(self, name, value):
411 411 """process a parameter, applying its effect if needed
412 412
413 413 Parameter starting with a lower case letter are advisory and will be
414 414 ignored when unknown. Those starting with an upper case letter are
415 415 mandatory and will this function will raise a KeyError when unknown.
416 416
417 417 Note: no option are currently supported. Any input will be either
418 418 ignored or failing.
419 419 """
420 420 if not name:
421 421 raise ValueError('empty parameter name')
422 422 if name[0] not in string.letters:
423 423 raise ValueError('non letter first character: %r' % name)
424 424 # Some logic will be later added here to try to process the option for
425 425 # a dict of known parameter.
426 426 if name[0].islower():
427 427 self.ui.debug("ignoring unknown parameter %r\n" % name)
428 428 else:
429 429 raise KeyError(name)
430 430
431 431
432 432 def __iter__(self):
433 433 """yield all parts contained in the stream"""
434 434 # make sure param have been loaded
435 435 self.params
436 436 self.ui.debug('start extraction of bundle2 parts\n')
437 437 part = self._readpart()
438 438 while part is not None:
439 439 yield part
440 440 part = self._readpart()
441 441 self.ui.debug('end of bundle2 stream\n')
442 442
443 443 def _readpart(self):
444 444 """return None when an end of stream markers is reach"""
445 445
446 446 headersize = self._unpack(_fpartheadersize)[0]
447 447 self.ui.debug('part header size: %i\n' % headersize)
448 448 if not headersize:
449 449 return None
450 450 headerblock = self._readexact(headersize)
451 451 # some utility to help reading from the header block
452 452 self._offset = 0 # layer violation to have something easy to understand
453 453 def fromheader(size):
454 454 """return the next <size> byte from the header"""
455 455 offset = self._offset
456 456 data = headerblock[offset:(offset + size)]
457 457 self._offset = offset + size
458 458 return data
459 459 def unpackheader(format):
460 460 """read given format from header
461 461
462 462 This automatically compute the size of the format to read."""
463 463 data = fromheader(struct.calcsize(format))
464 464 return _unpack(format, data)
465 465
466 466 typesize = unpackheader(_fparttypesize)[0]
467 467 parttype = fromheader(typesize)
468 468 self.ui.debug('part type: "%s"\n' % parttype)
469 469 partid = unpackheader(_fpartid)[0]
470 470 self.ui.debug('part id: "%s"\n' % partid)
471 471 ## reading parameters
472 472 # param count
473 473 mancount, advcount = unpackheader(_fpartparamcount)
474 474 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
475 475 # param size
476 476 paramsizes = unpackheader(_makefpartparamsizes(mancount + advcount))
477 477 # make it a list of couple again
478 478 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
479 479 # split mandatory from advisory
480 480 mansizes = paramsizes[:mancount]
481 481 advsizes = paramsizes[mancount:]
482 482 # retrive param value
483 483 manparams = []
484 484 for key, value in mansizes:
485 485 manparams.append((fromheader(key), fromheader(value)))
486 486 advparams = []
487 487 for key, value in advsizes:
488 488 advparams.append((fromheader(key), fromheader(value)))
489 489 del self._offset # clean up layer, nobody saw anything.
490 490 ## part payload
491 491 payload = []
492 492 payloadsize = self._unpack(_fpayloadsize)[0]
493 493 self.ui.debug('payload chunk size: %i\n' % payloadsize)
494 494 while payloadsize:
495 495 payload.append(self._readexact(payloadsize))
496 496 payloadsize = self._unpack(_fpayloadsize)[0]
497 497 self.ui.debug('payload chunk size: %i\n' % payloadsize)
498 498 payload = ''.join(payload)
499 499 current = part(parttype, manparams, advparams, data=payload)
500 500 current.id = partid
501 501 return current
502 502
503 503
504 504 class part(object):
505 505 """A bundle2 part contains application level payload
506 506
507 507 The part `type` is used to route the part to the application level
508 508 handler.
509 509 """
510 510
511 511 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
512 512 data=''):
513 513 self.id = None
514 514 self.type = parttype
515 515 self.data = data
516 516 self.mandatoryparams = mandatoryparams
517 517 self.advisoryparams = advisoryparams
518 518
519 519 def getchunks(self):
520 520 #### header
521 521 ## parttype
522 522 header = [_pack(_fparttypesize, len(self.type)),
523 523 self.type, _pack(_fpartid, self.id),
524 524 ]
525 525 ## parameters
526 526 # count
527 527 manpar = self.mandatoryparams
528 528 advpar = self.advisoryparams
529 529 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
530 530 # size
531 531 parsizes = []
532 532 for key, value in manpar:
533 533 parsizes.append(len(key))
534 534 parsizes.append(len(value))
535 535 for key, value in advpar:
536 536 parsizes.append(len(key))
537 537 parsizes.append(len(value))
538 538 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
539 539 header.append(paramsizes)
540 540 # key, value
541 541 for key, value in manpar:
542 542 header.append(key)
543 543 header.append(value)
544 544 for key, value in advpar:
545 545 header.append(key)
546 546 header.append(value)
547 547 ## finalize header
548 548 headerchunk = ''.join(header)
549 549 yield _pack(_fpartheadersize, len(headerchunk))
550 550 yield headerchunk
551 551 ## payload
552 for chunk in self._payloadchunks():
553 yield _pack(_fpayloadsize, len(chunk))
554 yield chunk
555 # end of payload
556 yield _pack(_fpayloadsize, 0)
557
558 def _payloadchunks(self):
559 """yield chunks of a the part payload
560
561 Exists to handle the different methods to provide data to a part."""
552 562 # we only support fixed size data now.
553 563 # This will be improved in the future.
554 564 if len(self.data):
555 yield _pack(_fpayloadsize, len(self.data))
556 565 yield self.data
557 # end of payload
558 yield _pack(_fpayloadsize, 0)
559 566
560 567 @parthandler('changegroup')
561 568 def handlechangegroup(op, inpart):
562 569 """apply a changegroup part on the repo
563 570
564 571 This is a very early implementation that will massive rework before being
565 572 inflicted to any end-user.
566 573 """
567 574 # Make sure we trigger a transaction creation
568 575 #
569 576 # The addchangegroup function will get a transaction object by itself, but
570 577 # we need to make sure we trigger the creation of a transaction object used
571 578 # for the whole processing scope.
572 579 op.gettransaction()
573 580 data = StringIO.StringIO(inpart.data)
574 581 data.seek(0)
575 582 cg = changegroup.readbundle(data, 'bundle2part')
576 583 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
577 584 op.records.add('changegroup', {'return': ret})
578 585 if op.reply is not None:
579 586 # This is definitly not the final form of this
580 587 # return. But one need to start somewhere.
581 588 op.reply.addpart(part('reply:changegroup', (),
582 589 [('in-reply-to', str(inpart.id)),
583 590 ('return', '%i' % ret)]))
584 591
585 592 @parthandler('reply:changegroup')
586 593 def handlechangegroup(op, inpart):
587 594 p = dict(inpart.advisoryparams)
588 595 ret = int(p['return'])
589 596 op.records.add('changegroup', {'return': ret}, int(p['in-reply-to']))
590 597
General Comments 0
You need to be logged in to leave comments. Login now