##// END OF EJS Templates
bundle2: add an obsmarkers part handler...
Pierre-Yves David -
r22336:60786c8a default
parent child Browse files
Show More
@@ -1,901 +1,907
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: (16 bits integer)
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: (16 bits inter)
68 68
69 69 The total number of Bytes used by the part headers. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 Bundle processing
129 129 ============================
130 130
131 131 Each part is processed in order using a "part handler". Handler are registered
132 132 for a certain part type.
133 133
134 134 The matching of a part to its handler is case insensitive. The case of the
135 135 part type is used to know if a part is mandatory or advisory. If the Part type
136 136 contains any uppercase char it is considered mandatory. When no handler is
137 137 known for a Mandatory part, the process is aborted and an exception is raised.
138 138 If the part is advisory and no handler is known, the part is ignored. When the
139 139 process is aborted, the full bundle is still read from the stream to keep the
140 140 channel usable. But none of the part read from an abort are processed. In the
141 141 future, dropping the stream may become an option for channel we do not care to
142 142 preserve.
143 143 """
144 144
145 145 import util
146 146 import struct
147 147 import urllib
148 148 import string
149 149 import pushkey
150 150
151 151 import changegroup, error
152 152 from i18n import _
153 153
154 154 _pack = struct.pack
155 155 _unpack = struct.unpack
156 156
157 157 _magicstring = 'HG2X'
158 158
159 159 _fstreamparamsize = '>H'
160 160 _fpartheadersize = '>H'
161 161 _fparttypesize = '>B'
162 162 _fpartid = '>I'
163 163 _fpayloadsize = '>I'
164 164 _fpartparamcount = '>BB'
165 165
166 166 preferedchunksize = 4096
167 167
168 168 def _makefpartparamsizes(nbparams):
169 169 """return a struct format to read part parameter sizes
170 170
171 171 The number parameters is variable so we need to build that format
172 172 dynamically.
173 173 """
174 174 return '>'+('BB'*nbparams)
175 175
176 176 parthandlermapping = {}
177 177
178 178 def parthandler(parttype, params=()):
179 179 """decorator that register a function as a bundle2 part handler
180 180
181 181 eg::
182 182
183 183 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
184 184 def myparttypehandler(...):
185 185 '''process a part of type "my part".'''
186 186 ...
187 187 """
188 188 def _decorator(func):
189 189 lparttype = parttype.lower() # enforce lower case matching.
190 190 assert lparttype not in parthandlermapping
191 191 parthandlermapping[lparttype] = func
192 192 func.params = frozenset(params)
193 193 return func
194 194 return _decorator
195 195
196 196 class unbundlerecords(object):
197 197 """keep record of what happens during and unbundle
198 198
199 199 New records are added using `records.add('cat', obj)`. Where 'cat' is a
200 200 category of record and obj is an arbitrary object.
201 201
202 202 `records['cat']` will return all entries of this category 'cat'.
203 203
204 204 Iterating on the object itself will yield `('category', obj)` tuples
205 205 for all entries.
206 206
207 207 All iterations happens in chronological order.
208 208 """
209 209
210 210 def __init__(self):
211 211 self._categories = {}
212 212 self._sequences = []
213 213 self._replies = {}
214 214
215 215 def add(self, category, entry, inreplyto=None):
216 216 """add a new record of a given category.
217 217
218 218 The entry can then be retrieved in the list returned by
219 219 self['category']."""
220 220 self._categories.setdefault(category, []).append(entry)
221 221 self._sequences.append((category, entry))
222 222 if inreplyto is not None:
223 223 self.getreplies(inreplyto).add(category, entry)
224 224
225 225 def getreplies(self, partid):
226 226 """get the subrecords that replies to a specific part"""
227 227 return self._replies.setdefault(partid, unbundlerecords())
228 228
229 229 def __getitem__(self, cat):
230 230 return tuple(self._categories.get(cat, ()))
231 231
232 232 def __iter__(self):
233 233 return iter(self._sequences)
234 234
235 235 def __len__(self):
236 236 return len(self._sequences)
237 237
238 238 def __nonzero__(self):
239 239 return bool(self._sequences)
240 240
241 241 class bundleoperation(object):
242 242 """an object that represents a single bundling process
243 243
244 244 Its purpose is to carry unbundle-related objects and states.
245 245
246 246 A new object should be created at the beginning of each bundle processing.
247 247 The object is to be returned by the processing function.
248 248
249 249 The object has very little content now it will ultimately contain:
250 250 * an access to the repo the bundle is applied to,
251 251 * a ui object,
252 252 * a way to retrieve a transaction to add changes to the repo,
253 253 * a way to record the result of processing each part,
254 254 * a way to construct a bundle response when applicable.
255 255 """
256 256
257 257 def __init__(self, repo, transactiongetter):
258 258 self.repo = repo
259 259 self.ui = repo.ui
260 260 self.records = unbundlerecords()
261 261 self.gettransaction = transactiongetter
262 262 self.reply = None
263 263
264 264 class TransactionUnavailable(RuntimeError):
265 265 pass
266 266
267 267 def _notransaction():
268 268 """default method to get a transaction while processing a bundle
269 269
270 270 Raise an exception to highlight the fact that no transaction was expected
271 271 to be created"""
272 272 raise TransactionUnavailable()
273 273
274 274 def processbundle(repo, unbundler, transactiongetter=_notransaction):
275 275 """This function process a bundle, apply effect to/from a repo
276 276
277 277 It iterates over each part then searches for and uses the proper handling
278 278 code to process the part. Parts are processed in order.
279 279
280 280 This is very early version of this function that will be strongly reworked
281 281 before final usage.
282 282
283 283 Unknown Mandatory part will abort the process.
284 284 """
285 285 op = bundleoperation(repo, transactiongetter)
286 286 # todo:
287 287 # - replace this is a init function soon.
288 288 # - exception catching
289 289 unbundler.params
290 290 iterparts = unbundler.iterparts()
291 291 part = None
292 292 try:
293 293 for part in iterparts:
294 294 parttype = part.type
295 295 # part key are matched lower case
296 296 key = parttype.lower()
297 297 try:
298 298 handler = parthandlermapping.get(key)
299 299 if handler is None:
300 300 raise error.BundleValueError(parttype=key)
301 301 op.ui.debug('found a handler for part %r\n' % parttype)
302 302 unknownparams = part.mandatorykeys - handler.params
303 303 if unknownparams:
304 304 unknownparams = list(unknownparams)
305 305 unknownparams.sort()
306 306 raise error.BundleValueError(parttype=key,
307 307 params=unknownparams)
308 308 except error.BundleValueError, exc:
309 309 if key != parttype: # mandatory parts
310 310 raise
311 311 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
312 312 # consuming the part
313 313 part.read()
314 314 continue
315 315
316 316
317 317 # handler is called outside the above try block so that we don't
318 318 # risk catching KeyErrors from anything other than the
319 319 # parthandlermapping lookup (any KeyError raised by handler()
320 320 # itself represents a defect of a different variety).
321 321 output = None
322 322 if op.reply is not None:
323 323 op.ui.pushbuffer(error=True)
324 324 output = ''
325 325 try:
326 326 handler(op, part)
327 327 finally:
328 328 if output is not None:
329 329 output = op.ui.popbuffer()
330 330 if output:
331 331 outpart = op.reply.newpart('b2x:output', data=output)
332 332 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
333 333 part.read()
334 334 except Exception, exc:
335 335 if part is not None:
336 336 # consume the bundle content
337 337 part.read()
338 338 for part in iterparts:
339 339 # consume the bundle content
340 340 part.read()
341 341 # Small hack to let caller code distinguish exceptions from bundle2
342 342 # processing fron the ones from bundle1 processing. This is mostly
343 343 # needed to handle different return codes to unbundle according to the
344 344 # type of bundle. We should probably clean up or drop this return code
345 345 # craziness in a future version.
346 346 exc.duringunbundle2 = True
347 347 raise
348 348 return op
349 349
350 350 def decodecaps(blob):
351 351 """decode a bundle2 caps bytes blob into a dictionnary
352 352
353 353 The blob is a list of capabilities (one per line)
354 354 Capabilities may have values using a line of the form::
355 355
356 356 capability=value1,value2,value3
357 357
358 358 The values are always a list."""
359 359 caps = {}
360 360 for line in blob.splitlines():
361 361 if not line:
362 362 continue
363 363 if '=' not in line:
364 364 key, vals = line, ()
365 365 else:
366 366 key, vals = line.split('=', 1)
367 367 vals = vals.split(',')
368 368 key = urllib.unquote(key)
369 369 vals = [urllib.unquote(v) for v in vals]
370 370 caps[key] = vals
371 371 return caps
372 372
373 373 def encodecaps(caps):
374 374 """encode a bundle2 caps dictionary into a bytes blob"""
375 375 chunks = []
376 376 for ca in sorted(caps):
377 377 vals = caps[ca]
378 378 ca = urllib.quote(ca)
379 379 vals = [urllib.quote(v) for v in vals]
380 380 if vals:
381 381 ca = "%s=%s" % (ca, ','.join(vals))
382 382 chunks.append(ca)
383 383 return '\n'.join(chunks)
384 384
385 385 class bundle20(object):
386 386 """represent an outgoing bundle2 container
387 387
388 388 Use the `addparam` method to add stream level parameter. and `newpart` to
389 389 populate it. Then call `getchunks` to retrieve all the binary chunks of
390 390 data that compose the bundle2 container."""
391 391
392 392 def __init__(self, ui, capabilities=()):
393 393 self.ui = ui
394 394 self._params = []
395 395 self._parts = []
396 396 self.capabilities = dict(capabilities)
397 397
398 398 @property
399 399 def nbparts(self):
400 400 """total number of parts added to the bundler"""
401 401 return len(self._parts)
402 402
403 403 # methods used to defines the bundle2 content
404 404 def addparam(self, name, value=None):
405 405 """add a stream level parameter"""
406 406 if not name:
407 407 raise ValueError('empty parameter name')
408 408 if name[0] not in string.letters:
409 409 raise ValueError('non letter first character: %r' % name)
410 410 self._params.append((name, value))
411 411
412 412 def addpart(self, part):
413 413 """add a new part to the bundle2 container
414 414
415 415 Parts contains the actual applicative payload."""
416 416 assert part.id is None
417 417 part.id = len(self._parts) # very cheap counter
418 418 self._parts.append(part)
419 419
420 420 def newpart(self, typeid, *args, **kwargs):
421 421 """create a new part and add it to the containers
422 422
423 423 As the part is directly added to the containers. For now, this means
424 424 that any failure to properly initialize the part after calling
425 425 ``newpart`` should result in a failure of the whole bundling process.
426 426
427 427 You can still fall back to manually create and add if you need better
428 428 control."""
429 429 part = bundlepart(typeid, *args, **kwargs)
430 430 self.addpart(part)
431 431 return part
432 432
433 433 # methods used to generate the bundle2 stream
434 434 def getchunks(self):
435 435 self.ui.debug('start emission of %s stream\n' % _magicstring)
436 436 yield _magicstring
437 437 param = self._paramchunk()
438 438 self.ui.debug('bundle parameter: %s\n' % param)
439 439 yield _pack(_fstreamparamsize, len(param))
440 440 if param:
441 441 yield param
442 442
443 443 self.ui.debug('start of parts\n')
444 444 for part in self._parts:
445 445 self.ui.debug('bundle part: "%s"\n' % part.type)
446 446 for chunk in part.getchunks():
447 447 yield chunk
448 448 self.ui.debug('end of bundle\n')
449 449 yield '\0\0'
450 450
451 451 def _paramchunk(self):
452 452 """return a encoded version of all stream parameters"""
453 453 blocks = []
454 454 for par, value in self._params:
455 455 par = urllib.quote(par)
456 456 if value is not None:
457 457 value = urllib.quote(value)
458 458 par = '%s=%s' % (par, value)
459 459 blocks.append(par)
460 460 return ' '.join(blocks)
461 461
462 462 class unpackermixin(object):
463 463 """A mixin to extract bytes and struct data from a stream"""
464 464
465 465 def __init__(self, fp):
466 466 self._fp = fp
467 467
468 468 def _unpack(self, format):
469 469 """unpack this struct format from the stream"""
470 470 data = self._readexact(struct.calcsize(format))
471 471 return _unpack(format, data)
472 472
473 473 def _readexact(self, size):
474 474 """read exactly <size> bytes from the stream"""
475 475 return changegroup.readexactly(self._fp, size)
476 476
477 477
478 478 class unbundle20(unpackermixin):
479 479 """interpret a bundle2 stream
480 480
481 481 This class is fed with a binary stream and yields parts through its
482 482 `iterparts` methods."""
483 483
484 484 def __init__(self, ui, fp, header=None):
485 485 """If header is specified, we do not read it out of the stream."""
486 486 self.ui = ui
487 487 super(unbundle20, self).__init__(fp)
488 488 if header is None:
489 489 header = self._readexact(4)
490 490 magic, version = header[0:2], header[2:4]
491 491 if magic != 'HG':
492 492 raise util.Abort(_('not a Mercurial bundle'))
493 493 if version != '2X':
494 494 raise util.Abort(_('unknown bundle version %s') % version)
495 495 self.ui.debug('start processing of %s stream\n' % header)
496 496
497 497 @util.propertycache
498 498 def params(self):
499 499 """dictionary of stream level parameters"""
500 500 self.ui.debug('reading bundle2 stream parameters\n')
501 501 params = {}
502 502 paramssize = self._unpack(_fstreamparamsize)[0]
503 503 if paramssize:
504 504 for p in self._readexact(paramssize).split(' '):
505 505 p = p.split('=', 1)
506 506 p = [urllib.unquote(i) for i in p]
507 507 if len(p) < 2:
508 508 p.append(None)
509 509 self._processparam(*p)
510 510 params[p[0]] = p[1]
511 511 return params
512 512
513 513 def _processparam(self, name, value):
514 514 """process a parameter, applying its effect if needed
515 515
516 516 Parameter starting with a lower case letter are advisory and will be
517 517 ignored when unknown. Those starting with an upper case letter are
518 518 mandatory and will this function will raise a KeyError when unknown.
519 519
520 520 Note: no option are currently supported. Any input will be either
521 521 ignored or failing.
522 522 """
523 523 if not name:
524 524 raise ValueError('empty parameter name')
525 525 if name[0] not in string.letters:
526 526 raise ValueError('non letter first character: %r' % name)
527 527 # Some logic will be later added here to try to process the option for
528 528 # a dict of known parameter.
529 529 if name[0].islower():
530 530 self.ui.debug("ignoring unknown parameter %r\n" % name)
531 531 else:
532 532 raise error.BundleValueError(params=(name,))
533 533
534 534
535 535 def iterparts(self):
536 536 """yield all parts contained in the stream"""
537 537 # make sure param have been loaded
538 538 self.params
539 539 self.ui.debug('start extraction of bundle2 parts\n')
540 540 headerblock = self._readpartheader()
541 541 while headerblock is not None:
542 542 part = unbundlepart(self.ui, headerblock, self._fp)
543 543 yield part
544 544 headerblock = self._readpartheader()
545 545 self.ui.debug('end of bundle2 stream\n')
546 546
547 547 def _readpartheader(self):
548 548 """reads a part header size and return the bytes blob
549 549
550 550 returns None if empty"""
551 551 headersize = self._unpack(_fpartheadersize)[0]
552 552 self.ui.debug('part header size: %i\n' % headersize)
553 553 if headersize:
554 554 return self._readexact(headersize)
555 555 return None
556 556
557 557
558 558 class bundlepart(object):
559 559 """A bundle2 part contains application level payload
560 560
561 561 The part `type` is used to route the part to the application level
562 562 handler.
563 563
564 564 The part payload is contained in ``part.data``. It could be raw bytes or a
565 565 generator of byte chunks.
566 566
567 567 You can add parameters to the part using the ``addparam`` method.
568 568 Parameters can be either mandatory (default) or advisory. Remote side
569 569 should be able to safely ignore the advisory ones.
570 570
571 571 Both data and parameters cannot be modified after the generation has begun.
572 572 """
573 573
574 574 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
575 575 data=''):
576 576 self.id = None
577 577 self.type = parttype
578 578 self._data = data
579 579 self._mandatoryparams = list(mandatoryparams)
580 580 self._advisoryparams = list(advisoryparams)
581 581 # checking for duplicated entries
582 582 self._seenparams = set()
583 583 for pname, __ in self._mandatoryparams + self._advisoryparams:
584 584 if pname in self._seenparams:
585 585 raise RuntimeError('duplicated params: %s' % pname)
586 586 self._seenparams.add(pname)
587 587 # status of the part's generation:
588 588 # - None: not started,
589 589 # - False: currently generated,
590 590 # - True: generation done.
591 591 self._generated = None
592 592
593 593 # methods used to defines the part content
594 594 def __setdata(self, data):
595 595 if self._generated is not None:
596 596 raise error.ReadOnlyPartError('part is being generated')
597 597 self._data = data
598 598 def __getdata(self):
599 599 return self._data
600 600 data = property(__getdata, __setdata)
601 601
602 602 @property
603 603 def mandatoryparams(self):
604 604 # make it an immutable tuple to force people through ``addparam``
605 605 return tuple(self._mandatoryparams)
606 606
607 607 @property
608 608 def advisoryparams(self):
609 609 # make it an immutable tuple to force people through ``addparam``
610 610 return tuple(self._advisoryparams)
611 611
612 612 def addparam(self, name, value='', mandatory=True):
613 613 if self._generated is not None:
614 614 raise error.ReadOnlyPartError('part is being generated')
615 615 if name in self._seenparams:
616 616 raise ValueError('duplicated params: %s' % name)
617 617 self._seenparams.add(name)
618 618 params = self._advisoryparams
619 619 if mandatory:
620 620 params = self._mandatoryparams
621 621 params.append((name, value))
622 622
623 623 # methods used to generates the bundle2 stream
624 624 def getchunks(self):
625 625 if self._generated is not None:
626 626 raise RuntimeError('part can only be consumed once')
627 627 self._generated = False
628 628 #### header
629 629 ## parttype
630 630 header = [_pack(_fparttypesize, len(self.type)),
631 631 self.type, _pack(_fpartid, self.id),
632 632 ]
633 633 ## parameters
634 634 # count
635 635 manpar = self.mandatoryparams
636 636 advpar = self.advisoryparams
637 637 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
638 638 # size
639 639 parsizes = []
640 640 for key, value in manpar:
641 641 parsizes.append(len(key))
642 642 parsizes.append(len(value))
643 643 for key, value in advpar:
644 644 parsizes.append(len(key))
645 645 parsizes.append(len(value))
646 646 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
647 647 header.append(paramsizes)
648 648 # key, value
649 649 for key, value in manpar:
650 650 header.append(key)
651 651 header.append(value)
652 652 for key, value in advpar:
653 653 header.append(key)
654 654 header.append(value)
655 655 ## finalize header
656 656 headerchunk = ''.join(header)
657 657 yield _pack(_fpartheadersize, len(headerchunk))
658 658 yield headerchunk
659 659 ## payload
660 660 for chunk in self._payloadchunks():
661 661 yield _pack(_fpayloadsize, len(chunk))
662 662 yield chunk
663 663 # end of payload
664 664 yield _pack(_fpayloadsize, 0)
665 665 self._generated = True
666 666
667 667 def _payloadchunks(self):
668 668 """yield chunks of a the part payload
669 669
670 670 Exists to handle the different methods to provide data to a part."""
671 671 # we only support fixed size data now.
672 672 # This will be improved in the future.
673 673 if util.safehasattr(self.data, 'next'):
674 674 buff = util.chunkbuffer(self.data)
675 675 chunk = buff.read(preferedchunksize)
676 676 while chunk:
677 677 yield chunk
678 678 chunk = buff.read(preferedchunksize)
679 679 elif len(self.data):
680 680 yield self.data
681 681
682 682 class unbundlepart(unpackermixin):
683 683 """a bundle part read from a bundle"""
684 684
685 685 def __init__(self, ui, header, fp):
686 686 super(unbundlepart, self).__init__(fp)
687 687 self.ui = ui
688 688 # unbundle state attr
689 689 self._headerdata = header
690 690 self._headeroffset = 0
691 691 self._initialized = False
692 692 self.consumed = False
693 693 # part data
694 694 self.id = None
695 695 self.type = None
696 696 self.mandatoryparams = None
697 697 self.advisoryparams = None
698 698 self.params = None
699 699 self.mandatorykeys = ()
700 700 self._payloadstream = None
701 701 self._readheader()
702 702
703 703 def _fromheader(self, size):
704 704 """return the next <size> byte from the header"""
705 705 offset = self._headeroffset
706 706 data = self._headerdata[offset:(offset + size)]
707 707 self._headeroffset = offset + size
708 708 return data
709 709
710 710 def _unpackheader(self, format):
711 711 """read given format from header
712 712
713 713 This automatically compute the size of the format to read."""
714 714 data = self._fromheader(struct.calcsize(format))
715 715 return _unpack(format, data)
716 716
717 717 def _initparams(self, mandatoryparams, advisoryparams):
718 718 """internal function to setup all logic related parameters"""
719 719 # make it read only to prevent people touching it by mistake.
720 720 self.mandatoryparams = tuple(mandatoryparams)
721 721 self.advisoryparams = tuple(advisoryparams)
722 722 # user friendly UI
723 723 self.params = dict(self.mandatoryparams)
724 724 self.params.update(dict(self.advisoryparams))
725 725 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
726 726
727 727 def _readheader(self):
728 728 """read the header and setup the object"""
729 729 typesize = self._unpackheader(_fparttypesize)[0]
730 730 self.type = self._fromheader(typesize)
731 731 self.ui.debug('part type: "%s"\n' % self.type)
732 732 self.id = self._unpackheader(_fpartid)[0]
733 733 self.ui.debug('part id: "%s"\n' % self.id)
734 734 ## reading parameters
735 735 # param count
736 736 mancount, advcount = self._unpackheader(_fpartparamcount)
737 737 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
738 738 # param size
739 739 fparamsizes = _makefpartparamsizes(mancount + advcount)
740 740 paramsizes = self._unpackheader(fparamsizes)
741 741 # make it a list of couple again
742 742 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
743 743 # split mandatory from advisory
744 744 mansizes = paramsizes[:mancount]
745 745 advsizes = paramsizes[mancount:]
746 746 # retrive param value
747 747 manparams = []
748 748 for key, value in mansizes:
749 749 manparams.append((self._fromheader(key), self._fromheader(value)))
750 750 advparams = []
751 751 for key, value in advsizes:
752 752 advparams.append((self._fromheader(key), self._fromheader(value)))
753 753 self._initparams(manparams, advparams)
754 754 ## part payload
755 755 def payloadchunks():
756 756 payloadsize = self._unpack(_fpayloadsize)[0]
757 757 self.ui.debug('payload chunk size: %i\n' % payloadsize)
758 758 while payloadsize:
759 759 yield self._readexact(payloadsize)
760 760 payloadsize = self._unpack(_fpayloadsize)[0]
761 761 self.ui.debug('payload chunk size: %i\n' % payloadsize)
762 762 self._payloadstream = util.chunkbuffer(payloadchunks())
763 763 # we read the data, tell it
764 764 self._initialized = True
765 765
766 766 def read(self, size=None):
767 767 """read payload data"""
768 768 if not self._initialized:
769 769 self._readheader()
770 770 if size is None:
771 771 data = self._payloadstream.read()
772 772 else:
773 773 data = self._payloadstream.read(size)
774 774 if size is None or len(data) < size:
775 775 self.consumed = True
776 776 return data
777 777
778 778 def bundle2caps(remote):
779 779 """return the bundlecapabilities of a peer as dict"""
780 780 raw = remote.capable('bundle2-exp')
781 781 if not raw and raw != '':
782 782 return {}
783 783 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
784 784 return decodecaps(capsblob)
785 785
786 786 @parthandler('b2x:changegroup')
787 787 def handlechangegroup(op, inpart):
788 788 """apply a changegroup part on the repo
789 789
790 790 This is a very early implementation that will massive rework before being
791 791 inflicted to any end-user.
792 792 """
793 793 # Make sure we trigger a transaction creation
794 794 #
795 795 # The addchangegroup function will get a transaction object by itself, but
796 796 # we need to make sure we trigger the creation of a transaction object used
797 797 # for the whole processing scope.
798 798 op.gettransaction()
799 799 cg = changegroup.unbundle10(inpart, 'UN')
800 800 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
801 801 op.records.add('changegroup', {'return': ret})
802 802 if op.reply is not None:
803 803 # This is definitly not the final form of this
804 804 # return. But one need to start somewhere.
805 805 part = op.reply.newpart('b2x:reply:changegroup')
806 806 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
807 807 part.addparam('return', '%i' % ret, mandatory=False)
808 808 assert not inpart.read()
809 809
810 810 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
811 811 def handlechangegroup(op, inpart):
812 812 ret = int(inpart.params['return'])
813 813 replyto = int(inpart.params['in-reply-to'])
814 814 op.records.add('changegroup', {'return': ret}, replyto)
815 815
816 816 @parthandler('b2x:check:heads')
817 817 def handlechangegroup(op, inpart):
818 818 """check that head of the repo did not change
819 819
820 820 This is used to detect a push race when using unbundle.
821 821 This replaces the "heads" argument of unbundle."""
822 822 h = inpart.read(20)
823 823 heads = []
824 824 while len(h) == 20:
825 825 heads.append(h)
826 826 h = inpart.read(20)
827 827 assert not h
828 828 if heads != op.repo.heads():
829 829 raise error.PushRaced('repository changed while pushing - '
830 830 'please try again')
831 831
832 832 @parthandler('b2x:output')
833 833 def handleoutput(op, inpart):
834 834 """forward output captured on the server to the client"""
835 835 for line in inpart.read().splitlines():
836 836 op.ui.write(('remote: %s\n' % line))
837 837
838 838 @parthandler('b2x:replycaps')
839 839 def handlereplycaps(op, inpart):
840 840 """Notify that a reply bundle should be created
841 841
842 842 The payload contains the capabilities information for the reply"""
843 843 caps = decodecaps(inpart.read())
844 844 if op.reply is None:
845 845 op.reply = bundle20(op.ui, caps)
846 846
847 847 @parthandler('b2x:error:abort', ('message', 'hint'))
848 848 def handlereplycaps(op, inpart):
849 849 """Used to transmit abort error over the wire"""
850 850 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
851 851
852 852 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
853 853 def handlereplycaps(op, inpart):
854 854 """Used to transmit unknown content error over the wire"""
855 855 kwargs = {}
856 856 parttype = inpart.params.get('parttype')
857 857 if parttype is not None:
858 858 kwargs['parttype'] = parttype
859 859 params = inpart.params.get('params')
860 860 if params is not None:
861 861 kwargs['params'] = params.split('\0')
862 862
863 863 raise error.BundleValueError(**kwargs)
864 864
865 865 @parthandler('b2x:error:pushraced', ('message',))
866 866 def handlereplycaps(op, inpart):
867 867 """Used to transmit push race error over the wire"""
868 868 raise error.ResponseError(_('push failed:'), inpart.params['message'])
869 869
870 870 @parthandler('b2x:listkeys', ('namespace',))
871 871 def handlelistkeys(op, inpart):
872 872 """retrieve pushkey namespace content stored in a bundle2"""
873 873 namespace = inpart.params['namespace']
874 874 r = pushkey.decodekeys(inpart.read())
875 875 op.records.add('listkeys', (namespace, r))
876 876
877 877 @parthandler('b2x:pushkey', ('namespace', 'key', 'old', 'new'))
878 878 def handlepushkey(op, inpart):
879 879 """process a pushkey request"""
880 880 dec = pushkey.decode
881 881 namespace = dec(inpart.params['namespace'])
882 882 key = dec(inpart.params['key'])
883 883 old = dec(inpart.params['old'])
884 884 new = dec(inpart.params['new'])
885 885 ret = op.repo.pushkey(namespace, key, old, new)
886 886 record = {'namespace': namespace,
887 887 'key': key,
888 888 'old': old,
889 889 'new': new}
890 890 op.records.add('pushkey', record)
891 891 if op.reply is not None:
892 892 rpart = op.reply.newpart('b2x:reply:pushkey')
893 893 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
894 894 rpart.addparam('return', '%i' % ret, mandatory=False)
895 895
896 896 @parthandler('b2x:reply:pushkey', ('return', 'in-reply-to'))
897 897 def handlepushkeyreply(op, inpart):
898 898 """retrieve the result of a pushkey request"""
899 899 ret = int(inpart.params['return'])
900 900 partid = int(inpart.params['in-reply-to'])
901 901 op.records.add('pushkey', {'return': ret}, partid)
902
903 @parthandler('b2x:obsmarkers')
904 def handleobsmarker(op, inpart):
905 """add a stream of obsmarkers to the repo"""
906 tr = op.gettransaction()
907 op.repo.obsstore.mergemarkers(tr, inpart.read())
General Comments 0
You need to be logged in to leave comments. Login now