##// END OF EJS Templates
bundle2: add a interrupt mechanism...
Pierre-Yves David -
r23066:ad144882 stable
parent child Browse files
Show More
@@ -1,1042 +1,1102
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part headers. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 import util
149 149 import struct
150 150 import urllib
151 151 import string
152 152 import obsolete
153 153 import pushkey
154 154 import url
155 155
156 156 import changegroup, error
157 157 from i18n import _
158 158
159 159 _pack = struct.pack
160 160 _unpack = struct.unpack
161 161
162 162 _magicstring = 'HG2Y'
163 163
164 164 _fstreamparamsize = '>i'
165 165 _fpartheadersize = '>i'
166 166 _fparttypesize = '>B'
167 167 _fpartid = '>I'
168 168 _fpayloadsize = '>i'
169 169 _fpartparamcount = '>BB'
170 170
171 171 preferedchunksize = 4096
172 172
173 173 def _makefpartparamsizes(nbparams):
174 174 """return a struct format to read part parameter sizes
175 175
176 176 The number parameters is variable so we need to build that format
177 177 dynamically.
178 178 """
179 179 return '>'+('BB'*nbparams)
180 180
181 181 parthandlermapping = {}
182 182
183 183 def parthandler(parttype, params=()):
184 184 """decorator that register a function as a bundle2 part handler
185 185
186 186 eg::
187 187
188 188 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
189 189 def myparttypehandler(...):
190 190 '''process a part of type "my part".'''
191 191 ...
192 192 """
193 193 def _decorator(func):
194 194 lparttype = parttype.lower() # enforce lower case matching.
195 195 assert lparttype not in parthandlermapping
196 196 parthandlermapping[lparttype] = func
197 197 func.params = frozenset(params)
198 198 return func
199 199 return _decorator
200 200
201 201 class unbundlerecords(object):
202 202 """keep record of what happens during and unbundle
203 203
204 204 New records are added using `records.add('cat', obj)`. Where 'cat' is a
205 205 category of record and obj is an arbitrary object.
206 206
207 207 `records['cat']` will return all entries of this category 'cat'.
208 208
209 209 Iterating on the object itself will yield `('category', obj)` tuples
210 210 for all entries.
211 211
212 212 All iterations happens in chronological order.
213 213 """
214 214
215 215 def __init__(self):
216 216 self._categories = {}
217 217 self._sequences = []
218 218 self._replies = {}
219 219
220 220 def add(self, category, entry, inreplyto=None):
221 221 """add a new record of a given category.
222 222
223 223 The entry can then be retrieved in the list returned by
224 224 self['category']."""
225 225 self._categories.setdefault(category, []).append(entry)
226 226 self._sequences.append((category, entry))
227 227 if inreplyto is not None:
228 228 self.getreplies(inreplyto).add(category, entry)
229 229
230 230 def getreplies(self, partid):
231 231 """get the subrecords that replies to a specific part"""
232 232 return self._replies.setdefault(partid, unbundlerecords())
233 233
234 234 def __getitem__(self, cat):
235 235 return tuple(self._categories.get(cat, ()))
236 236
237 237 def __iter__(self):
238 238 return iter(self._sequences)
239 239
240 240 def __len__(self):
241 241 return len(self._sequences)
242 242
243 243 def __nonzero__(self):
244 244 return bool(self._sequences)
245 245
246 246 class bundleoperation(object):
247 247 """an object that represents a single bundling process
248 248
249 249 Its purpose is to carry unbundle-related objects and states.
250 250
251 251 A new object should be created at the beginning of each bundle processing.
252 252 The object is to be returned by the processing function.
253 253
254 254 The object has very little content now it will ultimately contain:
255 255 * an access to the repo the bundle is applied to,
256 256 * a ui object,
257 257 * a way to retrieve a transaction to add changes to the repo,
258 258 * a way to record the result of processing each part,
259 259 * a way to construct a bundle response when applicable.
260 260 """
261 261
262 262 def __init__(self, repo, transactiongetter):
263 263 self.repo = repo
264 264 self.ui = repo.ui
265 265 self.records = unbundlerecords()
266 266 self.gettransaction = transactiongetter
267 267 self.reply = None
268 268
269 269 class TransactionUnavailable(RuntimeError):
270 270 pass
271 271
272 272 def _notransaction():
273 273 """default method to get a transaction while processing a bundle
274 274
275 275 Raise an exception to highlight the fact that no transaction was expected
276 276 to be created"""
277 277 raise TransactionUnavailable()
278 278
279 279 def processbundle(repo, unbundler, transactiongetter=_notransaction):
280 280 """This function process a bundle, apply effect to/from a repo
281 281
282 282 It iterates over each part then searches for and uses the proper handling
283 283 code to process the part. Parts are processed in order.
284 284
285 285 This is very early version of this function that will be strongly reworked
286 286 before final usage.
287 287
288 288 Unknown Mandatory part will abort the process.
289 289 """
290 290 op = bundleoperation(repo, transactiongetter)
291 291 # todo:
292 292 # - replace this is a init function soon.
293 293 # - exception catching
294 294 unbundler.params
295 295 iterparts = unbundler.iterparts()
296 296 part = None
297 297 try:
298 298 for part in iterparts:
299 299 _processpart(op, part)
300 300 except Exception, exc:
301 301 for part in iterparts:
302 302 # consume the bundle content
303 303 part.read()
304 304 # Small hack to let caller code distinguish exceptions from bundle2
305 305 # processing fron the ones from bundle1 processing. This is mostly
306 306 # needed to handle different return codes to unbundle according to the
307 307 # type of bundle. We should probably clean up or drop this return code
308 308 # craziness in a future version.
309 309 exc.duringunbundle2 = True
310 310 raise
311 311 return op
312 312
313 313 def _processpart(op, part):
314 314 """process a single part from a bundle
315 315
316 316 The part is guaranteed to have been fully consumed when the function exits
317 317 (even if an exception is raised)."""
318 318 try:
319 319 parttype = part.type
320 320 # part key are matched lower case
321 321 key = parttype.lower()
322 322 try:
323 323 handler = parthandlermapping.get(key)
324 324 if handler is None:
325 325 raise error.UnsupportedPartError(parttype=key)
326 326 op.ui.debug('found a handler for part %r\n' % parttype)
327 327 unknownparams = part.mandatorykeys - handler.params
328 328 if unknownparams:
329 329 unknownparams = list(unknownparams)
330 330 unknownparams.sort()
331 331 raise error.UnsupportedPartError(parttype=key,
332 332 params=unknownparams)
333 333 except error.UnsupportedPartError, exc:
334 334 if key != parttype: # mandatory parts
335 335 raise
336 336 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
337 337 return # skip to part processing
338 338
339 339 # handler is called outside the above try block so that we don't
340 340 # risk catching KeyErrors from anything other than the
341 341 # parthandlermapping lookup (any KeyError raised by handler()
342 342 # itself represents a defect of a different variety).
343 343 output = None
344 344 if op.reply is not None:
345 345 op.ui.pushbuffer(error=True)
346 346 output = ''
347 347 try:
348 348 handler(op, part)
349 349 finally:
350 350 if output is not None:
351 351 output = op.ui.popbuffer()
352 352 if output:
353 353 outpart = op.reply.newpart('b2x:output', data=output)
354 354 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
355 355 finally:
356 356 # consume the part content to not corrupt the stream.
357 357 part.read()
358 358
359 359
360 360 def decodecaps(blob):
361 361 """decode a bundle2 caps bytes blob into a dictionnary
362 362
363 363 The blob is a list of capabilities (one per line)
364 364 Capabilities may have values using a line of the form::
365 365
366 366 capability=value1,value2,value3
367 367
368 368 The values are always a list."""
369 369 caps = {}
370 370 for line in blob.splitlines():
371 371 if not line:
372 372 continue
373 373 if '=' not in line:
374 374 key, vals = line, ()
375 375 else:
376 376 key, vals = line.split('=', 1)
377 377 vals = vals.split(',')
378 378 key = urllib.unquote(key)
379 379 vals = [urllib.unquote(v) for v in vals]
380 380 caps[key] = vals
381 381 return caps
382 382
383 383 def encodecaps(caps):
384 384 """encode a bundle2 caps dictionary into a bytes blob"""
385 385 chunks = []
386 386 for ca in sorted(caps):
387 387 vals = caps[ca]
388 388 ca = urllib.quote(ca)
389 389 vals = [urllib.quote(v) for v in vals]
390 390 if vals:
391 391 ca = "%s=%s" % (ca, ','.join(vals))
392 392 chunks.append(ca)
393 393 return '\n'.join(chunks)
394 394
395 395 class bundle20(object):
396 396 """represent an outgoing bundle2 container
397 397
398 398 Use the `addparam` method to add stream level parameter. and `newpart` to
399 399 populate it. Then call `getchunks` to retrieve all the binary chunks of
400 400 data that compose the bundle2 container."""
401 401
402 402 def __init__(self, ui, capabilities=()):
403 403 self.ui = ui
404 404 self._params = []
405 405 self._parts = []
406 406 self.capabilities = dict(capabilities)
407 407
408 408 @property
409 409 def nbparts(self):
410 410 """total number of parts added to the bundler"""
411 411 return len(self._parts)
412 412
413 413 # methods used to defines the bundle2 content
414 414 def addparam(self, name, value=None):
415 415 """add a stream level parameter"""
416 416 if not name:
417 417 raise ValueError('empty parameter name')
418 418 if name[0] not in string.letters:
419 419 raise ValueError('non letter first character: %r' % name)
420 420 self._params.append((name, value))
421 421
422 422 def addpart(self, part):
423 423 """add a new part to the bundle2 container
424 424
425 425 Parts contains the actual applicative payload."""
426 426 assert part.id is None
427 427 part.id = len(self._parts) # very cheap counter
428 428 self._parts.append(part)
429 429
430 430 def newpart(self, typeid, *args, **kwargs):
431 431 """create a new part and add it to the containers
432 432
433 433 As the part is directly added to the containers. For now, this means
434 434 that any failure to properly initialize the part after calling
435 435 ``newpart`` should result in a failure of the whole bundling process.
436 436
437 437 You can still fall back to manually create and add if you need better
438 438 control."""
439 439 part = bundlepart(typeid, *args, **kwargs)
440 440 self.addpart(part)
441 441 return part
442 442
443 443 # methods used to generate the bundle2 stream
444 444 def getchunks(self):
445 445 self.ui.debug('start emission of %s stream\n' % _magicstring)
446 446 yield _magicstring
447 447 param = self._paramchunk()
448 448 self.ui.debug('bundle parameter: %s\n' % param)
449 449 yield _pack(_fstreamparamsize, len(param))
450 450 if param:
451 451 yield param
452 452
453 453 self.ui.debug('start of parts\n')
454 454 for part in self._parts:
455 455 self.ui.debug('bundle part: "%s"\n' % part.type)
456 456 for chunk in part.getchunks():
457 457 yield chunk
458 458 self.ui.debug('end of bundle\n')
459 459 yield _pack(_fpartheadersize, 0)
460 460
461 461 def _paramchunk(self):
462 462 """return a encoded version of all stream parameters"""
463 463 blocks = []
464 464 for par, value in self._params:
465 465 par = urllib.quote(par)
466 466 if value is not None:
467 467 value = urllib.quote(value)
468 468 par = '%s=%s' % (par, value)
469 469 blocks.append(par)
470 470 return ' '.join(blocks)
471 471
472 472 class unpackermixin(object):
473 473 """A mixin to extract bytes and struct data from a stream"""
474 474
475 475 def __init__(self, fp):
476 476 self._fp = fp
477 477
478 478 def _unpack(self, format):
479 479 """unpack this struct format from the stream"""
480 480 data = self._readexact(struct.calcsize(format))
481 481 return _unpack(format, data)
482 482
483 483 def _readexact(self, size):
484 484 """read exactly <size> bytes from the stream"""
485 485 return changegroup.readexactly(self._fp, size)
486 486
487 487
488 488 class unbundle20(unpackermixin):
489 489 """interpret a bundle2 stream
490 490
491 491 This class is fed with a binary stream and yields parts through its
492 492 `iterparts` methods."""
493 493
494 494 def __init__(self, ui, fp, header=None):
495 495 """If header is specified, we do not read it out of the stream."""
496 496 self.ui = ui
497 497 super(unbundle20, self).__init__(fp)
498 498 if header is None:
499 499 header = self._readexact(4)
500 500 magic, version = header[0:2], header[2:4]
501 501 if magic != 'HG':
502 502 raise util.Abort(_('not a Mercurial bundle'))
503 503 if version != '2Y':
504 504 raise util.Abort(_('unknown bundle version %s') % version)
505 505 self.ui.debug('start processing of %s stream\n' % header)
506 506
507 507 @util.propertycache
508 508 def params(self):
509 509 """dictionary of stream level parameters"""
510 510 self.ui.debug('reading bundle2 stream parameters\n')
511 511 params = {}
512 512 paramssize = self._unpack(_fstreamparamsize)[0]
513 513 if paramssize < 0:
514 514 raise error.BundleValueError('negative bundle param size: %i'
515 515 % paramssize)
516 516 if paramssize:
517 517 for p in self._readexact(paramssize).split(' '):
518 518 p = p.split('=', 1)
519 519 p = [urllib.unquote(i) for i in p]
520 520 if len(p) < 2:
521 521 p.append(None)
522 522 self._processparam(*p)
523 523 params[p[0]] = p[1]
524 524 return params
525 525
526 526 def _processparam(self, name, value):
527 527 """process a parameter, applying its effect if needed
528 528
529 529 Parameter starting with a lower case letter are advisory and will be
530 530 ignored when unknown. Those starting with an upper case letter are
531 531 mandatory and will this function will raise a KeyError when unknown.
532 532
533 533 Note: no option are currently supported. Any input will be either
534 534 ignored or failing.
535 535 """
536 536 if not name:
537 537 raise ValueError('empty parameter name')
538 538 if name[0] not in string.letters:
539 539 raise ValueError('non letter first character: %r' % name)
540 540 # Some logic will be later added here to try to process the option for
541 541 # a dict of known parameter.
542 542 if name[0].islower():
543 543 self.ui.debug("ignoring unknown parameter %r\n" % name)
544 544 else:
545 545 raise error.UnsupportedPartError(params=(name,))
546 546
547 547
548 548 def iterparts(self):
549 549 """yield all parts contained in the stream"""
550 550 # make sure param have been loaded
551 551 self.params
552 552 self.ui.debug('start extraction of bundle2 parts\n')
553 553 headerblock = self._readpartheader()
554 554 while headerblock is not None:
555 555 part = unbundlepart(self.ui, headerblock, self._fp)
556 556 yield part
557 557 headerblock = self._readpartheader()
558 558 self.ui.debug('end of bundle2 stream\n')
559 559
560 560 def _readpartheader(self):
561 561 """reads a part header size and return the bytes blob
562 562
563 563 returns None if empty"""
564 564 headersize = self._unpack(_fpartheadersize)[0]
565 565 if headersize < 0:
566 566 raise error.BundleValueError('negative part header size: %i'
567 567 % headersize)
568 568 self.ui.debug('part header size: %i\n' % headersize)
569 569 if headersize:
570 570 return self._readexact(headersize)
571 571 return None
572 572
573 573
574 574 class bundlepart(object):
575 575 """A bundle2 part contains application level payload
576 576
577 577 The part `type` is used to route the part to the application level
578 578 handler.
579 579
580 580 The part payload is contained in ``part.data``. It could be raw bytes or a
581 581 generator of byte chunks.
582 582
583 583 You can add parameters to the part using the ``addparam`` method.
584 584 Parameters can be either mandatory (default) or advisory. Remote side
585 585 should be able to safely ignore the advisory ones.
586 586
587 587 Both data and parameters cannot be modified after the generation has begun.
588 588 """
589 589
590 590 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
591 591 data=''):
592 592 self.id = None
593 593 self.type = parttype
594 594 self._data = data
595 595 self._mandatoryparams = list(mandatoryparams)
596 596 self._advisoryparams = list(advisoryparams)
597 597 # checking for duplicated entries
598 598 self._seenparams = set()
599 599 for pname, __ in self._mandatoryparams + self._advisoryparams:
600 600 if pname in self._seenparams:
601 601 raise RuntimeError('duplicated params: %s' % pname)
602 602 self._seenparams.add(pname)
603 603 # status of the part's generation:
604 604 # - None: not started,
605 605 # - False: currently generated,
606 606 # - True: generation done.
607 607 self._generated = None
608 608
609 609 # methods used to defines the part content
610 610 def __setdata(self, data):
611 611 if self._generated is not None:
612 612 raise error.ReadOnlyPartError('part is being generated')
613 613 self._data = data
614 614 def __getdata(self):
615 615 return self._data
616 616 data = property(__getdata, __setdata)
617 617
618 618 @property
619 619 def mandatoryparams(self):
620 620 # make it an immutable tuple to force people through ``addparam``
621 621 return tuple(self._mandatoryparams)
622 622
623 623 @property
624 624 def advisoryparams(self):
625 625 # make it an immutable tuple to force people through ``addparam``
626 626 return tuple(self._advisoryparams)
627 627
628 628 def addparam(self, name, value='', mandatory=True):
629 629 if self._generated is not None:
630 630 raise error.ReadOnlyPartError('part is being generated')
631 631 if name in self._seenparams:
632 632 raise ValueError('duplicated params: %s' % name)
633 633 self._seenparams.add(name)
634 634 params = self._advisoryparams
635 635 if mandatory:
636 636 params = self._mandatoryparams
637 637 params.append((name, value))
638 638
639 639 # methods used to generates the bundle2 stream
640 640 def getchunks(self):
641 641 if self._generated is not None:
642 642 raise RuntimeError('part can only be consumed once')
643 643 self._generated = False
644 644 #### header
645 645 ## parttype
646 646 header = [_pack(_fparttypesize, len(self.type)),
647 647 self.type, _pack(_fpartid, self.id),
648 648 ]
649 649 ## parameters
650 650 # count
651 651 manpar = self.mandatoryparams
652 652 advpar = self.advisoryparams
653 653 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
654 654 # size
655 655 parsizes = []
656 656 for key, value in manpar:
657 657 parsizes.append(len(key))
658 658 parsizes.append(len(value))
659 659 for key, value in advpar:
660 660 parsizes.append(len(key))
661 661 parsizes.append(len(value))
662 662 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
663 663 header.append(paramsizes)
664 664 # key, value
665 665 for key, value in manpar:
666 666 header.append(key)
667 667 header.append(value)
668 668 for key, value in advpar:
669 669 header.append(key)
670 670 header.append(value)
671 671 ## finalize header
672 672 headerchunk = ''.join(header)
673 673 yield _pack(_fpartheadersize, len(headerchunk))
674 674 yield headerchunk
675 675 ## payload
676 676 for chunk in self._payloadchunks():
677 677 yield _pack(_fpayloadsize, len(chunk))
678 678 yield chunk
679 679 # end of payload
680 680 yield _pack(_fpayloadsize, 0)
681 681 self._generated = True
682 682
683 683 def _payloadchunks(self):
684 684 """yield chunks of a the part payload
685 685
686 686 Exists to handle the different methods to provide data to a part."""
687 687 # we only support fixed size data now.
688 688 # This will be improved in the future.
689 689 if util.safehasattr(self.data, 'next'):
690 690 buff = util.chunkbuffer(self.data)
691 691 chunk = buff.read(preferedchunksize)
692 692 while chunk:
693 693 yield chunk
694 694 chunk = buff.read(preferedchunksize)
695 695 elif len(self.data):
696 696 yield self.data
697 697
698
699 flaginterrupt = -1
700
701 class interrupthandler(unpackermixin):
702 """read one part and process it with restricted capability
703
704 This allows to transmit exception raised on the producer size during part
705 iteration while the consumer is reading a part.
706
707 Part processed in this manner only have access to a ui object,"""
708
709 def __init__(self, ui, fp):
710 super(interrupthandler, self).__init__(fp)
711 self.ui = ui
712
713 def _readpartheader(self):
714 """reads a part header size and return the bytes blob
715
716 returns None if empty"""
717 headersize = self._unpack(_fpartheadersize)[0]
718 if headersize < 0:
719 raise error.BundleValueError('negative part header size: %i'
720 % headersize)
721 self.ui.debug('part header size: %i\n' % headersize)
722 if headersize:
723 return self._readexact(headersize)
724 return None
725
726 def __call__(self):
727 self.ui.debug('bundle2 stream interruption, looking for a part.\n')
728 headerblock = self._readpartheader()
729 if headerblock is None:
730 self.ui.debug('no part found during iterruption.\n')
731 return
732 part = unbundlepart(self.ui, headerblock, self._fp)
733 op = interruptoperation(self.ui)
734 _processpart(op, part)
735
736 class interruptoperation(object):
737 """A limited operation to be use by part handler during interruption
738
739 It only have access to an ui object.
740 """
741
742 def __init__(self, ui):
743 self.ui = ui
744 self.reply = None
745
746 @property
747 def repo(self):
748 raise RuntimeError('no repo access from stream interruption')
749
750 def gettransaction(self):
751 raise TransactionUnavailable('no repo access from stream interruption')
752
698 753 class unbundlepart(unpackermixin):
699 754 """a bundle part read from a bundle"""
700 755
701 756 def __init__(self, ui, header, fp):
702 757 super(unbundlepart, self).__init__(fp)
703 758 self.ui = ui
704 759 # unbundle state attr
705 760 self._headerdata = header
706 761 self._headeroffset = 0
707 762 self._initialized = False
708 763 self.consumed = False
709 764 # part data
710 765 self.id = None
711 766 self.type = None
712 767 self.mandatoryparams = None
713 768 self.advisoryparams = None
714 769 self.params = None
715 770 self.mandatorykeys = ()
716 771 self._payloadstream = None
717 772 self._readheader()
718 773
719 774 def _fromheader(self, size):
720 775 """return the next <size> byte from the header"""
721 776 offset = self._headeroffset
722 777 data = self._headerdata[offset:(offset + size)]
723 778 self._headeroffset = offset + size
724 779 return data
725 780
726 781 def _unpackheader(self, format):
727 782 """read given format from header
728 783
729 784 This automatically compute the size of the format to read."""
730 785 data = self._fromheader(struct.calcsize(format))
731 786 return _unpack(format, data)
732 787
733 788 def _initparams(self, mandatoryparams, advisoryparams):
734 789 """internal function to setup all logic related parameters"""
735 790 # make it read only to prevent people touching it by mistake.
736 791 self.mandatoryparams = tuple(mandatoryparams)
737 792 self.advisoryparams = tuple(advisoryparams)
738 793 # user friendly UI
739 794 self.params = dict(self.mandatoryparams)
740 795 self.params.update(dict(self.advisoryparams))
741 796 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
742 797
743 798 def _readheader(self):
744 799 """read the header and setup the object"""
745 800 typesize = self._unpackheader(_fparttypesize)[0]
746 801 self.type = self._fromheader(typesize)
747 802 self.ui.debug('part type: "%s"\n' % self.type)
748 803 self.id = self._unpackheader(_fpartid)[0]
749 804 self.ui.debug('part id: "%s"\n' % self.id)
750 805 ## reading parameters
751 806 # param count
752 807 mancount, advcount = self._unpackheader(_fpartparamcount)
753 808 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
754 809 # param size
755 810 fparamsizes = _makefpartparamsizes(mancount + advcount)
756 811 paramsizes = self._unpackheader(fparamsizes)
757 812 # make it a list of couple again
758 813 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
759 814 # split mandatory from advisory
760 815 mansizes = paramsizes[:mancount]
761 816 advsizes = paramsizes[mancount:]
762 817 # retrive param value
763 818 manparams = []
764 819 for key, value in mansizes:
765 820 manparams.append((self._fromheader(key), self._fromheader(value)))
766 821 advparams = []
767 822 for key, value in advsizes:
768 823 advparams.append((self._fromheader(key), self._fromheader(value)))
769 824 self._initparams(manparams, advparams)
770 825 ## part payload
771 826 def payloadchunks():
772 827 payloadsize = self._unpack(_fpayloadsize)[0]
773 828 self.ui.debug('payload chunk size: %i\n' % payloadsize)
774 829 while payloadsize:
775 if payloadsize < 0:
830 if payloadsize == flaginterrupt:
831 # interruption detection, the handler will now read a
832 # single part and process it.
833 interrupthandler(self.ui, self._fp)()
834 elif payloadsize < 0:
776 835 msg = 'negative payload chunk size: %i' % payloadsize
777 836 raise error.BundleValueError(msg)
837 else:
778 838 yield self._readexact(payloadsize)
779 839 payloadsize = self._unpack(_fpayloadsize)[0]
780 840 self.ui.debug('payload chunk size: %i\n' % payloadsize)
781 841 self._payloadstream = util.chunkbuffer(payloadchunks())
782 842 # we read the data, tell it
783 843 self._initialized = True
784 844
785 845 def read(self, size=None):
786 846 """read payload data"""
787 847 if not self._initialized:
788 848 self._readheader()
789 849 if size is None:
790 850 data = self._payloadstream.read()
791 851 else:
792 852 data = self._payloadstream.read(size)
793 853 if size is None or len(data) < size:
794 854 self.consumed = True
795 855 return data
796 856
797 857 capabilities = {'HG2Y': (),
798 858 'b2x:listkeys': (),
799 859 'b2x:pushkey': (),
800 860 'b2x:changegroup': (),
801 861 'digests': tuple(sorted(util.DIGESTS.keys())),
802 862 'b2x:remote-changegroup': ('http', 'https'),
803 863 }
804 864
805 865 def getrepocaps(repo):
806 866 """return the bundle2 capabilities for a given repo
807 867
808 868 Exists to allow extensions (like evolution) to mutate the capabilities.
809 869 """
810 870 caps = capabilities.copy()
811 871 if obsolete.isenabled(repo, obsolete.exchangeopt):
812 872 supportedformat = tuple('V%i' % v for v in obsolete.formats)
813 873 caps['b2x:obsmarkers'] = supportedformat
814 874 return caps
815 875
816 876 def bundle2caps(remote):
817 877 """return the bundlecapabilities of a peer as dict"""
818 878 raw = remote.capable('bundle2-exp')
819 879 if not raw and raw != '':
820 880 return {}
821 881 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
822 882 return decodecaps(capsblob)
823 883
824 884 def obsmarkersversion(caps):
825 885 """extract the list of supported obsmarkers versions from a bundle2caps dict
826 886 """
827 887 obscaps = caps.get('b2x:obsmarkers', ())
828 888 return [int(c[1:]) for c in obscaps if c.startswith('V')]
829 889
830 890 @parthandler('b2x:changegroup')
831 891 def handlechangegroup(op, inpart):
832 892 """apply a changegroup part on the repo
833 893
834 894 This is a very early implementation that will massive rework before being
835 895 inflicted to any end-user.
836 896 """
837 897 # Make sure we trigger a transaction creation
838 898 #
839 899 # The addchangegroup function will get a transaction object by itself, but
840 900 # we need to make sure we trigger the creation of a transaction object used
841 901 # for the whole processing scope.
842 902 op.gettransaction()
843 903 cg = changegroup.cg1unpacker(inpart, 'UN')
844 904 # the source and url passed here are overwritten by the one contained in
845 905 # the transaction.hookargs argument. So 'bundle2' is a placeholder
846 906 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
847 907 op.records.add('changegroup', {'return': ret})
848 908 if op.reply is not None:
849 909 # This is definitly not the final form of this
850 910 # return. But one need to start somewhere.
851 911 part = op.reply.newpart('b2x:reply:changegroup')
852 912 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
853 913 part.addparam('return', '%i' % ret, mandatory=False)
854 914 assert not inpart.read()
855 915
856 916 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
857 917 ['digest:%s' % k for k in util.DIGESTS.keys()])
858 918 @parthandler('b2x:remote-changegroup', _remotechangegroupparams)
859 919 def handleremotechangegroup(op, inpart):
860 920 """apply a bundle10 on the repo, given an url and validation information
861 921
862 922 All the information about the remote bundle to import are given as
863 923 parameters. The parameters include:
864 924 - url: the url to the bundle10.
865 925 - size: the bundle10 file size. It is used to validate what was
866 926 retrieved by the client matches the server knowledge about the bundle.
867 927 - digests: a space separated list of the digest types provided as
868 928 parameters.
869 929 - digest:<digest-type>: the hexadecimal representation of the digest with
870 930 that name. Like the size, it is used to validate what was retrieved by
871 931 the client matches what the server knows about the bundle.
872 932
873 933 When multiple digest types are given, all of them are checked.
874 934 """
875 935 try:
876 936 raw_url = inpart.params['url']
877 937 except KeyError:
878 938 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
879 939 parsed_url = util.url(raw_url)
880 940 if parsed_url.scheme not in capabilities['b2x:remote-changegroup']:
881 941 raise util.Abort(_('remote-changegroup does not support %s urls') %
882 942 parsed_url.scheme)
883 943
884 944 try:
885 945 size = int(inpart.params['size'])
886 946 except ValueError:
887 947 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
888 948 % 'size')
889 949 except KeyError:
890 950 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
891 951
892 952 digests = {}
893 953 for typ in inpart.params.get('digests', '').split():
894 954 param = 'digest:%s' % typ
895 955 try:
896 956 value = inpart.params[param]
897 957 except KeyError:
898 958 raise util.Abort(_('remote-changegroup: missing "%s" param') %
899 959 param)
900 960 digests[typ] = value
901 961
902 962 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
903 963
904 964 # Make sure we trigger a transaction creation
905 965 #
906 966 # The addchangegroup function will get a transaction object by itself, but
907 967 # we need to make sure we trigger the creation of a transaction object used
908 968 # for the whole processing scope.
909 969 op.gettransaction()
910 970 import exchange
911 971 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
912 972 if not isinstance(cg, changegroup.cg1unpacker):
913 973 raise util.Abort(_('%s: not a bundle version 1.0') %
914 974 util.hidepassword(raw_url))
915 975 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
916 976 op.records.add('changegroup', {'return': ret})
917 977 if op.reply is not None:
918 978 # This is definitly not the final form of this
919 979 # return. But one need to start somewhere.
920 980 part = op.reply.newpart('b2x:reply:changegroup')
921 981 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
922 982 part.addparam('return', '%i' % ret, mandatory=False)
923 983 try:
924 984 real_part.validate()
925 985 except util.Abort, e:
926 986 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
927 987 (util.hidepassword(raw_url), str(e)))
928 988 assert not inpart.read()
929 989
930 990 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
931 991 def handlereplychangegroup(op, inpart):
932 992 ret = int(inpart.params['return'])
933 993 replyto = int(inpart.params['in-reply-to'])
934 994 op.records.add('changegroup', {'return': ret}, replyto)
935 995
936 996 @parthandler('b2x:check:heads')
937 997 def handlecheckheads(op, inpart):
938 998 """check that head of the repo did not change
939 999
940 1000 This is used to detect a push race when using unbundle.
941 1001 This replaces the "heads" argument of unbundle."""
942 1002 h = inpart.read(20)
943 1003 heads = []
944 1004 while len(h) == 20:
945 1005 heads.append(h)
946 1006 h = inpart.read(20)
947 1007 assert not h
948 1008 if heads != op.repo.heads():
949 1009 raise error.PushRaced('repository changed while pushing - '
950 1010 'please try again')
951 1011
952 1012 @parthandler('b2x:output')
953 1013 def handleoutput(op, inpart):
954 1014 """forward output captured on the server to the client"""
955 1015 for line in inpart.read().splitlines():
956 1016 op.ui.write(('remote: %s\n' % line))
957 1017
958 1018 @parthandler('b2x:replycaps')
959 1019 def handlereplycaps(op, inpart):
960 1020 """Notify that a reply bundle should be created
961 1021
962 1022 The payload contains the capabilities information for the reply"""
963 1023 caps = decodecaps(inpart.read())
964 1024 if op.reply is None:
965 1025 op.reply = bundle20(op.ui, caps)
966 1026
967 1027 @parthandler('b2x:error:abort', ('message', 'hint'))
968 1028 def handlereplycaps(op, inpart):
969 1029 """Used to transmit abort error over the wire"""
970 1030 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
971 1031
972 1032 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
973 1033 def handlereplycaps(op, inpart):
974 1034 """Used to transmit unknown content error over the wire"""
975 1035 kwargs = {}
976 1036 parttype = inpart.params.get('parttype')
977 1037 if parttype is not None:
978 1038 kwargs['parttype'] = parttype
979 1039 params = inpart.params.get('params')
980 1040 if params is not None:
981 1041 kwargs['params'] = params.split('\0')
982 1042
983 1043 raise error.UnsupportedPartError(**kwargs)
984 1044
985 1045 @parthandler('b2x:error:pushraced', ('message',))
986 1046 def handlereplycaps(op, inpart):
987 1047 """Used to transmit push race error over the wire"""
988 1048 raise error.ResponseError(_('push failed:'), inpart.params['message'])
989 1049
990 1050 @parthandler('b2x:listkeys', ('namespace',))
991 1051 def handlelistkeys(op, inpart):
992 1052 """retrieve pushkey namespace content stored in a bundle2"""
993 1053 namespace = inpart.params['namespace']
994 1054 r = pushkey.decodekeys(inpart.read())
995 1055 op.records.add('listkeys', (namespace, r))
996 1056
997 1057 @parthandler('b2x:pushkey', ('namespace', 'key', 'old', 'new'))
998 1058 def handlepushkey(op, inpart):
999 1059 """process a pushkey request"""
1000 1060 dec = pushkey.decode
1001 1061 namespace = dec(inpart.params['namespace'])
1002 1062 key = dec(inpart.params['key'])
1003 1063 old = dec(inpart.params['old'])
1004 1064 new = dec(inpart.params['new'])
1005 1065 ret = op.repo.pushkey(namespace, key, old, new)
1006 1066 record = {'namespace': namespace,
1007 1067 'key': key,
1008 1068 'old': old,
1009 1069 'new': new}
1010 1070 op.records.add('pushkey', record)
1011 1071 if op.reply is not None:
1012 1072 rpart = op.reply.newpart('b2x:reply:pushkey')
1013 1073 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1014 1074 rpart.addparam('return', '%i' % ret, mandatory=False)
1015 1075
1016 1076 @parthandler('b2x:reply:pushkey', ('return', 'in-reply-to'))
1017 1077 def handlepushkeyreply(op, inpart):
1018 1078 """retrieve the result of a pushkey request"""
1019 1079 ret = int(inpart.params['return'])
1020 1080 partid = int(inpart.params['in-reply-to'])
1021 1081 op.records.add('pushkey', {'return': ret}, partid)
1022 1082
1023 1083 @parthandler('b2x:obsmarkers')
1024 1084 def handleobsmarker(op, inpart):
1025 1085 """add a stream of obsmarkers to the repo"""
1026 1086 tr = op.gettransaction()
1027 1087 new = op.repo.obsstore.mergemarkers(tr, inpart.read())
1028 1088 if new:
1029 1089 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1030 1090 op.records.add('obsmarkers', {'new': new})
1031 1091 if op.reply is not None:
1032 1092 rpart = op.reply.newpart('b2x:reply:obsmarkers')
1033 1093 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1034 1094 rpart.addparam('new', '%i' % new, mandatory=False)
1035 1095
1036 1096
1037 1097 @parthandler('b2x:reply:obsmarkers', ('new', 'in-reply-to'))
1038 1098 def handlepushkeyreply(op, inpart):
1039 1099 """retrieve the result of a pushkey request"""
1040 1100 ret = int(inpart.params['new'])
1041 1101 partid = int(inpart.params['in-reply-to'])
1042 1102 op.records.add('obsmarkers', {'new': ret}, partid)
General Comments 0
You need to be logged in to leave comments. Login now