##// END OF EJS Templates
bundle2: remove an explicit packing...
Pierre-Yves David -
r22661:9ea2913e default
parent child Browse files
Show More
@@ -1,946 +1,946
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: (16 bits integer)
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: (16 bits inter)
68 68
69 69 The total number of Bytes used by the part headers. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 Bundle processing
129 129 ============================
130 130
131 131 Each part is processed in order using a "part handler". Handler are registered
132 132 for a certain part type.
133 133
134 134 The matching of a part to its handler is case insensitive. The case of the
135 135 part type is used to know if a part is mandatory or advisory. If the Part type
136 136 contains any uppercase char it is considered mandatory. When no handler is
137 137 known for a Mandatory part, the process is aborted and an exception is raised.
138 138 If the part is advisory and no handler is known, the part is ignored. When the
139 139 process is aborted, the full bundle is still read from the stream to keep the
140 140 channel usable. But none of the part read from an abort are processed. In the
141 141 future, dropping the stream may become an option for channel we do not care to
142 142 preserve.
143 143 """
144 144
145 145 import util
146 146 import struct
147 147 import urllib
148 148 import string
149 149 import obsolete
150 150 import pushkey
151 151
152 152 import changegroup, error
153 153 from i18n import _
154 154
155 155 _pack = struct.pack
156 156 _unpack = struct.unpack
157 157
158 158 _magicstring = 'HG2X'
159 159
160 160 _fstreamparamsize = '>H'
161 161 _fpartheadersize = '>H'
162 162 _fparttypesize = '>B'
163 163 _fpartid = '>I'
164 164 _fpayloadsize = '>I'
165 165 _fpartparamcount = '>BB'
166 166
167 167 preferedchunksize = 4096
168 168
169 169 def _makefpartparamsizes(nbparams):
170 170 """return a struct format to read part parameter sizes
171 171
172 172 The number parameters is variable so we need to build that format
173 173 dynamically.
174 174 """
175 175 return '>'+('BB'*nbparams)
176 176
177 177 parthandlermapping = {}
178 178
179 179 def parthandler(parttype, params=()):
180 180 """decorator that register a function as a bundle2 part handler
181 181
182 182 eg::
183 183
184 184 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
185 185 def myparttypehandler(...):
186 186 '''process a part of type "my part".'''
187 187 ...
188 188 """
189 189 def _decorator(func):
190 190 lparttype = parttype.lower() # enforce lower case matching.
191 191 assert lparttype not in parthandlermapping
192 192 parthandlermapping[lparttype] = func
193 193 func.params = frozenset(params)
194 194 return func
195 195 return _decorator
196 196
197 197 class unbundlerecords(object):
198 198 """keep record of what happens during and unbundle
199 199
200 200 New records are added using `records.add('cat', obj)`. Where 'cat' is a
201 201 category of record and obj is an arbitrary object.
202 202
203 203 `records['cat']` will return all entries of this category 'cat'.
204 204
205 205 Iterating on the object itself will yield `('category', obj)` tuples
206 206 for all entries.
207 207
208 208 All iterations happens in chronological order.
209 209 """
210 210
211 211 def __init__(self):
212 212 self._categories = {}
213 213 self._sequences = []
214 214 self._replies = {}
215 215
216 216 def add(self, category, entry, inreplyto=None):
217 217 """add a new record of a given category.
218 218
219 219 The entry can then be retrieved in the list returned by
220 220 self['category']."""
221 221 self._categories.setdefault(category, []).append(entry)
222 222 self._sequences.append((category, entry))
223 223 if inreplyto is not None:
224 224 self.getreplies(inreplyto).add(category, entry)
225 225
226 226 def getreplies(self, partid):
227 227 """get the subrecords that replies to a specific part"""
228 228 return self._replies.setdefault(partid, unbundlerecords())
229 229
230 230 def __getitem__(self, cat):
231 231 return tuple(self._categories.get(cat, ()))
232 232
233 233 def __iter__(self):
234 234 return iter(self._sequences)
235 235
236 236 def __len__(self):
237 237 return len(self._sequences)
238 238
239 239 def __nonzero__(self):
240 240 return bool(self._sequences)
241 241
242 242 class bundleoperation(object):
243 243 """an object that represents a single bundling process
244 244
245 245 Its purpose is to carry unbundle-related objects and states.
246 246
247 247 A new object should be created at the beginning of each bundle processing.
248 248 The object is to be returned by the processing function.
249 249
250 250 The object has very little content now it will ultimately contain:
251 251 * an access to the repo the bundle is applied to,
252 252 * a ui object,
253 253 * a way to retrieve a transaction to add changes to the repo,
254 254 * a way to record the result of processing each part,
255 255 * a way to construct a bundle response when applicable.
256 256 """
257 257
258 258 def __init__(self, repo, transactiongetter):
259 259 self.repo = repo
260 260 self.ui = repo.ui
261 261 self.records = unbundlerecords()
262 262 self.gettransaction = transactiongetter
263 263 self.reply = None
264 264
265 265 class TransactionUnavailable(RuntimeError):
266 266 pass
267 267
268 268 def _notransaction():
269 269 """default method to get a transaction while processing a bundle
270 270
271 271 Raise an exception to highlight the fact that no transaction was expected
272 272 to be created"""
273 273 raise TransactionUnavailable()
274 274
275 275 def processbundle(repo, unbundler, transactiongetter=_notransaction):
276 276 """This function process a bundle, apply effect to/from a repo
277 277
278 278 It iterates over each part then searches for and uses the proper handling
279 279 code to process the part. Parts are processed in order.
280 280
281 281 This is very early version of this function that will be strongly reworked
282 282 before final usage.
283 283
284 284 Unknown Mandatory part will abort the process.
285 285 """
286 286 op = bundleoperation(repo, transactiongetter)
287 287 # todo:
288 288 # - replace this is a init function soon.
289 289 # - exception catching
290 290 unbundler.params
291 291 iterparts = unbundler.iterparts()
292 292 part = None
293 293 try:
294 294 for part in iterparts:
295 295 parttype = part.type
296 296 # part key are matched lower case
297 297 key = parttype.lower()
298 298 try:
299 299 handler = parthandlermapping.get(key)
300 300 if handler is None:
301 301 raise error.BundleValueError(parttype=key)
302 302 op.ui.debug('found a handler for part %r\n' % parttype)
303 303 unknownparams = part.mandatorykeys - handler.params
304 304 if unknownparams:
305 305 unknownparams = list(unknownparams)
306 306 unknownparams.sort()
307 307 raise error.BundleValueError(parttype=key,
308 308 params=unknownparams)
309 309 except error.BundleValueError, exc:
310 310 if key != parttype: # mandatory parts
311 311 raise
312 312 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
313 313 # consuming the part
314 314 part.read()
315 315 continue
316 316
317 317
318 318 # handler is called outside the above try block so that we don't
319 319 # risk catching KeyErrors from anything other than the
320 320 # parthandlermapping lookup (any KeyError raised by handler()
321 321 # itself represents a defect of a different variety).
322 322 output = None
323 323 if op.reply is not None:
324 324 op.ui.pushbuffer(error=True)
325 325 output = ''
326 326 try:
327 327 handler(op, part)
328 328 finally:
329 329 if output is not None:
330 330 output = op.ui.popbuffer()
331 331 if output:
332 332 outpart = op.reply.newpart('b2x:output', data=output)
333 333 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
334 334 part.read()
335 335 except Exception, exc:
336 336 if part is not None:
337 337 # consume the bundle content
338 338 part.read()
339 339 for part in iterparts:
340 340 # consume the bundle content
341 341 part.read()
342 342 # Small hack to let caller code distinguish exceptions from bundle2
343 343 # processing fron the ones from bundle1 processing. This is mostly
344 344 # needed to handle different return codes to unbundle according to the
345 345 # type of bundle. We should probably clean up or drop this return code
346 346 # craziness in a future version.
347 347 exc.duringunbundle2 = True
348 348 raise
349 349 return op
350 350
351 351 def decodecaps(blob):
352 352 """decode a bundle2 caps bytes blob into a dictionnary
353 353
354 354 The blob is a list of capabilities (one per line)
355 355 Capabilities may have values using a line of the form::
356 356
357 357 capability=value1,value2,value3
358 358
359 359 The values are always a list."""
360 360 caps = {}
361 361 for line in blob.splitlines():
362 362 if not line:
363 363 continue
364 364 if '=' not in line:
365 365 key, vals = line, ()
366 366 else:
367 367 key, vals = line.split('=', 1)
368 368 vals = vals.split(',')
369 369 key = urllib.unquote(key)
370 370 vals = [urllib.unquote(v) for v in vals]
371 371 caps[key] = vals
372 372 return caps
373 373
374 374 def encodecaps(caps):
375 375 """encode a bundle2 caps dictionary into a bytes blob"""
376 376 chunks = []
377 377 for ca in sorted(caps):
378 378 vals = caps[ca]
379 379 ca = urllib.quote(ca)
380 380 vals = [urllib.quote(v) for v in vals]
381 381 if vals:
382 382 ca = "%s=%s" % (ca, ','.join(vals))
383 383 chunks.append(ca)
384 384 return '\n'.join(chunks)
385 385
386 386 class bundle20(object):
387 387 """represent an outgoing bundle2 container
388 388
389 389 Use the `addparam` method to add stream level parameter. and `newpart` to
390 390 populate it. Then call `getchunks` to retrieve all the binary chunks of
391 391 data that compose the bundle2 container."""
392 392
393 393 def __init__(self, ui, capabilities=()):
394 394 self.ui = ui
395 395 self._params = []
396 396 self._parts = []
397 397 self.capabilities = dict(capabilities)
398 398
399 399 @property
400 400 def nbparts(self):
401 401 """total number of parts added to the bundler"""
402 402 return len(self._parts)
403 403
404 404 # methods used to defines the bundle2 content
405 405 def addparam(self, name, value=None):
406 406 """add a stream level parameter"""
407 407 if not name:
408 408 raise ValueError('empty parameter name')
409 409 if name[0] not in string.letters:
410 410 raise ValueError('non letter first character: %r' % name)
411 411 self._params.append((name, value))
412 412
413 413 def addpart(self, part):
414 414 """add a new part to the bundle2 container
415 415
416 416 Parts contains the actual applicative payload."""
417 417 assert part.id is None
418 418 part.id = len(self._parts) # very cheap counter
419 419 self._parts.append(part)
420 420
421 421 def newpart(self, typeid, *args, **kwargs):
422 422 """create a new part and add it to the containers
423 423
424 424 As the part is directly added to the containers. For now, this means
425 425 that any failure to properly initialize the part after calling
426 426 ``newpart`` should result in a failure of the whole bundling process.
427 427
428 428 You can still fall back to manually create and add if you need better
429 429 control."""
430 430 part = bundlepart(typeid, *args, **kwargs)
431 431 self.addpart(part)
432 432 return part
433 433
434 434 # methods used to generate the bundle2 stream
435 435 def getchunks(self):
436 436 self.ui.debug('start emission of %s stream\n' % _magicstring)
437 437 yield _magicstring
438 438 param = self._paramchunk()
439 439 self.ui.debug('bundle parameter: %s\n' % param)
440 440 yield _pack(_fstreamparamsize, len(param))
441 441 if param:
442 442 yield param
443 443
444 444 self.ui.debug('start of parts\n')
445 445 for part in self._parts:
446 446 self.ui.debug('bundle part: "%s"\n' % part.type)
447 447 for chunk in part.getchunks():
448 448 yield chunk
449 449 self.ui.debug('end of bundle\n')
450 yield '\0\0'
450 yield _pack(_fpartheadersize, 0)
451 451
452 452 def _paramchunk(self):
453 453 """return a encoded version of all stream parameters"""
454 454 blocks = []
455 455 for par, value in self._params:
456 456 par = urllib.quote(par)
457 457 if value is not None:
458 458 value = urllib.quote(value)
459 459 par = '%s=%s' % (par, value)
460 460 blocks.append(par)
461 461 return ' '.join(blocks)
462 462
463 463 class unpackermixin(object):
464 464 """A mixin to extract bytes and struct data from a stream"""
465 465
466 466 def __init__(self, fp):
467 467 self._fp = fp
468 468
469 469 def _unpack(self, format):
470 470 """unpack this struct format from the stream"""
471 471 data = self._readexact(struct.calcsize(format))
472 472 return _unpack(format, data)
473 473
474 474 def _readexact(self, size):
475 475 """read exactly <size> bytes from the stream"""
476 476 return changegroup.readexactly(self._fp, size)
477 477
478 478
479 479 class unbundle20(unpackermixin):
480 480 """interpret a bundle2 stream
481 481
482 482 This class is fed with a binary stream and yields parts through its
483 483 `iterparts` methods."""
484 484
485 485 def __init__(self, ui, fp, header=None):
486 486 """If header is specified, we do not read it out of the stream."""
487 487 self.ui = ui
488 488 super(unbundle20, self).__init__(fp)
489 489 if header is None:
490 490 header = self._readexact(4)
491 491 magic, version = header[0:2], header[2:4]
492 492 if magic != 'HG':
493 493 raise util.Abort(_('not a Mercurial bundle'))
494 494 if version != '2X':
495 495 raise util.Abort(_('unknown bundle version %s') % version)
496 496 self.ui.debug('start processing of %s stream\n' % header)
497 497
498 498 @util.propertycache
499 499 def params(self):
500 500 """dictionary of stream level parameters"""
501 501 self.ui.debug('reading bundle2 stream parameters\n')
502 502 params = {}
503 503 paramssize = self._unpack(_fstreamparamsize)[0]
504 504 if paramssize:
505 505 for p in self._readexact(paramssize).split(' '):
506 506 p = p.split('=', 1)
507 507 p = [urllib.unquote(i) for i in p]
508 508 if len(p) < 2:
509 509 p.append(None)
510 510 self._processparam(*p)
511 511 params[p[0]] = p[1]
512 512 return params
513 513
514 514 def _processparam(self, name, value):
515 515 """process a parameter, applying its effect if needed
516 516
517 517 Parameter starting with a lower case letter are advisory and will be
518 518 ignored when unknown. Those starting with an upper case letter are
519 519 mandatory and will this function will raise a KeyError when unknown.
520 520
521 521 Note: no option are currently supported. Any input will be either
522 522 ignored or failing.
523 523 """
524 524 if not name:
525 525 raise ValueError('empty parameter name')
526 526 if name[0] not in string.letters:
527 527 raise ValueError('non letter first character: %r' % name)
528 528 # Some logic will be later added here to try to process the option for
529 529 # a dict of known parameter.
530 530 if name[0].islower():
531 531 self.ui.debug("ignoring unknown parameter %r\n" % name)
532 532 else:
533 533 raise error.BundleValueError(params=(name,))
534 534
535 535
536 536 def iterparts(self):
537 537 """yield all parts contained in the stream"""
538 538 # make sure param have been loaded
539 539 self.params
540 540 self.ui.debug('start extraction of bundle2 parts\n')
541 541 headerblock = self._readpartheader()
542 542 while headerblock is not None:
543 543 part = unbundlepart(self.ui, headerblock, self._fp)
544 544 yield part
545 545 headerblock = self._readpartheader()
546 546 self.ui.debug('end of bundle2 stream\n')
547 547
548 548 def _readpartheader(self):
549 549 """reads a part header size and return the bytes blob
550 550
551 551 returns None if empty"""
552 552 headersize = self._unpack(_fpartheadersize)[0]
553 553 self.ui.debug('part header size: %i\n' % headersize)
554 554 if headersize:
555 555 return self._readexact(headersize)
556 556 return None
557 557
558 558
559 559 class bundlepart(object):
560 560 """A bundle2 part contains application level payload
561 561
562 562 The part `type` is used to route the part to the application level
563 563 handler.
564 564
565 565 The part payload is contained in ``part.data``. It could be raw bytes or a
566 566 generator of byte chunks.
567 567
568 568 You can add parameters to the part using the ``addparam`` method.
569 569 Parameters can be either mandatory (default) or advisory. Remote side
570 570 should be able to safely ignore the advisory ones.
571 571
572 572 Both data and parameters cannot be modified after the generation has begun.
573 573 """
574 574
575 575 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
576 576 data=''):
577 577 self.id = None
578 578 self.type = parttype
579 579 self._data = data
580 580 self._mandatoryparams = list(mandatoryparams)
581 581 self._advisoryparams = list(advisoryparams)
582 582 # checking for duplicated entries
583 583 self._seenparams = set()
584 584 for pname, __ in self._mandatoryparams + self._advisoryparams:
585 585 if pname in self._seenparams:
586 586 raise RuntimeError('duplicated params: %s' % pname)
587 587 self._seenparams.add(pname)
588 588 # status of the part's generation:
589 589 # - None: not started,
590 590 # - False: currently generated,
591 591 # - True: generation done.
592 592 self._generated = None
593 593
594 594 # methods used to defines the part content
595 595 def __setdata(self, data):
596 596 if self._generated is not None:
597 597 raise error.ReadOnlyPartError('part is being generated')
598 598 self._data = data
599 599 def __getdata(self):
600 600 return self._data
601 601 data = property(__getdata, __setdata)
602 602
603 603 @property
604 604 def mandatoryparams(self):
605 605 # make it an immutable tuple to force people through ``addparam``
606 606 return tuple(self._mandatoryparams)
607 607
608 608 @property
609 609 def advisoryparams(self):
610 610 # make it an immutable tuple to force people through ``addparam``
611 611 return tuple(self._advisoryparams)
612 612
613 613 def addparam(self, name, value='', mandatory=True):
614 614 if self._generated is not None:
615 615 raise error.ReadOnlyPartError('part is being generated')
616 616 if name in self._seenparams:
617 617 raise ValueError('duplicated params: %s' % name)
618 618 self._seenparams.add(name)
619 619 params = self._advisoryparams
620 620 if mandatory:
621 621 params = self._mandatoryparams
622 622 params.append((name, value))
623 623
624 624 # methods used to generates the bundle2 stream
625 625 def getchunks(self):
626 626 if self._generated is not None:
627 627 raise RuntimeError('part can only be consumed once')
628 628 self._generated = False
629 629 #### header
630 630 ## parttype
631 631 header = [_pack(_fparttypesize, len(self.type)),
632 632 self.type, _pack(_fpartid, self.id),
633 633 ]
634 634 ## parameters
635 635 # count
636 636 manpar = self.mandatoryparams
637 637 advpar = self.advisoryparams
638 638 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
639 639 # size
640 640 parsizes = []
641 641 for key, value in manpar:
642 642 parsizes.append(len(key))
643 643 parsizes.append(len(value))
644 644 for key, value in advpar:
645 645 parsizes.append(len(key))
646 646 parsizes.append(len(value))
647 647 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
648 648 header.append(paramsizes)
649 649 # key, value
650 650 for key, value in manpar:
651 651 header.append(key)
652 652 header.append(value)
653 653 for key, value in advpar:
654 654 header.append(key)
655 655 header.append(value)
656 656 ## finalize header
657 657 headerchunk = ''.join(header)
658 658 yield _pack(_fpartheadersize, len(headerchunk))
659 659 yield headerchunk
660 660 ## payload
661 661 for chunk in self._payloadchunks():
662 662 yield _pack(_fpayloadsize, len(chunk))
663 663 yield chunk
664 664 # end of payload
665 665 yield _pack(_fpayloadsize, 0)
666 666 self._generated = True
667 667
668 668 def _payloadchunks(self):
669 669 """yield chunks of a the part payload
670 670
671 671 Exists to handle the different methods to provide data to a part."""
672 672 # we only support fixed size data now.
673 673 # This will be improved in the future.
674 674 if util.safehasattr(self.data, 'next'):
675 675 buff = util.chunkbuffer(self.data)
676 676 chunk = buff.read(preferedchunksize)
677 677 while chunk:
678 678 yield chunk
679 679 chunk = buff.read(preferedchunksize)
680 680 elif len(self.data):
681 681 yield self.data
682 682
683 683 class unbundlepart(unpackermixin):
684 684 """a bundle part read from a bundle"""
685 685
686 686 def __init__(self, ui, header, fp):
687 687 super(unbundlepart, self).__init__(fp)
688 688 self.ui = ui
689 689 # unbundle state attr
690 690 self._headerdata = header
691 691 self._headeroffset = 0
692 692 self._initialized = False
693 693 self.consumed = False
694 694 # part data
695 695 self.id = None
696 696 self.type = None
697 697 self.mandatoryparams = None
698 698 self.advisoryparams = None
699 699 self.params = None
700 700 self.mandatorykeys = ()
701 701 self._payloadstream = None
702 702 self._readheader()
703 703
704 704 def _fromheader(self, size):
705 705 """return the next <size> byte from the header"""
706 706 offset = self._headeroffset
707 707 data = self._headerdata[offset:(offset + size)]
708 708 self._headeroffset = offset + size
709 709 return data
710 710
711 711 def _unpackheader(self, format):
712 712 """read given format from header
713 713
714 714 This automatically compute the size of the format to read."""
715 715 data = self._fromheader(struct.calcsize(format))
716 716 return _unpack(format, data)
717 717
718 718 def _initparams(self, mandatoryparams, advisoryparams):
719 719 """internal function to setup all logic related parameters"""
720 720 # make it read only to prevent people touching it by mistake.
721 721 self.mandatoryparams = tuple(mandatoryparams)
722 722 self.advisoryparams = tuple(advisoryparams)
723 723 # user friendly UI
724 724 self.params = dict(self.mandatoryparams)
725 725 self.params.update(dict(self.advisoryparams))
726 726 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
727 727
728 728 def _readheader(self):
729 729 """read the header and setup the object"""
730 730 typesize = self._unpackheader(_fparttypesize)[0]
731 731 self.type = self._fromheader(typesize)
732 732 self.ui.debug('part type: "%s"\n' % self.type)
733 733 self.id = self._unpackheader(_fpartid)[0]
734 734 self.ui.debug('part id: "%s"\n' % self.id)
735 735 ## reading parameters
736 736 # param count
737 737 mancount, advcount = self._unpackheader(_fpartparamcount)
738 738 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
739 739 # param size
740 740 fparamsizes = _makefpartparamsizes(mancount + advcount)
741 741 paramsizes = self._unpackheader(fparamsizes)
742 742 # make it a list of couple again
743 743 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
744 744 # split mandatory from advisory
745 745 mansizes = paramsizes[:mancount]
746 746 advsizes = paramsizes[mancount:]
747 747 # retrive param value
748 748 manparams = []
749 749 for key, value in mansizes:
750 750 manparams.append((self._fromheader(key), self._fromheader(value)))
751 751 advparams = []
752 752 for key, value in advsizes:
753 753 advparams.append((self._fromheader(key), self._fromheader(value)))
754 754 self._initparams(manparams, advparams)
755 755 ## part payload
756 756 def payloadchunks():
757 757 payloadsize = self._unpack(_fpayloadsize)[0]
758 758 self.ui.debug('payload chunk size: %i\n' % payloadsize)
759 759 while payloadsize:
760 760 yield self._readexact(payloadsize)
761 761 payloadsize = self._unpack(_fpayloadsize)[0]
762 762 self.ui.debug('payload chunk size: %i\n' % payloadsize)
763 763 self._payloadstream = util.chunkbuffer(payloadchunks())
764 764 # we read the data, tell it
765 765 self._initialized = True
766 766
767 767 def read(self, size=None):
768 768 """read payload data"""
769 769 if not self._initialized:
770 770 self._readheader()
771 771 if size is None:
772 772 data = self._payloadstream.read()
773 773 else:
774 774 data = self._payloadstream.read(size)
775 775 if size is None or len(data) < size:
776 776 self.consumed = True
777 777 return data
778 778
779 779 capabilities = {'HG2X': (),
780 780 'b2x:listkeys': (),
781 781 'b2x:pushkey': (),
782 782 'b2x:changegroup': (),
783 783 }
784 784
785 785 def getrepocaps(repo):
786 786 """return the bundle2 capabilities for a given repo
787 787
788 788 Exists to allow extensions (like evolution) to mutate the capabilities.
789 789 """
790 790 caps = capabilities.copy()
791 791 if obsolete._enabled:
792 792 supportedformat = tuple('V%i' % v for v in obsolete.formats)
793 793 caps['b2x:obsmarkers'] = supportedformat
794 794 return caps
795 795
796 796 def bundle2caps(remote):
797 797 """return the bundlecapabilities of a peer as dict"""
798 798 raw = remote.capable('bundle2-exp')
799 799 if not raw and raw != '':
800 800 return {}
801 801 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
802 802 return decodecaps(capsblob)
803 803
804 804 def obsmarkersversion(caps):
805 805 """extract the list of supported obsmarkers versions from a bundle2caps dict
806 806 """
807 807 obscaps = caps.get('b2x:obsmarkers', ())
808 808 return [int(c[1:]) for c in obscaps if c.startswith('V')]
809 809
810 810 @parthandler('b2x:changegroup')
811 811 def handlechangegroup(op, inpart):
812 812 """apply a changegroup part on the repo
813 813
814 814 This is a very early implementation that will massive rework before being
815 815 inflicted to any end-user.
816 816 """
817 817 # Make sure we trigger a transaction creation
818 818 #
819 819 # The addchangegroup function will get a transaction object by itself, but
820 820 # we need to make sure we trigger the creation of a transaction object used
821 821 # for the whole processing scope.
822 822 op.gettransaction()
823 823 cg = changegroup.cg1unpacker(inpart, 'UN')
824 824 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
825 825 op.records.add('changegroup', {'return': ret})
826 826 if op.reply is not None:
827 827 # This is definitly not the final form of this
828 828 # return. But one need to start somewhere.
829 829 part = op.reply.newpart('b2x:reply:changegroup')
830 830 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
831 831 part.addparam('return', '%i' % ret, mandatory=False)
832 832 assert not inpart.read()
833 833
834 834 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
835 835 def handlereplychangegroup(op, inpart):
836 836 ret = int(inpart.params['return'])
837 837 replyto = int(inpart.params['in-reply-to'])
838 838 op.records.add('changegroup', {'return': ret}, replyto)
839 839
840 840 @parthandler('b2x:check:heads')
841 841 def handlecheckheads(op, inpart):
842 842 """check that head of the repo did not change
843 843
844 844 This is used to detect a push race when using unbundle.
845 845 This replaces the "heads" argument of unbundle."""
846 846 h = inpart.read(20)
847 847 heads = []
848 848 while len(h) == 20:
849 849 heads.append(h)
850 850 h = inpart.read(20)
851 851 assert not h
852 852 if heads != op.repo.heads():
853 853 raise error.PushRaced('repository changed while pushing - '
854 854 'please try again')
855 855
856 856 @parthandler('b2x:output')
857 857 def handleoutput(op, inpart):
858 858 """forward output captured on the server to the client"""
859 859 for line in inpart.read().splitlines():
860 860 op.ui.write(('remote: %s\n' % line))
861 861
862 862 @parthandler('b2x:replycaps')
863 863 def handlereplycaps(op, inpart):
864 864 """Notify that a reply bundle should be created
865 865
866 866 The payload contains the capabilities information for the reply"""
867 867 caps = decodecaps(inpart.read())
868 868 if op.reply is None:
869 869 op.reply = bundle20(op.ui, caps)
870 870
871 871 @parthandler('b2x:error:abort', ('message', 'hint'))
872 872 def handlereplycaps(op, inpart):
873 873 """Used to transmit abort error over the wire"""
874 874 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
875 875
876 876 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
877 877 def handlereplycaps(op, inpart):
878 878 """Used to transmit unknown content error over the wire"""
879 879 kwargs = {}
880 880 parttype = inpart.params.get('parttype')
881 881 if parttype is not None:
882 882 kwargs['parttype'] = parttype
883 883 params = inpart.params.get('params')
884 884 if params is not None:
885 885 kwargs['params'] = params.split('\0')
886 886
887 887 raise error.BundleValueError(**kwargs)
888 888
889 889 @parthandler('b2x:error:pushraced', ('message',))
890 890 def handlereplycaps(op, inpart):
891 891 """Used to transmit push race error over the wire"""
892 892 raise error.ResponseError(_('push failed:'), inpart.params['message'])
893 893
894 894 @parthandler('b2x:listkeys', ('namespace',))
895 895 def handlelistkeys(op, inpart):
896 896 """retrieve pushkey namespace content stored in a bundle2"""
897 897 namespace = inpart.params['namespace']
898 898 r = pushkey.decodekeys(inpart.read())
899 899 op.records.add('listkeys', (namespace, r))
900 900
901 901 @parthandler('b2x:pushkey', ('namespace', 'key', 'old', 'new'))
902 902 def handlepushkey(op, inpart):
903 903 """process a pushkey request"""
904 904 dec = pushkey.decode
905 905 namespace = dec(inpart.params['namespace'])
906 906 key = dec(inpart.params['key'])
907 907 old = dec(inpart.params['old'])
908 908 new = dec(inpart.params['new'])
909 909 ret = op.repo.pushkey(namespace, key, old, new)
910 910 record = {'namespace': namespace,
911 911 'key': key,
912 912 'old': old,
913 913 'new': new}
914 914 op.records.add('pushkey', record)
915 915 if op.reply is not None:
916 916 rpart = op.reply.newpart('b2x:reply:pushkey')
917 917 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
918 918 rpart.addparam('return', '%i' % ret, mandatory=False)
919 919
920 920 @parthandler('b2x:reply:pushkey', ('return', 'in-reply-to'))
921 921 def handlepushkeyreply(op, inpart):
922 922 """retrieve the result of a pushkey request"""
923 923 ret = int(inpart.params['return'])
924 924 partid = int(inpart.params['in-reply-to'])
925 925 op.records.add('pushkey', {'return': ret}, partid)
926 926
927 927 @parthandler('b2x:obsmarkers')
928 928 def handleobsmarker(op, inpart):
929 929 """add a stream of obsmarkers to the repo"""
930 930 tr = op.gettransaction()
931 931 new = op.repo.obsstore.mergemarkers(tr, inpart.read())
932 932 if new:
933 933 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
934 934 op.records.add('obsmarkers', {'new': new})
935 935 if op.reply is not None:
936 936 rpart = op.reply.newpart('b2x:reply:obsmarkers')
937 937 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
938 938 rpart.addparam('new', '%i' % new, mandatory=False)
939 939
940 940
941 941 @parthandler('b2x:reply:obsmarkers', ('new', 'in-reply-to'))
942 942 def handlepushkeyreply(op, inpart):
943 943 """retrieve the result of a pushkey request"""
944 944 ret = int(inpart.params['new'])
945 945 partid = int(inpart.params['in-reply-to'])
946 946 op.records.add('obsmarkers', {'new': ret}, partid)
General Comments 0
You need to be logged in to leave comments. Login now