##// END OF EJS Templates
bundle2: warn about error during initialization in ``newpart`` docstring...
Pierre-Yves David -
r21602:cc33ae50 default
parent child Browse files
Show More
@@ -1,784 +1,791 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: (16 bits integer)
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: (16 bits inter)
68 68
69 69 The total number of Bytes used by the part headers. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 :payload:
117 117
118 118 payload is a series of `<chunksize><chunkdata>`.
119 119
120 120 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
121 121 `chunksize` says)` The payload part is concluded by a zero size chunk.
122 122
123 123 The current implementation always produces either zero or one chunk.
124 124 This is an implementation limitation that will ultimately be lifted.
125 125
126 126 Bundle processing
127 127 ============================
128 128
129 129 Each part is processed in order using a "part handler". Handler are registered
130 130 for a certain part type.
131 131
132 132 The matching of a part to its handler is case insensitive. The case of the
133 133 part type is used to know if a part is mandatory or advisory. If the Part type
134 134 contains any uppercase char it is considered mandatory. When no handler is
135 135 known for a Mandatory part, the process is aborted and an exception is raised.
136 136 If the part is advisory and no handler is known, the part is ignored. When the
137 137 process is aborted, the full bundle is still read from the stream to keep the
138 138 channel usable. But none of the part read from an abort are processed. In the
139 139 future, dropping the stream may become an option for channel we do not care to
140 140 preserve.
141 141 """
142 142
143 143 import util
144 144 import struct
145 145 import urllib
146 146 import string
147 147
148 148 import changegroup, error
149 149 from i18n import _
150 150
151 151 _pack = struct.pack
152 152 _unpack = struct.unpack
153 153
154 154 _magicstring = 'HG2X'
155 155
156 156 _fstreamparamsize = '>H'
157 157 _fpartheadersize = '>H'
158 158 _fparttypesize = '>B'
159 159 _fpartid = '>I'
160 160 _fpayloadsize = '>I'
161 161 _fpartparamcount = '>BB'
162 162
163 163 preferedchunksize = 4096
164 164
165 165 def _makefpartparamsizes(nbparams):
166 166 """return a struct format to read part parameter sizes
167 167
168 168 The number parameters is variable so we need to build that format
169 169 dynamically.
170 170 """
171 171 return '>'+('BB'*nbparams)
172 172
173 173 class UnknownPartError(KeyError):
174 174 """error raised when no handler is found for a Mandatory part"""
175 175 pass
176 176
177 177 parthandlermapping = {}
178 178
179 179 def parthandler(parttype):
180 180 """decorator that register a function as a bundle2 part handler
181 181
182 182 eg::
183 183
184 184 @parthandler('myparttype')
185 185 def myparttypehandler(...):
186 186 '''process a part of type "my part".'''
187 187 ...
188 188 """
189 189 def _decorator(func):
190 190 lparttype = parttype.lower() # enforce lower case matching.
191 191 assert lparttype not in parthandlermapping
192 192 parthandlermapping[lparttype] = func
193 193 return func
194 194 return _decorator
195 195
196 196 class unbundlerecords(object):
197 197 """keep record of what happens during and unbundle
198 198
199 199 New records are added using `records.add('cat', obj)`. Where 'cat' is a
200 200 category of record and obj is an arbitrary object.
201 201
202 202 `records['cat']` will return all entries of this category 'cat'.
203 203
204 204 Iterating on the object itself will yield `('category', obj)` tuples
205 205 for all entries.
206 206
207 207 All iterations happens in chronological order.
208 208 """
209 209
210 210 def __init__(self):
211 211 self._categories = {}
212 212 self._sequences = []
213 213 self._replies = {}
214 214
215 215 def add(self, category, entry, inreplyto=None):
216 216 """add a new record of a given category.
217 217
218 218 The entry can then be retrieved in the list returned by
219 219 self['category']."""
220 220 self._categories.setdefault(category, []).append(entry)
221 221 self._sequences.append((category, entry))
222 222 if inreplyto is not None:
223 223 self.getreplies(inreplyto).add(category, entry)
224 224
225 225 def getreplies(self, partid):
226 226 """get the subrecords that replies to a specific part"""
227 227 return self._replies.setdefault(partid, unbundlerecords())
228 228
229 229 def __getitem__(self, cat):
230 230 return tuple(self._categories.get(cat, ()))
231 231
232 232 def __iter__(self):
233 233 return iter(self._sequences)
234 234
235 235 def __len__(self):
236 236 return len(self._sequences)
237 237
238 238 def __nonzero__(self):
239 239 return bool(self._sequences)
240 240
241 241 class bundleoperation(object):
242 242 """an object that represents a single bundling process
243 243
244 244 Its purpose is to carry unbundle-related objects and states.
245 245
246 246 A new object should be created at the beginning of each bundle processing.
247 247 The object is to be returned by the processing function.
248 248
249 249 The object has very little content now it will ultimately contain:
250 250 * an access to the repo the bundle is applied to,
251 251 * a ui object,
252 252 * a way to retrieve a transaction to add changes to the repo,
253 253 * a way to record the result of processing each part,
254 254 * a way to construct a bundle response when applicable.
255 255 """
256 256
257 257 def __init__(self, repo, transactiongetter):
258 258 self.repo = repo
259 259 self.ui = repo.ui
260 260 self.records = unbundlerecords()
261 261 self.gettransaction = transactiongetter
262 262 self.reply = None
263 263
264 264 class TransactionUnavailable(RuntimeError):
265 265 pass
266 266
267 267 def _notransaction():
268 268 """default method to get a transaction while processing a bundle
269 269
270 270 Raise an exception to highlight the fact that no transaction was expected
271 271 to be created"""
272 272 raise TransactionUnavailable()
273 273
274 274 def processbundle(repo, unbundler, transactiongetter=_notransaction):
275 275 """This function process a bundle, apply effect to/from a repo
276 276
277 277 It iterates over each part then searches for and uses the proper handling
278 278 code to process the part. Parts are processed in order.
279 279
280 280 This is very early version of this function that will be strongly reworked
281 281 before final usage.
282 282
283 283 Unknown Mandatory part will abort the process.
284 284 """
285 285 op = bundleoperation(repo, transactiongetter)
286 286 # todo:
287 287 # - replace this is a init function soon.
288 288 # - exception catching
289 289 unbundler.params
290 290 iterparts = unbundler.iterparts()
291 291 part = None
292 292 try:
293 293 for part in iterparts:
294 294 parttype = part.type
295 295 # part key are matched lower case
296 296 key = parttype.lower()
297 297 try:
298 298 handler = parthandlermapping[key]
299 299 op.ui.debug('found a handler for part %r\n' % parttype)
300 300 except KeyError:
301 301 if key != parttype: # mandatory parts
302 302 # todo:
303 303 # - use a more precise exception
304 304 raise UnknownPartError(key)
305 305 op.ui.debug('ignoring unknown advisory part %r\n' % key)
306 306 # consuming the part
307 307 part.read()
308 308 continue
309 309
310 310 # handler is called outside the above try block so that we don't
311 311 # risk catching KeyErrors from anything other than the
312 312 # parthandlermapping lookup (any KeyError raised by handler()
313 313 # itself represents a defect of a different variety).
314 314 output = None
315 315 if op.reply is not None:
316 316 op.ui.pushbuffer(error=True)
317 317 output = ''
318 318 try:
319 319 handler(op, part)
320 320 finally:
321 321 if output is not None:
322 322 output = op.ui.popbuffer()
323 323 if output:
324 324 op.reply.newpart('b2x:output',
325 325 advisoryparams=[('in-reply-to',
326 326 str(part.id))],
327 327 data=output)
328 328 part.read()
329 329 except Exception, exc:
330 330 if part is not None:
331 331 # consume the bundle content
332 332 part.read()
333 333 for part in iterparts:
334 334 # consume the bundle content
335 335 part.read()
336 336 # Small hack to let caller code distinguish exceptions from bundle2
337 337 # processing fron the ones from bundle1 processing. This is mostly
338 338 # needed to handle different return codes to unbundle according to the
339 339 # type of bundle. We should probably clean up or drop this return code
340 340 # craziness in a future version.
341 341 exc.duringunbundle2 = True
342 342 raise
343 343 return op
344 344
345 345 def decodecaps(blob):
346 346 """decode a bundle2 caps bytes blob into a dictionnary
347 347
348 348 The blob is a list of capabilities (one per line)
349 349 Capabilities may have values using a line of the form::
350 350
351 351 capability=value1,value2,value3
352 352
353 353 The values are always a list."""
354 354 caps = {}
355 355 for line in blob.splitlines():
356 356 if not line:
357 357 continue
358 358 if '=' not in line:
359 359 key, vals = line, ()
360 360 else:
361 361 key, vals = line.split('=', 1)
362 362 vals = vals.split(',')
363 363 key = urllib.unquote(key)
364 364 vals = [urllib.unquote(v) for v in vals]
365 365 caps[key] = vals
366 366 return caps
367 367
368 368 def encodecaps(caps):
369 369 """encode a bundle2 caps dictionary into a bytes blob"""
370 370 chunks = []
371 371 for ca in sorted(caps):
372 372 vals = caps[ca]
373 373 ca = urllib.quote(ca)
374 374 vals = [urllib.quote(v) for v in vals]
375 375 if vals:
376 376 ca = "%s=%s" % (ca, ','.join(vals))
377 377 chunks.append(ca)
378 378 return '\n'.join(chunks)
379 379
380 380 class bundle20(object):
381 381 """represent an outgoing bundle2 container
382 382
383 383 Use the `addparam` method to add stream level parameter. and `newpart` to
384 384 populate it. Then call `getchunks` to retrieve all the binary chunks of
385 385 data that compose the bundle2 container."""
386 386
387 387 def __init__(self, ui, capabilities=()):
388 388 self.ui = ui
389 389 self._params = []
390 390 self._parts = []
391 391 self.capabilities = dict(capabilities)
392 392
393 393 # methods used to defines the bundle2 content
394 394 def addparam(self, name, value=None):
395 395 """add a stream level parameter"""
396 396 if not name:
397 397 raise ValueError('empty parameter name')
398 398 if name[0] not in string.letters:
399 399 raise ValueError('non letter first character: %r' % name)
400 400 self._params.append((name, value))
401 401
402 402 def addpart(self, part):
403 403 """add a new part to the bundle2 container
404 404
405 405 Parts contains the actual applicative payload."""
406 406 assert part.id is None
407 407 part.id = len(self._parts) # very cheap counter
408 408 self._parts.append(part)
409 409
410 410 def newpart(self, typeid, *args, **kwargs):
411 """create a new part and add it to the containers"""
411 """create a new part and add it to the containers
412
413 As the part is directly added to the containers. For now, this means
414 that any failure to properly initialize the part after calling
415 ``newpart`` should result in a failure of the whole bundling process.
416
417 You can still fall back to manually create and add if you need better
418 control."""
412 419 part = bundlepart(typeid, *args, **kwargs)
413 420 self.addpart(part)
414 421 return part
415 422
416 423 # methods used to generate the bundle2 stream
417 424 def getchunks(self):
418 425 self.ui.debug('start emission of %s stream\n' % _magicstring)
419 426 yield _magicstring
420 427 param = self._paramchunk()
421 428 self.ui.debug('bundle parameter: %s\n' % param)
422 429 yield _pack(_fstreamparamsize, len(param))
423 430 if param:
424 431 yield param
425 432
426 433 self.ui.debug('start of parts\n')
427 434 for part in self._parts:
428 435 self.ui.debug('bundle part: "%s"\n' % part.type)
429 436 for chunk in part.getchunks():
430 437 yield chunk
431 438 self.ui.debug('end of bundle\n')
432 439 yield '\0\0'
433 440
434 441 def _paramchunk(self):
435 442 """return a encoded version of all stream parameters"""
436 443 blocks = []
437 444 for par, value in self._params:
438 445 par = urllib.quote(par)
439 446 if value is not None:
440 447 value = urllib.quote(value)
441 448 par = '%s=%s' % (par, value)
442 449 blocks.append(par)
443 450 return ' '.join(blocks)
444 451
445 452 class unpackermixin(object):
446 453 """A mixin to extract bytes and struct data from a stream"""
447 454
448 455 def __init__(self, fp):
449 456 self._fp = fp
450 457
451 458 def _unpack(self, format):
452 459 """unpack this struct format from the stream"""
453 460 data = self._readexact(struct.calcsize(format))
454 461 return _unpack(format, data)
455 462
456 463 def _readexact(self, size):
457 464 """read exactly <size> bytes from the stream"""
458 465 return changegroup.readexactly(self._fp, size)
459 466
460 467
461 468 class unbundle20(unpackermixin):
462 469 """interpret a bundle2 stream
463 470
464 471 This class is fed with a binary stream and yields parts through its
465 472 `iterparts` methods."""
466 473
467 474 def __init__(self, ui, fp, header=None):
468 475 """If header is specified, we do not read it out of the stream."""
469 476 self.ui = ui
470 477 super(unbundle20, self).__init__(fp)
471 478 if header is None:
472 479 header = self._readexact(4)
473 480 magic, version = header[0:2], header[2:4]
474 481 if magic != 'HG':
475 482 raise util.Abort(_('not a Mercurial bundle'))
476 483 if version != '2X':
477 484 raise util.Abort(_('unknown bundle version %s') % version)
478 485 self.ui.debug('start processing of %s stream\n' % header)
479 486
480 487 @util.propertycache
481 488 def params(self):
482 489 """dictionary of stream level parameters"""
483 490 self.ui.debug('reading bundle2 stream parameters\n')
484 491 params = {}
485 492 paramssize = self._unpack(_fstreamparamsize)[0]
486 493 if paramssize:
487 494 for p in self._readexact(paramssize).split(' '):
488 495 p = p.split('=', 1)
489 496 p = [urllib.unquote(i) for i in p]
490 497 if len(p) < 2:
491 498 p.append(None)
492 499 self._processparam(*p)
493 500 params[p[0]] = p[1]
494 501 return params
495 502
496 503 def _processparam(self, name, value):
497 504 """process a parameter, applying its effect if needed
498 505
499 506 Parameter starting with a lower case letter are advisory and will be
500 507 ignored when unknown. Those starting with an upper case letter are
501 508 mandatory and will this function will raise a KeyError when unknown.
502 509
503 510 Note: no option are currently supported. Any input will be either
504 511 ignored or failing.
505 512 """
506 513 if not name:
507 514 raise ValueError('empty parameter name')
508 515 if name[0] not in string.letters:
509 516 raise ValueError('non letter first character: %r' % name)
510 517 # Some logic will be later added here to try to process the option for
511 518 # a dict of known parameter.
512 519 if name[0].islower():
513 520 self.ui.debug("ignoring unknown parameter %r\n" % name)
514 521 else:
515 522 raise KeyError(name)
516 523
517 524
518 525 def iterparts(self):
519 526 """yield all parts contained in the stream"""
520 527 # make sure param have been loaded
521 528 self.params
522 529 self.ui.debug('start extraction of bundle2 parts\n')
523 530 headerblock = self._readpartheader()
524 531 while headerblock is not None:
525 532 part = unbundlepart(self.ui, headerblock, self._fp)
526 533 yield part
527 534 headerblock = self._readpartheader()
528 535 self.ui.debug('end of bundle2 stream\n')
529 536
530 537 def _readpartheader(self):
531 538 """reads a part header size and return the bytes blob
532 539
533 540 returns None if empty"""
534 541 headersize = self._unpack(_fpartheadersize)[0]
535 542 self.ui.debug('part header size: %i\n' % headersize)
536 543 if headersize:
537 544 return self._readexact(headersize)
538 545 return None
539 546
540 547
541 548 class bundlepart(object):
542 549 """A bundle2 part contains application level payload
543 550
544 551 The part `type` is used to route the part to the application level
545 552 handler.
546 553 """
547 554
548 555 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
549 556 data=''):
550 557 self.id = None
551 558 self.type = parttype
552 559 self.data = data
553 560 self.mandatoryparams = mandatoryparams
554 561 self.advisoryparams = advisoryparams
555 562 # status of the part's generation:
556 563 # - None: not started,
557 564 # - False: currently generated,
558 565 # - True: generation done.
559 566 self._generated = None
560 567
561 568 # methods used to generates the bundle2 stream
562 569 def getchunks(self):
563 570 if self._generated is not None:
564 571 raise RuntimeError('part can only be consumed once')
565 572 self._generated = False
566 573 #### header
567 574 ## parttype
568 575 header = [_pack(_fparttypesize, len(self.type)),
569 576 self.type, _pack(_fpartid, self.id),
570 577 ]
571 578 ## parameters
572 579 # count
573 580 manpar = self.mandatoryparams
574 581 advpar = self.advisoryparams
575 582 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
576 583 # size
577 584 parsizes = []
578 585 for key, value in manpar:
579 586 parsizes.append(len(key))
580 587 parsizes.append(len(value))
581 588 for key, value in advpar:
582 589 parsizes.append(len(key))
583 590 parsizes.append(len(value))
584 591 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
585 592 header.append(paramsizes)
586 593 # key, value
587 594 for key, value in manpar:
588 595 header.append(key)
589 596 header.append(value)
590 597 for key, value in advpar:
591 598 header.append(key)
592 599 header.append(value)
593 600 ## finalize header
594 601 headerchunk = ''.join(header)
595 602 yield _pack(_fpartheadersize, len(headerchunk))
596 603 yield headerchunk
597 604 ## payload
598 605 for chunk in self._payloadchunks():
599 606 yield _pack(_fpayloadsize, len(chunk))
600 607 yield chunk
601 608 # end of payload
602 609 yield _pack(_fpayloadsize, 0)
603 610 self._generated = True
604 611
605 612 def _payloadchunks(self):
606 613 """yield chunks of a the part payload
607 614
608 615 Exists to handle the different methods to provide data to a part."""
609 616 # we only support fixed size data now.
610 617 # This will be improved in the future.
611 618 if util.safehasattr(self.data, 'next'):
612 619 buff = util.chunkbuffer(self.data)
613 620 chunk = buff.read(preferedchunksize)
614 621 while chunk:
615 622 yield chunk
616 623 chunk = buff.read(preferedchunksize)
617 624 elif len(self.data):
618 625 yield self.data
619 626
620 627 class unbundlepart(unpackermixin):
621 628 """a bundle part read from a bundle"""
622 629
623 630 def __init__(self, ui, header, fp):
624 631 super(unbundlepart, self).__init__(fp)
625 632 self.ui = ui
626 633 # unbundle state attr
627 634 self._headerdata = header
628 635 self._headeroffset = 0
629 636 self._initialized = False
630 637 self.consumed = False
631 638 # part data
632 639 self.id = None
633 640 self.type = None
634 641 self.mandatoryparams = None
635 642 self.advisoryparams = None
636 643 self._payloadstream = None
637 644 self._readheader()
638 645
639 646 def _fromheader(self, size):
640 647 """return the next <size> byte from the header"""
641 648 offset = self._headeroffset
642 649 data = self._headerdata[offset:(offset + size)]
643 650 self._headeroffset = offset + size
644 651 return data
645 652
646 653 def _unpackheader(self, format):
647 654 """read given format from header
648 655
649 656 This automatically compute the size of the format to read."""
650 657 data = self._fromheader(struct.calcsize(format))
651 658 return _unpack(format, data)
652 659
653 660 def _readheader(self):
654 661 """read the header and setup the object"""
655 662 typesize = self._unpackheader(_fparttypesize)[0]
656 663 self.type = self._fromheader(typesize)
657 664 self.ui.debug('part type: "%s"\n' % self.type)
658 665 self.id = self._unpackheader(_fpartid)[0]
659 666 self.ui.debug('part id: "%s"\n' % self.id)
660 667 ## reading parameters
661 668 # param count
662 669 mancount, advcount = self._unpackheader(_fpartparamcount)
663 670 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
664 671 # param size
665 672 fparamsizes = _makefpartparamsizes(mancount + advcount)
666 673 paramsizes = self._unpackheader(fparamsizes)
667 674 # make it a list of couple again
668 675 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
669 676 # split mandatory from advisory
670 677 mansizes = paramsizes[:mancount]
671 678 advsizes = paramsizes[mancount:]
672 679 # retrive param value
673 680 manparams = []
674 681 for key, value in mansizes:
675 682 manparams.append((self._fromheader(key), self._fromheader(value)))
676 683 advparams = []
677 684 for key, value in advsizes:
678 685 advparams.append((self._fromheader(key), self._fromheader(value)))
679 686 self.mandatoryparams = manparams
680 687 self.advisoryparams = advparams
681 688 ## part payload
682 689 def payloadchunks():
683 690 payloadsize = self._unpack(_fpayloadsize)[0]
684 691 self.ui.debug('payload chunk size: %i\n' % payloadsize)
685 692 while payloadsize:
686 693 yield self._readexact(payloadsize)
687 694 payloadsize = self._unpack(_fpayloadsize)[0]
688 695 self.ui.debug('payload chunk size: %i\n' % payloadsize)
689 696 self._payloadstream = util.chunkbuffer(payloadchunks())
690 697 # we read the data, tell it
691 698 self._initialized = True
692 699
693 700 def read(self, size=None):
694 701 """read payload data"""
695 702 if not self._initialized:
696 703 self._readheader()
697 704 if size is None:
698 705 data = self._payloadstream.read()
699 706 else:
700 707 data = self._payloadstream.read(size)
701 708 if size is None or len(data) < size:
702 709 self.consumed = True
703 710 return data
704 711
705 712
706 713 @parthandler('b2x:changegroup')
707 714 def handlechangegroup(op, inpart):
708 715 """apply a changegroup part on the repo
709 716
710 717 This is a very early implementation that will massive rework before being
711 718 inflicted to any end-user.
712 719 """
713 720 # Make sure we trigger a transaction creation
714 721 #
715 722 # The addchangegroup function will get a transaction object by itself, but
716 723 # we need to make sure we trigger the creation of a transaction object used
717 724 # for the whole processing scope.
718 725 op.gettransaction()
719 726 cg = changegroup.unbundle10(inpart, 'UN')
720 727 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
721 728 op.records.add('changegroup', {'return': ret})
722 729 if op.reply is not None:
723 730 # This is definitly not the final form of this
724 731 # return. But one need to start somewhere.
725 732 op.reply.newpart('b2x:reply:changegroup', (),
726 733 [('in-reply-to', str(inpart.id)),
727 734 ('return', '%i' % ret)])
728 735 assert not inpart.read()
729 736
730 737 @parthandler('b2x:reply:changegroup')
731 738 def handlechangegroup(op, inpart):
732 739 p = dict(inpart.advisoryparams)
733 740 ret = int(p['return'])
734 741 op.records.add('changegroup', {'return': ret}, int(p['in-reply-to']))
735 742
736 743 @parthandler('b2x:check:heads')
737 744 def handlechangegroup(op, inpart):
738 745 """check that head of the repo did not change
739 746
740 747 This is used to detect a push race when using unbundle.
741 748 This replaces the "heads" argument of unbundle."""
742 749 h = inpart.read(20)
743 750 heads = []
744 751 while len(h) == 20:
745 752 heads.append(h)
746 753 h = inpart.read(20)
747 754 assert not h
748 755 if heads != op.repo.heads():
749 756 raise error.PushRaced('repository changed while pushing - '
750 757 'please try again')
751 758
752 759 @parthandler('b2x:output')
753 760 def handleoutput(op, inpart):
754 761 """forward output captured on the server to the client"""
755 762 for line in inpart.read().splitlines():
756 763 op.ui.write(('remote: %s\n' % line))
757 764
758 765 @parthandler('b2x:replycaps')
759 766 def handlereplycaps(op, inpart):
760 767 """Notify that a reply bundle should be created
761 768
762 769 The payload contains the capabilities information for the reply"""
763 770 caps = decodecaps(inpart.read())
764 771 if op.reply is None:
765 772 op.reply = bundle20(op.ui, caps)
766 773
767 774 @parthandler('b2x:error:abort')
768 775 def handlereplycaps(op, inpart):
769 776 """Used to transmit abort error over the wire"""
770 777 manargs = dict(inpart.mandatoryparams)
771 778 advargs = dict(inpart.advisoryparams)
772 779 raise util.Abort(manargs['message'], hint=advargs.get('hint'))
773 780
774 781 @parthandler('b2x:error:unknownpart')
775 782 def handlereplycaps(op, inpart):
776 783 """Used to transmit unknown part error over the wire"""
777 784 manargs = dict(inpart.mandatoryparams)
778 785 raise UnknownPartError(manargs['parttype'])
779 786
780 787 @parthandler('b2x:error:pushraced')
781 788 def handlereplycaps(op, inpart):
782 789 """Used to transmit push race error over the wire"""
783 790 manargs = dict(inpart.mandatoryparams)
784 791 raise error.ResponseError(_('push failed:'), manargs['message'])
General Comments 0
You need to be logged in to leave comments. Login now