##// END OF EJS Templates
bundle2: fix parttype enforcement...
Matt Mackall -
r23916:a3f7c781 default
parent child Browse files
Show More
@@ -1,1141 +1,1141 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part headers. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 :parttype: alphanumerical part name (restricted to ^a-zA-Z0-9_:-])
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 import sys
149 149 import util
150 150 import struct
151 151 import urllib
152 152 import string
153 153 import obsolete
154 154 import pushkey
155 155 import url
156 156 import re
157 157
158 158 import changegroup, error
159 159 from i18n import _
160 160
161 161 _pack = struct.pack
162 162 _unpack = struct.unpack
163 163
164 164 _magicstring = 'HG2Y'
165 165
166 166 _fstreamparamsize = '>i'
167 167 _fpartheadersize = '>i'
168 168 _fparttypesize = '>B'
169 169 _fpartid = '>I'
170 170 _fpayloadsize = '>i'
171 171 _fpartparamcount = '>BB'
172 172
173 173 preferedchunksize = 4096
174 174
175 175 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
176 176
177 177 def validateparttype(parttype):
178 178 """raise ValueError if a parttype contains invalid character"""
179 if _parttypeforbidden.match(parttype):
179 if _parttypeforbidden.search(parttype):
180 180 raise ValueError(parttype)
181 181
182 182 def _makefpartparamsizes(nbparams):
183 183 """return a struct format to read part parameter sizes
184 184
185 185 The number parameters is variable so we need to build that format
186 186 dynamically.
187 187 """
188 188 return '>'+('BB'*nbparams)
189 189
190 190 parthandlermapping = {}
191 191
192 192 def parthandler(parttype, params=()):
193 193 """decorator that register a function as a bundle2 part handler
194 194
195 195 eg::
196 196
197 197 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
198 198 def myparttypehandler(...):
199 199 '''process a part of type "my part".'''
200 200 ...
201 201 """
202 202 validateparttype(parttype)
203 203 def _decorator(func):
204 204 lparttype = parttype.lower() # enforce lower case matching.
205 205 assert lparttype not in parthandlermapping
206 206 parthandlermapping[lparttype] = func
207 207 func.params = frozenset(params)
208 208 return func
209 209 return _decorator
210 210
211 211 class unbundlerecords(object):
212 212 """keep record of what happens during and unbundle
213 213
214 214 New records are added using `records.add('cat', obj)`. Where 'cat' is a
215 215 category of record and obj is an arbitrary object.
216 216
217 217 `records['cat']` will return all entries of this category 'cat'.
218 218
219 219 Iterating on the object itself will yield `('category', obj)` tuples
220 220 for all entries.
221 221
222 222 All iterations happens in chronological order.
223 223 """
224 224
225 225 def __init__(self):
226 226 self._categories = {}
227 227 self._sequences = []
228 228 self._replies = {}
229 229
230 230 def add(self, category, entry, inreplyto=None):
231 231 """add a new record of a given category.
232 232
233 233 The entry can then be retrieved in the list returned by
234 234 self['category']."""
235 235 self._categories.setdefault(category, []).append(entry)
236 236 self._sequences.append((category, entry))
237 237 if inreplyto is not None:
238 238 self.getreplies(inreplyto).add(category, entry)
239 239
240 240 def getreplies(self, partid):
241 241 """get the records that are replies to a specific part"""
242 242 return self._replies.setdefault(partid, unbundlerecords())
243 243
244 244 def __getitem__(self, cat):
245 245 return tuple(self._categories.get(cat, ()))
246 246
247 247 def __iter__(self):
248 248 return iter(self._sequences)
249 249
250 250 def __len__(self):
251 251 return len(self._sequences)
252 252
253 253 def __nonzero__(self):
254 254 return bool(self._sequences)
255 255
256 256 class bundleoperation(object):
257 257 """an object that represents a single bundling process
258 258
259 259 Its purpose is to carry unbundle-related objects and states.
260 260
261 261 A new object should be created at the beginning of each bundle processing.
262 262 The object is to be returned by the processing function.
263 263
264 264 The object has very little content now it will ultimately contain:
265 265 * an access to the repo the bundle is applied to,
266 266 * a ui object,
267 267 * a way to retrieve a transaction to add changes to the repo,
268 268 * a way to record the result of processing each part,
269 269 * a way to construct a bundle response when applicable.
270 270 """
271 271
272 272 def __init__(self, repo, transactiongetter):
273 273 self.repo = repo
274 274 self.ui = repo.ui
275 275 self.records = unbundlerecords()
276 276 self.gettransaction = transactiongetter
277 277 self.reply = None
278 278
279 279 class TransactionUnavailable(RuntimeError):
280 280 pass
281 281
282 282 def _notransaction():
283 283 """default method to get a transaction while processing a bundle
284 284
285 285 Raise an exception to highlight the fact that no transaction was expected
286 286 to be created"""
287 287 raise TransactionUnavailable()
288 288
289 289 def processbundle(repo, unbundler, transactiongetter=None):
290 290 """This function process a bundle, apply effect to/from a repo
291 291
292 292 It iterates over each part then searches for and uses the proper handling
293 293 code to process the part. Parts are processed in order.
294 294
295 295 This is very early version of this function that will be strongly reworked
296 296 before final usage.
297 297
298 298 Unknown Mandatory part will abort the process.
299 299 """
300 300 if transactiongetter is None:
301 301 transactiongetter = _notransaction
302 302 op = bundleoperation(repo, transactiongetter)
303 303 # todo:
304 304 # - replace this is a init function soon.
305 305 # - exception catching
306 306 unbundler.params
307 307 iterparts = unbundler.iterparts()
308 308 part = None
309 309 try:
310 310 for part in iterparts:
311 311 _processpart(op, part)
312 312 except Exception, exc:
313 313 for part in iterparts:
314 314 # consume the bundle content
315 315 part.read()
316 316 # Small hack to let caller code distinguish exceptions from bundle2
317 317 # processing from processing the old format. This is mostly
318 318 # needed to handle different return codes to unbundle according to the
319 319 # type of bundle. We should probably clean up or drop this return code
320 320 # craziness in a future version.
321 321 exc.duringunbundle2 = True
322 322 raise
323 323 return op
324 324
325 325 def _processpart(op, part):
326 326 """process a single part from a bundle
327 327
328 328 The part is guaranteed to have been fully consumed when the function exits
329 329 (even if an exception is raised)."""
330 330 try:
331 331 try:
332 332 handler = parthandlermapping.get(part.type)
333 333 if handler is None:
334 334 raise error.UnsupportedPartError(parttype=part.type)
335 335 op.ui.debug('found a handler for part %r\n' % part.type)
336 336 unknownparams = part.mandatorykeys - handler.params
337 337 if unknownparams:
338 338 unknownparams = list(unknownparams)
339 339 unknownparams.sort()
340 340 raise error.UnsupportedPartError(parttype=part.type,
341 341 params=unknownparams)
342 342 except error.UnsupportedPartError, exc:
343 343 if part.mandatory: # mandatory parts
344 344 raise
345 345 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
346 346 return # skip to part processing
347 347
348 348 # handler is called outside the above try block so that we don't
349 349 # risk catching KeyErrors from anything other than the
350 350 # parthandlermapping lookup (any KeyError raised by handler()
351 351 # itself represents a defect of a different variety).
352 352 output = None
353 353 if op.reply is not None:
354 354 op.ui.pushbuffer(error=True)
355 355 output = ''
356 356 try:
357 357 handler(op, part)
358 358 finally:
359 359 if output is not None:
360 360 output = op.ui.popbuffer()
361 361 if output:
362 362 outpart = op.reply.newpart('b2x:output', data=output,
363 363 mandatory=False)
364 364 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
365 365 finally:
366 366 # consume the part content to not corrupt the stream.
367 367 part.read()
368 368
369 369
370 370 def decodecaps(blob):
371 371 """decode a bundle2 caps bytes blob into a dictionary
372 372
373 373 The blob is a list of capabilities (one per line)
374 374 Capabilities may have values using a line of the form::
375 375
376 376 capability=value1,value2,value3
377 377
378 378 The values are always a list."""
379 379 caps = {}
380 380 for line in blob.splitlines():
381 381 if not line:
382 382 continue
383 383 if '=' not in line:
384 384 key, vals = line, ()
385 385 else:
386 386 key, vals = line.split('=', 1)
387 387 vals = vals.split(',')
388 388 key = urllib.unquote(key)
389 389 vals = [urllib.unquote(v) for v in vals]
390 390 caps[key] = vals
391 391 return caps
392 392
393 393 def encodecaps(caps):
394 394 """encode a bundle2 caps dictionary into a bytes blob"""
395 395 chunks = []
396 396 for ca in sorted(caps):
397 397 vals = caps[ca]
398 398 ca = urllib.quote(ca)
399 399 vals = [urllib.quote(v) for v in vals]
400 400 if vals:
401 401 ca = "%s=%s" % (ca, ','.join(vals))
402 402 chunks.append(ca)
403 403 return '\n'.join(chunks)
404 404
405 405 class bundle20(object):
406 406 """represent an outgoing bundle2 container
407 407
408 408 Use the `addparam` method to add stream level parameter. and `newpart` to
409 409 populate it. Then call `getchunks` to retrieve all the binary chunks of
410 410 data that compose the bundle2 container."""
411 411
412 412 def __init__(self, ui, capabilities=()):
413 413 self.ui = ui
414 414 self._params = []
415 415 self._parts = []
416 416 self.capabilities = dict(capabilities)
417 417
418 418 @property
419 419 def nbparts(self):
420 420 """total number of parts added to the bundler"""
421 421 return len(self._parts)
422 422
423 423 # methods used to defines the bundle2 content
424 424 def addparam(self, name, value=None):
425 425 """add a stream level parameter"""
426 426 if not name:
427 427 raise ValueError('empty parameter name')
428 428 if name[0] not in string.letters:
429 429 raise ValueError('non letter first character: %r' % name)
430 430 self._params.append((name, value))
431 431
432 432 def addpart(self, part):
433 433 """add a new part to the bundle2 container
434 434
435 435 Parts contains the actual applicative payload."""
436 436 assert part.id is None
437 437 part.id = len(self._parts) # very cheap counter
438 438 self._parts.append(part)
439 439
440 440 def newpart(self, typeid, *args, **kwargs):
441 441 """create a new part and add it to the containers
442 442
443 443 As the part is directly added to the containers. For now, this means
444 444 that any failure to properly initialize the part after calling
445 445 ``newpart`` should result in a failure of the whole bundling process.
446 446
447 447 You can still fall back to manually create and add if you need better
448 448 control."""
449 449 part = bundlepart(typeid, *args, **kwargs)
450 450 self.addpart(part)
451 451 return part
452 452
453 453 # methods used to generate the bundle2 stream
454 454 def getchunks(self):
455 455 self.ui.debug('start emission of %s stream\n' % _magicstring)
456 456 yield _magicstring
457 457 param = self._paramchunk()
458 458 self.ui.debug('bundle parameter: %s\n' % param)
459 459 yield _pack(_fstreamparamsize, len(param))
460 460 if param:
461 461 yield param
462 462
463 463 self.ui.debug('start of parts\n')
464 464 for part in self._parts:
465 465 self.ui.debug('bundle part: "%s"\n' % part.type)
466 466 for chunk in part.getchunks():
467 467 yield chunk
468 468 self.ui.debug('end of bundle\n')
469 469 yield _pack(_fpartheadersize, 0)
470 470
471 471 def _paramchunk(self):
472 472 """return a encoded version of all stream parameters"""
473 473 blocks = []
474 474 for par, value in self._params:
475 475 par = urllib.quote(par)
476 476 if value is not None:
477 477 value = urllib.quote(value)
478 478 par = '%s=%s' % (par, value)
479 479 blocks.append(par)
480 480 return ' '.join(blocks)
481 481
482 482 class unpackermixin(object):
483 483 """A mixin to extract bytes and struct data from a stream"""
484 484
485 485 def __init__(self, fp):
486 486 self._fp = fp
487 487
488 488 def _unpack(self, format):
489 489 """unpack this struct format from the stream"""
490 490 data = self._readexact(struct.calcsize(format))
491 491 return _unpack(format, data)
492 492
493 493 def _readexact(self, size):
494 494 """read exactly <size> bytes from the stream"""
495 495 return changegroup.readexactly(self._fp, size)
496 496
497 497
498 498 class unbundle20(unpackermixin):
499 499 """interpret a bundle2 stream
500 500
501 501 This class is fed with a binary stream and yields parts through its
502 502 `iterparts` methods."""
503 503
504 504 def __init__(self, ui, fp, header=None):
505 505 """If header is specified, we do not read it out of the stream."""
506 506 self.ui = ui
507 507 super(unbundle20, self).__init__(fp)
508 508 if header is None:
509 509 header = self._readexact(4)
510 510 magic, version = header[0:2], header[2:4]
511 511 if magic != 'HG':
512 512 raise util.Abort(_('not a Mercurial bundle'))
513 513 if version != '2Y':
514 514 raise util.Abort(_('unknown bundle version %s') % version)
515 515 self.ui.debug('start processing of %s stream\n' % header)
516 516
517 517 @util.propertycache
518 518 def params(self):
519 519 """dictionary of stream level parameters"""
520 520 self.ui.debug('reading bundle2 stream parameters\n')
521 521 params = {}
522 522 paramssize = self._unpack(_fstreamparamsize)[0]
523 523 if paramssize < 0:
524 524 raise error.BundleValueError('negative bundle param size: %i'
525 525 % paramssize)
526 526 if paramssize:
527 527 for p in self._readexact(paramssize).split(' '):
528 528 p = p.split('=', 1)
529 529 p = [urllib.unquote(i) for i in p]
530 530 if len(p) < 2:
531 531 p.append(None)
532 532 self._processparam(*p)
533 533 params[p[0]] = p[1]
534 534 return params
535 535
536 536 def _processparam(self, name, value):
537 537 """process a parameter, applying its effect if needed
538 538
539 539 Parameter starting with a lower case letter are advisory and will be
540 540 ignored when unknown. Those starting with an upper case letter are
541 541 mandatory and will this function will raise a KeyError when unknown.
542 542
543 543 Note: no option are currently supported. Any input will be either
544 544 ignored or failing.
545 545 """
546 546 if not name:
547 547 raise ValueError('empty parameter name')
548 548 if name[0] not in string.letters:
549 549 raise ValueError('non letter first character: %r' % name)
550 550 # Some logic will be later added here to try to process the option for
551 551 # a dict of known parameter.
552 552 if name[0].islower():
553 553 self.ui.debug("ignoring unknown parameter %r\n" % name)
554 554 else:
555 555 raise error.UnsupportedPartError(params=(name,))
556 556
557 557
558 558 def iterparts(self):
559 559 """yield all parts contained in the stream"""
560 560 # make sure param have been loaded
561 561 self.params
562 562 self.ui.debug('start extraction of bundle2 parts\n')
563 563 headerblock = self._readpartheader()
564 564 while headerblock is not None:
565 565 part = unbundlepart(self.ui, headerblock, self._fp)
566 566 yield part
567 567 headerblock = self._readpartheader()
568 568 self.ui.debug('end of bundle2 stream\n')
569 569
570 570 def _readpartheader(self):
571 571 """reads a part header size and return the bytes blob
572 572
573 573 returns None if empty"""
574 574 headersize = self._unpack(_fpartheadersize)[0]
575 575 if headersize < 0:
576 576 raise error.BundleValueError('negative part header size: %i'
577 577 % headersize)
578 578 self.ui.debug('part header size: %i\n' % headersize)
579 579 if headersize:
580 580 return self._readexact(headersize)
581 581 return None
582 582
583 583
584 584 class bundlepart(object):
585 585 """A bundle2 part contains application level payload
586 586
587 587 The part `type` is used to route the part to the application level
588 588 handler.
589 589
590 590 The part payload is contained in ``part.data``. It could be raw bytes or a
591 591 generator of byte chunks.
592 592
593 593 You can add parameters to the part using the ``addparam`` method.
594 594 Parameters can be either mandatory (default) or advisory. Remote side
595 595 should be able to safely ignore the advisory ones.
596 596
597 597 Both data and parameters cannot be modified after the generation has begun.
598 598 """
599 599
600 600 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
601 601 data='', mandatory=True):
602 602 validateparttype(parttype)
603 603 self.id = None
604 604 self.type = parttype
605 605 self._data = data
606 606 self._mandatoryparams = list(mandatoryparams)
607 607 self._advisoryparams = list(advisoryparams)
608 608 # checking for duplicated entries
609 609 self._seenparams = set()
610 610 for pname, __ in self._mandatoryparams + self._advisoryparams:
611 611 if pname in self._seenparams:
612 612 raise RuntimeError('duplicated params: %s' % pname)
613 613 self._seenparams.add(pname)
614 614 # status of the part's generation:
615 615 # - None: not started,
616 616 # - False: currently generated,
617 617 # - True: generation done.
618 618 self._generated = None
619 619 self.mandatory = mandatory
620 620
621 621 # methods used to defines the part content
622 622 def __setdata(self, data):
623 623 if self._generated is not None:
624 624 raise error.ReadOnlyPartError('part is being generated')
625 625 self._data = data
626 626 def __getdata(self):
627 627 return self._data
628 628 data = property(__getdata, __setdata)
629 629
630 630 @property
631 631 def mandatoryparams(self):
632 632 # make it an immutable tuple to force people through ``addparam``
633 633 return tuple(self._mandatoryparams)
634 634
635 635 @property
636 636 def advisoryparams(self):
637 637 # make it an immutable tuple to force people through ``addparam``
638 638 return tuple(self._advisoryparams)
639 639
640 640 def addparam(self, name, value='', mandatory=True):
641 641 if self._generated is not None:
642 642 raise error.ReadOnlyPartError('part is being generated')
643 643 if name in self._seenparams:
644 644 raise ValueError('duplicated params: %s' % name)
645 645 self._seenparams.add(name)
646 646 params = self._advisoryparams
647 647 if mandatory:
648 648 params = self._mandatoryparams
649 649 params.append((name, value))
650 650
651 651 # methods used to generates the bundle2 stream
652 652 def getchunks(self):
653 653 if self._generated is not None:
654 654 raise RuntimeError('part can only be consumed once')
655 655 self._generated = False
656 656 #### header
657 657 if self.mandatory:
658 658 parttype = self.type.upper()
659 659 else:
660 660 parttype = self.type.lower()
661 661 ## parttype
662 662 header = [_pack(_fparttypesize, len(parttype)),
663 663 parttype, _pack(_fpartid, self.id),
664 664 ]
665 665 ## parameters
666 666 # count
667 667 manpar = self.mandatoryparams
668 668 advpar = self.advisoryparams
669 669 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
670 670 # size
671 671 parsizes = []
672 672 for key, value in manpar:
673 673 parsizes.append(len(key))
674 674 parsizes.append(len(value))
675 675 for key, value in advpar:
676 676 parsizes.append(len(key))
677 677 parsizes.append(len(value))
678 678 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
679 679 header.append(paramsizes)
680 680 # key, value
681 681 for key, value in manpar:
682 682 header.append(key)
683 683 header.append(value)
684 684 for key, value in advpar:
685 685 header.append(key)
686 686 header.append(value)
687 687 ## finalize header
688 688 headerchunk = ''.join(header)
689 689 yield _pack(_fpartheadersize, len(headerchunk))
690 690 yield headerchunk
691 691 ## payload
692 692 try:
693 693 for chunk in self._payloadchunks():
694 694 yield _pack(_fpayloadsize, len(chunk))
695 695 yield chunk
696 696 except Exception, exc:
697 697 # backup exception data for later
698 698 exc_info = sys.exc_info()
699 699 msg = 'unexpected error: %s' % exc
700 700 interpart = bundlepart('b2x:error:abort', [('message', msg)],
701 701 mandatory=False)
702 702 interpart.id = 0
703 703 yield _pack(_fpayloadsize, -1)
704 704 for chunk in interpart.getchunks():
705 705 yield chunk
706 706 # abort current part payload
707 707 yield _pack(_fpayloadsize, 0)
708 708 raise exc_info[0], exc_info[1], exc_info[2]
709 709 # end of payload
710 710 yield _pack(_fpayloadsize, 0)
711 711 self._generated = True
712 712
713 713 def _payloadchunks(self):
714 714 """yield chunks of a the part payload
715 715
716 716 Exists to handle the different methods to provide data to a part."""
717 717 # we only support fixed size data now.
718 718 # This will be improved in the future.
719 719 if util.safehasattr(self.data, 'next'):
720 720 buff = util.chunkbuffer(self.data)
721 721 chunk = buff.read(preferedchunksize)
722 722 while chunk:
723 723 yield chunk
724 724 chunk = buff.read(preferedchunksize)
725 725 elif len(self.data):
726 726 yield self.data
727 727
728 728
729 729 flaginterrupt = -1
730 730
731 731 class interrupthandler(unpackermixin):
732 732 """read one part and process it with restricted capability
733 733
734 734 This allows to transmit exception raised on the producer size during part
735 735 iteration while the consumer is reading a part.
736 736
737 737 Part processed in this manner only have access to a ui object,"""
738 738
739 739 def __init__(self, ui, fp):
740 740 super(interrupthandler, self).__init__(fp)
741 741 self.ui = ui
742 742
743 743 def _readpartheader(self):
744 744 """reads a part header size and return the bytes blob
745 745
746 746 returns None if empty"""
747 747 headersize = self._unpack(_fpartheadersize)[0]
748 748 if headersize < 0:
749 749 raise error.BundleValueError('negative part header size: %i'
750 750 % headersize)
751 751 self.ui.debug('part header size: %i\n' % headersize)
752 752 if headersize:
753 753 return self._readexact(headersize)
754 754 return None
755 755
756 756 def __call__(self):
757 757 self.ui.debug('bundle2 stream interruption, looking for a part.\n')
758 758 headerblock = self._readpartheader()
759 759 if headerblock is None:
760 760 self.ui.debug('no part found during interruption.\n')
761 761 return
762 762 part = unbundlepart(self.ui, headerblock, self._fp)
763 763 op = interruptoperation(self.ui)
764 764 _processpart(op, part)
765 765
766 766 class interruptoperation(object):
767 767 """A limited operation to be use by part handler during interruption
768 768
769 769 It only have access to an ui object.
770 770 """
771 771
772 772 def __init__(self, ui):
773 773 self.ui = ui
774 774 self.reply = None
775 775
776 776 @property
777 777 def repo(self):
778 778 raise RuntimeError('no repo access from stream interruption')
779 779
780 780 def gettransaction(self):
781 781 raise TransactionUnavailable('no repo access from stream interruption')
782 782
783 783 class unbundlepart(unpackermixin):
784 784 """a bundle part read from a bundle"""
785 785
786 786 def __init__(self, ui, header, fp):
787 787 super(unbundlepart, self).__init__(fp)
788 788 self.ui = ui
789 789 # unbundle state attr
790 790 self._headerdata = header
791 791 self._headeroffset = 0
792 792 self._initialized = False
793 793 self.consumed = False
794 794 # part data
795 795 self.id = None
796 796 self.type = None
797 797 self.mandatoryparams = None
798 798 self.advisoryparams = None
799 799 self.params = None
800 800 self.mandatorykeys = ()
801 801 self._payloadstream = None
802 802 self._readheader()
803 803 self._mandatory = None
804 804
805 805 def _fromheader(self, size):
806 806 """return the next <size> byte from the header"""
807 807 offset = self._headeroffset
808 808 data = self._headerdata[offset:(offset + size)]
809 809 self._headeroffset = offset + size
810 810 return data
811 811
812 812 def _unpackheader(self, format):
813 813 """read given format from header
814 814
815 815 This automatically compute the size of the format to read."""
816 816 data = self._fromheader(struct.calcsize(format))
817 817 return _unpack(format, data)
818 818
819 819 def _initparams(self, mandatoryparams, advisoryparams):
820 820 """internal function to setup all logic related parameters"""
821 821 # make it read only to prevent people touching it by mistake.
822 822 self.mandatoryparams = tuple(mandatoryparams)
823 823 self.advisoryparams = tuple(advisoryparams)
824 824 # user friendly UI
825 825 self.params = dict(self.mandatoryparams)
826 826 self.params.update(dict(self.advisoryparams))
827 827 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
828 828
829 829 def _readheader(self):
830 830 """read the header and setup the object"""
831 831 typesize = self._unpackheader(_fparttypesize)[0]
832 832 self.type = self._fromheader(typesize)
833 833 self.ui.debug('part type: "%s"\n' % self.type)
834 834 self.id = self._unpackheader(_fpartid)[0]
835 835 self.ui.debug('part id: "%s"\n' % self.id)
836 836 # extract mandatory bit from type
837 837 self.mandatory = (self.type != self.type.lower())
838 838 self.type = self.type.lower()
839 839 ## reading parameters
840 840 # param count
841 841 mancount, advcount = self._unpackheader(_fpartparamcount)
842 842 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
843 843 # param size
844 844 fparamsizes = _makefpartparamsizes(mancount + advcount)
845 845 paramsizes = self._unpackheader(fparamsizes)
846 846 # make it a list of couple again
847 847 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
848 848 # split mandatory from advisory
849 849 mansizes = paramsizes[:mancount]
850 850 advsizes = paramsizes[mancount:]
851 851 # retrieve param value
852 852 manparams = []
853 853 for key, value in mansizes:
854 854 manparams.append((self._fromheader(key), self._fromheader(value)))
855 855 advparams = []
856 856 for key, value in advsizes:
857 857 advparams.append((self._fromheader(key), self._fromheader(value)))
858 858 self._initparams(manparams, advparams)
859 859 ## part payload
860 860 def payloadchunks():
861 861 payloadsize = self._unpack(_fpayloadsize)[0]
862 862 self.ui.debug('payload chunk size: %i\n' % payloadsize)
863 863 while payloadsize:
864 864 if payloadsize == flaginterrupt:
865 865 # interruption detection, the handler will now read a
866 866 # single part and process it.
867 867 interrupthandler(self.ui, self._fp)()
868 868 elif payloadsize < 0:
869 869 msg = 'negative payload chunk size: %i' % payloadsize
870 870 raise error.BundleValueError(msg)
871 871 else:
872 872 yield self._readexact(payloadsize)
873 873 payloadsize = self._unpack(_fpayloadsize)[0]
874 874 self.ui.debug('payload chunk size: %i\n' % payloadsize)
875 875 self._payloadstream = util.chunkbuffer(payloadchunks())
876 876 # we read the data, tell it
877 877 self._initialized = True
878 878
879 879 def read(self, size=None):
880 880 """read payload data"""
881 881 if not self._initialized:
882 882 self._readheader()
883 883 if size is None:
884 884 data = self._payloadstream.read()
885 885 else:
886 886 data = self._payloadstream.read(size)
887 887 if size is None or len(data) < size:
888 888 self.consumed = True
889 889 return data
890 890
891 891 capabilities = {'HG2Y': (),
892 892 'b2x:listkeys': (),
893 893 'b2x:pushkey': (),
894 894 'digests': tuple(sorted(util.DIGESTS.keys())),
895 895 'b2x:remote-changegroup': ('http', 'https'),
896 896 }
897 897
898 898 def getrepocaps(repo, allowpushback=False):
899 899 """return the bundle2 capabilities for a given repo
900 900
901 901 Exists to allow extensions (like evolution) to mutate the capabilities.
902 902 """
903 903 caps = capabilities.copy()
904 904 caps['b2x:changegroup'] = tuple(sorted(changegroup.packermap.keys()))
905 905 if obsolete.isenabled(repo, obsolete.exchangeopt):
906 906 supportedformat = tuple('V%i' % v for v in obsolete.formats)
907 907 caps['b2x:obsmarkers'] = supportedformat
908 908 if allowpushback:
909 909 caps['b2x:pushback'] = ()
910 910 return caps
911 911
912 912 def bundle2caps(remote):
913 913 """return the bundle capabilities of a peer as dict"""
914 914 raw = remote.capable('bundle2-exp')
915 915 if not raw and raw != '':
916 916 return {}
917 917 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
918 918 return decodecaps(capsblob)
919 919
920 920 def obsmarkersversion(caps):
921 921 """extract the list of supported obsmarkers versions from a bundle2caps dict
922 922 """
923 923 obscaps = caps.get('b2x:obsmarkers', ())
924 924 return [int(c[1:]) for c in obscaps if c.startswith('V')]
925 925
926 926 @parthandler('b2x:changegroup', ('version',))
927 927 def handlechangegroup(op, inpart):
928 928 """apply a changegroup part on the repo
929 929
930 930 This is a very early implementation that will massive rework before being
931 931 inflicted to any end-user.
932 932 """
933 933 # Make sure we trigger a transaction creation
934 934 #
935 935 # The addchangegroup function will get a transaction object by itself, but
936 936 # we need to make sure we trigger the creation of a transaction object used
937 937 # for the whole processing scope.
938 938 op.gettransaction()
939 939 unpackerversion = inpart.params.get('version', '01')
940 940 # We should raise an appropriate exception here
941 941 unpacker = changegroup.packermap[unpackerversion][1]
942 942 cg = unpacker(inpart, 'UN')
943 943 # the source and url passed here are overwritten by the one contained in
944 944 # the transaction.hookargs argument. So 'bundle2' is a placeholder
945 945 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
946 946 op.records.add('changegroup', {'return': ret})
947 947 if op.reply is not None:
948 948 # This is definitely not the final form of this
949 949 # return. But one need to start somewhere.
950 950 part = op.reply.newpart('b2x:reply:changegroup', mandatory=False)
951 951 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
952 952 part.addparam('return', '%i' % ret, mandatory=False)
953 953 assert not inpart.read()
954 954
955 955 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
956 956 ['digest:%s' % k for k in util.DIGESTS.keys()])
957 957 @parthandler('b2x:remote-changegroup', _remotechangegroupparams)
958 958 def handleremotechangegroup(op, inpart):
959 959 """apply a bundle10 on the repo, given an url and validation information
960 960
961 961 All the information about the remote bundle to import are given as
962 962 parameters. The parameters include:
963 963 - url: the url to the bundle10.
964 964 - size: the bundle10 file size. It is used to validate what was
965 965 retrieved by the client matches the server knowledge about the bundle.
966 966 - digests: a space separated list of the digest types provided as
967 967 parameters.
968 968 - digest:<digest-type>: the hexadecimal representation of the digest with
969 969 that name. Like the size, it is used to validate what was retrieved by
970 970 the client matches what the server knows about the bundle.
971 971
972 972 When multiple digest types are given, all of them are checked.
973 973 """
974 974 try:
975 975 raw_url = inpart.params['url']
976 976 except KeyError:
977 977 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
978 978 parsed_url = util.url(raw_url)
979 979 if parsed_url.scheme not in capabilities['b2x:remote-changegroup']:
980 980 raise util.Abort(_('remote-changegroup does not support %s urls') %
981 981 parsed_url.scheme)
982 982
983 983 try:
984 984 size = int(inpart.params['size'])
985 985 except ValueError:
986 986 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
987 987 % 'size')
988 988 except KeyError:
989 989 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
990 990
991 991 digests = {}
992 992 for typ in inpart.params.get('digests', '').split():
993 993 param = 'digest:%s' % typ
994 994 try:
995 995 value = inpart.params[param]
996 996 except KeyError:
997 997 raise util.Abort(_('remote-changegroup: missing "%s" param') %
998 998 param)
999 999 digests[typ] = value
1000 1000
1001 1001 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1002 1002
1003 1003 # Make sure we trigger a transaction creation
1004 1004 #
1005 1005 # The addchangegroup function will get a transaction object by itself, but
1006 1006 # we need to make sure we trigger the creation of a transaction object used
1007 1007 # for the whole processing scope.
1008 1008 op.gettransaction()
1009 1009 import exchange
1010 1010 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1011 1011 if not isinstance(cg, changegroup.cg1unpacker):
1012 1012 raise util.Abort(_('%s: not a bundle version 1.0') %
1013 1013 util.hidepassword(raw_url))
1014 1014 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1015 1015 op.records.add('changegroup', {'return': ret})
1016 1016 if op.reply is not None:
1017 1017 # This is definitely not the final form of this
1018 1018 # return. But one need to start somewhere.
1019 1019 part = op.reply.newpart('b2x:reply:changegroup')
1020 1020 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1021 1021 part.addparam('return', '%i' % ret, mandatory=False)
1022 1022 try:
1023 1023 real_part.validate()
1024 1024 except util.Abort, e:
1025 1025 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1026 1026 (util.hidepassword(raw_url), str(e)))
1027 1027 assert not inpart.read()
1028 1028
1029 1029 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
1030 1030 def handlereplychangegroup(op, inpart):
1031 1031 ret = int(inpart.params['return'])
1032 1032 replyto = int(inpart.params['in-reply-to'])
1033 1033 op.records.add('changegroup', {'return': ret}, replyto)
1034 1034
1035 1035 @parthandler('b2x:check:heads')
1036 1036 def handlecheckheads(op, inpart):
1037 1037 """check that head of the repo did not change
1038 1038
1039 1039 This is used to detect a push race when using unbundle.
1040 1040 This replaces the "heads" argument of unbundle."""
1041 1041 h = inpart.read(20)
1042 1042 heads = []
1043 1043 while len(h) == 20:
1044 1044 heads.append(h)
1045 1045 h = inpart.read(20)
1046 1046 assert not h
1047 1047 if heads != op.repo.heads():
1048 1048 raise error.PushRaced('repository changed while pushing - '
1049 1049 'please try again')
1050 1050
1051 1051 @parthandler('b2x:output')
1052 1052 def handleoutput(op, inpart):
1053 1053 """forward output captured on the server to the client"""
1054 1054 for line in inpart.read().splitlines():
1055 1055 op.ui.write(('remote: %s\n' % line))
1056 1056
1057 1057 @parthandler('b2x:replycaps')
1058 1058 def handlereplycaps(op, inpart):
1059 1059 """Notify that a reply bundle should be created
1060 1060
1061 1061 The payload contains the capabilities information for the reply"""
1062 1062 caps = decodecaps(inpart.read())
1063 1063 if op.reply is None:
1064 1064 op.reply = bundle20(op.ui, caps)
1065 1065
1066 1066 @parthandler('b2x:error:abort', ('message', 'hint'))
1067 1067 def handlereplycaps(op, inpart):
1068 1068 """Used to transmit abort error over the wire"""
1069 1069 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1070 1070
1071 1071 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
1072 1072 def handlereplycaps(op, inpart):
1073 1073 """Used to transmit unknown content error over the wire"""
1074 1074 kwargs = {}
1075 1075 parttype = inpart.params.get('parttype')
1076 1076 if parttype is not None:
1077 1077 kwargs['parttype'] = parttype
1078 1078 params = inpart.params.get('params')
1079 1079 if params is not None:
1080 1080 kwargs['params'] = params.split('\0')
1081 1081
1082 1082 raise error.UnsupportedPartError(**kwargs)
1083 1083
1084 1084 @parthandler('b2x:error:pushraced', ('message',))
1085 1085 def handlereplycaps(op, inpart):
1086 1086 """Used to transmit push race error over the wire"""
1087 1087 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1088 1088
1089 1089 @parthandler('b2x:listkeys', ('namespace',))
1090 1090 def handlelistkeys(op, inpart):
1091 1091 """retrieve pushkey namespace content stored in a bundle2"""
1092 1092 namespace = inpart.params['namespace']
1093 1093 r = pushkey.decodekeys(inpart.read())
1094 1094 op.records.add('listkeys', (namespace, r))
1095 1095
1096 1096 @parthandler('b2x:pushkey', ('namespace', 'key', 'old', 'new'))
1097 1097 def handlepushkey(op, inpart):
1098 1098 """process a pushkey request"""
1099 1099 dec = pushkey.decode
1100 1100 namespace = dec(inpart.params['namespace'])
1101 1101 key = dec(inpart.params['key'])
1102 1102 old = dec(inpart.params['old'])
1103 1103 new = dec(inpart.params['new'])
1104 1104 ret = op.repo.pushkey(namespace, key, old, new)
1105 1105 record = {'namespace': namespace,
1106 1106 'key': key,
1107 1107 'old': old,
1108 1108 'new': new}
1109 1109 op.records.add('pushkey', record)
1110 1110 if op.reply is not None:
1111 1111 rpart = op.reply.newpart('b2x:reply:pushkey')
1112 1112 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1113 1113 rpart.addparam('return', '%i' % ret, mandatory=False)
1114 1114
1115 1115 @parthandler('b2x:reply:pushkey', ('return', 'in-reply-to'))
1116 1116 def handlepushkeyreply(op, inpart):
1117 1117 """retrieve the result of a pushkey request"""
1118 1118 ret = int(inpart.params['return'])
1119 1119 partid = int(inpart.params['in-reply-to'])
1120 1120 op.records.add('pushkey', {'return': ret}, partid)
1121 1121
1122 1122 @parthandler('b2x:obsmarkers')
1123 1123 def handleobsmarker(op, inpart):
1124 1124 """add a stream of obsmarkers to the repo"""
1125 1125 tr = op.gettransaction()
1126 1126 new = op.repo.obsstore.mergemarkers(tr, inpart.read())
1127 1127 if new:
1128 1128 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1129 1129 op.records.add('obsmarkers', {'new': new})
1130 1130 if op.reply is not None:
1131 1131 rpart = op.reply.newpart('b2x:reply:obsmarkers')
1132 1132 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1133 1133 rpart.addparam('new', '%i' % new, mandatory=False)
1134 1134
1135 1135
1136 1136 @parthandler('b2x:reply:obsmarkers', ('new', 'in-reply-to'))
1137 1137 def handlepushkeyreply(op, inpart):
1138 1138 """retrieve the result of a pushkey request"""
1139 1139 ret = int(inpart.params['new'])
1140 1140 partid = int(inpart.params['in-reply-to'])
1141 1141 op.records.add('obsmarkers', {'new': ret}, partid)
General Comments 0
You need to be logged in to leave comments. Login now