##// END OF EJS Templates
bundle2: flush output in a part in all cases...
Pierre-Yves David -
r24743:a2ef1dc3 default
parent child Browse files
Show More
@@ -1,1239 +1,1239 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part headers. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 import errno
149 149 import sys
150 150 import util
151 151 import struct
152 152 import urllib
153 153 import string
154 154 import obsolete
155 155 import pushkey
156 156 import url
157 157 import re
158 158
159 159 import changegroup, error
160 160 from i18n import _
161 161
162 162 _pack = struct.pack
163 163 _unpack = struct.unpack
164 164
165 165 _fstreamparamsize = '>i'
166 166 _fpartheadersize = '>i'
167 167 _fparttypesize = '>B'
168 168 _fpartid = '>I'
169 169 _fpayloadsize = '>i'
170 170 _fpartparamcount = '>BB'
171 171
172 172 preferedchunksize = 4096
173 173
174 174 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
175 175
176 176 def validateparttype(parttype):
177 177 """raise ValueError if a parttype contains invalid character"""
178 178 if _parttypeforbidden.search(parttype):
179 179 raise ValueError(parttype)
180 180
181 181 def _makefpartparamsizes(nbparams):
182 182 """return a struct format to read part parameter sizes
183 183
184 184 The number parameters is variable so we need to build that format
185 185 dynamically.
186 186 """
187 187 return '>'+('BB'*nbparams)
188 188
189 189 parthandlermapping = {}
190 190
191 191 def parthandler(parttype, params=()):
192 192 """decorator that register a function as a bundle2 part handler
193 193
194 194 eg::
195 195
196 196 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
197 197 def myparttypehandler(...):
198 198 '''process a part of type "my part".'''
199 199 ...
200 200 """
201 201 validateparttype(parttype)
202 202 def _decorator(func):
203 203 lparttype = parttype.lower() # enforce lower case matching.
204 204 assert lparttype not in parthandlermapping
205 205 parthandlermapping[lparttype] = func
206 206 func.params = frozenset(params)
207 207 return func
208 208 return _decorator
209 209
210 210 class unbundlerecords(object):
211 211 """keep record of what happens during and unbundle
212 212
213 213 New records are added using `records.add('cat', obj)`. Where 'cat' is a
214 214 category of record and obj is an arbitrary object.
215 215
216 216 `records['cat']` will return all entries of this category 'cat'.
217 217
218 218 Iterating on the object itself will yield `('category', obj)` tuples
219 219 for all entries.
220 220
221 221 All iterations happens in chronological order.
222 222 """
223 223
224 224 def __init__(self):
225 225 self._categories = {}
226 226 self._sequences = []
227 227 self._replies = {}
228 228
229 229 def add(self, category, entry, inreplyto=None):
230 230 """add a new record of a given category.
231 231
232 232 The entry can then be retrieved in the list returned by
233 233 self['category']."""
234 234 self._categories.setdefault(category, []).append(entry)
235 235 self._sequences.append((category, entry))
236 236 if inreplyto is not None:
237 237 self.getreplies(inreplyto).add(category, entry)
238 238
239 239 def getreplies(self, partid):
240 240 """get the records that are replies to a specific part"""
241 241 return self._replies.setdefault(partid, unbundlerecords())
242 242
243 243 def __getitem__(self, cat):
244 244 return tuple(self._categories.get(cat, ()))
245 245
246 246 def __iter__(self):
247 247 return iter(self._sequences)
248 248
249 249 def __len__(self):
250 250 return len(self._sequences)
251 251
252 252 def __nonzero__(self):
253 253 return bool(self._sequences)
254 254
255 255 class bundleoperation(object):
256 256 """an object that represents a single bundling process
257 257
258 258 Its purpose is to carry unbundle-related objects and states.
259 259
260 260 A new object should be created at the beginning of each bundle processing.
261 261 The object is to be returned by the processing function.
262 262
263 263 The object has very little content now it will ultimately contain:
264 264 * an access to the repo the bundle is applied to,
265 265 * a ui object,
266 266 * a way to retrieve a transaction to add changes to the repo,
267 267 * a way to record the result of processing each part,
268 268 * a way to construct a bundle response when applicable.
269 269 """
270 270
271 271 def __init__(self, repo, transactiongetter):
272 272 self.repo = repo
273 273 self.ui = repo.ui
274 274 self.records = unbundlerecords()
275 275 self.gettransaction = transactiongetter
276 276 self.reply = None
277 277
278 278 class TransactionUnavailable(RuntimeError):
279 279 pass
280 280
281 281 def _notransaction():
282 282 """default method to get a transaction while processing a bundle
283 283
284 284 Raise an exception to highlight the fact that no transaction was expected
285 285 to be created"""
286 286 raise TransactionUnavailable()
287 287
288 288 def processbundle(repo, unbundler, transactiongetter=None):
289 289 """This function process a bundle, apply effect to/from a repo
290 290
291 291 It iterates over each part then searches for and uses the proper handling
292 292 code to process the part. Parts are processed in order.
293 293
294 294 This is very early version of this function that will be strongly reworked
295 295 before final usage.
296 296
297 297 Unknown Mandatory part will abort the process.
298 298 """
299 299 if transactiongetter is None:
300 300 transactiongetter = _notransaction
301 301 op = bundleoperation(repo, transactiongetter)
302 302 # todo:
303 303 # - replace this is a init function soon.
304 304 # - exception catching
305 305 unbundler.params
306 306 iterparts = unbundler.iterparts()
307 307 part = None
308 308 try:
309 309 for part in iterparts:
310 310 _processpart(op, part)
311 311 except Exception, exc:
312 312 for part in iterparts:
313 313 # consume the bundle content
314 314 part.seek(0, 2)
315 315 # Small hack to let caller code distinguish exceptions from bundle2
316 316 # processing from processing the old format. This is mostly
317 317 # needed to handle different return codes to unbundle according to the
318 318 # type of bundle. We should probably clean up or drop this return code
319 319 # craziness in a future version.
320 320 exc.duringunbundle2 = True
321 321 raise
322 322 return op
323 323
324 324 def _processpart(op, part):
325 325 """process a single part from a bundle
326 326
327 327 The part is guaranteed to have been fully consumed when the function exits
328 328 (even if an exception is raised)."""
329 329 try:
330 330 try:
331 331 handler = parthandlermapping.get(part.type)
332 332 if handler is None:
333 333 raise error.UnsupportedPartError(parttype=part.type)
334 334 op.ui.debug('found a handler for part %r\n' % part.type)
335 335 unknownparams = part.mandatorykeys - handler.params
336 336 if unknownparams:
337 337 unknownparams = list(unknownparams)
338 338 unknownparams.sort()
339 339 raise error.UnsupportedPartError(parttype=part.type,
340 340 params=unknownparams)
341 341 except error.UnsupportedPartError, exc:
342 342 if part.mandatory: # mandatory parts
343 343 raise
344 344 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
345 345 return # skip to part processing
346 346
347 347 # handler is called outside the above try block so that we don't
348 348 # risk catching KeyErrors from anything other than the
349 349 # parthandlermapping lookup (any KeyError raised by handler()
350 350 # itself represents a defect of a different variety).
351 351 output = None
352 352 if op.reply is not None:
353 353 op.ui.pushbuffer(error=True)
354 354 output = ''
355 355 try:
356 356 handler(op, part)
357 357 finally:
358 358 if output is not None:
359 359 output = op.ui.popbuffer()
360 if output:
361 outpart = op.reply.newpart('output', data=output,
362 mandatory=False)
363 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
360 if output:
361 outpart = op.reply.newpart('output', data=output,
362 mandatory=False)
363 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
364 364 finally:
365 365 # consume the part content to not corrupt the stream.
366 366 part.seek(0, 2)
367 367
368 368
369 369 def decodecaps(blob):
370 370 """decode a bundle2 caps bytes blob into a dictionary
371 371
372 372 The blob is a list of capabilities (one per line)
373 373 Capabilities may have values using a line of the form::
374 374
375 375 capability=value1,value2,value3
376 376
377 377 The values are always a list."""
378 378 caps = {}
379 379 for line in blob.splitlines():
380 380 if not line:
381 381 continue
382 382 if '=' not in line:
383 383 key, vals = line, ()
384 384 else:
385 385 key, vals = line.split('=', 1)
386 386 vals = vals.split(',')
387 387 key = urllib.unquote(key)
388 388 vals = [urllib.unquote(v) for v in vals]
389 389 caps[key] = vals
390 390 return caps
391 391
392 392 def encodecaps(caps):
393 393 """encode a bundle2 caps dictionary into a bytes blob"""
394 394 chunks = []
395 395 for ca in sorted(caps):
396 396 vals = caps[ca]
397 397 ca = urllib.quote(ca)
398 398 vals = [urllib.quote(v) for v in vals]
399 399 if vals:
400 400 ca = "%s=%s" % (ca, ','.join(vals))
401 401 chunks.append(ca)
402 402 return '\n'.join(chunks)
403 403
404 404 class bundle20(object):
405 405 """represent an outgoing bundle2 container
406 406
407 407 Use the `addparam` method to add stream level parameter. and `newpart` to
408 408 populate it. Then call `getchunks` to retrieve all the binary chunks of
409 409 data that compose the bundle2 container."""
410 410
411 411 _magicstring = 'HG20'
412 412
413 413 def __init__(self, ui, capabilities=()):
414 414 self.ui = ui
415 415 self._params = []
416 416 self._parts = []
417 417 self.capabilities = dict(capabilities)
418 418
419 419 @property
420 420 def nbparts(self):
421 421 """total number of parts added to the bundler"""
422 422 return len(self._parts)
423 423
424 424 # methods used to defines the bundle2 content
425 425 def addparam(self, name, value=None):
426 426 """add a stream level parameter"""
427 427 if not name:
428 428 raise ValueError('empty parameter name')
429 429 if name[0] not in string.letters:
430 430 raise ValueError('non letter first character: %r' % name)
431 431 self._params.append((name, value))
432 432
433 433 def addpart(self, part):
434 434 """add a new part to the bundle2 container
435 435
436 436 Parts contains the actual applicative payload."""
437 437 assert part.id is None
438 438 part.id = len(self._parts) # very cheap counter
439 439 self._parts.append(part)
440 440
441 441 def newpart(self, typeid, *args, **kwargs):
442 442 """create a new part and add it to the containers
443 443
444 444 As the part is directly added to the containers. For now, this means
445 445 that any failure to properly initialize the part after calling
446 446 ``newpart`` should result in a failure of the whole bundling process.
447 447
448 448 You can still fall back to manually create and add if you need better
449 449 control."""
450 450 part = bundlepart(typeid, *args, **kwargs)
451 451 self.addpart(part)
452 452 return part
453 453
454 454 # methods used to generate the bundle2 stream
455 455 def getchunks(self):
456 456 self.ui.debug('start emission of %s stream\n' % self._magicstring)
457 457 yield self._magicstring
458 458 param = self._paramchunk()
459 459 self.ui.debug('bundle parameter: %s\n' % param)
460 460 yield _pack(_fstreamparamsize, len(param))
461 461 if param:
462 462 yield param
463 463
464 464 self.ui.debug('start of parts\n')
465 465 for part in self._parts:
466 466 self.ui.debug('bundle part: "%s"\n' % part.type)
467 467 for chunk in part.getchunks():
468 468 yield chunk
469 469 self.ui.debug('end of bundle\n')
470 470 yield _pack(_fpartheadersize, 0)
471 471
472 472 def _paramchunk(self):
473 473 """return a encoded version of all stream parameters"""
474 474 blocks = []
475 475 for par, value in self._params:
476 476 par = urllib.quote(par)
477 477 if value is not None:
478 478 value = urllib.quote(value)
479 479 par = '%s=%s' % (par, value)
480 480 blocks.append(par)
481 481 return ' '.join(blocks)
482 482
483 483 class unpackermixin(object):
484 484 """A mixin to extract bytes and struct data from a stream"""
485 485
486 486 def __init__(self, fp):
487 487 self._fp = fp
488 488 self._seekable = (util.safehasattr(fp, 'seek') and
489 489 util.safehasattr(fp, 'tell'))
490 490
491 491 def _unpack(self, format):
492 492 """unpack this struct format from the stream"""
493 493 data = self._readexact(struct.calcsize(format))
494 494 return _unpack(format, data)
495 495
496 496 def _readexact(self, size):
497 497 """read exactly <size> bytes from the stream"""
498 498 return changegroup.readexactly(self._fp, size)
499 499
500 500 def seek(self, offset, whence=0):
501 501 """move the underlying file pointer"""
502 502 if self._seekable:
503 503 return self._fp.seek(offset, whence)
504 504 else:
505 505 raise NotImplementedError(_('File pointer is not seekable'))
506 506
507 507 def tell(self):
508 508 """return the file offset, or None if file is not seekable"""
509 509 if self._seekable:
510 510 try:
511 511 return self._fp.tell()
512 512 except IOError, e:
513 513 if e.errno == errno.ESPIPE:
514 514 self._seekable = False
515 515 else:
516 516 raise
517 517 return None
518 518
519 519 def close(self):
520 520 """close underlying file"""
521 521 if util.safehasattr(self._fp, 'close'):
522 522 return self._fp.close()
523 523
524 524 def getunbundler(ui, fp, header=None):
525 525 """return a valid unbundler object for a given header"""
526 526 if header is None:
527 527 header = changegroup.readexactly(fp, 4)
528 528 magic, version = header[0:2], header[2:4]
529 529 if magic != 'HG':
530 530 raise util.Abort(_('not a Mercurial bundle'))
531 531 unbundlerclass = formatmap.get(version)
532 532 if unbundlerclass is None:
533 533 raise util.Abort(_('unknown bundle version %s') % version)
534 534 unbundler = unbundlerclass(ui, fp)
535 535 ui.debug('start processing of %s stream\n' % header)
536 536 return unbundler
537 537
538 538 class unbundle20(unpackermixin):
539 539 """interpret a bundle2 stream
540 540
541 541 This class is fed with a binary stream and yields parts through its
542 542 `iterparts` methods."""
543 543
544 544 def __init__(self, ui, fp):
545 545 """If header is specified, we do not read it out of the stream."""
546 546 self.ui = ui
547 547 super(unbundle20, self).__init__(fp)
548 548
549 549 @util.propertycache
550 550 def params(self):
551 551 """dictionary of stream level parameters"""
552 552 self.ui.debug('reading bundle2 stream parameters\n')
553 553 params = {}
554 554 paramssize = self._unpack(_fstreamparamsize)[0]
555 555 if paramssize < 0:
556 556 raise error.BundleValueError('negative bundle param size: %i'
557 557 % paramssize)
558 558 if paramssize:
559 559 for p in self._readexact(paramssize).split(' '):
560 560 p = p.split('=', 1)
561 561 p = [urllib.unquote(i) for i in p]
562 562 if len(p) < 2:
563 563 p.append(None)
564 564 self._processparam(*p)
565 565 params[p[0]] = p[1]
566 566 return params
567 567
568 568 def _processparam(self, name, value):
569 569 """process a parameter, applying its effect if needed
570 570
571 571 Parameter starting with a lower case letter are advisory and will be
572 572 ignored when unknown. Those starting with an upper case letter are
573 573 mandatory and will this function will raise a KeyError when unknown.
574 574
575 575 Note: no option are currently supported. Any input will be either
576 576 ignored or failing.
577 577 """
578 578 if not name:
579 579 raise ValueError('empty parameter name')
580 580 if name[0] not in string.letters:
581 581 raise ValueError('non letter first character: %r' % name)
582 582 # Some logic will be later added here to try to process the option for
583 583 # a dict of known parameter.
584 584 if name[0].islower():
585 585 self.ui.debug("ignoring unknown parameter %r\n" % name)
586 586 else:
587 587 raise error.UnsupportedPartError(params=(name,))
588 588
589 589
590 590 def iterparts(self):
591 591 """yield all parts contained in the stream"""
592 592 # make sure param have been loaded
593 593 self.params
594 594 self.ui.debug('start extraction of bundle2 parts\n')
595 595 headerblock = self._readpartheader()
596 596 while headerblock is not None:
597 597 part = unbundlepart(self.ui, headerblock, self._fp)
598 598 yield part
599 599 part.seek(0, 2)
600 600 headerblock = self._readpartheader()
601 601 self.ui.debug('end of bundle2 stream\n')
602 602
603 603 def _readpartheader(self):
604 604 """reads a part header size and return the bytes blob
605 605
606 606 returns None if empty"""
607 607 headersize = self._unpack(_fpartheadersize)[0]
608 608 if headersize < 0:
609 609 raise error.BundleValueError('negative part header size: %i'
610 610 % headersize)
611 611 self.ui.debug('part header size: %i\n' % headersize)
612 612 if headersize:
613 613 return self._readexact(headersize)
614 614 return None
615 615
616 616 def compressed(self):
617 617 return False
618 618
619 619 formatmap = {'20': unbundle20}
620 620
621 621 class bundlepart(object):
622 622 """A bundle2 part contains application level payload
623 623
624 624 The part `type` is used to route the part to the application level
625 625 handler.
626 626
627 627 The part payload is contained in ``part.data``. It could be raw bytes or a
628 628 generator of byte chunks.
629 629
630 630 You can add parameters to the part using the ``addparam`` method.
631 631 Parameters can be either mandatory (default) or advisory. Remote side
632 632 should be able to safely ignore the advisory ones.
633 633
634 634 Both data and parameters cannot be modified after the generation has begun.
635 635 """
636 636
637 637 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
638 638 data='', mandatory=True):
639 639 validateparttype(parttype)
640 640 self.id = None
641 641 self.type = parttype
642 642 self._data = data
643 643 self._mandatoryparams = list(mandatoryparams)
644 644 self._advisoryparams = list(advisoryparams)
645 645 # checking for duplicated entries
646 646 self._seenparams = set()
647 647 for pname, __ in self._mandatoryparams + self._advisoryparams:
648 648 if pname in self._seenparams:
649 649 raise RuntimeError('duplicated params: %s' % pname)
650 650 self._seenparams.add(pname)
651 651 # status of the part's generation:
652 652 # - None: not started,
653 653 # - False: currently generated,
654 654 # - True: generation done.
655 655 self._generated = None
656 656 self.mandatory = mandatory
657 657
658 658 # methods used to defines the part content
659 659 def __setdata(self, data):
660 660 if self._generated is not None:
661 661 raise error.ReadOnlyPartError('part is being generated')
662 662 self._data = data
663 663 def __getdata(self):
664 664 return self._data
665 665 data = property(__getdata, __setdata)
666 666
667 667 @property
668 668 def mandatoryparams(self):
669 669 # make it an immutable tuple to force people through ``addparam``
670 670 return tuple(self._mandatoryparams)
671 671
672 672 @property
673 673 def advisoryparams(self):
674 674 # make it an immutable tuple to force people through ``addparam``
675 675 return tuple(self._advisoryparams)
676 676
677 677 def addparam(self, name, value='', mandatory=True):
678 678 if self._generated is not None:
679 679 raise error.ReadOnlyPartError('part is being generated')
680 680 if name in self._seenparams:
681 681 raise ValueError('duplicated params: %s' % name)
682 682 self._seenparams.add(name)
683 683 params = self._advisoryparams
684 684 if mandatory:
685 685 params = self._mandatoryparams
686 686 params.append((name, value))
687 687
688 688 # methods used to generates the bundle2 stream
689 689 def getchunks(self):
690 690 if self._generated is not None:
691 691 raise RuntimeError('part can only be consumed once')
692 692 self._generated = False
693 693 #### header
694 694 if self.mandatory:
695 695 parttype = self.type.upper()
696 696 else:
697 697 parttype = self.type.lower()
698 698 ## parttype
699 699 header = [_pack(_fparttypesize, len(parttype)),
700 700 parttype, _pack(_fpartid, self.id),
701 701 ]
702 702 ## parameters
703 703 # count
704 704 manpar = self.mandatoryparams
705 705 advpar = self.advisoryparams
706 706 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
707 707 # size
708 708 parsizes = []
709 709 for key, value in manpar:
710 710 parsizes.append(len(key))
711 711 parsizes.append(len(value))
712 712 for key, value in advpar:
713 713 parsizes.append(len(key))
714 714 parsizes.append(len(value))
715 715 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
716 716 header.append(paramsizes)
717 717 # key, value
718 718 for key, value in manpar:
719 719 header.append(key)
720 720 header.append(value)
721 721 for key, value in advpar:
722 722 header.append(key)
723 723 header.append(value)
724 724 ## finalize header
725 725 headerchunk = ''.join(header)
726 726 yield _pack(_fpartheadersize, len(headerchunk))
727 727 yield headerchunk
728 728 ## payload
729 729 try:
730 730 for chunk in self._payloadchunks():
731 731 yield _pack(_fpayloadsize, len(chunk))
732 732 yield chunk
733 733 except Exception, exc:
734 734 # backup exception data for later
735 735 exc_info = sys.exc_info()
736 736 msg = 'unexpected error: %s' % exc
737 737 interpart = bundlepart('error:abort', [('message', msg)],
738 738 mandatory=False)
739 739 interpart.id = 0
740 740 yield _pack(_fpayloadsize, -1)
741 741 for chunk in interpart.getchunks():
742 742 yield chunk
743 743 # abort current part payload
744 744 yield _pack(_fpayloadsize, 0)
745 745 raise exc_info[0], exc_info[1], exc_info[2]
746 746 # end of payload
747 747 yield _pack(_fpayloadsize, 0)
748 748 self._generated = True
749 749
750 750 def _payloadchunks(self):
751 751 """yield chunks of a the part payload
752 752
753 753 Exists to handle the different methods to provide data to a part."""
754 754 # we only support fixed size data now.
755 755 # This will be improved in the future.
756 756 if util.safehasattr(self.data, 'next'):
757 757 buff = util.chunkbuffer(self.data)
758 758 chunk = buff.read(preferedchunksize)
759 759 while chunk:
760 760 yield chunk
761 761 chunk = buff.read(preferedchunksize)
762 762 elif len(self.data):
763 763 yield self.data
764 764
765 765
766 766 flaginterrupt = -1
767 767
768 768 class interrupthandler(unpackermixin):
769 769 """read one part and process it with restricted capability
770 770
771 771 This allows to transmit exception raised on the producer size during part
772 772 iteration while the consumer is reading a part.
773 773
774 774 Part processed in this manner only have access to a ui object,"""
775 775
776 776 def __init__(self, ui, fp):
777 777 super(interrupthandler, self).__init__(fp)
778 778 self.ui = ui
779 779
780 780 def _readpartheader(self):
781 781 """reads a part header size and return the bytes blob
782 782
783 783 returns None if empty"""
784 784 headersize = self._unpack(_fpartheadersize)[0]
785 785 if headersize < 0:
786 786 raise error.BundleValueError('negative part header size: %i'
787 787 % headersize)
788 788 self.ui.debug('part header size: %i\n' % headersize)
789 789 if headersize:
790 790 return self._readexact(headersize)
791 791 return None
792 792
793 793 def __call__(self):
794 794 self.ui.debug('bundle2 stream interruption, looking for a part.\n')
795 795 headerblock = self._readpartheader()
796 796 if headerblock is None:
797 797 self.ui.debug('no part found during interruption.\n')
798 798 return
799 799 part = unbundlepart(self.ui, headerblock, self._fp)
800 800 op = interruptoperation(self.ui)
801 801 _processpart(op, part)
802 802
803 803 class interruptoperation(object):
804 804 """A limited operation to be use by part handler during interruption
805 805
806 806 It only have access to an ui object.
807 807 """
808 808
809 809 def __init__(self, ui):
810 810 self.ui = ui
811 811 self.reply = None
812 812
813 813 @property
814 814 def repo(self):
815 815 raise RuntimeError('no repo access from stream interruption')
816 816
817 817 def gettransaction(self):
818 818 raise TransactionUnavailable('no repo access from stream interruption')
819 819
820 820 class unbundlepart(unpackermixin):
821 821 """a bundle part read from a bundle"""
822 822
823 823 def __init__(self, ui, header, fp):
824 824 super(unbundlepart, self).__init__(fp)
825 825 self.ui = ui
826 826 # unbundle state attr
827 827 self._headerdata = header
828 828 self._headeroffset = 0
829 829 self._initialized = False
830 830 self.consumed = False
831 831 # part data
832 832 self.id = None
833 833 self.type = None
834 834 self.mandatoryparams = None
835 835 self.advisoryparams = None
836 836 self.params = None
837 837 self.mandatorykeys = ()
838 838 self._payloadstream = None
839 839 self._readheader()
840 840 self._mandatory = None
841 841 self._chunkindex = [] #(payload, file) position tuples for chunk starts
842 842 self._pos = 0
843 843
844 844 def _fromheader(self, size):
845 845 """return the next <size> byte from the header"""
846 846 offset = self._headeroffset
847 847 data = self._headerdata[offset:(offset + size)]
848 848 self._headeroffset = offset + size
849 849 return data
850 850
851 851 def _unpackheader(self, format):
852 852 """read given format from header
853 853
854 854 This automatically compute the size of the format to read."""
855 855 data = self._fromheader(struct.calcsize(format))
856 856 return _unpack(format, data)
857 857
858 858 def _initparams(self, mandatoryparams, advisoryparams):
859 859 """internal function to setup all logic related parameters"""
860 860 # make it read only to prevent people touching it by mistake.
861 861 self.mandatoryparams = tuple(mandatoryparams)
862 862 self.advisoryparams = tuple(advisoryparams)
863 863 # user friendly UI
864 864 self.params = dict(self.mandatoryparams)
865 865 self.params.update(dict(self.advisoryparams))
866 866 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
867 867
868 868 def _payloadchunks(self, chunknum=0):
869 869 '''seek to specified chunk and start yielding data'''
870 870 if len(self._chunkindex) == 0:
871 871 assert chunknum == 0, 'Must start with chunk 0'
872 872 self._chunkindex.append((0, super(unbundlepart, self).tell()))
873 873 else:
874 874 assert chunknum < len(self._chunkindex), \
875 875 'Unknown chunk %d' % chunknum
876 876 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
877 877
878 878 pos = self._chunkindex[chunknum][0]
879 879 payloadsize = self._unpack(_fpayloadsize)[0]
880 880 self.ui.debug('payload chunk size: %i\n' % payloadsize)
881 881 while payloadsize:
882 882 if payloadsize == flaginterrupt:
883 883 # interruption detection, the handler will now read a
884 884 # single part and process it.
885 885 interrupthandler(self.ui, self._fp)()
886 886 elif payloadsize < 0:
887 887 msg = 'negative payload chunk size: %i' % payloadsize
888 888 raise error.BundleValueError(msg)
889 889 else:
890 890 result = self._readexact(payloadsize)
891 891 chunknum += 1
892 892 pos += payloadsize
893 893 if chunknum == len(self._chunkindex):
894 894 self._chunkindex.append((pos,
895 895 super(unbundlepart, self).tell()))
896 896 yield result
897 897 payloadsize = self._unpack(_fpayloadsize)[0]
898 898 self.ui.debug('payload chunk size: %i\n' % payloadsize)
899 899
900 900 def _findchunk(self, pos):
901 901 '''for a given payload position, return a chunk number and offset'''
902 902 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
903 903 if ppos == pos:
904 904 return chunk, 0
905 905 elif ppos > pos:
906 906 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
907 907 raise ValueError('Unknown chunk')
908 908
909 909 def _readheader(self):
910 910 """read the header and setup the object"""
911 911 typesize = self._unpackheader(_fparttypesize)[0]
912 912 self.type = self._fromheader(typesize)
913 913 self.ui.debug('part type: "%s"\n' % self.type)
914 914 self.id = self._unpackheader(_fpartid)[0]
915 915 self.ui.debug('part id: "%s"\n' % self.id)
916 916 # extract mandatory bit from type
917 917 self.mandatory = (self.type != self.type.lower())
918 918 self.type = self.type.lower()
919 919 ## reading parameters
920 920 # param count
921 921 mancount, advcount = self._unpackheader(_fpartparamcount)
922 922 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
923 923 # param size
924 924 fparamsizes = _makefpartparamsizes(mancount + advcount)
925 925 paramsizes = self._unpackheader(fparamsizes)
926 926 # make it a list of couple again
927 927 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
928 928 # split mandatory from advisory
929 929 mansizes = paramsizes[:mancount]
930 930 advsizes = paramsizes[mancount:]
931 931 # retrieve param value
932 932 manparams = []
933 933 for key, value in mansizes:
934 934 manparams.append((self._fromheader(key), self._fromheader(value)))
935 935 advparams = []
936 936 for key, value in advsizes:
937 937 advparams.append((self._fromheader(key), self._fromheader(value)))
938 938 self._initparams(manparams, advparams)
939 939 ## part payload
940 940 self._payloadstream = util.chunkbuffer(self._payloadchunks())
941 941 # we read the data, tell it
942 942 self._initialized = True
943 943
944 944 def read(self, size=None):
945 945 """read payload data"""
946 946 if not self._initialized:
947 947 self._readheader()
948 948 if size is None:
949 949 data = self._payloadstream.read()
950 950 else:
951 951 data = self._payloadstream.read(size)
952 952 if size is None or len(data) < size:
953 953 self.consumed = True
954 954 self._pos += len(data)
955 955 return data
956 956
957 957 def tell(self):
958 958 return self._pos
959 959
960 960 def seek(self, offset, whence=0):
961 961 if whence == 0:
962 962 newpos = offset
963 963 elif whence == 1:
964 964 newpos = self._pos + offset
965 965 elif whence == 2:
966 966 if not self.consumed:
967 967 self.read()
968 968 newpos = self._chunkindex[-1][0] - offset
969 969 else:
970 970 raise ValueError('Unknown whence value: %r' % (whence,))
971 971
972 972 if newpos > self._chunkindex[-1][0] and not self.consumed:
973 973 self.read()
974 974 if not 0 <= newpos <= self._chunkindex[-1][0]:
975 975 raise ValueError('Offset out of range')
976 976
977 977 if self._pos != newpos:
978 978 chunk, internaloffset = self._findchunk(newpos)
979 979 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
980 980 adjust = self.read(internaloffset)
981 981 if len(adjust) != internaloffset:
982 982 raise util.Abort(_('Seek failed\n'))
983 983 self._pos = newpos
984 984
985 985 capabilities = {'HG20': (),
986 986 'listkeys': (),
987 987 'pushkey': (),
988 988 'digests': tuple(sorted(util.DIGESTS.keys())),
989 989 'remote-changegroup': ('http', 'https'),
990 990 }
991 991
992 992 def getrepocaps(repo, allowpushback=False):
993 993 """return the bundle2 capabilities for a given repo
994 994
995 995 Exists to allow extensions (like evolution) to mutate the capabilities.
996 996 """
997 997 caps = capabilities.copy()
998 998 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
999 999 if obsolete.isenabled(repo, obsolete.exchangeopt):
1000 1000 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1001 1001 caps['obsmarkers'] = supportedformat
1002 1002 if allowpushback:
1003 1003 caps['pushback'] = ()
1004 1004 return caps
1005 1005
1006 1006 def bundle2caps(remote):
1007 1007 """return the bundle capabilities of a peer as dict"""
1008 1008 raw = remote.capable('bundle2')
1009 1009 if not raw and raw != '':
1010 1010 return {}
1011 1011 capsblob = urllib.unquote(remote.capable('bundle2'))
1012 1012 return decodecaps(capsblob)
1013 1013
1014 1014 def obsmarkersversion(caps):
1015 1015 """extract the list of supported obsmarkers versions from a bundle2caps dict
1016 1016 """
1017 1017 obscaps = caps.get('obsmarkers', ())
1018 1018 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1019 1019
1020 1020 @parthandler('changegroup', ('version',))
1021 1021 def handlechangegroup(op, inpart):
1022 1022 """apply a changegroup part on the repo
1023 1023
1024 1024 This is a very early implementation that will massive rework before being
1025 1025 inflicted to any end-user.
1026 1026 """
1027 1027 # Make sure we trigger a transaction creation
1028 1028 #
1029 1029 # The addchangegroup function will get a transaction object by itself, but
1030 1030 # we need to make sure we trigger the creation of a transaction object used
1031 1031 # for the whole processing scope.
1032 1032 op.gettransaction()
1033 1033 unpackerversion = inpart.params.get('version', '01')
1034 1034 # We should raise an appropriate exception here
1035 1035 unpacker = changegroup.packermap[unpackerversion][1]
1036 1036 cg = unpacker(inpart, 'UN')
1037 1037 # the source and url passed here are overwritten by the one contained in
1038 1038 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1039 1039 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1040 1040 op.records.add('changegroup', {'return': ret})
1041 1041 if op.reply is not None:
1042 1042 # This is definitely not the final form of this
1043 1043 # return. But one need to start somewhere.
1044 1044 part = op.reply.newpart('reply:changegroup', mandatory=False)
1045 1045 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1046 1046 part.addparam('return', '%i' % ret, mandatory=False)
1047 1047 assert not inpart.read()
1048 1048
1049 1049 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1050 1050 ['digest:%s' % k for k in util.DIGESTS.keys()])
1051 1051 @parthandler('remote-changegroup', _remotechangegroupparams)
1052 1052 def handleremotechangegroup(op, inpart):
1053 1053 """apply a bundle10 on the repo, given an url and validation information
1054 1054
1055 1055 All the information about the remote bundle to import are given as
1056 1056 parameters. The parameters include:
1057 1057 - url: the url to the bundle10.
1058 1058 - size: the bundle10 file size. It is used to validate what was
1059 1059 retrieved by the client matches the server knowledge about the bundle.
1060 1060 - digests: a space separated list of the digest types provided as
1061 1061 parameters.
1062 1062 - digest:<digest-type>: the hexadecimal representation of the digest with
1063 1063 that name. Like the size, it is used to validate what was retrieved by
1064 1064 the client matches what the server knows about the bundle.
1065 1065
1066 1066 When multiple digest types are given, all of them are checked.
1067 1067 """
1068 1068 try:
1069 1069 raw_url = inpart.params['url']
1070 1070 except KeyError:
1071 1071 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1072 1072 parsed_url = util.url(raw_url)
1073 1073 if parsed_url.scheme not in capabilities['remote-changegroup']:
1074 1074 raise util.Abort(_('remote-changegroup does not support %s urls') %
1075 1075 parsed_url.scheme)
1076 1076
1077 1077 try:
1078 1078 size = int(inpart.params['size'])
1079 1079 except ValueError:
1080 1080 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1081 1081 % 'size')
1082 1082 except KeyError:
1083 1083 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1084 1084
1085 1085 digests = {}
1086 1086 for typ in inpart.params.get('digests', '').split():
1087 1087 param = 'digest:%s' % typ
1088 1088 try:
1089 1089 value = inpart.params[param]
1090 1090 except KeyError:
1091 1091 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1092 1092 param)
1093 1093 digests[typ] = value
1094 1094
1095 1095 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1096 1096
1097 1097 # Make sure we trigger a transaction creation
1098 1098 #
1099 1099 # The addchangegroup function will get a transaction object by itself, but
1100 1100 # we need to make sure we trigger the creation of a transaction object used
1101 1101 # for the whole processing scope.
1102 1102 op.gettransaction()
1103 1103 import exchange
1104 1104 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1105 1105 if not isinstance(cg, changegroup.cg1unpacker):
1106 1106 raise util.Abort(_('%s: not a bundle version 1.0') %
1107 1107 util.hidepassword(raw_url))
1108 1108 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1109 1109 op.records.add('changegroup', {'return': ret})
1110 1110 if op.reply is not None:
1111 1111 # This is definitely not the final form of this
1112 1112 # return. But one need to start somewhere.
1113 1113 part = op.reply.newpart('reply:changegroup')
1114 1114 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1115 1115 part.addparam('return', '%i' % ret, mandatory=False)
1116 1116 try:
1117 1117 real_part.validate()
1118 1118 except util.Abort, e:
1119 1119 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1120 1120 (util.hidepassword(raw_url), str(e)))
1121 1121 assert not inpart.read()
1122 1122
1123 1123 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1124 1124 def handlereplychangegroup(op, inpart):
1125 1125 ret = int(inpart.params['return'])
1126 1126 replyto = int(inpart.params['in-reply-to'])
1127 1127 op.records.add('changegroup', {'return': ret}, replyto)
1128 1128
1129 1129 @parthandler('check:heads')
1130 1130 def handlecheckheads(op, inpart):
1131 1131 """check that head of the repo did not change
1132 1132
1133 1133 This is used to detect a push race when using unbundle.
1134 1134 This replaces the "heads" argument of unbundle."""
1135 1135 h = inpart.read(20)
1136 1136 heads = []
1137 1137 while len(h) == 20:
1138 1138 heads.append(h)
1139 1139 h = inpart.read(20)
1140 1140 assert not h
1141 1141 if heads != op.repo.heads():
1142 1142 raise error.PushRaced('repository changed while pushing - '
1143 1143 'please try again')
1144 1144
1145 1145 @parthandler('output')
1146 1146 def handleoutput(op, inpart):
1147 1147 """forward output captured on the server to the client"""
1148 1148 for line in inpart.read().splitlines():
1149 1149 op.ui.write(('remote: %s\n' % line))
1150 1150
1151 1151 @parthandler('replycaps')
1152 1152 def handlereplycaps(op, inpart):
1153 1153 """Notify that a reply bundle should be created
1154 1154
1155 1155 The payload contains the capabilities information for the reply"""
1156 1156 caps = decodecaps(inpart.read())
1157 1157 if op.reply is None:
1158 1158 op.reply = bundle20(op.ui, caps)
1159 1159
1160 1160 @parthandler('error:abort', ('message', 'hint'))
1161 1161 def handleerrorabort(op, inpart):
1162 1162 """Used to transmit abort error over the wire"""
1163 1163 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1164 1164
1165 1165 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1166 1166 def handleerrorunsupportedcontent(op, inpart):
1167 1167 """Used to transmit unknown content error over the wire"""
1168 1168 kwargs = {}
1169 1169 parttype = inpart.params.get('parttype')
1170 1170 if parttype is not None:
1171 1171 kwargs['parttype'] = parttype
1172 1172 params = inpart.params.get('params')
1173 1173 if params is not None:
1174 1174 kwargs['params'] = params.split('\0')
1175 1175
1176 1176 raise error.UnsupportedPartError(**kwargs)
1177 1177
1178 1178 @parthandler('error:pushraced', ('message',))
1179 1179 def handleerrorpushraced(op, inpart):
1180 1180 """Used to transmit push race error over the wire"""
1181 1181 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1182 1182
1183 1183 @parthandler('listkeys', ('namespace',))
1184 1184 def handlelistkeys(op, inpart):
1185 1185 """retrieve pushkey namespace content stored in a bundle2"""
1186 1186 namespace = inpart.params['namespace']
1187 1187 r = pushkey.decodekeys(inpart.read())
1188 1188 op.records.add('listkeys', (namespace, r))
1189 1189
1190 1190 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1191 1191 def handlepushkey(op, inpart):
1192 1192 """process a pushkey request"""
1193 1193 dec = pushkey.decode
1194 1194 namespace = dec(inpart.params['namespace'])
1195 1195 key = dec(inpart.params['key'])
1196 1196 old = dec(inpart.params['old'])
1197 1197 new = dec(inpart.params['new'])
1198 1198 ret = op.repo.pushkey(namespace, key, old, new)
1199 1199 record = {'namespace': namespace,
1200 1200 'key': key,
1201 1201 'old': old,
1202 1202 'new': new}
1203 1203 op.records.add('pushkey', record)
1204 1204 if op.reply is not None:
1205 1205 rpart = op.reply.newpart('reply:pushkey')
1206 1206 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1207 1207 rpart.addparam('return', '%i' % ret, mandatory=False)
1208 1208
1209 1209 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1210 1210 def handlepushkeyreply(op, inpart):
1211 1211 """retrieve the result of a pushkey request"""
1212 1212 ret = int(inpart.params['return'])
1213 1213 partid = int(inpart.params['in-reply-to'])
1214 1214 op.records.add('pushkey', {'return': ret}, partid)
1215 1215
1216 1216 @parthandler('obsmarkers')
1217 1217 def handleobsmarker(op, inpart):
1218 1218 """add a stream of obsmarkers to the repo"""
1219 1219 tr = op.gettransaction()
1220 1220 markerdata = inpart.read()
1221 1221 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1222 1222 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1223 1223 % len(markerdata))
1224 1224 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1225 1225 if new:
1226 1226 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1227 1227 op.records.add('obsmarkers', {'new': new})
1228 1228 if op.reply is not None:
1229 1229 rpart = op.reply.newpart('reply:obsmarkers')
1230 1230 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1231 1231 rpart.addparam('new', '%i' % new, mandatory=False)
1232 1232
1233 1233
1234 1234 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1235 1235 def handlepushkeyreply(op, inpart):
1236 1236 """retrieve the result of a pushkey request"""
1237 1237 ret = int(inpart.params['new'])
1238 1238 partid = int(inpart.params['in-reply-to'])
1239 1239 op.records.add('obsmarkers', {'new': ret}, partid)
General Comments 0
You need to be logged in to leave comments. Login now