##// END OF EJS Templates
bundle2._processpart: forcing lower-case compare is no longer necessary...
Eric Sumner -
r23586:112f9c73 default
parent child Browse files
Show More
@@ -1,1127 +1,1124
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part headers. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 import sys
149 149 import util
150 150 import struct
151 151 import urllib
152 152 import string
153 153 import obsolete
154 154 import pushkey
155 155 import url
156 156
157 157 import changegroup, error
158 158 from i18n import _
159 159
160 160 _pack = struct.pack
161 161 _unpack = struct.unpack
162 162
163 163 _magicstring = 'HG2Y'
164 164
165 165 _fstreamparamsize = '>i'
166 166 _fpartheadersize = '>i'
167 167 _fparttypesize = '>B'
168 168 _fpartid = '>I'
169 169 _fpayloadsize = '>i'
170 170 _fpartparamcount = '>BB'
171 171
172 172 preferedchunksize = 4096
173 173
174 174 def _makefpartparamsizes(nbparams):
175 175 """return a struct format to read part parameter sizes
176 176
177 177 The number parameters is variable so we need to build that format
178 178 dynamically.
179 179 """
180 180 return '>'+('BB'*nbparams)
181 181
182 182 parthandlermapping = {}
183 183
184 184 def parthandler(parttype, params=()):
185 185 """decorator that register a function as a bundle2 part handler
186 186
187 187 eg::
188 188
189 189 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
190 190 def myparttypehandler(...):
191 191 '''process a part of type "my part".'''
192 192 ...
193 193 """
194 194 def _decorator(func):
195 195 lparttype = parttype.lower() # enforce lower case matching.
196 196 assert lparttype not in parthandlermapping
197 197 parthandlermapping[lparttype] = func
198 198 func.params = frozenset(params)
199 199 return func
200 200 return _decorator
201 201
202 202 class unbundlerecords(object):
203 203 """keep record of what happens during and unbundle
204 204
205 205 New records are added using `records.add('cat', obj)`. Where 'cat' is a
206 206 category of record and obj is an arbitrary object.
207 207
208 208 `records['cat']` will return all entries of this category 'cat'.
209 209
210 210 Iterating on the object itself will yield `('category', obj)` tuples
211 211 for all entries.
212 212
213 213 All iterations happens in chronological order.
214 214 """
215 215
216 216 def __init__(self):
217 217 self._categories = {}
218 218 self._sequences = []
219 219 self._replies = {}
220 220
221 221 def add(self, category, entry, inreplyto=None):
222 222 """add a new record of a given category.
223 223
224 224 The entry can then be retrieved in the list returned by
225 225 self['category']."""
226 226 self._categories.setdefault(category, []).append(entry)
227 227 self._sequences.append((category, entry))
228 228 if inreplyto is not None:
229 229 self.getreplies(inreplyto).add(category, entry)
230 230
231 231 def getreplies(self, partid):
232 232 """get the records that are replies to a specific part"""
233 233 return self._replies.setdefault(partid, unbundlerecords())
234 234
235 235 def __getitem__(self, cat):
236 236 return tuple(self._categories.get(cat, ()))
237 237
238 238 def __iter__(self):
239 239 return iter(self._sequences)
240 240
241 241 def __len__(self):
242 242 return len(self._sequences)
243 243
244 244 def __nonzero__(self):
245 245 return bool(self._sequences)
246 246
247 247 class bundleoperation(object):
248 248 """an object that represents a single bundling process
249 249
250 250 Its purpose is to carry unbundle-related objects and states.
251 251
252 252 A new object should be created at the beginning of each bundle processing.
253 253 The object is to be returned by the processing function.
254 254
255 255 The object has very little content now it will ultimately contain:
256 256 * an access to the repo the bundle is applied to,
257 257 * a ui object,
258 258 * a way to retrieve a transaction to add changes to the repo,
259 259 * a way to record the result of processing each part,
260 260 * a way to construct a bundle response when applicable.
261 261 """
262 262
263 263 def __init__(self, repo, transactiongetter):
264 264 self.repo = repo
265 265 self.ui = repo.ui
266 266 self.records = unbundlerecords()
267 267 self.gettransaction = transactiongetter
268 268 self.reply = None
269 269
270 270 class TransactionUnavailable(RuntimeError):
271 271 pass
272 272
273 273 def _notransaction():
274 274 """default method to get a transaction while processing a bundle
275 275
276 276 Raise an exception to highlight the fact that no transaction was expected
277 277 to be created"""
278 278 raise TransactionUnavailable()
279 279
280 280 def processbundle(repo, unbundler, transactiongetter=None):
281 281 """This function process a bundle, apply effect to/from a repo
282 282
283 283 It iterates over each part then searches for and uses the proper handling
284 284 code to process the part. Parts are processed in order.
285 285
286 286 This is very early version of this function that will be strongly reworked
287 287 before final usage.
288 288
289 289 Unknown Mandatory part will abort the process.
290 290 """
291 291 if transactiongetter is None:
292 292 transactiongetter = _notransaction
293 293 op = bundleoperation(repo, transactiongetter)
294 294 # todo:
295 295 # - replace this is a init function soon.
296 296 # - exception catching
297 297 unbundler.params
298 298 iterparts = unbundler.iterparts()
299 299 part = None
300 300 try:
301 301 for part in iterparts:
302 302 _processpart(op, part)
303 303 except Exception, exc:
304 304 for part in iterparts:
305 305 # consume the bundle content
306 306 part.read()
307 307 # Small hack to let caller code distinguish exceptions from bundle2
308 308 # processing from processing the old format. This is mostly
309 309 # needed to handle different return codes to unbundle according to the
310 310 # type of bundle. We should probably clean up or drop this return code
311 311 # craziness in a future version.
312 312 exc.duringunbundle2 = True
313 313 raise
314 314 return op
315 315
316 316 def _processpart(op, part):
317 317 """process a single part from a bundle
318 318
319 319 The part is guaranteed to have been fully consumed when the function exits
320 320 (even if an exception is raised)."""
321 321 try:
322 parttype = part.type
323 # part key are matched lower case
324 key = parttype.lower()
325 322 try:
326 handler = parthandlermapping.get(key)
323 handler = parthandlermapping.get(part.type)
327 324 if handler is None:
328 raise error.UnsupportedPartError(parttype=key)
329 op.ui.debug('found a handler for part %r\n' % parttype)
325 raise error.UnsupportedPartError(parttype=part.type)
326 op.ui.debug('found a handler for part %r\n' % part.type)
330 327 unknownparams = part.mandatorykeys - handler.params
331 328 if unknownparams:
332 329 unknownparams = list(unknownparams)
333 330 unknownparams.sort()
334 raise error.UnsupportedPartError(parttype=key,
331 raise error.UnsupportedPartError(parttype=part.type,
335 332 params=unknownparams)
336 333 except error.UnsupportedPartError, exc:
337 334 if part.mandatory: # mandatory parts
338 335 raise
339 336 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
340 337 return # skip to part processing
341 338
342 339 # handler is called outside the above try block so that we don't
343 340 # risk catching KeyErrors from anything other than the
344 341 # parthandlermapping lookup (any KeyError raised by handler()
345 342 # itself represents a defect of a different variety).
346 343 output = None
347 344 if op.reply is not None:
348 345 op.ui.pushbuffer(error=True)
349 346 output = ''
350 347 try:
351 348 handler(op, part)
352 349 finally:
353 350 if output is not None:
354 351 output = op.ui.popbuffer()
355 352 if output:
356 353 outpart = op.reply.newpart('b2x:output', data=output)
357 354 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
358 355 finally:
359 356 # consume the part content to not corrupt the stream.
360 357 part.read()
361 358
362 359
363 360 def decodecaps(blob):
364 361 """decode a bundle2 caps bytes blob into a dictionary
365 362
366 363 The blob is a list of capabilities (one per line)
367 364 Capabilities may have values using a line of the form::
368 365
369 366 capability=value1,value2,value3
370 367
371 368 The values are always a list."""
372 369 caps = {}
373 370 for line in blob.splitlines():
374 371 if not line:
375 372 continue
376 373 if '=' not in line:
377 374 key, vals = line, ()
378 375 else:
379 376 key, vals = line.split('=', 1)
380 377 vals = vals.split(',')
381 378 key = urllib.unquote(key)
382 379 vals = [urllib.unquote(v) for v in vals]
383 380 caps[key] = vals
384 381 return caps
385 382
386 383 def encodecaps(caps):
387 384 """encode a bundle2 caps dictionary into a bytes blob"""
388 385 chunks = []
389 386 for ca in sorted(caps):
390 387 vals = caps[ca]
391 388 ca = urllib.quote(ca)
392 389 vals = [urllib.quote(v) for v in vals]
393 390 if vals:
394 391 ca = "%s=%s" % (ca, ','.join(vals))
395 392 chunks.append(ca)
396 393 return '\n'.join(chunks)
397 394
398 395 class bundle20(object):
399 396 """represent an outgoing bundle2 container
400 397
401 398 Use the `addparam` method to add stream level parameter. and `newpart` to
402 399 populate it. Then call `getchunks` to retrieve all the binary chunks of
403 400 data that compose the bundle2 container."""
404 401
405 402 def __init__(self, ui, capabilities=()):
406 403 self.ui = ui
407 404 self._params = []
408 405 self._parts = []
409 406 self.capabilities = dict(capabilities)
410 407
411 408 @property
412 409 def nbparts(self):
413 410 """total number of parts added to the bundler"""
414 411 return len(self._parts)
415 412
416 413 # methods used to defines the bundle2 content
417 414 def addparam(self, name, value=None):
418 415 """add a stream level parameter"""
419 416 if not name:
420 417 raise ValueError('empty parameter name')
421 418 if name[0] not in string.letters:
422 419 raise ValueError('non letter first character: %r' % name)
423 420 self._params.append((name, value))
424 421
425 422 def addpart(self, part):
426 423 """add a new part to the bundle2 container
427 424
428 425 Parts contains the actual applicative payload."""
429 426 assert part.id is None
430 427 part.id = len(self._parts) # very cheap counter
431 428 self._parts.append(part)
432 429
433 430 def newpart(self, typeid, *args, **kwargs):
434 431 """create a new part and add it to the containers
435 432
436 433 As the part is directly added to the containers. For now, this means
437 434 that any failure to properly initialize the part after calling
438 435 ``newpart`` should result in a failure of the whole bundling process.
439 436
440 437 You can still fall back to manually create and add if you need better
441 438 control."""
442 439 part = bundlepart(typeid, *args, **kwargs)
443 440 self.addpart(part)
444 441 return part
445 442
446 443 # methods used to generate the bundle2 stream
447 444 def getchunks(self):
448 445 self.ui.debug('start emission of %s stream\n' % _magicstring)
449 446 yield _magicstring
450 447 param = self._paramchunk()
451 448 self.ui.debug('bundle parameter: %s\n' % param)
452 449 yield _pack(_fstreamparamsize, len(param))
453 450 if param:
454 451 yield param
455 452
456 453 self.ui.debug('start of parts\n')
457 454 for part in self._parts:
458 455 self.ui.debug('bundle part: "%s"\n' % part.type)
459 456 for chunk in part.getchunks():
460 457 yield chunk
461 458 self.ui.debug('end of bundle\n')
462 459 yield _pack(_fpartheadersize, 0)
463 460
464 461 def _paramchunk(self):
465 462 """return a encoded version of all stream parameters"""
466 463 blocks = []
467 464 for par, value in self._params:
468 465 par = urllib.quote(par)
469 466 if value is not None:
470 467 value = urllib.quote(value)
471 468 par = '%s=%s' % (par, value)
472 469 blocks.append(par)
473 470 return ' '.join(blocks)
474 471
475 472 class unpackermixin(object):
476 473 """A mixin to extract bytes and struct data from a stream"""
477 474
478 475 def __init__(self, fp):
479 476 self._fp = fp
480 477
481 478 def _unpack(self, format):
482 479 """unpack this struct format from the stream"""
483 480 data = self._readexact(struct.calcsize(format))
484 481 return _unpack(format, data)
485 482
486 483 def _readexact(self, size):
487 484 """read exactly <size> bytes from the stream"""
488 485 return changegroup.readexactly(self._fp, size)
489 486
490 487
491 488 class unbundle20(unpackermixin):
492 489 """interpret a bundle2 stream
493 490
494 491 This class is fed with a binary stream and yields parts through its
495 492 `iterparts` methods."""
496 493
497 494 def __init__(self, ui, fp, header=None):
498 495 """If header is specified, we do not read it out of the stream."""
499 496 self.ui = ui
500 497 super(unbundle20, self).__init__(fp)
501 498 if header is None:
502 499 header = self._readexact(4)
503 500 magic, version = header[0:2], header[2:4]
504 501 if magic != 'HG':
505 502 raise util.Abort(_('not a Mercurial bundle'))
506 503 if version != '2Y':
507 504 raise util.Abort(_('unknown bundle version %s') % version)
508 505 self.ui.debug('start processing of %s stream\n' % header)
509 506
510 507 @util.propertycache
511 508 def params(self):
512 509 """dictionary of stream level parameters"""
513 510 self.ui.debug('reading bundle2 stream parameters\n')
514 511 params = {}
515 512 paramssize = self._unpack(_fstreamparamsize)[0]
516 513 if paramssize < 0:
517 514 raise error.BundleValueError('negative bundle param size: %i'
518 515 % paramssize)
519 516 if paramssize:
520 517 for p in self._readexact(paramssize).split(' '):
521 518 p = p.split('=', 1)
522 519 p = [urllib.unquote(i) for i in p]
523 520 if len(p) < 2:
524 521 p.append(None)
525 522 self._processparam(*p)
526 523 params[p[0]] = p[1]
527 524 return params
528 525
529 526 def _processparam(self, name, value):
530 527 """process a parameter, applying its effect if needed
531 528
532 529 Parameter starting with a lower case letter are advisory and will be
533 530 ignored when unknown. Those starting with an upper case letter are
534 531 mandatory and will this function will raise a KeyError when unknown.
535 532
536 533 Note: no option are currently supported. Any input will be either
537 534 ignored or failing.
538 535 """
539 536 if not name:
540 537 raise ValueError('empty parameter name')
541 538 if name[0] not in string.letters:
542 539 raise ValueError('non letter first character: %r' % name)
543 540 # Some logic will be later added here to try to process the option for
544 541 # a dict of known parameter.
545 542 if name[0].islower():
546 543 self.ui.debug("ignoring unknown parameter %r\n" % name)
547 544 else:
548 545 raise error.UnsupportedPartError(params=(name,))
549 546
550 547
551 548 def iterparts(self):
552 549 """yield all parts contained in the stream"""
553 550 # make sure param have been loaded
554 551 self.params
555 552 self.ui.debug('start extraction of bundle2 parts\n')
556 553 headerblock = self._readpartheader()
557 554 while headerblock is not None:
558 555 part = unbundlepart(self.ui, headerblock, self._fp)
559 556 yield part
560 557 headerblock = self._readpartheader()
561 558 self.ui.debug('end of bundle2 stream\n')
562 559
563 560 def _readpartheader(self):
564 561 """reads a part header size and return the bytes blob
565 562
566 563 returns None if empty"""
567 564 headersize = self._unpack(_fpartheadersize)[0]
568 565 if headersize < 0:
569 566 raise error.BundleValueError('negative part header size: %i'
570 567 % headersize)
571 568 self.ui.debug('part header size: %i\n' % headersize)
572 569 if headersize:
573 570 return self._readexact(headersize)
574 571 return None
575 572
576 573
577 574 class bundlepart(object):
578 575 """A bundle2 part contains application level payload
579 576
580 577 The part `type` is used to route the part to the application level
581 578 handler.
582 579
583 580 The part payload is contained in ``part.data``. It could be raw bytes or a
584 581 generator of byte chunks.
585 582
586 583 You can add parameters to the part using the ``addparam`` method.
587 584 Parameters can be either mandatory (default) or advisory. Remote side
588 585 should be able to safely ignore the advisory ones.
589 586
590 587 Both data and parameters cannot be modified after the generation has begun.
591 588 """
592 589
593 590 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
594 591 data=''):
595 592 self.id = None
596 593 self.type = parttype
597 594 self._data = data
598 595 self._mandatoryparams = list(mandatoryparams)
599 596 self._advisoryparams = list(advisoryparams)
600 597 # checking for duplicated entries
601 598 self._seenparams = set()
602 599 for pname, __ in self._mandatoryparams + self._advisoryparams:
603 600 if pname in self._seenparams:
604 601 raise RuntimeError('duplicated params: %s' % pname)
605 602 self._seenparams.add(pname)
606 603 # status of the part's generation:
607 604 # - None: not started,
608 605 # - False: currently generated,
609 606 # - True: generation done.
610 607 self._generated = None
611 608
612 609 # methods used to defines the part content
613 610 def __setdata(self, data):
614 611 if self._generated is not None:
615 612 raise error.ReadOnlyPartError('part is being generated')
616 613 self._data = data
617 614 def __getdata(self):
618 615 return self._data
619 616 data = property(__getdata, __setdata)
620 617
621 618 @property
622 619 def mandatoryparams(self):
623 620 # make it an immutable tuple to force people through ``addparam``
624 621 return tuple(self._mandatoryparams)
625 622
626 623 @property
627 624 def advisoryparams(self):
628 625 # make it an immutable tuple to force people through ``addparam``
629 626 return tuple(self._advisoryparams)
630 627
631 628 def addparam(self, name, value='', mandatory=True):
632 629 if self._generated is not None:
633 630 raise error.ReadOnlyPartError('part is being generated')
634 631 if name in self._seenparams:
635 632 raise ValueError('duplicated params: %s' % name)
636 633 self._seenparams.add(name)
637 634 params = self._advisoryparams
638 635 if mandatory:
639 636 params = self._mandatoryparams
640 637 params.append((name, value))
641 638
642 639 # methods used to generates the bundle2 stream
643 640 def getchunks(self):
644 641 if self._generated is not None:
645 642 raise RuntimeError('part can only be consumed once')
646 643 self._generated = False
647 644 #### header
648 645 ## parttype
649 646 header = [_pack(_fparttypesize, len(self.type)),
650 647 self.type, _pack(_fpartid, self.id),
651 648 ]
652 649 ## parameters
653 650 # count
654 651 manpar = self.mandatoryparams
655 652 advpar = self.advisoryparams
656 653 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
657 654 # size
658 655 parsizes = []
659 656 for key, value in manpar:
660 657 parsizes.append(len(key))
661 658 parsizes.append(len(value))
662 659 for key, value in advpar:
663 660 parsizes.append(len(key))
664 661 parsizes.append(len(value))
665 662 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
666 663 header.append(paramsizes)
667 664 # key, value
668 665 for key, value in manpar:
669 666 header.append(key)
670 667 header.append(value)
671 668 for key, value in advpar:
672 669 header.append(key)
673 670 header.append(value)
674 671 ## finalize header
675 672 headerchunk = ''.join(header)
676 673 yield _pack(_fpartheadersize, len(headerchunk))
677 674 yield headerchunk
678 675 ## payload
679 676 try:
680 677 for chunk in self._payloadchunks():
681 678 yield _pack(_fpayloadsize, len(chunk))
682 679 yield chunk
683 680 except Exception, exc:
684 681 # backup exception data for later
685 682 exc_info = sys.exc_info()
686 683 msg = 'unexpected error: %s' % exc
687 684 interpart = bundlepart('b2x:error:abort', [('message', msg)])
688 685 interpart.id = 0
689 686 yield _pack(_fpayloadsize, -1)
690 687 for chunk in interpart.getchunks():
691 688 yield chunk
692 689 # abort current part payload
693 690 yield _pack(_fpayloadsize, 0)
694 691 raise exc_info[0], exc_info[1], exc_info[2]
695 692 # end of payload
696 693 yield _pack(_fpayloadsize, 0)
697 694 self._generated = True
698 695
699 696 def _payloadchunks(self):
700 697 """yield chunks of a the part payload
701 698
702 699 Exists to handle the different methods to provide data to a part."""
703 700 # we only support fixed size data now.
704 701 # This will be improved in the future.
705 702 if util.safehasattr(self.data, 'next'):
706 703 buff = util.chunkbuffer(self.data)
707 704 chunk = buff.read(preferedchunksize)
708 705 while chunk:
709 706 yield chunk
710 707 chunk = buff.read(preferedchunksize)
711 708 elif len(self.data):
712 709 yield self.data
713 710
714 711
715 712 flaginterrupt = -1
716 713
717 714 class interrupthandler(unpackermixin):
718 715 """read one part and process it with restricted capability
719 716
720 717 This allows to transmit exception raised on the producer size during part
721 718 iteration while the consumer is reading a part.
722 719
723 720 Part processed in this manner only have access to a ui object,"""
724 721
725 722 def __init__(self, ui, fp):
726 723 super(interrupthandler, self).__init__(fp)
727 724 self.ui = ui
728 725
729 726 def _readpartheader(self):
730 727 """reads a part header size and return the bytes blob
731 728
732 729 returns None if empty"""
733 730 headersize = self._unpack(_fpartheadersize)[0]
734 731 if headersize < 0:
735 732 raise error.BundleValueError('negative part header size: %i'
736 733 % headersize)
737 734 self.ui.debug('part header size: %i\n' % headersize)
738 735 if headersize:
739 736 return self._readexact(headersize)
740 737 return None
741 738
742 739 def __call__(self):
743 740 self.ui.debug('bundle2 stream interruption, looking for a part.\n')
744 741 headerblock = self._readpartheader()
745 742 if headerblock is None:
746 743 self.ui.debug('no part found during interruption.\n')
747 744 return
748 745 part = unbundlepart(self.ui, headerblock, self._fp)
749 746 op = interruptoperation(self.ui)
750 747 _processpart(op, part)
751 748
752 749 class interruptoperation(object):
753 750 """A limited operation to be use by part handler during interruption
754 751
755 752 It only have access to an ui object.
756 753 """
757 754
758 755 def __init__(self, ui):
759 756 self.ui = ui
760 757 self.reply = None
761 758
762 759 @property
763 760 def repo(self):
764 761 raise RuntimeError('no repo access from stream interruption')
765 762
766 763 def gettransaction(self):
767 764 raise TransactionUnavailable('no repo access from stream interruption')
768 765
769 766 class unbundlepart(unpackermixin):
770 767 """a bundle part read from a bundle"""
771 768
772 769 def __init__(self, ui, header, fp):
773 770 super(unbundlepart, self).__init__(fp)
774 771 self.ui = ui
775 772 # unbundle state attr
776 773 self._headerdata = header
777 774 self._headeroffset = 0
778 775 self._initialized = False
779 776 self.consumed = False
780 777 # part data
781 778 self.id = None
782 779 self.type = None
783 780 self.mandatoryparams = None
784 781 self.advisoryparams = None
785 782 self.params = None
786 783 self.mandatorykeys = ()
787 784 self._payloadstream = None
788 785 self._readheader()
789 786 self._mandatory = None
790 787
791 788 def _fromheader(self, size):
792 789 """return the next <size> byte from the header"""
793 790 offset = self._headeroffset
794 791 data = self._headerdata[offset:(offset + size)]
795 792 self._headeroffset = offset + size
796 793 return data
797 794
798 795 def _unpackheader(self, format):
799 796 """read given format from header
800 797
801 798 This automatically compute the size of the format to read."""
802 799 data = self._fromheader(struct.calcsize(format))
803 800 return _unpack(format, data)
804 801
805 802 def _initparams(self, mandatoryparams, advisoryparams):
806 803 """internal function to setup all logic related parameters"""
807 804 # make it read only to prevent people touching it by mistake.
808 805 self.mandatoryparams = tuple(mandatoryparams)
809 806 self.advisoryparams = tuple(advisoryparams)
810 807 # user friendly UI
811 808 self.params = dict(self.mandatoryparams)
812 809 self.params.update(dict(self.advisoryparams))
813 810 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
814 811
815 812 def _readheader(self):
816 813 """read the header and setup the object"""
817 814 typesize = self._unpackheader(_fparttypesize)[0]
818 815 self.type = self._fromheader(typesize)
819 816 self.ui.debug('part type: "%s"\n' % self.type)
820 817 self.id = self._unpackheader(_fpartid)[0]
821 818 self.ui.debug('part id: "%s"\n' % self.id)
822 819 # extract mandatory bit from type
823 820 self.mandatory = (self.type != self.type.lower())
824 821 self.type = self.type.lower()
825 822 ## reading parameters
826 823 # param count
827 824 mancount, advcount = self._unpackheader(_fpartparamcount)
828 825 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
829 826 # param size
830 827 fparamsizes = _makefpartparamsizes(mancount + advcount)
831 828 paramsizes = self._unpackheader(fparamsizes)
832 829 # make it a list of couple again
833 830 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
834 831 # split mandatory from advisory
835 832 mansizes = paramsizes[:mancount]
836 833 advsizes = paramsizes[mancount:]
837 834 # retrieve param value
838 835 manparams = []
839 836 for key, value in mansizes:
840 837 manparams.append((self._fromheader(key), self._fromheader(value)))
841 838 advparams = []
842 839 for key, value in advsizes:
843 840 advparams.append((self._fromheader(key), self._fromheader(value)))
844 841 self._initparams(manparams, advparams)
845 842 ## part payload
846 843 def payloadchunks():
847 844 payloadsize = self._unpack(_fpayloadsize)[0]
848 845 self.ui.debug('payload chunk size: %i\n' % payloadsize)
849 846 while payloadsize:
850 847 if payloadsize == flaginterrupt:
851 848 # interruption detection, the handler will now read a
852 849 # single part and process it.
853 850 interrupthandler(self.ui, self._fp)()
854 851 elif payloadsize < 0:
855 852 msg = 'negative payload chunk size: %i' % payloadsize
856 853 raise error.BundleValueError(msg)
857 854 else:
858 855 yield self._readexact(payloadsize)
859 856 payloadsize = self._unpack(_fpayloadsize)[0]
860 857 self.ui.debug('payload chunk size: %i\n' % payloadsize)
861 858 self._payloadstream = util.chunkbuffer(payloadchunks())
862 859 # we read the data, tell it
863 860 self._initialized = True
864 861
865 862 def read(self, size=None):
866 863 """read payload data"""
867 864 if not self._initialized:
868 865 self._readheader()
869 866 if size is None:
870 867 data = self._payloadstream.read()
871 868 else:
872 869 data = self._payloadstream.read(size)
873 870 if size is None or len(data) < size:
874 871 self.consumed = True
875 872 return data
876 873
877 874 capabilities = {'HG2Y': (),
878 875 'b2x:listkeys': (),
879 876 'b2x:pushkey': (),
880 877 'digests': tuple(sorted(util.DIGESTS.keys())),
881 878 'b2x:remote-changegroup': ('http', 'https'),
882 879 }
883 880
884 881 def getrepocaps(repo, allowpushback=False):
885 882 """return the bundle2 capabilities for a given repo
886 883
887 884 Exists to allow extensions (like evolution) to mutate the capabilities.
888 885 """
889 886 caps = capabilities.copy()
890 887 caps['b2x:changegroup'] = tuple(sorted(changegroup.packermap.keys()))
891 888 if obsolete.isenabled(repo, obsolete.exchangeopt):
892 889 supportedformat = tuple('V%i' % v for v in obsolete.formats)
893 890 caps['b2x:obsmarkers'] = supportedformat
894 891 if allowpushback:
895 892 caps['b2x:pushback'] = ()
896 893 return caps
897 894
898 895 def bundle2caps(remote):
899 896 """return the bundle capabilities of a peer as dict"""
900 897 raw = remote.capable('bundle2-exp')
901 898 if not raw and raw != '':
902 899 return {}
903 900 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
904 901 return decodecaps(capsblob)
905 902
906 903 def obsmarkersversion(caps):
907 904 """extract the list of supported obsmarkers versions from a bundle2caps dict
908 905 """
909 906 obscaps = caps.get('b2x:obsmarkers', ())
910 907 return [int(c[1:]) for c in obscaps if c.startswith('V')]
911 908
912 909 @parthandler('b2x:changegroup', ('version',))
913 910 def handlechangegroup(op, inpart):
914 911 """apply a changegroup part on the repo
915 912
916 913 This is a very early implementation that will massive rework before being
917 914 inflicted to any end-user.
918 915 """
919 916 # Make sure we trigger a transaction creation
920 917 #
921 918 # The addchangegroup function will get a transaction object by itself, but
922 919 # we need to make sure we trigger the creation of a transaction object used
923 920 # for the whole processing scope.
924 921 op.gettransaction()
925 922 unpackerversion = inpart.params.get('version', '01')
926 923 # We should raise an appropriate exception here
927 924 unpacker = changegroup.packermap[unpackerversion][1]
928 925 cg = unpacker(inpart, 'UN')
929 926 # the source and url passed here are overwritten by the one contained in
930 927 # the transaction.hookargs argument. So 'bundle2' is a placeholder
931 928 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
932 929 op.records.add('changegroup', {'return': ret})
933 930 if op.reply is not None:
934 931 # This is definitely not the final form of this
935 932 # return. But one need to start somewhere.
936 933 part = op.reply.newpart('b2x:reply:changegroup')
937 934 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
938 935 part.addparam('return', '%i' % ret, mandatory=False)
939 936 assert not inpart.read()
940 937
941 938 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
942 939 ['digest:%s' % k for k in util.DIGESTS.keys()])
943 940 @parthandler('b2x:remote-changegroup', _remotechangegroupparams)
944 941 def handleremotechangegroup(op, inpart):
945 942 """apply a bundle10 on the repo, given an url and validation information
946 943
947 944 All the information about the remote bundle to import are given as
948 945 parameters. The parameters include:
949 946 - url: the url to the bundle10.
950 947 - size: the bundle10 file size. It is used to validate what was
951 948 retrieved by the client matches the server knowledge about the bundle.
952 949 - digests: a space separated list of the digest types provided as
953 950 parameters.
954 951 - digest:<digest-type>: the hexadecimal representation of the digest with
955 952 that name. Like the size, it is used to validate what was retrieved by
956 953 the client matches what the server knows about the bundle.
957 954
958 955 When multiple digest types are given, all of them are checked.
959 956 """
960 957 try:
961 958 raw_url = inpart.params['url']
962 959 except KeyError:
963 960 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
964 961 parsed_url = util.url(raw_url)
965 962 if parsed_url.scheme not in capabilities['b2x:remote-changegroup']:
966 963 raise util.Abort(_('remote-changegroup does not support %s urls') %
967 964 parsed_url.scheme)
968 965
969 966 try:
970 967 size = int(inpart.params['size'])
971 968 except ValueError:
972 969 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
973 970 % 'size')
974 971 except KeyError:
975 972 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
976 973
977 974 digests = {}
978 975 for typ in inpart.params.get('digests', '').split():
979 976 param = 'digest:%s' % typ
980 977 try:
981 978 value = inpart.params[param]
982 979 except KeyError:
983 980 raise util.Abort(_('remote-changegroup: missing "%s" param') %
984 981 param)
985 982 digests[typ] = value
986 983
987 984 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
988 985
989 986 # Make sure we trigger a transaction creation
990 987 #
991 988 # The addchangegroup function will get a transaction object by itself, but
992 989 # we need to make sure we trigger the creation of a transaction object used
993 990 # for the whole processing scope.
994 991 op.gettransaction()
995 992 import exchange
996 993 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
997 994 if not isinstance(cg, changegroup.cg1unpacker):
998 995 raise util.Abort(_('%s: not a bundle version 1.0') %
999 996 util.hidepassword(raw_url))
1000 997 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1001 998 op.records.add('changegroup', {'return': ret})
1002 999 if op.reply is not None:
1003 1000 # This is definitely not the final form of this
1004 1001 # return. But one need to start somewhere.
1005 1002 part = op.reply.newpart('b2x:reply:changegroup')
1006 1003 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1007 1004 part.addparam('return', '%i' % ret, mandatory=False)
1008 1005 try:
1009 1006 real_part.validate()
1010 1007 except util.Abort, e:
1011 1008 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1012 1009 (util.hidepassword(raw_url), str(e)))
1013 1010 assert not inpart.read()
1014 1011
1015 1012 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
1016 1013 def handlereplychangegroup(op, inpart):
1017 1014 ret = int(inpart.params['return'])
1018 1015 replyto = int(inpart.params['in-reply-to'])
1019 1016 op.records.add('changegroup', {'return': ret}, replyto)
1020 1017
1021 1018 @parthandler('b2x:check:heads')
1022 1019 def handlecheckheads(op, inpart):
1023 1020 """check that head of the repo did not change
1024 1021
1025 1022 This is used to detect a push race when using unbundle.
1026 1023 This replaces the "heads" argument of unbundle."""
1027 1024 h = inpart.read(20)
1028 1025 heads = []
1029 1026 while len(h) == 20:
1030 1027 heads.append(h)
1031 1028 h = inpart.read(20)
1032 1029 assert not h
1033 1030 if heads != op.repo.heads():
1034 1031 raise error.PushRaced('repository changed while pushing - '
1035 1032 'please try again')
1036 1033
1037 1034 @parthandler('b2x:output')
1038 1035 def handleoutput(op, inpart):
1039 1036 """forward output captured on the server to the client"""
1040 1037 for line in inpart.read().splitlines():
1041 1038 op.ui.write(('remote: %s\n' % line))
1042 1039
1043 1040 @parthandler('b2x:replycaps')
1044 1041 def handlereplycaps(op, inpart):
1045 1042 """Notify that a reply bundle should be created
1046 1043
1047 1044 The payload contains the capabilities information for the reply"""
1048 1045 caps = decodecaps(inpart.read())
1049 1046 if op.reply is None:
1050 1047 op.reply = bundle20(op.ui, caps)
1051 1048
1052 1049 @parthandler('b2x:error:abort', ('message', 'hint'))
1053 1050 def handlereplycaps(op, inpart):
1054 1051 """Used to transmit abort error over the wire"""
1055 1052 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1056 1053
1057 1054 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
1058 1055 def handlereplycaps(op, inpart):
1059 1056 """Used to transmit unknown content error over the wire"""
1060 1057 kwargs = {}
1061 1058 parttype = inpart.params.get('parttype')
1062 1059 if parttype is not None:
1063 1060 kwargs['parttype'] = parttype
1064 1061 params = inpart.params.get('params')
1065 1062 if params is not None:
1066 1063 kwargs['params'] = params.split('\0')
1067 1064
1068 1065 raise error.UnsupportedPartError(**kwargs)
1069 1066
1070 1067 @parthandler('b2x:error:pushraced', ('message',))
1071 1068 def handlereplycaps(op, inpart):
1072 1069 """Used to transmit push race error over the wire"""
1073 1070 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1074 1071
1075 1072 @parthandler('b2x:listkeys', ('namespace',))
1076 1073 def handlelistkeys(op, inpart):
1077 1074 """retrieve pushkey namespace content stored in a bundle2"""
1078 1075 namespace = inpart.params['namespace']
1079 1076 r = pushkey.decodekeys(inpart.read())
1080 1077 op.records.add('listkeys', (namespace, r))
1081 1078
1082 1079 @parthandler('b2x:pushkey', ('namespace', 'key', 'old', 'new'))
1083 1080 def handlepushkey(op, inpart):
1084 1081 """process a pushkey request"""
1085 1082 dec = pushkey.decode
1086 1083 namespace = dec(inpart.params['namespace'])
1087 1084 key = dec(inpart.params['key'])
1088 1085 old = dec(inpart.params['old'])
1089 1086 new = dec(inpart.params['new'])
1090 1087 ret = op.repo.pushkey(namespace, key, old, new)
1091 1088 record = {'namespace': namespace,
1092 1089 'key': key,
1093 1090 'old': old,
1094 1091 'new': new}
1095 1092 op.records.add('pushkey', record)
1096 1093 if op.reply is not None:
1097 1094 rpart = op.reply.newpart('b2x:reply:pushkey')
1098 1095 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1099 1096 rpart.addparam('return', '%i' % ret, mandatory=False)
1100 1097
1101 1098 @parthandler('b2x:reply:pushkey', ('return', 'in-reply-to'))
1102 1099 def handlepushkeyreply(op, inpart):
1103 1100 """retrieve the result of a pushkey request"""
1104 1101 ret = int(inpart.params['return'])
1105 1102 partid = int(inpart.params['in-reply-to'])
1106 1103 op.records.add('pushkey', {'return': ret}, partid)
1107 1104
1108 1105 @parthandler('b2x:obsmarkers')
1109 1106 def handleobsmarker(op, inpart):
1110 1107 """add a stream of obsmarkers to the repo"""
1111 1108 tr = op.gettransaction()
1112 1109 new = op.repo.obsstore.mergemarkers(tr, inpart.read())
1113 1110 if new:
1114 1111 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1115 1112 op.records.add('obsmarkers', {'new': new})
1116 1113 if op.reply is not None:
1117 1114 rpart = op.reply.newpart('b2x:reply:obsmarkers')
1118 1115 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1119 1116 rpart.addparam('new', '%i' % new, mandatory=False)
1120 1117
1121 1118
1122 1119 @parthandler('b2x:reply:obsmarkers', ('new', 'in-reply-to'))
1123 1120 def handlepushkeyreply(op, inpart):
1124 1121 """retrieve the result of a pushkey request"""
1125 1122 ret = int(inpart.params['new'])
1126 1123 partid = int(inpart.params['in-reply-to'])
1127 1124 op.records.add('obsmarkers', {'new': ret}, partid)
General Comments 0
You need to be logged in to leave comments. Login now