##// END OF EJS Templates
bundle2: declare supported parameters for all handlers...
Pierre-Yves David -
r21624:d61066d7 default
parent child Browse files
Show More
@@ -1,846 +1,846
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: (16 bits integer)
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: (16 bits inter)
68 68
69 69 The total number of Bytes used by the part headers. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 Bundle processing
129 129 ============================
130 130
131 131 Each part is processed in order using a "part handler". Handler are registered
132 132 for a certain part type.
133 133
134 134 The matching of a part to its handler is case insensitive. The case of the
135 135 part type is used to know if a part is mandatory or advisory. If the Part type
136 136 contains any uppercase char it is considered mandatory. When no handler is
137 137 known for a Mandatory part, the process is aborted and an exception is raised.
138 138 If the part is advisory and no handler is known, the part is ignored. When the
139 139 process is aborted, the full bundle is still read from the stream to keep the
140 140 channel usable. But none of the part read from an abort are processed. In the
141 141 future, dropping the stream may become an option for channel we do not care to
142 142 preserve.
143 143 """
144 144
145 145 import util
146 146 import struct
147 147 import urllib
148 148 import string
149 149
150 150 import changegroup, error
151 151 from i18n import _
152 152
153 153 _pack = struct.pack
154 154 _unpack = struct.unpack
155 155
156 156 _magicstring = 'HG2X'
157 157
158 158 _fstreamparamsize = '>H'
159 159 _fpartheadersize = '>H'
160 160 _fparttypesize = '>B'
161 161 _fpartid = '>I'
162 162 _fpayloadsize = '>I'
163 163 _fpartparamcount = '>BB'
164 164
165 165 preferedchunksize = 4096
166 166
167 167 def _makefpartparamsizes(nbparams):
168 168 """return a struct format to read part parameter sizes
169 169
170 170 The number parameters is variable so we need to build that format
171 171 dynamically.
172 172 """
173 173 return '>'+('BB'*nbparams)
174 174
175 175 parthandlermapping = {}
176 176
177 177 def parthandler(parttype, params=()):
178 178 """decorator that register a function as a bundle2 part handler
179 179
180 180 eg::
181 181
182 @parthandler('myparttype')
182 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
183 183 def myparttypehandler(...):
184 184 '''process a part of type "my part".'''
185 185 ...
186 186 """
187 187 def _decorator(func):
188 188 lparttype = parttype.lower() # enforce lower case matching.
189 189 assert lparttype not in parthandlermapping
190 190 parthandlermapping[lparttype] = func
191 191 func.params = frozenset(params)
192 192 return func
193 193 return _decorator
194 194
195 195 class unbundlerecords(object):
196 196 """keep record of what happens during and unbundle
197 197
198 198 New records are added using `records.add('cat', obj)`. Where 'cat' is a
199 199 category of record and obj is an arbitrary object.
200 200
201 201 `records['cat']` will return all entries of this category 'cat'.
202 202
203 203 Iterating on the object itself will yield `('category', obj)` tuples
204 204 for all entries.
205 205
206 206 All iterations happens in chronological order.
207 207 """
208 208
209 209 def __init__(self):
210 210 self._categories = {}
211 211 self._sequences = []
212 212 self._replies = {}
213 213
214 214 def add(self, category, entry, inreplyto=None):
215 215 """add a new record of a given category.
216 216
217 217 The entry can then be retrieved in the list returned by
218 218 self['category']."""
219 219 self._categories.setdefault(category, []).append(entry)
220 220 self._sequences.append((category, entry))
221 221 if inreplyto is not None:
222 222 self.getreplies(inreplyto).add(category, entry)
223 223
224 224 def getreplies(self, partid):
225 225 """get the subrecords that replies to a specific part"""
226 226 return self._replies.setdefault(partid, unbundlerecords())
227 227
228 228 def __getitem__(self, cat):
229 229 return tuple(self._categories.get(cat, ()))
230 230
231 231 def __iter__(self):
232 232 return iter(self._sequences)
233 233
234 234 def __len__(self):
235 235 return len(self._sequences)
236 236
237 237 def __nonzero__(self):
238 238 return bool(self._sequences)
239 239
240 240 class bundleoperation(object):
241 241 """an object that represents a single bundling process
242 242
243 243 Its purpose is to carry unbundle-related objects and states.
244 244
245 245 A new object should be created at the beginning of each bundle processing.
246 246 The object is to be returned by the processing function.
247 247
248 248 The object has very little content now it will ultimately contain:
249 249 * an access to the repo the bundle is applied to,
250 250 * a ui object,
251 251 * a way to retrieve a transaction to add changes to the repo,
252 252 * a way to record the result of processing each part,
253 253 * a way to construct a bundle response when applicable.
254 254 """
255 255
256 256 def __init__(self, repo, transactiongetter):
257 257 self.repo = repo
258 258 self.ui = repo.ui
259 259 self.records = unbundlerecords()
260 260 self.gettransaction = transactiongetter
261 261 self.reply = None
262 262
263 263 class TransactionUnavailable(RuntimeError):
264 264 pass
265 265
266 266 def _notransaction():
267 267 """default method to get a transaction while processing a bundle
268 268
269 269 Raise an exception to highlight the fact that no transaction was expected
270 270 to be created"""
271 271 raise TransactionUnavailable()
272 272
273 273 def processbundle(repo, unbundler, transactiongetter=_notransaction):
274 274 """This function process a bundle, apply effect to/from a repo
275 275
276 276 It iterates over each part then searches for and uses the proper handling
277 277 code to process the part. Parts are processed in order.
278 278
279 279 This is very early version of this function that will be strongly reworked
280 280 before final usage.
281 281
282 282 Unknown Mandatory part will abort the process.
283 283 """
284 284 op = bundleoperation(repo, transactiongetter)
285 285 # todo:
286 286 # - replace this is a init function soon.
287 287 # - exception catching
288 288 unbundler.params
289 289 iterparts = unbundler.iterparts()
290 290 part = None
291 291 try:
292 292 for part in iterparts:
293 293 parttype = part.type
294 294 # part key are matched lower case
295 295 key = parttype.lower()
296 296 try:
297 297 handler = parthandlermapping[key]
298 298 op.ui.debug('found a handler for part %r\n' % parttype)
299 299 except KeyError:
300 300 if key != parttype: # mandatory parts
301 301 # todo:
302 302 # - use a more precise exception
303 303 raise error.BundleValueError(parttype=key)
304 304 op.ui.debug('ignoring unknown advisory part %r\n' % key)
305 305 # consuming the part
306 306 part.read()
307 307 continue
308 308
309 309 # handler is called outside the above try block so that we don't
310 310 # risk catching KeyErrors from anything other than the
311 311 # parthandlermapping lookup (any KeyError raised by handler()
312 312 # itself represents a defect of a different variety).
313 313 output = None
314 314 if op.reply is not None:
315 315 op.ui.pushbuffer(error=True)
316 316 output = ''
317 317 try:
318 318 handler(op, part)
319 319 finally:
320 320 if output is not None:
321 321 output = op.ui.popbuffer()
322 322 if output:
323 323 outpart = op.reply.newpart('b2x:output', data=output)
324 324 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
325 325 part.read()
326 326 except Exception, exc:
327 327 if part is not None:
328 328 # consume the bundle content
329 329 part.read()
330 330 for part in iterparts:
331 331 # consume the bundle content
332 332 part.read()
333 333 # Small hack to let caller code distinguish exceptions from bundle2
334 334 # processing fron the ones from bundle1 processing. This is mostly
335 335 # needed to handle different return codes to unbundle according to the
336 336 # type of bundle. We should probably clean up or drop this return code
337 337 # craziness in a future version.
338 338 exc.duringunbundle2 = True
339 339 raise
340 340 return op
341 341
342 342 def decodecaps(blob):
343 343 """decode a bundle2 caps bytes blob into a dictionnary
344 344
345 345 The blob is a list of capabilities (one per line)
346 346 Capabilities may have values using a line of the form::
347 347
348 348 capability=value1,value2,value3
349 349
350 350 The values are always a list."""
351 351 caps = {}
352 352 for line in blob.splitlines():
353 353 if not line:
354 354 continue
355 355 if '=' not in line:
356 356 key, vals = line, ()
357 357 else:
358 358 key, vals = line.split('=', 1)
359 359 vals = vals.split(',')
360 360 key = urllib.unquote(key)
361 361 vals = [urllib.unquote(v) for v in vals]
362 362 caps[key] = vals
363 363 return caps
364 364
365 365 def encodecaps(caps):
366 366 """encode a bundle2 caps dictionary into a bytes blob"""
367 367 chunks = []
368 368 for ca in sorted(caps):
369 369 vals = caps[ca]
370 370 ca = urllib.quote(ca)
371 371 vals = [urllib.quote(v) for v in vals]
372 372 if vals:
373 373 ca = "%s=%s" % (ca, ','.join(vals))
374 374 chunks.append(ca)
375 375 return '\n'.join(chunks)
376 376
377 377 class bundle20(object):
378 378 """represent an outgoing bundle2 container
379 379
380 380 Use the `addparam` method to add stream level parameter. and `newpart` to
381 381 populate it. Then call `getchunks` to retrieve all the binary chunks of
382 382 data that compose the bundle2 container."""
383 383
384 384 def __init__(self, ui, capabilities=()):
385 385 self.ui = ui
386 386 self._params = []
387 387 self._parts = []
388 388 self.capabilities = dict(capabilities)
389 389
390 390 # methods used to defines the bundle2 content
391 391 def addparam(self, name, value=None):
392 392 """add a stream level parameter"""
393 393 if not name:
394 394 raise ValueError('empty parameter name')
395 395 if name[0] not in string.letters:
396 396 raise ValueError('non letter first character: %r' % name)
397 397 self._params.append((name, value))
398 398
399 399 def addpart(self, part):
400 400 """add a new part to the bundle2 container
401 401
402 402 Parts contains the actual applicative payload."""
403 403 assert part.id is None
404 404 part.id = len(self._parts) # very cheap counter
405 405 self._parts.append(part)
406 406
407 407 def newpart(self, typeid, *args, **kwargs):
408 408 """create a new part and add it to the containers
409 409
410 410 As the part is directly added to the containers. For now, this means
411 411 that any failure to properly initialize the part after calling
412 412 ``newpart`` should result in a failure of the whole bundling process.
413 413
414 414 You can still fall back to manually create and add if you need better
415 415 control."""
416 416 part = bundlepart(typeid, *args, **kwargs)
417 417 self.addpart(part)
418 418 return part
419 419
420 420 # methods used to generate the bundle2 stream
421 421 def getchunks(self):
422 422 self.ui.debug('start emission of %s stream\n' % _magicstring)
423 423 yield _magicstring
424 424 param = self._paramchunk()
425 425 self.ui.debug('bundle parameter: %s\n' % param)
426 426 yield _pack(_fstreamparamsize, len(param))
427 427 if param:
428 428 yield param
429 429
430 430 self.ui.debug('start of parts\n')
431 431 for part in self._parts:
432 432 self.ui.debug('bundle part: "%s"\n' % part.type)
433 433 for chunk in part.getchunks():
434 434 yield chunk
435 435 self.ui.debug('end of bundle\n')
436 436 yield '\0\0'
437 437
438 438 def _paramchunk(self):
439 439 """return a encoded version of all stream parameters"""
440 440 blocks = []
441 441 for par, value in self._params:
442 442 par = urllib.quote(par)
443 443 if value is not None:
444 444 value = urllib.quote(value)
445 445 par = '%s=%s' % (par, value)
446 446 blocks.append(par)
447 447 return ' '.join(blocks)
448 448
449 449 class unpackermixin(object):
450 450 """A mixin to extract bytes and struct data from a stream"""
451 451
452 452 def __init__(self, fp):
453 453 self._fp = fp
454 454
455 455 def _unpack(self, format):
456 456 """unpack this struct format from the stream"""
457 457 data = self._readexact(struct.calcsize(format))
458 458 return _unpack(format, data)
459 459
460 460 def _readexact(self, size):
461 461 """read exactly <size> bytes from the stream"""
462 462 return changegroup.readexactly(self._fp, size)
463 463
464 464
465 465 class unbundle20(unpackermixin):
466 466 """interpret a bundle2 stream
467 467
468 468 This class is fed with a binary stream and yields parts through its
469 469 `iterparts` methods."""
470 470
471 471 def __init__(self, ui, fp, header=None):
472 472 """If header is specified, we do not read it out of the stream."""
473 473 self.ui = ui
474 474 super(unbundle20, self).__init__(fp)
475 475 if header is None:
476 476 header = self._readexact(4)
477 477 magic, version = header[0:2], header[2:4]
478 478 if magic != 'HG':
479 479 raise util.Abort(_('not a Mercurial bundle'))
480 480 if version != '2X':
481 481 raise util.Abort(_('unknown bundle version %s') % version)
482 482 self.ui.debug('start processing of %s stream\n' % header)
483 483
484 484 @util.propertycache
485 485 def params(self):
486 486 """dictionary of stream level parameters"""
487 487 self.ui.debug('reading bundle2 stream parameters\n')
488 488 params = {}
489 489 paramssize = self._unpack(_fstreamparamsize)[0]
490 490 if paramssize:
491 491 for p in self._readexact(paramssize).split(' '):
492 492 p = p.split('=', 1)
493 493 p = [urllib.unquote(i) for i in p]
494 494 if len(p) < 2:
495 495 p.append(None)
496 496 self._processparam(*p)
497 497 params[p[0]] = p[1]
498 498 return params
499 499
500 500 def _processparam(self, name, value):
501 501 """process a parameter, applying its effect if needed
502 502
503 503 Parameter starting with a lower case letter are advisory and will be
504 504 ignored when unknown. Those starting with an upper case letter are
505 505 mandatory and will this function will raise a KeyError when unknown.
506 506
507 507 Note: no option are currently supported. Any input will be either
508 508 ignored or failing.
509 509 """
510 510 if not name:
511 511 raise ValueError('empty parameter name')
512 512 if name[0] not in string.letters:
513 513 raise ValueError('non letter first character: %r' % name)
514 514 # Some logic will be later added here to try to process the option for
515 515 # a dict of known parameter.
516 516 if name[0].islower():
517 517 self.ui.debug("ignoring unknown parameter %r\n" % name)
518 518 else:
519 519 raise KeyError(name)
520 520
521 521
522 522 def iterparts(self):
523 523 """yield all parts contained in the stream"""
524 524 # make sure param have been loaded
525 525 self.params
526 526 self.ui.debug('start extraction of bundle2 parts\n')
527 527 headerblock = self._readpartheader()
528 528 while headerblock is not None:
529 529 part = unbundlepart(self.ui, headerblock, self._fp)
530 530 yield part
531 531 headerblock = self._readpartheader()
532 532 self.ui.debug('end of bundle2 stream\n')
533 533
534 534 def _readpartheader(self):
535 535 """reads a part header size and return the bytes blob
536 536
537 537 returns None if empty"""
538 538 headersize = self._unpack(_fpartheadersize)[0]
539 539 self.ui.debug('part header size: %i\n' % headersize)
540 540 if headersize:
541 541 return self._readexact(headersize)
542 542 return None
543 543
544 544
545 545 class bundlepart(object):
546 546 """A bundle2 part contains application level payload
547 547
548 548 The part `type` is used to route the part to the application level
549 549 handler.
550 550
551 551 The part payload is contained in ``part.data``. It could be raw bytes or a
552 552 generator of byte chunks.
553 553
554 554 You can add parameters to the part using the ``addparam`` method.
555 555 Parameters can be either mandatory (default) or advisory. Remote side
556 556 should be able to safely ignore the advisory ones.
557 557
558 558 Both data and parameters cannot be modified after the generation has begun.
559 559 """
560 560
561 561 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
562 562 data=''):
563 563 self.id = None
564 564 self.type = parttype
565 565 self._data = data
566 566 self._mandatoryparams = list(mandatoryparams)
567 567 self._advisoryparams = list(advisoryparams)
568 568 # checking for duplicated entries
569 569 self._seenparams = set()
570 570 for pname, __ in self._mandatoryparams + self._advisoryparams:
571 571 if pname in self._seenparams:
572 572 raise RuntimeError('duplicated params: %s' % pname)
573 573 self._seenparams.add(pname)
574 574 # status of the part's generation:
575 575 # - None: not started,
576 576 # - False: currently generated,
577 577 # - True: generation done.
578 578 self._generated = None
579 579
580 580 # methods used to defines the part content
581 581 def __setdata(self, data):
582 582 if self._generated is not None:
583 583 raise error.ReadOnlyPartError('part is being generated')
584 584 self._data = data
585 585 def __getdata(self):
586 586 return self._data
587 587 data = property(__getdata, __setdata)
588 588
589 589 @property
590 590 def mandatoryparams(self):
591 591 # make it an immutable tuple to force people through ``addparam``
592 592 return tuple(self._mandatoryparams)
593 593
594 594 @property
595 595 def advisoryparams(self):
596 596 # make it an immutable tuple to force people through ``addparam``
597 597 return tuple(self._advisoryparams)
598 598
599 599 def addparam(self, name, value='', mandatory=True):
600 600 if self._generated is not None:
601 601 raise error.ReadOnlyPartError('part is being generated')
602 602 if name in self._seenparams:
603 603 raise ValueError('duplicated params: %s' % name)
604 604 self._seenparams.add(name)
605 605 params = self._advisoryparams
606 606 if mandatory:
607 607 params = self._mandatoryparams
608 608 params.append((name, value))
609 609
610 610 # methods used to generates the bundle2 stream
611 611 def getchunks(self):
612 612 if self._generated is not None:
613 613 raise RuntimeError('part can only be consumed once')
614 614 self._generated = False
615 615 #### header
616 616 ## parttype
617 617 header = [_pack(_fparttypesize, len(self.type)),
618 618 self.type, _pack(_fpartid, self.id),
619 619 ]
620 620 ## parameters
621 621 # count
622 622 manpar = self.mandatoryparams
623 623 advpar = self.advisoryparams
624 624 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
625 625 # size
626 626 parsizes = []
627 627 for key, value in manpar:
628 628 parsizes.append(len(key))
629 629 parsizes.append(len(value))
630 630 for key, value in advpar:
631 631 parsizes.append(len(key))
632 632 parsizes.append(len(value))
633 633 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
634 634 header.append(paramsizes)
635 635 # key, value
636 636 for key, value in manpar:
637 637 header.append(key)
638 638 header.append(value)
639 639 for key, value in advpar:
640 640 header.append(key)
641 641 header.append(value)
642 642 ## finalize header
643 643 headerchunk = ''.join(header)
644 644 yield _pack(_fpartheadersize, len(headerchunk))
645 645 yield headerchunk
646 646 ## payload
647 647 for chunk in self._payloadchunks():
648 648 yield _pack(_fpayloadsize, len(chunk))
649 649 yield chunk
650 650 # end of payload
651 651 yield _pack(_fpayloadsize, 0)
652 652 self._generated = True
653 653
654 654 def _payloadchunks(self):
655 655 """yield chunks of a the part payload
656 656
657 657 Exists to handle the different methods to provide data to a part."""
658 658 # we only support fixed size data now.
659 659 # This will be improved in the future.
660 660 if util.safehasattr(self.data, 'next'):
661 661 buff = util.chunkbuffer(self.data)
662 662 chunk = buff.read(preferedchunksize)
663 663 while chunk:
664 664 yield chunk
665 665 chunk = buff.read(preferedchunksize)
666 666 elif len(self.data):
667 667 yield self.data
668 668
669 669 class unbundlepart(unpackermixin):
670 670 """a bundle part read from a bundle"""
671 671
672 672 def __init__(self, ui, header, fp):
673 673 super(unbundlepart, self).__init__(fp)
674 674 self.ui = ui
675 675 # unbundle state attr
676 676 self._headerdata = header
677 677 self._headeroffset = 0
678 678 self._initialized = False
679 679 self.consumed = False
680 680 # part data
681 681 self.id = None
682 682 self.type = None
683 683 self.mandatoryparams = None
684 684 self.advisoryparams = None
685 685 self.params = None
686 686 self.mandatorykeys = ()
687 687 self._payloadstream = None
688 688 self._readheader()
689 689
690 690 def _fromheader(self, size):
691 691 """return the next <size> byte from the header"""
692 692 offset = self._headeroffset
693 693 data = self._headerdata[offset:(offset + size)]
694 694 self._headeroffset = offset + size
695 695 return data
696 696
697 697 def _unpackheader(self, format):
698 698 """read given format from header
699 699
700 700 This automatically compute the size of the format to read."""
701 701 data = self._fromheader(struct.calcsize(format))
702 702 return _unpack(format, data)
703 703
704 704 def _initparams(self, mandatoryparams, advisoryparams):
705 705 """internal function to setup all logic related parameters"""
706 706 # make it read only to prevent people touching it by mistake.
707 707 self.mandatoryparams = tuple(mandatoryparams)
708 708 self.advisoryparams = tuple(advisoryparams)
709 709 # user friendly UI
710 710 self.params = dict(self.mandatoryparams)
711 711 self.params.update(dict(self.advisoryparams))
712 712 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
713 713
714 714 def _readheader(self):
715 715 """read the header and setup the object"""
716 716 typesize = self._unpackheader(_fparttypesize)[0]
717 717 self.type = self._fromheader(typesize)
718 718 self.ui.debug('part type: "%s"\n' % self.type)
719 719 self.id = self._unpackheader(_fpartid)[0]
720 720 self.ui.debug('part id: "%s"\n' % self.id)
721 721 ## reading parameters
722 722 # param count
723 723 mancount, advcount = self._unpackheader(_fpartparamcount)
724 724 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
725 725 # param size
726 726 fparamsizes = _makefpartparamsizes(mancount + advcount)
727 727 paramsizes = self._unpackheader(fparamsizes)
728 728 # make it a list of couple again
729 729 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
730 730 # split mandatory from advisory
731 731 mansizes = paramsizes[:mancount]
732 732 advsizes = paramsizes[mancount:]
733 733 # retrive param value
734 734 manparams = []
735 735 for key, value in mansizes:
736 736 manparams.append((self._fromheader(key), self._fromheader(value)))
737 737 advparams = []
738 738 for key, value in advsizes:
739 739 advparams.append((self._fromheader(key), self._fromheader(value)))
740 740 self._initparams(manparams, advparams)
741 741 ## part payload
742 742 def payloadchunks():
743 743 payloadsize = self._unpack(_fpayloadsize)[0]
744 744 self.ui.debug('payload chunk size: %i\n' % payloadsize)
745 745 while payloadsize:
746 746 yield self._readexact(payloadsize)
747 747 payloadsize = self._unpack(_fpayloadsize)[0]
748 748 self.ui.debug('payload chunk size: %i\n' % payloadsize)
749 749 self._payloadstream = util.chunkbuffer(payloadchunks())
750 750 # we read the data, tell it
751 751 self._initialized = True
752 752
753 753 def read(self, size=None):
754 754 """read payload data"""
755 755 if not self._initialized:
756 756 self._readheader()
757 757 if size is None:
758 758 data = self._payloadstream.read()
759 759 else:
760 760 data = self._payloadstream.read(size)
761 761 if size is None or len(data) < size:
762 762 self.consumed = True
763 763 return data
764 764
765 765
766 766 @parthandler('b2x:changegroup')
767 767 def handlechangegroup(op, inpart):
768 768 """apply a changegroup part on the repo
769 769
770 770 This is a very early implementation that will massive rework before being
771 771 inflicted to any end-user.
772 772 """
773 773 # Make sure we trigger a transaction creation
774 774 #
775 775 # The addchangegroup function will get a transaction object by itself, but
776 776 # we need to make sure we trigger the creation of a transaction object used
777 777 # for the whole processing scope.
778 778 op.gettransaction()
779 779 cg = changegroup.unbundle10(inpart, 'UN')
780 780 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
781 781 op.records.add('changegroup', {'return': ret})
782 782 if op.reply is not None:
783 783 # This is definitly not the final form of this
784 784 # return. But one need to start somewhere.
785 785 part = op.reply.newpart('b2x:reply:changegroup')
786 786 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
787 787 part.addparam('return', '%i' % ret, mandatory=False)
788 788 assert not inpart.read()
789 789
790 @parthandler('b2x:reply:changegroup')
790 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
791 791 def handlechangegroup(op, inpart):
792 792 ret = int(inpart.params['return'])
793 793 replyto = int(inpart.params['in-reply-to'])
794 794 op.records.add('changegroup', {'return': ret}, replyto)
795 795
796 796 @parthandler('b2x:check:heads')
797 797 def handlechangegroup(op, inpart):
798 798 """check that head of the repo did not change
799 799
800 800 This is used to detect a push race when using unbundle.
801 801 This replaces the "heads" argument of unbundle."""
802 802 h = inpart.read(20)
803 803 heads = []
804 804 while len(h) == 20:
805 805 heads.append(h)
806 806 h = inpart.read(20)
807 807 assert not h
808 808 if heads != op.repo.heads():
809 809 raise error.PushRaced('repository changed while pushing - '
810 810 'please try again')
811 811
812 812 @parthandler('b2x:output')
813 813 def handleoutput(op, inpart):
814 814 """forward output captured on the server to the client"""
815 815 for line in inpart.read().splitlines():
816 816 op.ui.write(('remote: %s\n' % line))
817 817
818 818 @parthandler('b2x:replycaps')
819 819 def handlereplycaps(op, inpart):
820 820 """Notify that a reply bundle should be created
821 821
822 822 The payload contains the capabilities information for the reply"""
823 823 caps = decodecaps(inpart.read())
824 824 if op.reply is None:
825 825 op.reply = bundle20(op.ui, caps)
826 826
827 @parthandler('b2x:error:abort')
827 @parthandler('b2x:error:abort', ('message', 'hint'))
828 828 def handlereplycaps(op, inpart):
829 829 """Used to transmit abort error over the wire"""
830 830 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
831 831
832 @parthandler('b2x:error:unsupportedcontent')
832 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
833 833 def handlereplycaps(op, inpart):
834 834 """Used to transmit unknown content error over the wire"""
835 835 kwargs = {}
836 836 kwargs['parttype'] = inpart.params['parttype']
837 837 params = inpart.params.get('params')
838 838 if params is not None:
839 839 kwargs['params'] = params.split('\0')
840 840
841 841 raise error.BundleValueError(**kwargs)
842 842
843 843 @parthandler('b2x:error:pushraced', ('message',))
844 844 def handlereplycaps(op, inpart):
845 845 """Used to transmit push race error over the wire"""
846 846 raise error.ResponseError(_('push failed:'), inpart.params['message'])
General Comments 0
You need to be logged in to leave comments. Login now