##// END OF EJS Templates
bundle2: add a `obsmarkersversion` function to extract supported version...
Pierre-Yves David -
r22344:9829b794 default
parent child Browse files
Show More
@@ -1,939 +1,946 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: (16 bits integer)
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: (16 bits inter)
68 68
69 69 The total number of Bytes used by the part headers. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 Bundle processing
129 129 ============================
130 130
131 131 Each part is processed in order using a "part handler". Handler are registered
132 132 for a certain part type.
133 133
134 134 The matching of a part to its handler is case insensitive. The case of the
135 135 part type is used to know if a part is mandatory or advisory. If the Part type
136 136 contains any uppercase char it is considered mandatory. When no handler is
137 137 known for a Mandatory part, the process is aborted and an exception is raised.
138 138 If the part is advisory and no handler is known, the part is ignored. When the
139 139 process is aborted, the full bundle is still read from the stream to keep the
140 140 channel usable. But none of the part read from an abort are processed. In the
141 141 future, dropping the stream may become an option for channel we do not care to
142 142 preserve.
143 143 """
144 144
145 145 import util
146 146 import struct
147 147 import urllib
148 148 import string
149 import obsolete
149 150 import pushkey
150 151
151 152 import changegroup, error
152 153 from i18n import _
153 154
154 155 _pack = struct.pack
155 156 _unpack = struct.unpack
156 157
157 158 _magicstring = 'HG2X'
158 159
159 160 _fstreamparamsize = '>H'
160 161 _fpartheadersize = '>H'
161 162 _fparttypesize = '>B'
162 163 _fpartid = '>I'
163 164 _fpayloadsize = '>I'
164 165 _fpartparamcount = '>BB'
165 166
166 167 preferedchunksize = 4096
167 168
168 169 def _makefpartparamsizes(nbparams):
169 170 """return a struct format to read part parameter sizes
170 171
171 172 The number parameters is variable so we need to build that format
172 173 dynamically.
173 174 """
174 175 return '>'+('BB'*nbparams)
175 176
176 177 parthandlermapping = {}
177 178
178 179 def parthandler(parttype, params=()):
179 180 """decorator that register a function as a bundle2 part handler
180 181
181 182 eg::
182 183
183 184 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
184 185 def myparttypehandler(...):
185 186 '''process a part of type "my part".'''
186 187 ...
187 188 """
188 189 def _decorator(func):
189 190 lparttype = parttype.lower() # enforce lower case matching.
190 191 assert lparttype not in parthandlermapping
191 192 parthandlermapping[lparttype] = func
192 193 func.params = frozenset(params)
193 194 return func
194 195 return _decorator
195 196
196 197 class unbundlerecords(object):
197 198 """keep record of what happens during and unbundle
198 199
199 200 New records are added using `records.add('cat', obj)`. Where 'cat' is a
200 201 category of record and obj is an arbitrary object.
201 202
202 203 `records['cat']` will return all entries of this category 'cat'.
203 204
204 205 Iterating on the object itself will yield `('category', obj)` tuples
205 206 for all entries.
206 207
207 208 All iterations happens in chronological order.
208 209 """
209 210
210 211 def __init__(self):
211 212 self._categories = {}
212 213 self._sequences = []
213 214 self._replies = {}
214 215
215 216 def add(self, category, entry, inreplyto=None):
216 217 """add a new record of a given category.
217 218
218 219 The entry can then be retrieved in the list returned by
219 220 self['category']."""
220 221 self._categories.setdefault(category, []).append(entry)
221 222 self._sequences.append((category, entry))
222 223 if inreplyto is not None:
223 224 self.getreplies(inreplyto).add(category, entry)
224 225
225 226 def getreplies(self, partid):
226 227 """get the subrecords that replies to a specific part"""
227 228 return self._replies.setdefault(partid, unbundlerecords())
228 229
229 230 def __getitem__(self, cat):
230 231 return tuple(self._categories.get(cat, ()))
231 232
232 233 def __iter__(self):
233 234 return iter(self._sequences)
234 235
235 236 def __len__(self):
236 237 return len(self._sequences)
237 238
238 239 def __nonzero__(self):
239 240 return bool(self._sequences)
240 241
241 242 class bundleoperation(object):
242 243 """an object that represents a single bundling process
243 244
244 245 Its purpose is to carry unbundle-related objects and states.
245 246
246 247 A new object should be created at the beginning of each bundle processing.
247 248 The object is to be returned by the processing function.
248 249
249 250 The object has very little content now it will ultimately contain:
250 251 * an access to the repo the bundle is applied to,
251 252 * a ui object,
252 253 * a way to retrieve a transaction to add changes to the repo,
253 254 * a way to record the result of processing each part,
254 255 * a way to construct a bundle response when applicable.
255 256 """
256 257
257 258 def __init__(self, repo, transactiongetter):
258 259 self.repo = repo
259 260 self.ui = repo.ui
260 261 self.records = unbundlerecords()
261 262 self.gettransaction = transactiongetter
262 263 self.reply = None
263 264
264 265 class TransactionUnavailable(RuntimeError):
265 266 pass
266 267
267 268 def _notransaction():
268 269 """default method to get a transaction while processing a bundle
269 270
270 271 Raise an exception to highlight the fact that no transaction was expected
271 272 to be created"""
272 273 raise TransactionUnavailable()
273 274
274 275 def processbundle(repo, unbundler, transactiongetter=_notransaction):
275 276 """This function process a bundle, apply effect to/from a repo
276 277
277 278 It iterates over each part then searches for and uses the proper handling
278 279 code to process the part. Parts are processed in order.
279 280
280 281 This is very early version of this function that will be strongly reworked
281 282 before final usage.
282 283
283 284 Unknown Mandatory part will abort the process.
284 285 """
285 286 op = bundleoperation(repo, transactiongetter)
286 287 # todo:
287 288 # - replace this is a init function soon.
288 289 # - exception catching
289 290 unbundler.params
290 291 iterparts = unbundler.iterparts()
291 292 part = None
292 293 try:
293 294 for part in iterparts:
294 295 parttype = part.type
295 296 # part key are matched lower case
296 297 key = parttype.lower()
297 298 try:
298 299 handler = parthandlermapping.get(key)
299 300 if handler is None:
300 301 raise error.BundleValueError(parttype=key)
301 302 op.ui.debug('found a handler for part %r\n' % parttype)
302 303 unknownparams = part.mandatorykeys - handler.params
303 304 if unknownparams:
304 305 unknownparams = list(unknownparams)
305 306 unknownparams.sort()
306 307 raise error.BundleValueError(parttype=key,
307 308 params=unknownparams)
308 309 except error.BundleValueError, exc:
309 310 if key != parttype: # mandatory parts
310 311 raise
311 312 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
312 313 # consuming the part
313 314 part.read()
314 315 continue
315 316
316 317
317 318 # handler is called outside the above try block so that we don't
318 319 # risk catching KeyErrors from anything other than the
319 320 # parthandlermapping lookup (any KeyError raised by handler()
320 321 # itself represents a defect of a different variety).
321 322 output = None
322 323 if op.reply is not None:
323 324 op.ui.pushbuffer(error=True)
324 325 output = ''
325 326 try:
326 327 handler(op, part)
327 328 finally:
328 329 if output is not None:
329 330 output = op.ui.popbuffer()
330 331 if output:
331 332 outpart = op.reply.newpart('b2x:output', data=output)
332 333 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
333 334 part.read()
334 335 except Exception, exc:
335 336 if part is not None:
336 337 # consume the bundle content
337 338 part.read()
338 339 for part in iterparts:
339 340 # consume the bundle content
340 341 part.read()
341 342 # Small hack to let caller code distinguish exceptions from bundle2
342 343 # processing fron the ones from bundle1 processing. This is mostly
343 344 # needed to handle different return codes to unbundle according to the
344 345 # type of bundle. We should probably clean up or drop this return code
345 346 # craziness in a future version.
346 347 exc.duringunbundle2 = True
347 348 raise
348 349 return op
349 350
350 351 def decodecaps(blob):
351 352 """decode a bundle2 caps bytes blob into a dictionnary
352 353
353 354 The blob is a list of capabilities (one per line)
354 355 Capabilities may have values using a line of the form::
355 356
356 357 capability=value1,value2,value3
357 358
358 359 The values are always a list."""
359 360 caps = {}
360 361 for line in blob.splitlines():
361 362 if not line:
362 363 continue
363 364 if '=' not in line:
364 365 key, vals = line, ()
365 366 else:
366 367 key, vals = line.split('=', 1)
367 368 vals = vals.split(',')
368 369 key = urllib.unquote(key)
369 370 vals = [urllib.unquote(v) for v in vals]
370 371 caps[key] = vals
371 372 return caps
372 373
373 374 def encodecaps(caps):
374 375 """encode a bundle2 caps dictionary into a bytes blob"""
375 376 chunks = []
376 377 for ca in sorted(caps):
377 378 vals = caps[ca]
378 379 ca = urllib.quote(ca)
379 380 vals = [urllib.quote(v) for v in vals]
380 381 if vals:
381 382 ca = "%s=%s" % (ca, ','.join(vals))
382 383 chunks.append(ca)
383 384 return '\n'.join(chunks)
384 385
385 386 class bundle20(object):
386 387 """represent an outgoing bundle2 container
387 388
388 389 Use the `addparam` method to add stream level parameter. and `newpart` to
389 390 populate it. Then call `getchunks` to retrieve all the binary chunks of
390 391 data that compose the bundle2 container."""
391 392
392 393 def __init__(self, ui, capabilities=()):
393 394 self.ui = ui
394 395 self._params = []
395 396 self._parts = []
396 397 self.capabilities = dict(capabilities)
397 398
398 399 @property
399 400 def nbparts(self):
400 401 """total number of parts added to the bundler"""
401 402 return len(self._parts)
402 403
403 404 # methods used to defines the bundle2 content
404 405 def addparam(self, name, value=None):
405 406 """add a stream level parameter"""
406 407 if not name:
407 408 raise ValueError('empty parameter name')
408 409 if name[0] not in string.letters:
409 410 raise ValueError('non letter first character: %r' % name)
410 411 self._params.append((name, value))
411 412
412 413 def addpart(self, part):
413 414 """add a new part to the bundle2 container
414 415
415 416 Parts contains the actual applicative payload."""
416 417 assert part.id is None
417 418 part.id = len(self._parts) # very cheap counter
418 419 self._parts.append(part)
419 420
420 421 def newpart(self, typeid, *args, **kwargs):
421 422 """create a new part and add it to the containers
422 423
423 424 As the part is directly added to the containers. For now, this means
424 425 that any failure to properly initialize the part after calling
425 426 ``newpart`` should result in a failure of the whole bundling process.
426 427
427 428 You can still fall back to manually create and add if you need better
428 429 control."""
429 430 part = bundlepart(typeid, *args, **kwargs)
430 431 self.addpart(part)
431 432 return part
432 433
433 434 # methods used to generate the bundle2 stream
434 435 def getchunks(self):
435 436 self.ui.debug('start emission of %s stream\n' % _magicstring)
436 437 yield _magicstring
437 438 param = self._paramchunk()
438 439 self.ui.debug('bundle parameter: %s\n' % param)
439 440 yield _pack(_fstreamparamsize, len(param))
440 441 if param:
441 442 yield param
442 443
443 444 self.ui.debug('start of parts\n')
444 445 for part in self._parts:
445 446 self.ui.debug('bundle part: "%s"\n' % part.type)
446 447 for chunk in part.getchunks():
447 448 yield chunk
448 449 self.ui.debug('end of bundle\n')
449 450 yield '\0\0'
450 451
451 452 def _paramchunk(self):
452 453 """return a encoded version of all stream parameters"""
453 454 blocks = []
454 455 for par, value in self._params:
455 456 par = urllib.quote(par)
456 457 if value is not None:
457 458 value = urllib.quote(value)
458 459 par = '%s=%s' % (par, value)
459 460 blocks.append(par)
460 461 return ' '.join(blocks)
461 462
462 463 class unpackermixin(object):
463 464 """A mixin to extract bytes and struct data from a stream"""
464 465
465 466 def __init__(self, fp):
466 467 self._fp = fp
467 468
468 469 def _unpack(self, format):
469 470 """unpack this struct format from the stream"""
470 471 data = self._readexact(struct.calcsize(format))
471 472 return _unpack(format, data)
472 473
473 474 def _readexact(self, size):
474 475 """read exactly <size> bytes from the stream"""
475 476 return changegroup.readexactly(self._fp, size)
476 477
477 478
478 479 class unbundle20(unpackermixin):
479 480 """interpret a bundle2 stream
480 481
481 482 This class is fed with a binary stream and yields parts through its
482 483 `iterparts` methods."""
483 484
484 485 def __init__(self, ui, fp, header=None):
485 486 """If header is specified, we do not read it out of the stream."""
486 487 self.ui = ui
487 488 super(unbundle20, self).__init__(fp)
488 489 if header is None:
489 490 header = self._readexact(4)
490 491 magic, version = header[0:2], header[2:4]
491 492 if magic != 'HG':
492 493 raise util.Abort(_('not a Mercurial bundle'))
493 494 if version != '2X':
494 495 raise util.Abort(_('unknown bundle version %s') % version)
495 496 self.ui.debug('start processing of %s stream\n' % header)
496 497
497 498 @util.propertycache
498 499 def params(self):
499 500 """dictionary of stream level parameters"""
500 501 self.ui.debug('reading bundle2 stream parameters\n')
501 502 params = {}
502 503 paramssize = self._unpack(_fstreamparamsize)[0]
503 504 if paramssize:
504 505 for p in self._readexact(paramssize).split(' '):
505 506 p = p.split('=', 1)
506 507 p = [urllib.unquote(i) for i in p]
507 508 if len(p) < 2:
508 509 p.append(None)
509 510 self._processparam(*p)
510 511 params[p[0]] = p[1]
511 512 return params
512 513
513 514 def _processparam(self, name, value):
514 515 """process a parameter, applying its effect if needed
515 516
516 517 Parameter starting with a lower case letter are advisory and will be
517 518 ignored when unknown. Those starting with an upper case letter are
518 519 mandatory and will this function will raise a KeyError when unknown.
519 520
520 521 Note: no option are currently supported. Any input will be either
521 522 ignored or failing.
522 523 """
523 524 if not name:
524 525 raise ValueError('empty parameter name')
525 526 if name[0] not in string.letters:
526 527 raise ValueError('non letter first character: %r' % name)
527 528 # Some logic will be later added here to try to process the option for
528 529 # a dict of known parameter.
529 530 if name[0].islower():
530 531 self.ui.debug("ignoring unknown parameter %r\n" % name)
531 532 else:
532 533 raise error.BundleValueError(params=(name,))
533 534
534 535
535 536 def iterparts(self):
536 537 """yield all parts contained in the stream"""
537 538 # make sure param have been loaded
538 539 self.params
539 540 self.ui.debug('start extraction of bundle2 parts\n')
540 541 headerblock = self._readpartheader()
541 542 while headerblock is not None:
542 543 part = unbundlepart(self.ui, headerblock, self._fp)
543 544 yield part
544 545 headerblock = self._readpartheader()
545 546 self.ui.debug('end of bundle2 stream\n')
546 547
547 548 def _readpartheader(self):
548 549 """reads a part header size and return the bytes blob
549 550
550 551 returns None if empty"""
551 552 headersize = self._unpack(_fpartheadersize)[0]
552 553 self.ui.debug('part header size: %i\n' % headersize)
553 554 if headersize:
554 555 return self._readexact(headersize)
555 556 return None
556 557
557 558
558 559 class bundlepart(object):
559 560 """A bundle2 part contains application level payload
560 561
561 562 The part `type` is used to route the part to the application level
562 563 handler.
563 564
564 565 The part payload is contained in ``part.data``. It could be raw bytes or a
565 566 generator of byte chunks.
566 567
567 568 You can add parameters to the part using the ``addparam`` method.
568 569 Parameters can be either mandatory (default) or advisory. Remote side
569 570 should be able to safely ignore the advisory ones.
570 571
571 572 Both data and parameters cannot be modified after the generation has begun.
572 573 """
573 574
574 575 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
575 576 data=''):
576 577 self.id = None
577 578 self.type = parttype
578 579 self._data = data
579 580 self._mandatoryparams = list(mandatoryparams)
580 581 self._advisoryparams = list(advisoryparams)
581 582 # checking for duplicated entries
582 583 self._seenparams = set()
583 584 for pname, __ in self._mandatoryparams + self._advisoryparams:
584 585 if pname in self._seenparams:
585 586 raise RuntimeError('duplicated params: %s' % pname)
586 587 self._seenparams.add(pname)
587 588 # status of the part's generation:
588 589 # - None: not started,
589 590 # - False: currently generated,
590 591 # - True: generation done.
591 592 self._generated = None
592 593
593 594 # methods used to defines the part content
594 595 def __setdata(self, data):
595 596 if self._generated is not None:
596 597 raise error.ReadOnlyPartError('part is being generated')
597 598 self._data = data
598 599 def __getdata(self):
599 600 return self._data
600 601 data = property(__getdata, __setdata)
601 602
602 603 @property
603 604 def mandatoryparams(self):
604 605 # make it an immutable tuple to force people through ``addparam``
605 606 return tuple(self._mandatoryparams)
606 607
607 608 @property
608 609 def advisoryparams(self):
609 610 # make it an immutable tuple to force people through ``addparam``
610 611 return tuple(self._advisoryparams)
611 612
612 613 def addparam(self, name, value='', mandatory=True):
613 614 if self._generated is not None:
614 615 raise error.ReadOnlyPartError('part is being generated')
615 616 if name in self._seenparams:
616 617 raise ValueError('duplicated params: %s' % name)
617 618 self._seenparams.add(name)
618 619 params = self._advisoryparams
619 620 if mandatory:
620 621 params = self._mandatoryparams
621 622 params.append((name, value))
622 623
623 624 # methods used to generates the bundle2 stream
624 625 def getchunks(self):
625 626 if self._generated is not None:
626 627 raise RuntimeError('part can only be consumed once')
627 628 self._generated = False
628 629 #### header
629 630 ## parttype
630 631 header = [_pack(_fparttypesize, len(self.type)),
631 632 self.type, _pack(_fpartid, self.id),
632 633 ]
633 634 ## parameters
634 635 # count
635 636 manpar = self.mandatoryparams
636 637 advpar = self.advisoryparams
637 638 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
638 639 # size
639 640 parsizes = []
640 641 for key, value in manpar:
641 642 parsizes.append(len(key))
642 643 parsizes.append(len(value))
643 644 for key, value in advpar:
644 645 parsizes.append(len(key))
645 646 parsizes.append(len(value))
646 647 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
647 648 header.append(paramsizes)
648 649 # key, value
649 650 for key, value in manpar:
650 651 header.append(key)
651 652 header.append(value)
652 653 for key, value in advpar:
653 654 header.append(key)
654 655 header.append(value)
655 656 ## finalize header
656 657 headerchunk = ''.join(header)
657 658 yield _pack(_fpartheadersize, len(headerchunk))
658 659 yield headerchunk
659 660 ## payload
660 661 for chunk in self._payloadchunks():
661 662 yield _pack(_fpayloadsize, len(chunk))
662 663 yield chunk
663 664 # end of payload
664 665 yield _pack(_fpayloadsize, 0)
665 666 self._generated = True
666 667
667 668 def _payloadchunks(self):
668 669 """yield chunks of a the part payload
669 670
670 671 Exists to handle the different methods to provide data to a part."""
671 672 # we only support fixed size data now.
672 673 # This will be improved in the future.
673 674 if util.safehasattr(self.data, 'next'):
674 675 buff = util.chunkbuffer(self.data)
675 676 chunk = buff.read(preferedchunksize)
676 677 while chunk:
677 678 yield chunk
678 679 chunk = buff.read(preferedchunksize)
679 680 elif len(self.data):
680 681 yield self.data
681 682
682 683 class unbundlepart(unpackermixin):
683 684 """a bundle part read from a bundle"""
684 685
685 686 def __init__(self, ui, header, fp):
686 687 super(unbundlepart, self).__init__(fp)
687 688 self.ui = ui
688 689 # unbundle state attr
689 690 self._headerdata = header
690 691 self._headeroffset = 0
691 692 self._initialized = False
692 693 self.consumed = False
693 694 # part data
694 695 self.id = None
695 696 self.type = None
696 697 self.mandatoryparams = None
697 698 self.advisoryparams = None
698 699 self.params = None
699 700 self.mandatorykeys = ()
700 701 self._payloadstream = None
701 702 self._readheader()
702 703
703 704 def _fromheader(self, size):
704 705 """return the next <size> byte from the header"""
705 706 offset = self._headeroffset
706 707 data = self._headerdata[offset:(offset + size)]
707 708 self._headeroffset = offset + size
708 709 return data
709 710
710 711 def _unpackheader(self, format):
711 712 """read given format from header
712 713
713 714 This automatically compute the size of the format to read."""
714 715 data = self._fromheader(struct.calcsize(format))
715 716 return _unpack(format, data)
716 717
717 718 def _initparams(self, mandatoryparams, advisoryparams):
718 719 """internal function to setup all logic related parameters"""
719 720 # make it read only to prevent people touching it by mistake.
720 721 self.mandatoryparams = tuple(mandatoryparams)
721 722 self.advisoryparams = tuple(advisoryparams)
722 723 # user friendly UI
723 724 self.params = dict(self.mandatoryparams)
724 725 self.params.update(dict(self.advisoryparams))
725 726 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
726 727
727 728 def _readheader(self):
728 729 """read the header and setup the object"""
729 730 typesize = self._unpackheader(_fparttypesize)[0]
730 731 self.type = self._fromheader(typesize)
731 732 self.ui.debug('part type: "%s"\n' % self.type)
732 733 self.id = self._unpackheader(_fpartid)[0]
733 734 self.ui.debug('part id: "%s"\n' % self.id)
734 735 ## reading parameters
735 736 # param count
736 737 mancount, advcount = self._unpackheader(_fpartparamcount)
737 738 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
738 739 # param size
739 740 fparamsizes = _makefpartparamsizes(mancount + advcount)
740 741 paramsizes = self._unpackheader(fparamsizes)
741 742 # make it a list of couple again
742 743 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
743 744 # split mandatory from advisory
744 745 mansizes = paramsizes[:mancount]
745 746 advsizes = paramsizes[mancount:]
746 747 # retrive param value
747 748 manparams = []
748 749 for key, value in mansizes:
749 750 manparams.append((self._fromheader(key), self._fromheader(value)))
750 751 advparams = []
751 752 for key, value in advsizes:
752 753 advparams.append((self._fromheader(key), self._fromheader(value)))
753 754 self._initparams(manparams, advparams)
754 755 ## part payload
755 756 def payloadchunks():
756 757 payloadsize = self._unpack(_fpayloadsize)[0]
757 758 self.ui.debug('payload chunk size: %i\n' % payloadsize)
758 759 while payloadsize:
759 760 yield self._readexact(payloadsize)
760 761 payloadsize = self._unpack(_fpayloadsize)[0]
761 762 self.ui.debug('payload chunk size: %i\n' % payloadsize)
762 763 self._payloadstream = util.chunkbuffer(payloadchunks())
763 764 # we read the data, tell it
764 765 self._initialized = True
765 766
766 767 def read(self, size=None):
767 768 """read payload data"""
768 769 if not self._initialized:
769 770 self._readheader()
770 771 if size is None:
771 772 data = self._payloadstream.read()
772 773 else:
773 774 data = self._payloadstream.read(size)
774 775 if size is None or len(data) < size:
775 776 self.consumed = True
776 777 return data
777 778
778 779 capabilities = {'HG2X': (),
779 780 'b2x:listkeys': (),
780 781 'b2x:pushkey': (),
781 782 'b2x:changegroup': (),
782 783 }
783 784
784 785 def getrepocaps(repo):
785 786 """return the bundle2 capabilities for a given repo
786 787
787 788 Exists to allow extensions (like evolution) to mutate the capabilities.
788 789 """
789 790 caps = capabilities.copy()
790 791 if obsolete._enabled:
791 792 supportedformat = tuple('V%i' % v for v in obsolete.formats)
792 793 caps['b2x:obsmarkers'] = supportedformat
793 794 return caps
794 795
795 796 def bundle2caps(remote):
796 797 """return the bundlecapabilities of a peer as dict"""
797 798 raw = remote.capable('bundle2-exp')
798 799 if not raw and raw != '':
799 800 return {}
800 801 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
801 802 return decodecaps(capsblob)
802 803
804 def obsmarkersversion(caps):
805 """extract the list of supported obsmarkers versions from a bundle2caps dict
806 """
807 obscaps = caps.get('b2x:obsmarkers', ())
808 return [int(c[1:]) for c in obscaps if c.startswith('V')]
809
803 810 @parthandler('b2x:changegroup')
804 811 def handlechangegroup(op, inpart):
805 812 """apply a changegroup part on the repo
806 813
807 814 This is a very early implementation that will massive rework before being
808 815 inflicted to any end-user.
809 816 """
810 817 # Make sure we trigger a transaction creation
811 818 #
812 819 # The addchangegroup function will get a transaction object by itself, but
813 820 # we need to make sure we trigger the creation of a transaction object used
814 821 # for the whole processing scope.
815 822 op.gettransaction()
816 823 cg = changegroup.unbundle10(inpart, 'UN')
817 824 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
818 825 op.records.add('changegroup', {'return': ret})
819 826 if op.reply is not None:
820 827 # This is definitly not the final form of this
821 828 # return. But one need to start somewhere.
822 829 part = op.reply.newpart('b2x:reply:changegroup')
823 830 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
824 831 part.addparam('return', '%i' % ret, mandatory=False)
825 832 assert not inpart.read()
826 833
827 834 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
828 835 def handlechangegroup(op, inpart):
829 836 ret = int(inpart.params['return'])
830 837 replyto = int(inpart.params['in-reply-to'])
831 838 op.records.add('changegroup', {'return': ret}, replyto)
832 839
833 840 @parthandler('b2x:check:heads')
834 841 def handlechangegroup(op, inpart):
835 842 """check that head of the repo did not change
836 843
837 844 This is used to detect a push race when using unbundle.
838 845 This replaces the "heads" argument of unbundle."""
839 846 h = inpart.read(20)
840 847 heads = []
841 848 while len(h) == 20:
842 849 heads.append(h)
843 850 h = inpart.read(20)
844 851 assert not h
845 852 if heads != op.repo.heads():
846 853 raise error.PushRaced('repository changed while pushing - '
847 854 'please try again')
848 855
849 856 @parthandler('b2x:output')
850 857 def handleoutput(op, inpart):
851 858 """forward output captured on the server to the client"""
852 859 for line in inpart.read().splitlines():
853 860 op.ui.write(('remote: %s\n' % line))
854 861
855 862 @parthandler('b2x:replycaps')
856 863 def handlereplycaps(op, inpart):
857 864 """Notify that a reply bundle should be created
858 865
859 866 The payload contains the capabilities information for the reply"""
860 867 caps = decodecaps(inpart.read())
861 868 if op.reply is None:
862 869 op.reply = bundle20(op.ui, caps)
863 870
864 871 @parthandler('b2x:error:abort', ('message', 'hint'))
865 872 def handlereplycaps(op, inpart):
866 873 """Used to transmit abort error over the wire"""
867 874 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
868 875
869 876 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
870 877 def handlereplycaps(op, inpart):
871 878 """Used to transmit unknown content error over the wire"""
872 879 kwargs = {}
873 880 parttype = inpart.params.get('parttype')
874 881 if parttype is not None:
875 882 kwargs['parttype'] = parttype
876 883 params = inpart.params.get('params')
877 884 if params is not None:
878 885 kwargs['params'] = params.split('\0')
879 886
880 887 raise error.BundleValueError(**kwargs)
881 888
882 889 @parthandler('b2x:error:pushraced', ('message',))
883 890 def handlereplycaps(op, inpart):
884 891 """Used to transmit push race error over the wire"""
885 892 raise error.ResponseError(_('push failed:'), inpart.params['message'])
886 893
887 894 @parthandler('b2x:listkeys', ('namespace',))
888 895 def handlelistkeys(op, inpart):
889 896 """retrieve pushkey namespace content stored in a bundle2"""
890 897 namespace = inpart.params['namespace']
891 898 r = pushkey.decodekeys(inpart.read())
892 899 op.records.add('listkeys', (namespace, r))
893 900
894 901 @parthandler('b2x:pushkey', ('namespace', 'key', 'old', 'new'))
895 902 def handlepushkey(op, inpart):
896 903 """process a pushkey request"""
897 904 dec = pushkey.decode
898 905 namespace = dec(inpart.params['namespace'])
899 906 key = dec(inpart.params['key'])
900 907 old = dec(inpart.params['old'])
901 908 new = dec(inpart.params['new'])
902 909 ret = op.repo.pushkey(namespace, key, old, new)
903 910 record = {'namespace': namespace,
904 911 'key': key,
905 912 'old': old,
906 913 'new': new}
907 914 op.records.add('pushkey', record)
908 915 if op.reply is not None:
909 916 rpart = op.reply.newpart('b2x:reply:pushkey')
910 917 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
911 918 rpart.addparam('return', '%i' % ret, mandatory=False)
912 919
913 920 @parthandler('b2x:reply:pushkey', ('return', 'in-reply-to'))
914 921 def handlepushkeyreply(op, inpart):
915 922 """retrieve the result of a pushkey request"""
916 923 ret = int(inpart.params['return'])
917 924 partid = int(inpart.params['in-reply-to'])
918 925 op.records.add('pushkey', {'return': ret}, partid)
919 926
920 927 @parthandler('b2x:obsmarkers')
921 928 def handleobsmarker(op, inpart):
922 929 """add a stream of obsmarkers to the repo"""
923 930 tr = op.gettransaction()
924 931 new = op.repo.obsstore.mergemarkers(tr, inpart.read())
925 932 if new:
926 933 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
927 934 op.records.add('obsmarkers', {'new': new})
928 935 if op.reply is not None:
929 936 rpart = op.reply.newpart('b2x:reply:obsmarkers')
930 937 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
931 938 rpart.addparam('new', '%i' % new, mandatory=False)
932 939
933 940
934 941 @parthandler('b2x:reply:obsmarkers', ('new', 'in-reply-to'))
935 942 def handlepushkeyreply(op, inpart):
936 943 """retrieve the result of a pushkey request"""
937 944 ret = int(inpart.params['new'])
938 945 partid = int(inpart.params['in-reply-to'])
939 946 op.records.add('obsmarkers', {'new': ret}, partid)
General Comments 0
You need to be logged in to leave comments. Login now