##// END OF EJS Templates
bundle2.processbundle: let callers request default behavior...
Eric Sumner -
r23438:6e0ecb9a default
parent child Browse files
Show More
@@ -1,1119 +1,1121
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part headers. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 import sys
149 149 import util
150 150 import struct
151 151 import urllib
152 152 import string
153 153 import obsolete
154 154 import pushkey
155 155 import url
156 156
157 157 import changegroup, error
158 158 from i18n import _
159 159
160 160 _pack = struct.pack
161 161 _unpack = struct.unpack
162 162
163 163 _magicstring = 'HG2Y'
164 164
165 165 _fstreamparamsize = '>i'
166 166 _fpartheadersize = '>i'
167 167 _fparttypesize = '>B'
168 168 _fpartid = '>I'
169 169 _fpayloadsize = '>i'
170 170 _fpartparamcount = '>BB'
171 171
172 172 preferedchunksize = 4096
173 173
174 174 def _makefpartparamsizes(nbparams):
175 175 """return a struct format to read part parameter sizes
176 176
177 177 The number parameters is variable so we need to build that format
178 178 dynamically.
179 179 """
180 180 return '>'+('BB'*nbparams)
181 181
182 182 parthandlermapping = {}
183 183
184 184 def parthandler(parttype, params=()):
185 185 """decorator that register a function as a bundle2 part handler
186 186
187 187 eg::
188 188
189 189 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
190 190 def myparttypehandler(...):
191 191 '''process a part of type "my part".'''
192 192 ...
193 193 """
194 194 def _decorator(func):
195 195 lparttype = parttype.lower() # enforce lower case matching.
196 196 assert lparttype not in parthandlermapping
197 197 parthandlermapping[lparttype] = func
198 198 func.params = frozenset(params)
199 199 return func
200 200 return _decorator
201 201
202 202 class unbundlerecords(object):
203 203 """keep record of what happens during and unbundle
204 204
205 205 New records are added using `records.add('cat', obj)`. Where 'cat' is a
206 206 category of record and obj is an arbitrary object.
207 207
208 208 `records['cat']` will return all entries of this category 'cat'.
209 209
210 210 Iterating on the object itself will yield `('category', obj)` tuples
211 211 for all entries.
212 212
213 213 All iterations happens in chronological order.
214 214 """
215 215
216 216 def __init__(self):
217 217 self._categories = {}
218 218 self._sequences = []
219 219 self._replies = {}
220 220
221 221 def add(self, category, entry, inreplyto=None):
222 222 """add a new record of a given category.
223 223
224 224 The entry can then be retrieved in the list returned by
225 225 self['category']."""
226 226 self._categories.setdefault(category, []).append(entry)
227 227 self._sequences.append((category, entry))
228 228 if inreplyto is not None:
229 229 self.getreplies(inreplyto).add(category, entry)
230 230
231 231 def getreplies(self, partid):
232 232 """get the records that are replies to a specific part"""
233 233 return self._replies.setdefault(partid, unbundlerecords())
234 234
235 235 def __getitem__(self, cat):
236 236 return tuple(self._categories.get(cat, ()))
237 237
238 238 def __iter__(self):
239 239 return iter(self._sequences)
240 240
241 241 def __len__(self):
242 242 return len(self._sequences)
243 243
244 244 def __nonzero__(self):
245 245 return bool(self._sequences)
246 246
247 247 class bundleoperation(object):
248 248 """an object that represents a single bundling process
249 249
250 250 Its purpose is to carry unbundle-related objects and states.
251 251
252 252 A new object should be created at the beginning of each bundle processing.
253 253 The object is to be returned by the processing function.
254 254
255 255 The object has very little content now it will ultimately contain:
256 256 * an access to the repo the bundle is applied to,
257 257 * a ui object,
258 258 * a way to retrieve a transaction to add changes to the repo,
259 259 * a way to record the result of processing each part,
260 260 * a way to construct a bundle response when applicable.
261 261 """
262 262
263 263 def __init__(self, repo, transactiongetter):
264 264 self.repo = repo
265 265 self.ui = repo.ui
266 266 self.records = unbundlerecords()
267 267 self.gettransaction = transactiongetter
268 268 self.reply = None
269 269
270 270 class TransactionUnavailable(RuntimeError):
271 271 pass
272 272
273 273 def _notransaction():
274 274 """default method to get a transaction while processing a bundle
275 275
276 276 Raise an exception to highlight the fact that no transaction was expected
277 277 to be created"""
278 278 raise TransactionUnavailable()
279 279
280 def processbundle(repo, unbundler, transactiongetter=_notransaction):
280 def processbundle(repo, unbundler, transactiongetter=None):
281 281 """This function process a bundle, apply effect to/from a repo
282 282
283 283 It iterates over each part then searches for and uses the proper handling
284 284 code to process the part. Parts are processed in order.
285 285
286 286 This is very early version of this function that will be strongly reworked
287 287 before final usage.
288 288
289 289 Unknown Mandatory part will abort the process.
290 290 """
291 if transactiongetter is None:
292 transactiongetter = _notransaction
291 293 op = bundleoperation(repo, transactiongetter)
292 294 # todo:
293 295 # - replace this is a init function soon.
294 296 # - exception catching
295 297 unbundler.params
296 298 iterparts = unbundler.iterparts()
297 299 part = None
298 300 try:
299 301 for part in iterparts:
300 302 _processpart(op, part)
301 303 except Exception, exc:
302 304 for part in iterparts:
303 305 # consume the bundle content
304 306 part.read()
305 307 # Small hack to let caller code distinguish exceptions from bundle2
306 308 # processing from processing the old format. This is mostly
307 309 # needed to handle different return codes to unbundle according to the
308 310 # type of bundle. We should probably clean up or drop this return code
309 311 # craziness in a future version.
310 312 exc.duringunbundle2 = True
311 313 raise
312 314 return op
313 315
314 316 def _processpart(op, part):
315 317 """process a single part from a bundle
316 318
317 319 The part is guaranteed to have been fully consumed when the function exits
318 320 (even if an exception is raised)."""
319 321 try:
320 322 parttype = part.type
321 323 # part key are matched lower case
322 324 key = parttype.lower()
323 325 try:
324 326 handler = parthandlermapping.get(key)
325 327 if handler is None:
326 328 raise error.UnsupportedPartError(parttype=key)
327 329 op.ui.debug('found a handler for part %r\n' % parttype)
328 330 unknownparams = part.mandatorykeys - handler.params
329 331 if unknownparams:
330 332 unknownparams = list(unknownparams)
331 333 unknownparams.sort()
332 334 raise error.UnsupportedPartError(parttype=key,
333 335 params=unknownparams)
334 336 except error.UnsupportedPartError, exc:
335 337 if key != parttype: # mandatory parts
336 338 raise
337 339 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
338 340 return # skip to part processing
339 341
340 342 # handler is called outside the above try block so that we don't
341 343 # risk catching KeyErrors from anything other than the
342 344 # parthandlermapping lookup (any KeyError raised by handler()
343 345 # itself represents a defect of a different variety).
344 346 output = None
345 347 if op.reply is not None:
346 348 op.ui.pushbuffer(error=True)
347 349 output = ''
348 350 try:
349 351 handler(op, part)
350 352 finally:
351 353 if output is not None:
352 354 output = op.ui.popbuffer()
353 355 if output:
354 356 outpart = op.reply.newpart('b2x:output', data=output)
355 357 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
356 358 finally:
357 359 # consume the part content to not corrupt the stream.
358 360 part.read()
359 361
360 362
361 363 def decodecaps(blob):
362 364 """decode a bundle2 caps bytes blob into a dictionary
363 365
364 366 The blob is a list of capabilities (one per line)
365 367 Capabilities may have values using a line of the form::
366 368
367 369 capability=value1,value2,value3
368 370
369 371 The values are always a list."""
370 372 caps = {}
371 373 for line in blob.splitlines():
372 374 if not line:
373 375 continue
374 376 if '=' not in line:
375 377 key, vals = line, ()
376 378 else:
377 379 key, vals = line.split('=', 1)
378 380 vals = vals.split(',')
379 381 key = urllib.unquote(key)
380 382 vals = [urllib.unquote(v) for v in vals]
381 383 caps[key] = vals
382 384 return caps
383 385
384 386 def encodecaps(caps):
385 387 """encode a bundle2 caps dictionary into a bytes blob"""
386 388 chunks = []
387 389 for ca in sorted(caps):
388 390 vals = caps[ca]
389 391 ca = urllib.quote(ca)
390 392 vals = [urllib.quote(v) for v in vals]
391 393 if vals:
392 394 ca = "%s=%s" % (ca, ','.join(vals))
393 395 chunks.append(ca)
394 396 return '\n'.join(chunks)
395 397
396 398 class bundle20(object):
397 399 """represent an outgoing bundle2 container
398 400
399 401 Use the `addparam` method to add stream level parameter. and `newpart` to
400 402 populate it. Then call `getchunks` to retrieve all the binary chunks of
401 403 data that compose the bundle2 container."""
402 404
403 405 def __init__(self, ui, capabilities=()):
404 406 self.ui = ui
405 407 self._params = []
406 408 self._parts = []
407 409 self.capabilities = dict(capabilities)
408 410
409 411 @property
410 412 def nbparts(self):
411 413 """total number of parts added to the bundler"""
412 414 return len(self._parts)
413 415
414 416 # methods used to defines the bundle2 content
415 417 def addparam(self, name, value=None):
416 418 """add a stream level parameter"""
417 419 if not name:
418 420 raise ValueError('empty parameter name')
419 421 if name[0] not in string.letters:
420 422 raise ValueError('non letter first character: %r' % name)
421 423 self._params.append((name, value))
422 424
423 425 def addpart(self, part):
424 426 """add a new part to the bundle2 container
425 427
426 428 Parts contains the actual applicative payload."""
427 429 assert part.id is None
428 430 part.id = len(self._parts) # very cheap counter
429 431 self._parts.append(part)
430 432
431 433 def newpart(self, typeid, *args, **kwargs):
432 434 """create a new part and add it to the containers
433 435
434 436 As the part is directly added to the containers. For now, this means
435 437 that any failure to properly initialize the part after calling
436 438 ``newpart`` should result in a failure of the whole bundling process.
437 439
438 440 You can still fall back to manually create and add if you need better
439 441 control."""
440 442 part = bundlepart(typeid, *args, **kwargs)
441 443 self.addpart(part)
442 444 return part
443 445
444 446 # methods used to generate the bundle2 stream
445 447 def getchunks(self):
446 448 self.ui.debug('start emission of %s stream\n' % _magicstring)
447 449 yield _magicstring
448 450 param = self._paramchunk()
449 451 self.ui.debug('bundle parameter: %s\n' % param)
450 452 yield _pack(_fstreamparamsize, len(param))
451 453 if param:
452 454 yield param
453 455
454 456 self.ui.debug('start of parts\n')
455 457 for part in self._parts:
456 458 self.ui.debug('bundle part: "%s"\n' % part.type)
457 459 for chunk in part.getchunks():
458 460 yield chunk
459 461 self.ui.debug('end of bundle\n')
460 462 yield _pack(_fpartheadersize, 0)
461 463
462 464 def _paramchunk(self):
463 465 """return a encoded version of all stream parameters"""
464 466 blocks = []
465 467 for par, value in self._params:
466 468 par = urllib.quote(par)
467 469 if value is not None:
468 470 value = urllib.quote(value)
469 471 par = '%s=%s' % (par, value)
470 472 blocks.append(par)
471 473 return ' '.join(blocks)
472 474
473 475 class unpackermixin(object):
474 476 """A mixin to extract bytes and struct data from a stream"""
475 477
476 478 def __init__(self, fp):
477 479 self._fp = fp
478 480
479 481 def _unpack(self, format):
480 482 """unpack this struct format from the stream"""
481 483 data = self._readexact(struct.calcsize(format))
482 484 return _unpack(format, data)
483 485
484 486 def _readexact(self, size):
485 487 """read exactly <size> bytes from the stream"""
486 488 return changegroup.readexactly(self._fp, size)
487 489
488 490
489 491 class unbundle20(unpackermixin):
490 492 """interpret a bundle2 stream
491 493
492 494 This class is fed with a binary stream and yields parts through its
493 495 `iterparts` methods."""
494 496
495 497 def __init__(self, ui, fp, header=None):
496 498 """If header is specified, we do not read it out of the stream."""
497 499 self.ui = ui
498 500 super(unbundle20, self).__init__(fp)
499 501 if header is None:
500 502 header = self._readexact(4)
501 503 magic, version = header[0:2], header[2:4]
502 504 if magic != 'HG':
503 505 raise util.Abort(_('not a Mercurial bundle'))
504 506 if version != '2Y':
505 507 raise util.Abort(_('unknown bundle version %s') % version)
506 508 self.ui.debug('start processing of %s stream\n' % header)
507 509
508 510 @util.propertycache
509 511 def params(self):
510 512 """dictionary of stream level parameters"""
511 513 self.ui.debug('reading bundle2 stream parameters\n')
512 514 params = {}
513 515 paramssize = self._unpack(_fstreamparamsize)[0]
514 516 if paramssize < 0:
515 517 raise error.BundleValueError('negative bundle param size: %i'
516 518 % paramssize)
517 519 if paramssize:
518 520 for p in self._readexact(paramssize).split(' '):
519 521 p = p.split('=', 1)
520 522 p = [urllib.unquote(i) for i in p]
521 523 if len(p) < 2:
522 524 p.append(None)
523 525 self._processparam(*p)
524 526 params[p[0]] = p[1]
525 527 return params
526 528
527 529 def _processparam(self, name, value):
528 530 """process a parameter, applying its effect if needed
529 531
530 532 Parameter starting with a lower case letter are advisory and will be
531 533 ignored when unknown. Those starting with an upper case letter are
532 534 mandatory and will this function will raise a KeyError when unknown.
533 535
534 536 Note: no option are currently supported. Any input will be either
535 537 ignored or failing.
536 538 """
537 539 if not name:
538 540 raise ValueError('empty parameter name')
539 541 if name[0] not in string.letters:
540 542 raise ValueError('non letter first character: %r' % name)
541 543 # Some logic will be later added here to try to process the option for
542 544 # a dict of known parameter.
543 545 if name[0].islower():
544 546 self.ui.debug("ignoring unknown parameter %r\n" % name)
545 547 else:
546 548 raise error.UnsupportedPartError(params=(name,))
547 549
548 550
549 551 def iterparts(self):
550 552 """yield all parts contained in the stream"""
551 553 # make sure param have been loaded
552 554 self.params
553 555 self.ui.debug('start extraction of bundle2 parts\n')
554 556 headerblock = self._readpartheader()
555 557 while headerblock is not None:
556 558 part = unbundlepart(self.ui, headerblock, self._fp)
557 559 yield part
558 560 headerblock = self._readpartheader()
559 561 self.ui.debug('end of bundle2 stream\n')
560 562
561 563 def _readpartheader(self):
562 564 """reads a part header size and return the bytes blob
563 565
564 566 returns None if empty"""
565 567 headersize = self._unpack(_fpartheadersize)[0]
566 568 if headersize < 0:
567 569 raise error.BundleValueError('negative part header size: %i'
568 570 % headersize)
569 571 self.ui.debug('part header size: %i\n' % headersize)
570 572 if headersize:
571 573 return self._readexact(headersize)
572 574 return None
573 575
574 576
575 577 class bundlepart(object):
576 578 """A bundle2 part contains application level payload
577 579
578 580 The part `type` is used to route the part to the application level
579 581 handler.
580 582
581 583 The part payload is contained in ``part.data``. It could be raw bytes or a
582 584 generator of byte chunks.
583 585
584 586 You can add parameters to the part using the ``addparam`` method.
585 587 Parameters can be either mandatory (default) or advisory. Remote side
586 588 should be able to safely ignore the advisory ones.
587 589
588 590 Both data and parameters cannot be modified after the generation has begun.
589 591 """
590 592
591 593 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
592 594 data=''):
593 595 self.id = None
594 596 self.type = parttype
595 597 self._data = data
596 598 self._mandatoryparams = list(mandatoryparams)
597 599 self._advisoryparams = list(advisoryparams)
598 600 # checking for duplicated entries
599 601 self._seenparams = set()
600 602 for pname, __ in self._mandatoryparams + self._advisoryparams:
601 603 if pname in self._seenparams:
602 604 raise RuntimeError('duplicated params: %s' % pname)
603 605 self._seenparams.add(pname)
604 606 # status of the part's generation:
605 607 # - None: not started,
606 608 # - False: currently generated,
607 609 # - True: generation done.
608 610 self._generated = None
609 611
610 612 # methods used to defines the part content
611 613 def __setdata(self, data):
612 614 if self._generated is not None:
613 615 raise error.ReadOnlyPartError('part is being generated')
614 616 self._data = data
615 617 def __getdata(self):
616 618 return self._data
617 619 data = property(__getdata, __setdata)
618 620
619 621 @property
620 622 def mandatoryparams(self):
621 623 # make it an immutable tuple to force people through ``addparam``
622 624 return tuple(self._mandatoryparams)
623 625
624 626 @property
625 627 def advisoryparams(self):
626 628 # make it an immutable tuple to force people through ``addparam``
627 629 return tuple(self._advisoryparams)
628 630
629 631 def addparam(self, name, value='', mandatory=True):
630 632 if self._generated is not None:
631 633 raise error.ReadOnlyPartError('part is being generated')
632 634 if name in self._seenparams:
633 635 raise ValueError('duplicated params: %s' % name)
634 636 self._seenparams.add(name)
635 637 params = self._advisoryparams
636 638 if mandatory:
637 639 params = self._mandatoryparams
638 640 params.append((name, value))
639 641
640 642 # methods used to generates the bundle2 stream
641 643 def getchunks(self):
642 644 if self._generated is not None:
643 645 raise RuntimeError('part can only be consumed once')
644 646 self._generated = False
645 647 #### header
646 648 ## parttype
647 649 header = [_pack(_fparttypesize, len(self.type)),
648 650 self.type, _pack(_fpartid, self.id),
649 651 ]
650 652 ## parameters
651 653 # count
652 654 manpar = self.mandatoryparams
653 655 advpar = self.advisoryparams
654 656 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
655 657 # size
656 658 parsizes = []
657 659 for key, value in manpar:
658 660 parsizes.append(len(key))
659 661 parsizes.append(len(value))
660 662 for key, value in advpar:
661 663 parsizes.append(len(key))
662 664 parsizes.append(len(value))
663 665 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
664 666 header.append(paramsizes)
665 667 # key, value
666 668 for key, value in manpar:
667 669 header.append(key)
668 670 header.append(value)
669 671 for key, value in advpar:
670 672 header.append(key)
671 673 header.append(value)
672 674 ## finalize header
673 675 headerchunk = ''.join(header)
674 676 yield _pack(_fpartheadersize, len(headerchunk))
675 677 yield headerchunk
676 678 ## payload
677 679 try:
678 680 for chunk in self._payloadchunks():
679 681 yield _pack(_fpayloadsize, len(chunk))
680 682 yield chunk
681 683 except Exception, exc:
682 684 # backup exception data for later
683 685 exc_info = sys.exc_info()
684 686 msg = 'unexpected error: %s' % exc
685 687 interpart = bundlepart('b2x:error:abort', [('message', msg)])
686 688 interpart.id = 0
687 689 yield _pack(_fpayloadsize, -1)
688 690 for chunk in interpart.getchunks():
689 691 yield chunk
690 692 # abort current part payload
691 693 yield _pack(_fpayloadsize, 0)
692 694 raise exc_info[0], exc_info[1], exc_info[2]
693 695 # end of payload
694 696 yield _pack(_fpayloadsize, 0)
695 697 self._generated = True
696 698
697 699 def _payloadchunks(self):
698 700 """yield chunks of a the part payload
699 701
700 702 Exists to handle the different methods to provide data to a part."""
701 703 # we only support fixed size data now.
702 704 # This will be improved in the future.
703 705 if util.safehasattr(self.data, 'next'):
704 706 buff = util.chunkbuffer(self.data)
705 707 chunk = buff.read(preferedchunksize)
706 708 while chunk:
707 709 yield chunk
708 710 chunk = buff.read(preferedchunksize)
709 711 elif len(self.data):
710 712 yield self.data
711 713
712 714
713 715 flaginterrupt = -1
714 716
715 717 class interrupthandler(unpackermixin):
716 718 """read one part and process it with restricted capability
717 719
718 720 This allows to transmit exception raised on the producer size during part
719 721 iteration while the consumer is reading a part.
720 722
721 723 Part processed in this manner only have access to a ui object,"""
722 724
723 725 def __init__(self, ui, fp):
724 726 super(interrupthandler, self).__init__(fp)
725 727 self.ui = ui
726 728
727 729 def _readpartheader(self):
728 730 """reads a part header size and return the bytes blob
729 731
730 732 returns None if empty"""
731 733 headersize = self._unpack(_fpartheadersize)[0]
732 734 if headersize < 0:
733 735 raise error.BundleValueError('negative part header size: %i'
734 736 % headersize)
735 737 self.ui.debug('part header size: %i\n' % headersize)
736 738 if headersize:
737 739 return self._readexact(headersize)
738 740 return None
739 741
740 742 def __call__(self):
741 743 self.ui.debug('bundle2 stream interruption, looking for a part.\n')
742 744 headerblock = self._readpartheader()
743 745 if headerblock is None:
744 746 self.ui.debug('no part found during interruption.\n')
745 747 return
746 748 part = unbundlepart(self.ui, headerblock, self._fp)
747 749 op = interruptoperation(self.ui)
748 750 _processpart(op, part)
749 751
750 752 class interruptoperation(object):
751 753 """A limited operation to be use by part handler during interruption
752 754
753 755 It only have access to an ui object.
754 756 """
755 757
756 758 def __init__(self, ui):
757 759 self.ui = ui
758 760 self.reply = None
759 761
760 762 @property
761 763 def repo(self):
762 764 raise RuntimeError('no repo access from stream interruption')
763 765
764 766 def gettransaction(self):
765 767 raise TransactionUnavailable('no repo access from stream interruption')
766 768
767 769 class unbundlepart(unpackermixin):
768 770 """a bundle part read from a bundle"""
769 771
770 772 def __init__(self, ui, header, fp):
771 773 super(unbundlepart, self).__init__(fp)
772 774 self.ui = ui
773 775 # unbundle state attr
774 776 self._headerdata = header
775 777 self._headeroffset = 0
776 778 self._initialized = False
777 779 self.consumed = False
778 780 # part data
779 781 self.id = None
780 782 self.type = None
781 783 self.mandatoryparams = None
782 784 self.advisoryparams = None
783 785 self.params = None
784 786 self.mandatorykeys = ()
785 787 self._payloadstream = None
786 788 self._readheader()
787 789
788 790 def _fromheader(self, size):
789 791 """return the next <size> byte from the header"""
790 792 offset = self._headeroffset
791 793 data = self._headerdata[offset:(offset + size)]
792 794 self._headeroffset = offset + size
793 795 return data
794 796
795 797 def _unpackheader(self, format):
796 798 """read given format from header
797 799
798 800 This automatically compute the size of the format to read."""
799 801 data = self._fromheader(struct.calcsize(format))
800 802 return _unpack(format, data)
801 803
802 804 def _initparams(self, mandatoryparams, advisoryparams):
803 805 """internal function to setup all logic related parameters"""
804 806 # make it read only to prevent people touching it by mistake.
805 807 self.mandatoryparams = tuple(mandatoryparams)
806 808 self.advisoryparams = tuple(advisoryparams)
807 809 # user friendly UI
808 810 self.params = dict(self.mandatoryparams)
809 811 self.params.update(dict(self.advisoryparams))
810 812 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
811 813
812 814 def _readheader(self):
813 815 """read the header and setup the object"""
814 816 typesize = self._unpackheader(_fparttypesize)[0]
815 817 self.type = self._fromheader(typesize)
816 818 self.ui.debug('part type: "%s"\n' % self.type)
817 819 self.id = self._unpackheader(_fpartid)[0]
818 820 self.ui.debug('part id: "%s"\n' % self.id)
819 821 ## reading parameters
820 822 # param count
821 823 mancount, advcount = self._unpackheader(_fpartparamcount)
822 824 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
823 825 # param size
824 826 fparamsizes = _makefpartparamsizes(mancount + advcount)
825 827 paramsizes = self._unpackheader(fparamsizes)
826 828 # make it a list of couple again
827 829 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
828 830 # split mandatory from advisory
829 831 mansizes = paramsizes[:mancount]
830 832 advsizes = paramsizes[mancount:]
831 833 # retrieve param value
832 834 manparams = []
833 835 for key, value in mansizes:
834 836 manparams.append((self._fromheader(key), self._fromheader(value)))
835 837 advparams = []
836 838 for key, value in advsizes:
837 839 advparams.append((self._fromheader(key), self._fromheader(value)))
838 840 self._initparams(manparams, advparams)
839 841 ## part payload
840 842 def payloadchunks():
841 843 payloadsize = self._unpack(_fpayloadsize)[0]
842 844 self.ui.debug('payload chunk size: %i\n' % payloadsize)
843 845 while payloadsize:
844 846 if payloadsize == flaginterrupt:
845 847 # interruption detection, the handler will now read a
846 848 # single part and process it.
847 849 interrupthandler(self.ui, self._fp)()
848 850 elif payloadsize < 0:
849 851 msg = 'negative payload chunk size: %i' % payloadsize
850 852 raise error.BundleValueError(msg)
851 853 else:
852 854 yield self._readexact(payloadsize)
853 855 payloadsize = self._unpack(_fpayloadsize)[0]
854 856 self.ui.debug('payload chunk size: %i\n' % payloadsize)
855 857 self._payloadstream = util.chunkbuffer(payloadchunks())
856 858 # we read the data, tell it
857 859 self._initialized = True
858 860
859 861 def read(self, size=None):
860 862 """read payload data"""
861 863 if not self._initialized:
862 864 self._readheader()
863 865 if size is None:
864 866 data = self._payloadstream.read()
865 867 else:
866 868 data = self._payloadstream.read(size)
867 869 if size is None or len(data) < size:
868 870 self.consumed = True
869 871 return data
870 872
871 873 capabilities = {'HG2Y': (),
872 874 'b2x:listkeys': (),
873 875 'b2x:pushkey': (),
874 876 'digests': tuple(sorted(util.DIGESTS.keys())),
875 877 'b2x:remote-changegroup': ('http', 'https'),
876 878 }
877 879
878 880 def getrepocaps(repo):
879 881 """return the bundle2 capabilities for a given repo
880 882
881 883 Exists to allow extensions (like evolution) to mutate the capabilities.
882 884 """
883 885 caps = capabilities.copy()
884 886 caps['b2x:changegroup'] = tuple(sorted(changegroup.packermap.keys()))
885 887 if obsolete.isenabled(repo, obsolete.exchangeopt):
886 888 supportedformat = tuple('V%i' % v for v in obsolete.formats)
887 889 caps['b2x:obsmarkers'] = supportedformat
888 890 return caps
889 891
890 892 def bundle2caps(remote):
891 893 """return the bundle capabilities of a peer as dict"""
892 894 raw = remote.capable('bundle2-exp')
893 895 if not raw and raw != '':
894 896 return {}
895 897 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
896 898 return decodecaps(capsblob)
897 899
898 900 def obsmarkersversion(caps):
899 901 """extract the list of supported obsmarkers versions from a bundle2caps dict
900 902 """
901 903 obscaps = caps.get('b2x:obsmarkers', ())
902 904 return [int(c[1:]) for c in obscaps if c.startswith('V')]
903 905
904 906 @parthandler('b2x:changegroup', ('version',))
905 907 def handlechangegroup(op, inpart):
906 908 """apply a changegroup part on the repo
907 909
908 910 This is a very early implementation that will massive rework before being
909 911 inflicted to any end-user.
910 912 """
911 913 # Make sure we trigger a transaction creation
912 914 #
913 915 # The addchangegroup function will get a transaction object by itself, but
914 916 # we need to make sure we trigger the creation of a transaction object used
915 917 # for the whole processing scope.
916 918 op.gettransaction()
917 919 unpackerversion = inpart.params.get('version', '01')
918 920 # We should raise an appropriate exception here
919 921 unpacker = changegroup.packermap[unpackerversion][1]
920 922 cg = unpacker(inpart, 'UN')
921 923 # the source and url passed here are overwritten by the one contained in
922 924 # the transaction.hookargs argument. So 'bundle2' is a placeholder
923 925 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
924 926 op.records.add('changegroup', {'return': ret})
925 927 if op.reply is not None:
926 928 # This is definitely not the final form of this
927 929 # return. But one need to start somewhere.
928 930 part = op.reply.newpart('b2x:reply:changegroup')
929 931 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
930 932 part.addparam('return', '%i' % ret, mandatory=False)
931 933 assert not inpart.read()
932 934
933 935 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
934 936 ['digest:%s' % k for k in util.DIGESTS.keys()])
935 937 @parthandler('b2x:remote-changegroup', _remotechangegroupparams)
936 938 def handleremotechangegroup(op, inpart):
937 939 """apply a bundle10 on the repo, given an url and validation information
938 940
939 941 All the information about the remote bundle to import are given as
940 942 parameters. The parameters include:
941 943 - url: the url to the bundle10.
942 944 - size: the bundle10 file size. It is used to validate what was
943 945 retrieved by the client matches the server knowledge about the bundle.
944 946 - digests: a space separated list of the digest types provided as
945 947 parameters.
946 948 - digest:<digest-type>: the hexadecimal representation of the digest with
947 949 that name. Like the size, it is used to validate what was retrieved by
948 950 the client matches what the server knows about the bundle.
949 951
950 952 When multiple digest types are given, all of them are checked.
951 953 """
952 954 try:
953 955 raw_url = inpart.params['url']
954 956 except KeyError:
955 957 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
956 958 parsed_url = util.url(raw_url)
957 959 if parsed_url.scheme not in capabilities['b2x:remote-changegroup']:
958 960 raise util.Abort(_('remote-changegroup does not support %s urls') %
959 961 parsed_url.scheme)
960 962
961 963 try:
962 964 size = int(inpart.params['size'])
963 965 except ValueError:
964 966 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
965 967 % 'size')
966 968 except KeyError:
967 969 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
968 970
969 971 digests = {}
970 972 for typ in inpart.params.get('digests', '').split():
971 973 param = 'digest:%s' % typ
972 974 try:
973 975 value = inpart.params[param]
974 976 except KeyError:
975 977 raise util.Abort(_('remote-changegroup: missing "%s" param') %
976 978 param)
977 979 digests[typ] = value
978 980
979 981 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
980 982
981 983 # Make sure we trigger a transaction creation
982 984 #
983 985 # The addchangegroup function will get a transaction object by itself, but
984 986 # we need to make sure we trigger the creation of a transaction object used
985 987 # for the whole processing scope.
986 988 op.gettransaction()
987 989 import exchange
988 990 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
989 991 if not isinstance(cg, changegroup.cg1unpacker):
990 992 raise util.Abort(_('%s: not a bundle version 1.0') %
991 993 util.hidepassword(raw_url))
992 994 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
993 995 op.records.add('changegroup', {'return': ret})
994 996 if op.reply is not None:
995 997 # This is definitely not the final form of this
996 998 # return. But one need to start somewhere.
997 999 part = op.reply.newpart('b2x:reply:changegroup')
998 1000 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
999 1001 part.addparam('return', '%i' % ret, mandatory=False)
1000 1002 try:
1001 1003 real_part.validate()
1002 1004 except util.Abort, e:
1003 1005 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1004 1006 (util.hidepassword(raw_url), str(e)))
1005 1007 assert not inpart.read()
1006 1008
1007 1009 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
1008 1010 def handlereplychangegroup(op, inpart):
1009 1011 ret = int(inpart.params['return'])
1010 1012 replyto = int(inpart.params['in-reply-to'])
1011 1013 op.records.add('changegroup', {'return': ret}, replyto)
1012 1014
1013 1015 @parthandler('b2x:check:heads')
1014 1016 def handlecheckheads(op, inpart):
1015 1017 """check that head of the repo did not change
1016 1018
1017 1019 This is used to detect a push race when using unbundle.
1018 1020 This replaces the "heads" argument of unbundle."""
1019 1021 h = inpart.read(20)
1020 1022 heads = []
1021 1023 while len(h) == 20:
1022 1024 heads.append(h)
1023 1025 h = inpart.read(20)
1024 1026 assert not h
1025 1027 if heads != op.repo.heads():
1026 1028 raise error.PushRaced('repository changed while pushing - '
1027 1029 'please try again')
1028 1030
1029 1031 @parthandler('b2x:output')
1030 1032 def handleoutput(op, inpart):
1031 1033 """forward output captured on the server to the client"""
1032 1034 for line in inpart.read().splitlines():
1033 1035 op.ui.write(('remote: %s\n' % line))
1034 1036
1035 1037 @parthandler('b2x:replycaps')
1036 1038 def handlereplycaps(op, inpart):
1037 1039 """Notify that a reply bundle should be created
1038 1040
1039 1041 The payload contains the capabilities information for the reply"""
1040 1042 caps = decodecaps(inpart.read())
1041 1043 if op.reply is None:
1042 1044 op.reply = bundle20(op.ui, caps)
1043 1045
1044 1046 @parthandler('b2x:error:abort', ('message', 'hint'))
1045 1047 def handlereplycaps(op, inpart):
1046 1048 """Used to transmit abort error over the wire"""
1047 1049 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1048 1050
1049 1051 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
1050 1052 def handlereplycaps(op, inpart):
1051 1053 """Used to transmit unknown content error over the wire"""
1052 1054 kwargs = {}
1053 1055 parttype = inpart.params.get('parttype')
1054 1056 if parttype is not None:
1055 1057 kwargs['parttype'] = parttype
1056 1058 params = inpart.params.get('params')
1057 1059 if params is not None:
1058 1060 kwargs['params'] = params.split('\0')
1059 1061
1060 1062 raise error.UnsupportedPartError(**kwargs)
1061 1063
1062 1064 @parthandler('b2x:error:pushraced', ('message',))
1063 1065 def handlereplycaps(op, inpart):
1064 1066 """Used to transmit push race error over the wire"""
1065 1067 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1066 1068
1067 1069 @parthandler('b2x:listkeys', ('namespace',))
1068 1070 def handlelistkeys(op, inpart):
1069 1071 """retrieve pushkey namespace content stored in a bundle2"""
1070 1072 namespace = inpart.params['namespace']
1071 1073 r = pushkey.decodekeys(inpart.read())
1072 1074 op.records.add('listkeys', (namespace, r))
1073 1075
1074 1076 @parthandler('b2x:pushkey', ('namespace', 'key', 'old', 'new'))
1075 1077 def handlepushkey(op, inpart):
1076 1078 """process a pushkey request"""
1077 1079 dec = pushkey.decode
1078 1080 namespace = dec(inpart.params['namespace'])
1079 1081 key = dec(inpart.params['key'])
1080 1082 old = dec(inpart.params['old'])
1081 1083 new = dec(inpart.params['new'])
1082 1084 ret = op.repo.pushkey(namespace, key, old, new)
1083 1085 record = {'namespace': namespace,
1084 1086 'key': key,
1085 1087 'old': old,
1086 1088 'new': new}
1087 1089 op.records.add('pushkey', record)
1088 1090 if op.reply is not None:
1089 1091 rpart = op.reply.newpart('b2x:reply:pushkey')
1090 1092 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1091 1093 rpart.addparam('return', '%i' % ret, mandatory=False)
1092 1094
1093 1095 @parthandler('b2x:reply:pushkey', ('return', 'in-reply-to'))
1094 1096 def handlepushkeyreply(op, inpart):
1095 1097 """retrieve the result of a pushkey request"""
1096 1098 ret = int(inpart.params['return'])
1097 1099 partid = int(inpart.params['in-reply-to'])
1098 1100 op.records.add('pushkey', {'return': ret}, partid)
1099 1101
1100 1102 @parthandler('b2x:obsmarkers')
1101 1103 def handleobsmarker(op, inpart):
1102 1104 """add a stream of obsmarkers to the repo"""
1103 1105 tr = op.gettransaction()
1104 1106 new = op.repo.obsstore.mergemarkers(tr, inpart.read())
1105 1107 if new:
1106 1108 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1107 1109 op.records.add('obsmarkers', {'new': new})
1108 1110 if op.reply is not None:
1109 1111 rpart = op.reply.newpart('b2x:reply:obsmarkers')
1110 1112 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1111 1113 rpart.addparam('new', '%i' % new, mandatory=False)
1112 1114
1113 1115
1114 1116 @parthandler('b2x:reply:obsmarkers', ('new', 'in-reply-to'))
1115 1117 def handlepushkeyreply(op, inpart):
1116 1118 """retrieve the result of a pushkey request"""
1117 1119 ret = int(inpart.params['new'])
1118 1120 partid = int(inpart.params['in-reply-to'])
1119 1121 op.records.add('obsmarkers', {'new': ret}, partid)
General Comments 0
You need to be logged in to leave comments. Login now