##// END OF EJS Templates
bundle2: add reply awareness to unbundlerecords...
Pierre-Yves David -
r20996:ed3c5e18 default
parent child Browse files
Show More
@@ -1,568 +1,575 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: (16 bits integer)
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. parameter with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safefly ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remains simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a the bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: (16 bits inter)
68 68
69 69 The total number of Bytes used by the part headers. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :typename: alphanumerical part name
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitraty content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 :payload:
117 117
118 118 payload is a series of `<chunksize><chunkdata>`.
119 119
120 120 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
121 121 `chunksize` says)` The payload part is concluded by a zero size chunk.
122 122
123 123 The current implementation always produces either zero or one chunk.
124 124 This is an implementation limitation that will ultimatly be lifted.
125 125
126 126 Bundle processing
127 127 ============================
128 128
129 129 Each part is processed in order using a "part handler". Handler are registered
130 130 for a certain part type.
131 131
132 132 The matching of a part to its handler is case insensitive. The case of the
133 133 part type is used to know if a part is mandatory or advisory. If the Part type
134 134 contains any uppercase char it is considered mandatory. When no handler is
135 135 known for a Mandatory part, the process is aborted and an exception is raised.
136 136 If the part is advisory and no handler is known, the part is ignored. When the
137 137 process is aborted, the full bundle is still read from the stream to keep the
138 138 channel usable. But none of the part read from an abort are processed. In the
139 139 future, dropping the stream may become an option for channel we do not care to
140 140 preserve.
141 141 """
142 142
143 143 import util
144 144 import struct
145 145 import urllib
146 146 import string
147 147 import StringIO
148 148
149 149 import changegroup
150 150 from i18n import _
151 151
152 152 _pack = struct.pack
153 153 _unpack = struct.unpack
154 154
155 155 _magicstring = 'HG20'
156 156
157 157 _fstreamparamsize = '>H'
158 158 _fpartheadersize = '>H'
159 159 _fparttypesize = '>B'
160 160 _fpartid = '>I'
161 161 _fpayloadsize = '>I'
162 162 _fpartparamcount = '>BB'
163 163
164 164 def _makefpartparamsizes(nbparams):
165 165 """return a struct format to read part parameter sizes
166 166
167 167 The number parameters is variable so we need to build that format
168 168 dynamically.
169 169 """
170 170 return '>'+('BB'*nbparams)
171 171
172 172 parthandlermapping = {}
173 173
174 174 def parthandler(parttype):
175 175 """decorator that register a function as a bundle2 part handler
176 176
177 177 eg::
178 178
179 179 @parthandler('myparttype')
180 180 def myparttypehandler(...):
181 181 '''process a part of type "my part".'''
182 182 ...
183 183 """
184 184 def _decorator(func):
185 185 lparttype = parttype.lower() # enforce lower case matching.
186 186 assert lparttype not in parthandlermapping
187 187 parthandlermapping[lparttype] = func
188 188 return func
189 189 return _decorator
190 190
191 191 class unbundlerecords(object):
192 192 """keep record of what happens during and unbundle
193 193
194 194 New records are added using `records.add('cat', obj)`. Where 'cat' is a
195 195 category of record and obj is an arbitraty object.
196 196
197 197 `records['cat']` will return all entries of this category 'cat'.
198 198
199 199 Iterating on the object itself will yield `('category', obj)` tuples
200 200 for all entries.
201 201
202 202 All iterations happens in chronological order.
203 203 """
204 204
205 205 def __init__(self):
206 206 self._categories = {}
207 207 self._sequences = []
208 self._replies = {}
208 209
209 def add(self, category, entry):
210 def add(self, category, entry, inreplyto=None):
210 211 """add a new record of a given category.
211 212
212 213 The entry can then be retrieved in the list returned by
213 214 self['category']."""
214 215 self._categories.setdefault(category, []).append(entry)
215 216 self._sequences.append((category, entry))
217 if inreplyto is not None:
218 self.getreplies(inreplyto).add(category, entry)
219
220 def getreplies(self, partid):
221 """get the subrecords that replies to a specific part"""
222 return self._replies.setdefault(partid, unbundlerecords())
216 223
217 224 def __getitem__(self, cat):
218 225 return tuple(self._categories.get(cat, ()))
219 226
220 227 def __iter__(self):
221 228 return iter(self._sequences)
222 229
223 230 def __len__(self):
224 231 return len(self._sequences)
225 232
226 233 def __nonzero__(self):
227 234 return bool(self._sequences)
228 235
229 236 class bundleoperation(object):
230 237 """an object that represents a single bundling process
231 238
232 239 Its purpose is to carry unbundle-related objects and states.
233 240
234 241 A new object should be created at the beginning of each bundle processing.
235 242 The object is to be returned by the processing function.
236 243
237 244 The object has very little content now it will ultimately contain:
238 245 * an access to the repo the bundle is applied to,
239 246 * a ui object,
240 247 * a way to retrieve a transaction to add changes to the repo,
241 248 * a way to record the result of processing each part,
242 249 * a way to construct a bundle response when applicable.
243 250 """
244 251
245 252 def __init__(self, repo, transactiongetter):
246 253 self.repo = repo
247 254 self.ui = repo.ui
248 255 self.records = unbundlerecords()
249 256 self.gettransaction = transactiongetter
250 257
251 258 class TransactionUnavailable(RuntimeError):
252 259 pass
253 260
254 261 def _notransaction():
255 262 """default method to get a transaction while processing a bundle
256 263
257 264 Raise an exception to highlight the fact that no transaction was expected
258 265 to be created"""
259 266 raise TransactionUnavailable()
260 267
261 268 def processbundle(repo, unbundler, transactiongetter=_notransaction):
262 269 """This function process a bundle, apply effect to/from a repo
263 270
264 271 It iterates over each part then searches for and uses the proper handling
265 272 code to process the part. Parts are processed in order.
266 273
267 274 This is very early version of this function that will be strongly reworked
268 275 before final usage.
269 276
270 277 Unknown Mandatory part will abort the process.
271 278 """
272 279 op = bundleoperation(repo, transactiongetter)
273 280 # todo:
274 281 # - replace this is a init function soon.
275 282 # - exception catching
276 283 unbundler.params
277 284 iterparts = iter(unbundler)
278 285 try:
279 286 for part in iterparts:
280 287 parttype = part.type
281 288 # part key are matched lower case
282 289 key = parttype.lower()
283 290 try:
284 291 handler = parthandlermapping[key]
285 292 op.ui.debug('found a handler for part %r\n' % parttype)
286 293 except KeyError:
287 294 if key != parttype: # mandatory parts
288 295 # todo:
289 296 # - use a more precise exception
290 297 raise
291 298 op.ui.debug('ignoring unknown advisory part %r\n' % key)
292 299 # todo:
293 300 # - consume the part once we use streaming
294 301 continue
295 302 handler(op, part)
296 303 except Exception:
297 304 for part in iterparts:
298 305 pass # consume the bundle content
299 306 raise
300 307 return op
301 308
302 309 class bundle20(object):
303 310 """represent an outgoing bundle2 container
304 311
305 312 Use the `addparam` method to add stream level parameter. and `addpart` to
306 313 populate it. Then call `getchunks` to retrieve all the binary chunks of
307 314 datathat compose the bundle2 container."""
308 315
309 316 def __init__(self, ui):
310 317 self.ui = ui
311 318 self._params = []
312 319 self._parts = []
313 320
314 321 def addparam(self, name, value=None):
315 322 """add a stream level parameter"""
316 323 if not name:
317 324 raise ValueError('empty parameter name')
318 325 if name[0] not in string.letters:
319 326 raise ValueError('non letter first character: %r' % name)
320 327 self._params.append((name, value))
321 328
322 329 def addpart(self, part):
323 330 """add a new part to the bundle2 container
324 331
325 332 Parts contains the actuall applicative payload."""
326 333 assert part.id is None
327 334 part.id = len(self._parts) # very cheap counter
328 335 self._parts.append(part)
329 336
330 337 def getchunks(self):
331 338 self.ui.debug('start emission of %s stream\n' % _magicstring)
332 339 yield _magicstring
333 340 param = self._paramchunk()
334 341 self.ui.debug('bundle parameter: %s\n' % param)
335 342 yield _pack(_fstreamparamsize, len(param))
336 343 if param:
337 344 yield param
338 345
339 346 self.ui.debug('start of parts\n')
340 347 for part in self._parts:
341 348 self.ui.debug('bundle part: "%s"\n' % part.type)
342 349 for chunk in part.getchunks():
343 350 yield chunk
344 351 self.ui.debug('end of bundle\n')
345 352 yield '\0\0'
346 353
347 354 def _paramchunk(self):
348 355 """return a encoded version of all stream parameters"""
349 356 blocks = []
350 357 for par, value in self._params:
351 358 par = urllib.quote(par)
352 359 if value is not None:
353 360 value = urllib.quote(value)
354 361 par = '%s=%s' % (par, value)
355 362 blocks.append(par)
356 363 return ' '.join(blocks)
357 364
358 365 class unbundle20(object):
359 366 """interpret a bundle2 stream
360 367
361 368 (this will eventually yield parts)"""
362 369
363 370 def __init__(self, ui, fp):
364 371 self.ui = ui
365 372 self._fp = fp
366 373 header = self._readexact(4)
367 374 magic, version = header[0:2], header[2:4]
368 375 if magic != 'HG':
369 376 raise util.Abort(_('not a Mercurial bundle'))
370 377 if version != '20':
371 378 raise util.Abort(_('unknown bundle version %s') % version)
372 379 self.ui.debug('start processing of %s stream\n' % header)
373 380
374 381 def _unpack(self, format):
375 382 """unpack this struct format from the stream"""
376 383 data = self._readexact(struct.calcsize(format))
377 384 return _unpack(format, data)
378 385
379 386 def _readexact(self, size):
380 387 """read exactly <size> bytes from the stream"""
381 388 return changegroup.readexactly(self._fp, size)
382 389
383 390 @util.propertycache
384 391 def params(self):
385 392 """dictionnary of stream level parameters"""
386 393 self.ui.debug('reading bundle2 stream parameters\n')
387 394 params = {}
388 395 paramssize = self._unpack(_fstreamparamsize)[0]
389 396 if paramssize:
390 397 for p in self._readexact(paramssize).split(' '):
391 398 p = p.split('=', 1)
392 399 p = [urllib.unquote(i) for i in p]
393 400 if len(p) < 2:
394 401 p.append(None)
395 402 self._processparam(*p)
396 403 params[p[0]] = p[1]
397 404 return params
398 405
399 406 def _processparam(self, name, value):
400 407 """process a parameter, applying its effect if needed
401 408
402 409 Parameter starting with a lower case letter are advisory and will be
403 410 ignored when unknown. Those starting with an upper case letter are
404 411 mandatory and will this function will raise a KeyError when unknown.
405 412
406 413 Note: no option are currently supported. Any input will be either
407 414 ignored or failing.
408 415 """
409 416 if not name:
410 417 raise ValueError('empty parameter name')
411 418 if name[0] not in string.letters:
412 419 raise ValueError('non letter first character: %r' % name)
413 420 # Some logic will be later added here to try to process the option for
414 421 # a dict of known parameter.
415 422 if name[0].islower():
416 423 self.ui.debug("ignoring unknown parameter %r\n" % name)
417 424 else:
418 425 raise KeyError(name)
419 426
420 427
421 428 def __iter__(self):
422 429 """yield all parts contained in the stream"""
423 430 # make sure param have been loaded
424 431 self.params
425 432 self.ui.debug('start extraction of bundle2 parts\n')
426 433 part = self._readpart()
427 434 while part is not None:
428 435 yield part
429 436 part = self._readpart()
430 437 self.ui.debug('end of bundle2 stream\n')
431 438
432 439 def _readpart(self):
433 440 """return None when an end of stream markers is reach"""
434 441
435 442 headersize = self._unpack(_fpartheadersize)[0]
436 443 self.ui.debug('part header size: %i\n' % headersize)
437 444 if not headersize:
438 445 return None
439 446 headerblock = self._readexact(headersize)
440 447 # some utility to help reading from the header block
441 448 self._offset = 0 # layer violation to have something easy to understand
442 449 def fromheader(size):
443 450 """return the next <size> byte from the header"""
444 451 offset = self._offset
445 452 data = headerblock[offset:(offset + size)]
446 453 self._offset = offset + size
447 454 return data
448 455 def unpackheader(format):
449 456 """read given format from header
450 457
451 458 This automatically compute the size of the format to read."""
452 459 data = fromheader(struct.calcsize(format))
453 460 return _unpack(format, data)
454 461
455 462 typesize = unpackheader(_fparttypesize)[0]
456 463 parttype = fromheader(typesize)
457 464 self.ui.debug('part type: "%s"\n' % parttype)
458 465 partid = unpackheader(_fpartid)[0]
459 466 self.ui.debug('part id: "%s"\n' % partid)
460 467 ## reading parameters
461 468 # param count
462 469 mancount, advcount = unpackheader(_fpartparamcount)
463 470 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
464 471 # param size
465 472 paramsizes = unpackheader(_makefpartparamsizes(mancount + advcount))
466 473 # make it a list of couple again
467 474 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
468 475 # split mandatory from advisory
469 476 mansizes = paramsizes[:mancount]
470 477 advsizes = paramsizes[mancount:]
471 478 # retrive param value
472 479 manparams = []
473 480 for key, value in mansizes:
474 481 manparams.append((fromheader(key), fromheader(value)))
475 482 advparams = []
476 483 for key, value in advsizes:
477 484 advparams.append((fromheader(key), fromheader(value)))
478 485 del self._offset # clean up layer, nobody saw anything.
479 486 ## part payload
480 487 payload = []
481 488 payloadsize = self._unpack(_fpayloadsize)[0]
482 489 self.ui.debug('payload chunk size: %i\n' % payloadsize)
483 490 while payloadsize:
484 491 payload.append(self._readexact(payloadsize))
485 492 payloadsize = self._unpack(_fpayloadsize)[0]
486 493 self.ui.debug('payload chunk size: %i\n' % payloadsize)
487 494 payload = ''.join(payload)
488 495 current = part(parttype, manparams, advparams, data=payload)
489 496 current.id = partid
490 497 return current
491 498
492 499
493 500 class part(object):
494 501 """A bundle2 part contains application level payload
495 502
496 503 The part `type` is used to route the part to the application level
497 504 handler.
498 505 """
499 506
500 507 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
501 508 data=''):
502 509 self.id = None
503 510 self.type = parttype
504 511 self.data = data
505 512 self.mandatoryparams = mandatoryparams
506 513 self.advisoryparams = advisoryparams
507 514
508 515 def getchunks(self):
509 516 #### header
510 517 ## parttype
511 518 header = [_pack(_fparttypesize, len(self.type)),
512 519 self.type, _pack(_fpartid, self.id),
513 520 ]
514 521 ## parameters
515 522 # count
516 523 manpar = self.mandatoryparams
517 524 advpar = self.advisoryparams
518 525 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
519 526 # size
520 527 parsizes = []
521 528 for key, value in manpar:
522 529 parsizes.append(len(key))
523 530 parsizes.append(len(value))
524 531 for key, value in advpar:
525 532 parsizes.append(len(key))
526 533 parsizes.append(len(value))
527 534 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
528 535 header.append(paramsizes)
529 536 # key, value
530 537 for key, value in manpar:
531 538 header.append(key)
532 539 header.append(value)
533 540 for key, value in advpar:
534 541 header.append(key)
535 542 header.append(value)
536 543 ## finalize header
537 544 headerchunk = ''.join(header)
538 545 yield _pack(_fpartheadersize, len(headerchunk))
539 546 yield headerchunk
540 547 ## payload
541 548 # we only support fixed size data now.
542 549 # This will be improved in the future.
543 550 if len(self.data):
544 551 yield _pack(_fpayloadsize, len(self.data))
545 552 yield self.data
546 553 # end of payload
547 554 yield _pack(_fpayloadsize, 0)
548 555
549 556 @parthandler('changegroup')
550 557 def handlechangegroup(op, part):
551 558 """apply a changegroup part on the repo
552 559
553 560 This is a very early implementation that will massive rework before being
554 561 inflicted to any end-user.
555 562 """
556 563 # Make sure we trigger a transaction creation
557 564 #
558 565 # The addchangegroup function will get a transaction object by itself, but
559 566 # we need to make sure we trigger the creation of a transaction object used
560 567 # for the whole processing scope.
561 568 op.gettransaction()
562 569 data = StringIO.StringIO(part.data)
563 570 data.seek(0)
564 571 cg = changegroup.readbundle(data, 'bundle2part')
565 572 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
566 573 op.records.add('changegroup', {'return': ret})
567 574
568 575
General Comments 0
You need to be logged in to leave comments. Login now