##// END OF EJS Templates
bundle2: make sure unbundled part param are read-only...
Pierre-Yves David -
r21609:63cc2594 default
parent child Browse files
Show More
@@ -1,844 +1,845 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: (16 bits integer)
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: (16 bits inter)
68 68
69 69 The total number of Bytes used by the part headers. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 Bundle processing
129 129 ============================
130 130
131 131 Each part is processed in order using a "part handler". Handler are registered
132 132 for a certain part type.
133 133
134 134 The matching of a part to its handler is case insensitive. The case of the
135 135 part type is used to know if a part is mandatory or advisory. If the Part type
136 136 contains any uppercase char it is considered mandatory. When no handler is
137 137 known for a Mandatory part, the process is aborted and an exception is raised.
138 138 If the part is advisory and no handler is known, the part is ignored. When the
139 139 process is aborted, the full bundle is still read from the stream to keep the
140 140 channel usable. But none of the part read from an abort are processed. In the
141 141 future, dropping the stream may become an option for channel we do not care to
142 142 preserve.
143 143 """
144 144
145 145 import util
146 146 import struct
147 147 import urllib
148 148 import string
149 149
150 150 import changegroup, error
151 151 from i18n import _
152 152
153 153 _pack = struct.pack
154 154 _unpack = struct.unpack
155 155
156 156 _magicstring = 'HG2X'
157 157
158 158 _fstreamparamsize = '>H'
159 159 _fpartheadersize = '>H'
160 160 _fparttypesize = '>B'
161 161 _fpartid = '>I'
162 162 _fpayloadsize = '>I'
163 163 _fpartparamcount = '>BB'
164 164
165 165 preferedchunksize = 4096
166 166
167 167 def _makefpartparamsizes(nbparams):
168 168 """return a struct format to read part parameter sizes
169 169
170 170 The number parameters is variable so we need to build that format
171 171 dynamically.
172 172 """
173 173 return '>'+('BB'*nbparams)
174 174
175 175 class UnknownPartError(KeyError):
176 176 """error raised when no handler is found for a Mandatory part"""
177 177 pass
178 178
179 179 class ReadOnlyPartError(RuntimeError):
180 180 """error raised when code tries to alter a part being generated"""
181 181 pass
182 182
183 183 parthandlermapping = {}
184 184
185 185 def parthandler(parttype):
186 186 """decorator that register a function as a bundle2 part handler
187 187
188 188 eg::
189 189
190 190 @parthandler('myparttype')
191 191 def myparttypehandler(...):
192 192 '''process a part of type "my part".'''
193 193 ...
194 194 """
195 195 def _decorator(func):
196 196 lparttype = parttype.lower() # enforce lower case matching.
197 197 assert lparttype not in parthandlermapping
198 198 parthandlermapping[lparttype] = func
199 199 return func
200 200 return _decorator
201 201
202 202 class unbundlerecords(object):
203 203 """keep record of what happens during and unbundle
204 204
205 205 New records are added using `records.add('cat', obj)`. Where 'cat' is a
206 206 category of record and obj is an arbitrary object.
207 207
208 208 `records['cat']` will return all entries of this category 'cat'.
209 209
210 210 Iterating on the object itself will yield `('category', obj)` tuples
211 211 for all entries.
212 212
213 213 All iterations happens in chronological order.
214 214 """
215 215
216 216 def __init__(self):
217 217 self._categories = {}
218 218 self._sequences = []
219 219 self._replies = {}
220 220
221 221 def add(self, category, entry, inreplyto=None):
222 222 """add a new record of a given category.
223 223
224 224 The entry can then be retrieved in the list returned by
225 225 self['category']."""
226 226 self._categories.setdefault(category, []).append(entry)
227 227 self._sequences.append((category, entry))
228 228 if inreplyto is not None:
229 229 self.getreplies(inreplyto).add(category, entry)
230 230
231 231 def getreplies(self, partid):
232 232 """get the subrecords that replies to a specific part"""
233 233 return self._replies.setdefault(partid, unbundlerecords())
234 234
235 235 def __getitem__(self, cat):
236 236 return tuple(self._categories.get(cat, ()))
237 237
238 238 def __iter__(self):
239 239 return iter(self._sequences)
240 240
241 241 def __len__(self):
242 242 return len(self._sequences)
243 243
244 244 def __nonzero__(self):
245 245 return bool(self._sequences)
246 246
247 247 class bundleoperation(object):
248 248 """an object that represents a single bundling process
249 249
250 250 Its purpose is to carry unbundle-related objects and states.
251 251
252 252 A new object should be created at the beginning of each bundle processing.
253 253 The object is to be returned by the processing function.
254 254
255 255 The object has very little content now it will ultimately contain:
256 256 * an access to the repo the bundle is applied to,
257 257 * a ui object,
258 258 * a way to retrieve a transaction to add changes to the repo,
259 259 * a way to record the result of processing each part,
260 260 * a way to construct a bundle response when applicable.
261 261 """
262 262
263 263 def __init__(self, repo, transactiongetter):
264 264 self.repo = repo
265 265 self.ui = repo.ui
266 266 self.records = unbundlerecords()
267 267 self.gettransaction = transactiongetter
268 268 self.reply = None
269 269
270 270 class TransactionUnavailable(RuntimeError):
271 271 pass
272 272
273 273 def _notransaction():
274 274 """default method to get a transaction while processing a bundle
275 275
276 276 Raise an exception to highlight the fact that no transaction was expected
277 277 to be created"""
278 278 raise TransactionUnavailable()
279 279
280 280 def processbundle(repo, unbundler, transactiongetter=_notransaction):
281 281 """This function process a bundle, apply effect to/from a repo
282 282
283 283 It iterates over each part then searches for and uses the proper handling
284 284 code to process the part. Parts are processed in order.
285 285
286 286 This is very early version of this function that will be strongly reworked
287 287 before final usage.
288 288
289 289 Unknown Mandatory part will abort the process.
290 290 """
291 291 op = bundleoperation(repo, transactiongetter)
292 292 # todo:
293 293 # - replace this is a init function soon.
294 294 # - exception catching
295 295 unbundler.params
296 296 iterparts = unbundler.iterparts()
297 297 part = None
298 298 try:
299 299 for part in iterparts:
300 300 parttype = part.type
301 301 # part key are matched lower case
302 302 key = parttype.lower()
303 303 try:
304 304 handler = parthandlermapping[key]
305 305 op.ui.debug('found a handler for part %r\n' % parttype)
306 306 except KeyError:
307 307 if key != parttype: # mandatory parts
308 308 # todo:
309 309 # - use a more precise exception
310 310 raise UnknownPartError(key)
311 311 op.ui.debug('ignoring unknown advisory part %r\n' % key)
312 312 # consuming the part
313 313 part.read()
314 314 continue
315 315
316 316 # handler is called outside the above try block so that we don't
317 317 # risk catching KeyErrors from anything other than the
318 318 # parthandlermapping lookup (any KeyError raised by handler()
319 319 # itself represents a defect of a different variety).
320 320 output = None
321 321 if op.reply is not None:
322 322 op.ui.pushbuffer(error=True)
323 323 output = ''
324 324 try:
325 325 handler(op, part)
326 326 finally:
327 327 if output is not None:
328 328 output = op.ui.popbuffer()
329 329 if output:
330 330 outpart = op.reply.newpart('b2x:output', data=output)
331 331 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
332 332 part.read()
333 333 except Exception, exc:
334 334 if part is not None:
335 335 # consume the bundle content
336 336 part.read()
337 337 for part in iterparts:
338 338 # consume the bundle content
339 339 part.read()
340 340 # Small hack to let caller code distinguish exceptions from bundle2
341 341 # processing fron the ones from bundle1 processing. This is mostly
342 342 # needed to handle different return codes to unbundle according to the
343 343 # type of bundle. We should probably clean up or drop this return code
344 344 # craziness in a future version.
345 345 exc.duringunbundle2 = True
346 346 raise
347 347 return op
348 348
349 349 def decodecaps(blob):
350 350 """decode a bundle2 caps bytes blob into a dictionnary
351 351
352 352 The blob is a list of capabilities (one per line)
353 353 Capabilities may have values using a line of the form::
354 354
355 355 capability=value1,value2,value3
356 356
357 357 The values are always a list."""
358 358 caps = {}
359 359 for line in blob.splitlines():
360 360 if not line:
361 361 continue
362 362 if '=' not in line:
363 363 key, vals = line, ()
364 364 else:
365 365 key, vals = line.split('=', 1)
366 366 vals = vals.split(',')
367 367 key = urllib.unquote(key)
368 368 vals = [urllib.unquote(v) for v in vals]
369 369 caps[key] = vals
370 370 return caps
371 371
372 372 def encodecaps(caps):
373 373 """encode a bundle2 caps dictionary into a bytes blob"""
374 374 chunks = []
375 375 for ca in sorted(caps):
376 376 vals = caps[ca]
377 377 ca = urllib.quote(ca)
378 378 vals = [urllib.quote(v) for v in vals]
379 379 if vals:
380 380 ca = "%s=%s" % (ca, ','.join(vals))
381 381 chunks.append(ca)
382 382 return '\n'.join(chunks)
383 383
384 384 class bundle20(object):
385 385 """represent an outgoing bundle2 container
386 386
387 387 Use the `addparam` method to add stream level parameter. and `newpart` to
388 388 populate it. Then call `getchunks` to retrieve all the binary chunks of
389 389 data that compose the bundle2 container."""
390 390
391 391 def __init__(self, ui, capabilities=()):
392 392 self.ui = ui
393 393 self._params = []
394 394 self._parts = []
395 395 self.capabilities = dict(capabilities)
396 396
397 397 # methods used to defines the bundle2 content
398 398 def addparam(self, name, value=None):
399 399 """add a stream level parameter"""
400 400 if not name:
401 401 raise ValueError('empty parameter name')
402 402 if name[0] not in string.letters:
403 403 raise ValueError('non letter first character: %r' % name)
404 404 self._params.append((name, value))
405 405
406 406 def addpart(self, part):
407 407 """add a new part to the bundle2 container
408 408
409 409 Parts contains the actual applicative payload."""
410 410 assert part.id is None
411 411 part.id = len(self._parts) # very cheap counter
412 412 self._parts.append(part)
413 413
414 414 def newpart(self, typeid, *args, **kwargs):
415 415 """create a new part and add it to the containers
416 416
417 417 As the part is directly added to the containers. For now, this means
418 418 that any failure to properly initialize the part after calling
419 419 ``newpart`` should result in a failure of the whole bundling process.
420 420
421 421 You can still fall back to manually create and add if you need better
422 422 control."""
423 423 part = bundlepart(typeid, *args, **kwargs)
424 424 self.addpart(part)
425 425 return part
426 426
427 427 # methods used to generate the bundle2 stream
428 428 def getchunks(self):
429 429 self.ui.debug('start emission of %s stream\n' % _magicstring)
430 430 yield _magicstring
431 431 param = self._paramchunk()
432 432 self.ui.debug('bundle parameter: %s\n' % param)
433 433 yield _pack(_fstreamparamsize, len(param))
434 434 if param:
435 435 yield param
436 436
437 437 self.ui.debug('start of parts\n')
438 438 for part in self._parts:
439 439 self.ui.debug('bundle part: "%s"\n' % part.type)
440 440 for chunk in part.getchunks():
441 441 yield chunk
442 442 self.ui.debug('end of bundle\n')
443 443 yield '\0\0'
444 444
445 445 def _paramchunk(self):
446 446 """return a encoded version of all stream parameters"""
447 447 blocks = []
448 448 for par, value in self._params:
449 449 par = urllib.quote(par)
450 450 if value is not None:
451 451 value = urllib.quote(value)
452 452 par = '%s=%s' % (par, value)
453 453 blocks.append(par)
454 454 return ' '.join(blocks)
455 455
456 456 class unpackermixin(object):
457 457 """A mixin to extract bytes and struct data from a stream"""
458 458
459 459 def __init__(self, fp):
460 460 self._fp = fp
461 461
462 462 def _unpack(self, format):
463 463 """unpack this struct format from the stream"""
464 464 data = self._readexact(struct.calcsize(format))
465 465 return _unpack(format, data)
466 466
467 467 def _readexact(self, size):
468 468 """read exactly <size> bytes from the stream"""
469 469 return changegroup.readexactly(self._fp, size)
470 470
471 471
472 472 class unbundle20(unpackermixin):
473 473 """interpret a bundle2 stream
474 474
475 475 This class is fed with a binary stream and yields parts through its
476 476 `iterparts` methods."""
477 477
478 478 def __init__(self, ui, fp, header=None):
479 479 """If header is specified, we do not read it out of the stream."""
480 480 self.ui = ui
481 481 super(unbundle20, self).__init__(fp)
482 482 if header is None:
483 483 header = self._readexact(4)
484 484 magic, version = header[0:2], header[2:4]
485 485 if magic != 'HG':
486 486 raise util.Abort(_('not a Mercurial bundle'))
487 487 if version != '2X':
488 488 raise util.Abort(_('unknown bundle version %s') % version)
489 489 self.ui.debug('start processing of %s stream\n' % header)
490 490
491 491 @util.propertycache
492 492 def params(self):
493 493 """dictionary of stream level parameters"""
494 494 self.ui.debug('reading bundle2 stream parameters\n')
495 495 params = {}
496 496 paramssize = self._unpack(_fstreamparamsize)[0]
497 497 if paramssize:
498 498 for p in self._readexact(paramssize).split(' '):
499 499 p = p.split('=', 1)
500 500 p = [urllib.unquote(i) for i in p]
501 501 if len(p) < 2:
502 502 p.append(None)
503 503 self._processparam(*p)
504 504 params[p[0]] = p[1]
505 505 return params
506 506
507 507 def _processparam(self, name, value):
508 508 """process a parameter, applying its effect if needed
509 509
510 510 Parameter starting with a lower case letter are advisory and will be
511 511 ignored when unknown. Those starting with an upper case letter are
512 512 mandatory and will this function will raise a KeyError when unknown.
513 513
514 514 Note: no option are currently supported. Any input will be either
515 515 ignored or failing.
516 516 """
517 517 if not name:
518 518 raise ValueError('empty parameter name')
519 519 if name[0] not in string.letters:
520 520 raise ValueError('non letter first character: %r' % name)
521 521 # Some logic will be later added here to try to process the option for
522 522 # a dict of known parameter.
523 523 if name[0].islower():
524 524 self.ui.debug("ignoring unknown parameter %r\n" % name)
525 525 else:
526 526 raise KeyError(name)
527 527
528 528
529 529 def iterparts(self):
530 530 """yield all parts contained in the stream"""
531 531 # make sure param have been loaded
532 532 self.params
533 533 self.ui.debug('start extraction of bundle2 parts\n')
534 534 headerblock = self._readpartheader()
535 535 while headerblock is not None:
536 536 part = unbundlepart(self.ui, headerblock, self._fp)
537 537 yield part
538 538 headerblock = self._readpartheader()
539 539 self.ui.debug('end of bundle2 stream\n')
540 540
541 541 def _readpartheader(self):
542 542 """reads a part header size and return the bytes blob
543 543
544 544 returns None if empty"""
545 545 headersize = self._unpack(_fpartheadersize)[0]
546 546 self.ui.debug('part header size: %i\n' % headersize)
547 547 if headersize:
548 548 return self._readexact(headersize)
549 549 return None
550 550
551 551
552 552 class bundlepart(object):
553 553 """A bundle2 part contains application level payload
554 554
555 555 The part `type` is used to route the part to the application level
556 556 handler.
557 557
558 558 The part payload is contained in ``part.data``. It could be raw bytes or a
559 559 generator of byte chunks.
560 560
561 561 You can add parameters to the part using the ``addparam`` method.
562 562 Parameters can be either mandatory (default) or advisory. Remote side
563 563 should be able to safely ignore the advisory ones.
564 564
565 565 Both data and parameters cannot be modified after the generation has begun.
566 566 """
567 567
568 568 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
569 569 data=''):
570 570 self.id = None
571 571 self.type = parttype
572 572 self._data = data
573 573 self._mandatoryparams = list(mandatoryparams)
574 574 self._advisoryparams = list(advisoryparams)
575 575 # checking for duplicated entries
576 576 self._seenparams = set()
577 577 for pname, __ in self._mandatoryparams + self._advisoryparams:
578 578 if pname in self._seenparams:
579 579 raise RuntimeError('duplicated params: %s' % pname)
580 580 self._seenparams.add(pname)
581 581 # status of the part's generation:
582 582 # - None: not started,
583 583 # - False: currently generated,
584 584 # - True: generation done.
585 585 self._generated = None
586 586
587 587 # methods used to defines the part content
588 588 def __setdata(self, data):
589 589 if self._generated is not None:
590 590 raise ReadOnlyPartError('part is being generated')
591 591 self._data = data
592 592 def __getdata(self):
593 593 return self._data
594 594 data = property(__getdata, __setdata)
595 595
596 596 @property
597 597 def mandatoryparams(self):
598 598 # make it an immutable tuple to force people through ``addparam``
599 599 return tuple(self._mandatoryparams)
600 600
601 601 @property
602 602 def advisoryparams(self):
603 603 # make it an immutable tuple to force people through ``addparam``
604 604 return tuple(self._advisoryparams)
605 605
606 606 def addparam(self, name, value='', mandatory=True):
607 607 if self._generated is not None:
608 608 raise ReadOnlyPartError('part is being generated')
609 609 if name in self._seenparams:
610 610 raise ValueError('duplicated params: %s' % name)
611 611 self._seenparams.add(name)
612 612 params = self._advisoryparams
613 613 if mandatory:
614 614 params = self._mandatoryparams
615 615 params.append((name, value))
616 616
617 617 # methods used to generates the bundle2 stream
618 618 def getchunks(self):
619 619 if self._generated is not None:
620 620 raise RuntimeError('part can only be consumed once')
621 621 self._generated = False
622 622 #### header
623 623 ## parttype
624 624 header = [_pack(_fparttypesize, len(self.type)),
625 625 self.type, _pack(_fpartid, self.id),
626 626 ]
627 627 ## parameters
628 628 # count
629 629 manpar = self.mandatoryparams
630 630 advpar = self.advisoryparams
631 631 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
632 632 # size
633 633 parsizes = []
634 634 for key, value in manpar:
635 635 parsizes.append(len(key))
636 636 parsizes.append(len(value))
637 637 for key, value in advpar:
638 638 parsizes.append(len(key))
639 639 parsizes.append(len(value))
640 640 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
641 641 header.append(paramsizes)
642 642 # key, value
643 643 for key, value in manpar:
644 644 header.append(key)
645 645 header.append(value)
646 646 for key, value in advpar:
647 647 header.append(key)
648 648 header.append(value)
649 649 ## finalize header
650 650 headerchunk = ''.join(header)
651 651 yield _pack(_fpartheadersize, len(headerchunk))
652 652 yield headerchunk
653 653 ## payload
654 654 for chunk in self._payloadchunks():
655 655 yield _pack(_fpayloadsize, len(chunk))
656 656 yield chunk
657 657 # end of payload
658 658 yield _pack(_fpayloadsize, 0)
659 659 self._generated = True
660 660
661 661 def _payloadchunks(self):
662 662 """yield chunks of a the part payload
663 663
664 664 Exists to handle the different methods to provide data to a part."""
665 665 # we only support fixed size data now.
666 666 # This will be improved in the future.
667 667 if util.safehasattr(self.data, 'next'):
668 668 buff = util.chunkbuffer(self.data)
669 669 chunk = buff.read(preferedchunksize)
670 670 while chunk:
671 671 yield chunk
672 672 chunk = buff.read(preferedchunksize)
673 673 elif len(self.data):
674 674 yield self.data
675 675
676 676 class unbundlepart(unpackermixin):
677 677 """a bundle part read from a bundle"""
678 678
679 679 def __init__(self, ui, header, fp):
680 680 super(unbundlepart, self).__init__(fp)
681 681 self.ui = ui
682 682 # unbundle state attr
683 683 self._headerdata = header
684 684 self._headeroffset = 0
685 685 self._initialized = False
686 686 self.consumed = False
687 687 # part data
688 688 self.id = None
689 689 self.type = None
690 690 self.mandatoryparams = None
691 691 self.advisoryparams = None
692 692 self._payloadstream = None
693 693 self._readheader()
694 694
695 695 def _fromheader(self, size):
696 696 """return the next <size> byte from the header"""
697 697 offset = self._headeroffset
698 698 data = self._headerdata[offset:(offset + size)]
699 699 self._headeroffset = offset + size
700 700 return data
701 701
702 702 def _unpackheader(self, format):
703 703 """read given format from header
704 704
705 705 This automatically compute the size of the format to read."""
706 706 data = self._fromheader(struct.calcsize(format))
707 707 return _unpack(format, data)
708 708
709 709 def _initparams(self, mandatoryparams, advisoryparams):
710 710 """internal function to setup all logic related parameters"""
711 self.mandatoryparams = mandatoryparams
712 self.advisoryparams = advisoryparams
711 # make it read only to prevent people touching it by mistake.
712 self.mandatoryparams = tuple(mandatoryparams)
713 self.advisoryparams = tuple(advisoryparams)
713 714
714 715 def _readheader(self):
715 716 """read the header and setup the object"""
716 717 typesize = self._unpackheader(_fparttypesize)[0]
717 718 self.type = self._fromheader(typesize)
718 719 self.ui.debug('part type: "%s"\n' % self.type)
719 720 self.id = self._unpackheader(_fpartid)[0]
720 721 self.ui.debug('part id: "%s"\n' % self.id)
721 722 ## reading parameters
722 723 # param count
723 724 mancount, advcount = self._unpackheader(_fpartparamcount)
724 725 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
725 726 # param size
726 727 fparamsizes = _makefpartparamsizes(mancount + advcount)
727 728 paramsizes = self._unpackheader(fparamsizes)
728 729 # make it a list of couple again
729 730 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
730 731 # split mandatory from advisory
731 732 mansizes = paramsizes[:mancount]
732 733 advsizes = paramsizes[mancount:]
733 734 # retrive param value
734 735 manparams = []
735 736 for key, value in mansizes:
736 737 manparams.append((self._fromheader(key), self._fromheader(value)))
737 738 advparams = []
738 739 for key, value in advsizes:
739 740 advparams.append((self._fromheader(key), self._fromheader(value)))
740 741 self._initparams(manparams, advparams)
741 742 ## part payload
742 743 def payloadchunks():
743 744 payloadsize = self._unpack(_fpayloadsize)[0]
744 745 self.ui.debug('payload chunk size: %i\n' % payloadsize)
745 746 while payloadsize:
746 747 yield self._readexact(payloadsize)
747 748 payloadsize = self._unpack(_fpayloadsize)[0]
748 749 self.ui.debug('payload chunk size: %i\n' % payloadsize)
749 750 self._payloadstream = util.chunkbuffer(payloadchunks())
750 751 # we read the data, tell it
751 752 self._initialized = True
752 753
753 754 def read(self, size=None):
754 755 """read payload data"""
755 756 if not self._initialized:
756 757 self._readheader()
757 758 if size is None:
758 759 data = self._payloadstream.read()
759 760 else:
760 761 data = self._payloadstream.read(size)
761 762 if size is None or len(data) < size:
762 763 self.consumed = True
763 764 return data
764 765
765 766
766 767 @parthandler('b2x:changegroup')
767 768 def handlechangegroup(op, inpart):
768 769 """apply a changegroup part on the repo
769 770
770 771 This is a very early implementation that will massive rework before being
771 772 inflicted to any end-user.
772 773 """
773 774 # Make sure we trigger a transaction creation
774 775 #
775 776 # The addchangegroup function will get a transaction object by itself, but
776 777 # we need to make sure we trigger the creation of a transaction object used
777 778 # for the whole processing scope.
778 779 op.gettransaction()
779 780 cg = changegroup.unbundle10(inpart, 'UN')
780 781 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
781 782 op.records.add('changegroup', {'return': ret})
782 783 if op.reply is not None:
783 784 # This is definitly not the final form of this
784 785 # return. But one need to start somewhere.
785 786 part = op.reply.newpart('b2x:reply:changegroup')
786 787 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
787 788 part.addparam('return', '%i' % ret, mandatory=False)
788 789 assert not inpart.read()
789 790
790 791 @parthandler('b2x:reply:changegroup')
791 792 def handlechangegroup(op, inpart):
792 793 p = dict(inpart.advisoryparams)
793 794 ret = int(p['return'])
794 795 op.records.add('changegroup', {'return': ret}, int(p['in-reply-to']))
795 796
796 797 @parthandler('b2x:check:heads')
797 798 def handlechangegroup(op, inpart):
798 799 """check that head of the repo did not change
799 800
800 801 This is used to detect a push race when using unbundle.
801 802 This replaces the "heads" argument of unbundle."""
802 803 h = inpart.read(20)
803 804 heads = []
804 805 while len(h) == 20:
805 806 heads.append(h)
806 807 h = inpart.read(20)
807 808 assert not h
808 809 if heads != op.repo.heads():
809 810 raise error.PushRaced('repository changed while pushing - '
810 811 'please try again')
811 812
812 813 @parthandler('b2x:output')
813 814 def handleoutput(op, inpart):
814 815 """forward output captured on the server to the client"""
815 816 for line in inpart.read().splitlines():
816 817 op.ui.write(('remote: %s\n' % line))
817 818
818 819 @parthandler('b2x:replycaps')
819 820 def handlereplycaps(op, inpart):
820 821 """Notify that a reply bundle should be created
821 822
822 823 The payload contains the capabilities information for the reply"""
823 824 caps = decodecaps(inpart.read())
824 825 if op.reply is None:
825 826 op.reply = bundle20(op.ui, caps)
826 827
827 828 @parthandler('b2x:error:abort')
828 829 def handlereplycaps(op, inpart):
829 830 """Used to transmit abort error over the wire"""
830 831 manargs = dict(inpart.mandatoryparams)
831 832 advargs = dict(inpart.advisoryparams)
832 833 raise util.Abort(manargs['message'], hint=advargs.get('hint'))
833 834
834 835 @parthandler('b2x:error:unknownpart')
835 836 def handlereplycaps(op, inpart):
836 837 """Used to transmit unknown part error over the wire"""
837 838 manargs = dict(inpart.mandatoryparams)
838 839 raise UnknownPartError(manargs['parttype'])
839 840
840 841 @parthandler('b2x:error:pushraced')
841 842 def handlereplycaps(op, inpart):
842 843 """Used to transmit push race error over the wire"""
843 844 manargs = dict(inpart.mandatoryparams)
844 845 raise error.ResponseError(_('push failed:'), manargs['message'])
General Comments 0
You need to be logged in to leave comments. Login now