##// END OF EJS Templates
bundle2: add a 'copy' method on parts...
Pierre-Yves David -
r24793:2ec89458 default
parent child Browse files
Show More
@@ -1,1239 +1,1248 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part headers. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 import errno
149 149 import sys
150 150 import util
151 151 import struct
152 152 import urllib
153 153 import string
154 154 import obsolete
155 155 import pushkey
156 156 import url
157 157 import re
158 158
159 159 import changegroup, error
160 160 from i18n import _
161 161
162 162 _pack = struct.pack
163 163 _unpack = struct.unpack
164 164
165 165 _fstreamparamsize = '>i'
166 166 _fpartheadersize = '>i'
167 167 _fparttypesize = '>B'
168 168 _fpartid = '>I'
169 169 _fpayloadsize = '>i'
170 170 _fpartparamcount = '>BB'
171 171
172 172 preferedchunksize = 4096
173 173
174 174 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
175 175
176 176 def validateparttype(parttype):
177 177 """raise ValueError if a parttype contains invalid character"""
178 178 if _parttypeforbidden.search(parttype):
179 179 raise ValueError(parttype)
180 180
181 181 def _makefpartparamsizes(nbparams):
182 182 """return a struct format to read part parameter sizes
183 183
184 184 The number parameters is variable so we need to build that format
185 185 dynamically.
186 186 """
187 187 return '>'+('BB'*nbparams)
188 188
189 189 parthandlermapping = {}
190 190
191 191 def parthandler(parttype, params=()):
192 192 """decorator that register a function as a bundle2 part handler
193 193
194 194 eg::
195 195
196 196 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
197 197 def myparttypehandler(...):
198 198 '''process a part of type "my part".'''
199 199 ...
200 200 """
201 201 validateparttype(parttype)
202 202 def _decorator(func):
203 203 lparttype = parttype.lower() # enforce lower case matching.
204 204 assert lparttype not in parthandlermapping
205 205 parthandlermapping[lparttype] = func
206 206 func.params = frozenset(params)
207 207 return func
208 208 return _decorator
209 209
210 210 class unbundlerecords(object):
211 211 """keep record of what happens during and unbundle
212 212
213 213 New records are added using `records.add('cat', obj)`. Where 'cat' is a
214 214 category of record and obj is an arbitrary object.
215 215
216 216 `records['cat']` will return all entries of this category 'cat'.
217 217
218 218 Iterating on the object itself will yield `('category', obj)` tuples
219 219 for all entries.
220 220
221 221 All iterations happens in chronological order.
222 222 """
223 223
224 224 def __init__(self):
225 225 self._categories = {}
226 226 self._sequences = []
227 227 self._replies = {}
228 228
229 229 def add(self, category, entry, inreplyto=None):
230 230 """add a new record of a given category.
231 231
232 232 The entry can then be retrieved in the list returned by
233 233 self['category']."""
234 234 self._categories.setdefault(category, []).append(entry)
235 235 self._sequences.append((category, entry))
236 236 if inreplyto is not None:
237 237 self.getreplies(inreplyto).add(category, entry)
238 238
239 239 def getreplies(self, partid):
240 240 """get the records that are replies to a specific part"""
241 241 return self._replies.setdefault(partid, unbundlerecords())
242 242
243 243 def __getitem__(self, cat):
244 244 return tuple(self._categories.get(cat, ()))
245 245
246 246 def __iter__(self):
247 247 return iter(self._sequences)
248 248
249 249 def __len__(self):
250 250 return len(self._sequences)
251 251
252 252 def __nonzero__(self):
253 253 return bool(self._sequences)
254 254
255 255 class bundleoperation(object):
256 256 """an object that represents a single bundling process
257 257
258 258 Its purpose is to carry unbundle-related objects and states.
259 259
260 260 A new object should be created at the beginning of each bundle processing.
261 261 The object is to be returned by the processing function.
262 262
263 263 The object has very little content now it will ultimately contain:
264 264 * an access to the repo the bundle is applied to,
265 265 * a ui object,
266 266 * a way to retrieve a transaction to add changes to the repo,
267 267 * a way to record the result of processing each part,
268 268 * a way to construct a bundle response when applicable.
269 269 """
270 270
271 271 def __init__(self, repo, transactiongetter):
272 272 self.repo = repo
273 273 self.ui = repo.ui
274 274 self.records = unbundlerecords()
275 275 self.gettransaction = transactiongetter
276 276 self.reply = None
277 277
278 278 class TransactionUnavailable(RuntimeError):
279 279 pass
280 280
281 281 def _notransaction():
282 282 """default method to get a transaction while processing a bundle
283 283
284 284 Raise an exception to highlight the fact that no transaction was expected
285 285 to be created"""
286 286 raise TransactionUnavailable()
287 287
288 288 def processbundle(repo, unbundler, transactiongetter=None):
289 289 """This function process a bundle, apply effect to/from a repo
290 290
291 291 It iterates over each part then searches for and uses the proper handling
292 292 code to process the part. Parts are processed in order.
293 293
294 294 This is very early version of this function that will be strongly reworked
295 295 before final usage.
296 296
297 297 Unknown Mandatory part will abort the process.
298 298 """
299 299 if transactiongetter is None:
300 300 transactiongetter = _notransaction
301 301 op = bundleoperation(repo, transactiongetter)
302 302 # todo:
303 303 # - replace this is a init function soon.
304 304 # - exception catching
305 305 unbundler.params
306 306 iterparts = unbundler.iterparts()
307 307 part = None
308 308 try:
309 309 for part in iterparts:
310 310 _processpart(op, part)
311 311 except Exception, exc:
312 312 for part in iterparts:
313 313 # consume the bundle content
314 314 part.seek(0, 2)
315 315 # Small hack to let caller code distinguish exceptions from bundle2
316 316 # processing from processing the old format. This is mostly
317 317 # needed to handle different return codes to unbundle according to the
318 318 # type of bundle. We should probably clean up or drop this return code
319 319 # craziness in a future version.
320 320 exc.duringunbundle2 = True
321 321 raise
322 322 return op
323 323
324 324 def _processpart(op, part):
325 325 """process a single part from a bundle
326 326
327 327 The part is guaranteed to have been fully consumed when the function exits
328 328 (even if an exception is raised)."""
329 329 try:
330 330 try:
331 331 handler = parthandlermapping.get(part.type)
332 332 if handler is None:
333 333 raise error.UnsupportedPartError(parttype=part.type)
334 334 op.ui.debug('found a handler for part %r\n' % part.type)
335 335 unknownparams = part.mandatorykeys - handler.params
336 336 if unknownparams:
337 337 unknownparams = list(unknownparams)
338 338 unknownparams.sort()
339 339 raise error.UnsupportedPartError(parttype=part.type,
340 340 params=unknownparams)
341 341 except error.UnsupportedPartError, exc:
342 342 if part.mandatory: # mandatory parts
343 343 raise
344 344 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
345 345 return # skip to part processing
346 346
347 347 # handler is called outside the above try block so that we don't
348 348 # risk catching KeyErrors from anything other than the
349 349 # parthandlermapping lookup (any KeyError raised by handler()
350 350 # itself represents a defect of a different variety).
351 351 output = None
352 352 if op.reply is not None:
353 353 op.ui.pushbuffer(error=True)
354 354 output = ''
355 355 try:
356 356 handler(op, part)
357 357 finally:
358 358 if output is not None:
359 359 output = op.ui.popbuffer()
360 360 if output:
361 361 outpart = op.reply.newpart('output', data=output,
362 362 mandatory=False)
363 363 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
364 364 finally:
365 365 # consume the part content to not corrupt the stream.
366 366 part.seek(0, 2)
367 367
368 368
369 369 def decodecaps(blob):
370 370 """decode a bundle2 caps bytes blob into a dictionary
371 371
372 372 The blob is a list of capabilities (one per line)
373 373 Capabilities may have values using a line of the form::
374 374
375 375 capability=value1,value2,value3
376 376
377 377 The values are always a list."""
378 378 caps = {}
379 379 for line in blob.splitlines():
380 380 if not line:
381 381 continue
382 382 if '=' not in line:
383 383 key, vals = line, ()
384 384 else:
385 385 key, vals = line.split('=', 1)
386 386 vals = vals.split(',')
387 387 key = urllib.unquote(key)
388 388 vals = [urllib.unquote(v) for v in vals]
389 389 caps[key] = vals
390 390 return caps
391 391
392 392 def encodecaps(caps):
393 393 """encode a bundle2 caps dictionary into a bytes blob"""
394 394 chunks = []
395 395 for ca in sorted(caps):
396 396 vals = caps[ca]
397 397 ca = urllib.quote(ca)
398 398 vals = [urllib.quote(v) for v in vals]
399 399 if vals:
400 400 ca = "%s=%s" % (ca, ','.join(vals))
401 401 chunks.append(ca)
402 402 return '\n'.join(chunks)
403 403
404 404 class bundle20(object):
405 405 """represent an outgoing bundle2 container
406 406
407 407 Use the `addparam` method to add stream level parameter. and `newpart` to
408 408 populate it. Then call `getchunks` to retrieve all the binary chunks of
409 409 data that compose the bundle2 container."""
410 410
411 411 _magicstring = 'HG20'
412 412
413 413 def __init__(self, ui, capabilities=()):
414 414 self.ui = ui
415 415 self._params = []
416 416 self._parts = []
417 417 self.capabilities = dict(capabilities)
418 418
419 419 @property
420 420 def nbparts(self):
421 421 """total number of parts added to the bundler"""
422 422 return len(self._parts)
423 423
424 424 # methods used to defines the bundle2 content
425 425 def addparam(self, name, value=None):
426 426 """add a stream level parameter"""
427 427 if not name:
428 428 raise ValueError('empty parameter name')
429 429 if name[0] not in string.letters:
430 430 raise ValueError('non letter first character: %r' % name)
431 431 self._params.append((name, value))
432 432
433 433 def addpart(self, part):
434 434 """add a new part to the bundle2 container
435 435
436 436 Parts contains the actual applicative payload."""
437 437 assert part.id is None
438 438 part.id = len(self._parts) # very cheap counter
439 439 self._parts.append(part)
440 440
441 441 def newpart(self, typeid, *args, **kwargs):
442 442 """create a new part and add it to the containers
443 443
444 444 As the part is directly added to the containers. For now, this means
445 445 that any failure to properly initialize the part after calling
446 446 ``newpart`` should result in a failure of the whole bundling process.
447 447
448 448 You can still fall back to manually create and add if you need better
449 449 control."""
450 450 part = bundlepart(typeid, *args, **kwargs)
451 451 self.addpart(part)
452 452 return part
453 453
454 454 # methods used to generate the bundle2 stream
455 455 def getchunks(self):
456 456 self.ui.debug('start emission of %s stream\n' % self._magicstring)
457 457 yield self._magicstring
458 458 param = self._paramchunk()
459 459 self.ui.debug('bundle parameter: %s\n' % param)
460 460 yield _pack(_fstreamparamsize, len(param))
461 461 if param:
462 462 yield param
463 463
464 464 self.ui.debug('start of parts\n')
465 465 for part in self._parts:
466 466 self.ui.debug('bundle part: "%s"\n' % part.type)
467 467 for chunk in part.getchunks():
468 468 yield chunk
469 469 self.ui.debug('end of bundle\n')
470 470 yield _pack(_fpartheadersize, 0)
471 471
472 472 def _paramchunk(self):
473 473 """return a encoded version of all stream parameters"""
474 474 blocks = []
475 475 for par, value in self._params:
476 476 par = urllib.quote(par)
477 477 if value is not None:
478 478 value = urllib.quote(value)
479 479 par = '%s=%s' % (par, value)
480 480 blocks.append(par)
481 481 return ' '.join(blocks)
482 482
483 483 class unpackermixin(object):
484 484 """A mixin to extract bytes and struct data from a stream"""
485 485
486 486 def __init__(self, fp):
487 487 self._fp = fp
488 488 self._seekable = (util.safehasattr(fp, 'seek') and
489 489 util.safehasattr(fp, 'tell'))
490 490
491 491 def _unpack(self, format):
492 492 """unpack this struct format from the stream"""
493 493 data = self._readexact(struct.calcsize(format))
494 494 return _unpack(format, data)
495 495
496 496 def _readexact(self, size):
497 497 """read exactly <size> bytes from the stream"""
498 498 return changegroup.readexactly(self._fp, size)
499 499
500 500 def seek(self, offset, whence=0):
501 501 """move the underlying file pointer"""
502 502 if self._seekable:
503 503 return self._fp.seek(offset, whence)
504 504 else:
505 505 raise NotImplementedError(_('File pointer is not seekable'))
506 506
507 507 def tell(self):
508 508 """return the file offset, or None if file is not seekable"""
509 509 if self._seekable:
510 510 try:
511 511 return self._fp.tell()
512 512 except IOError, e:
513 513 if e.errno == errno.ESPIPE:
514 514 self._seekable = False
515 515 else:
516 516 raise
517 517 return None
518 518
519 519 def close(self):
520 520 """close underlying file"""
521 521 if util.safehasattr(self._fp, 'close'):
522 522 return self._fp.close()
523 523
524 524 def getunbundler(ui, fp, header=None):
525 525 """return a valid unbundler object for a given header"""
526 526 if header is None:
527 527 header = changegroup.readexactly(fp, 4)
528 528 magic, version = header[0:2], header[2:4]
529 529 if magic != 'HG':
530 530 raise util.Abort(_('not a Mercurial bundle'))
531 531 unbundlerclass = formatmap.get(version)
532 532 if unbundlerclass is None:
533 533 raise util.Abort(_('unknown bundle version %s') % version)
534 534 unbundler = unbundlerclass(ui, fp)
535 535 ui.debug('start processing of %s stream\n' % header)
536 536 return unbundler
537 537
538 538 class unbundle20(unpackermixin):
539 539 """interpret a bundle2 stream
540 540
541 541 This class is fed with a binary stream and yields parts through its
542 542 `iterparts` methods."""
543 543
544 544 def __init__(self, ui, fp):
545 545 """If header is specified, we do not read it out of the stream."""
546 546 self.ui = ui
547 547 super(unbundle20, self).__init__(fp)
548 548
549 549 @util.propertycache
550 550 def params(self):
551 551 """dictionary of stream level parameters"""
552 552 self.ui.debug('reading bundle2 stream parameters\n')
553 553 params = {}
554 554 paramssize = self._unpack(_fstreamparamsize)[0]
555 555 if paramssize < 0:
556 556 raise error.BundleValueError('negative bundle param size: %i'
557 557 % paramssize)
558 558 if paramssize:
559 559 for p in self._readexact(paramssize).split(' '):
560 560 p = p.split('=', 1)
561 561 p = [urllib.unquote(i) for i in p]
562 562 if len(p) < 2:
563 563 p.append(None)
564 564 self._processparam(*p)
565 565 params[p[0]] = p[1]
566 566 return params
567 567
568 568 def _processparam(self, name, value):
569 569 """process a parameter, applying its effect if needed
570 570
571 571 Parameter starting with a lower case letter are advisory and will be
572 572 ignored when unknown. Those starting with an upper case letter are
573 573 mandatory and will this function will raise a KeyError when unknown.
574 574
575 575 Note: no option are currently supported. Any input will be either
576 576 ignored or failing.
577 577 """
578 578 if not name:
579 579 raise ValueError('empty parameter name')
580 580 if name[0] not in string.letters:
581 581 raise ValueError('non letter first character: %r' % name)
582 582 # Some logic will be later added here to try to process the option for
583 583 # a dict of known parameter.
584 584 if name[0].islower():
585 585 self.ui.debug("ignoring unknown parameter %r\n" % name)
586 586 else:
587 587 raise error.UnsupportedPartError(params=(name,))
588 588
589 589
590 590 def iterparts(self):
591 591 """yield all parts contained in the stream"""
592 592 # make sure param have been loaded
593 593 self.params
594 594 self.ui.debug('start extraction of bundle2 parts\n')
595 595 headerblock = self._readpartheader()
596 596 while headerblock is not None:
597 597 part = unbundlepart(self.ui, headerblock, self._fp)
598 598 yield part
599 599 part.seek(0, 2)
600 600 headerblock = self._readpartheader()
601 601 self.ui.debug('end of bundle2 stream\n')
602 602
603 603 def _readpartheader(self):
604 604 """reads a part header size and return the bytes blob
605 605
606 606 returns None if empty"""
607 607 headersize = self._unpack(_fpartheadersize)[0]
608 608 if headersize < 0:
609 609 raise error.BundleValueError('negative part header size: %i'
610 610 % headersize)
611 611 self.ui.debug('part header size: %i\n' % headersize)
612 612 if headersize:
613 613 return self._readexact(headersize)
614 614 return None
615 615
616 616 def compressed(self):
617 617 return False
618 618
619 619 formatmap = {'20': unbundle20}
620 620
621 621 class bundlepart(object):
622 622 """A bundle2 part contains application level payload
623 623
624 624 The part `type` is used to route the part to the application level
625 625 handler.
626 626
627 627 The part payload is contained in ``part.data``. It could be raw bytes or a
628 628 generator of byte chunks.
629 629
630 630 You can add parameters to the part using the ``addparam`` method.
631 631 Parameters can be either mandatory (default) or advisory. Remote side
632 632 should be able to safely ignore the advisory ones.
633 633
634 634 Both data and parameters cannot be modified after the generation has begun.
635 635 """
636 636
637 637 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
638 638 data='', mandatory=True):
639 639 validateparttype(parttype)
640 640 self.id = None
641 641 self.type = parttype
642 642 self._data = data
643 643 self._mandatoryparams = list(mandatoryparams)
644 644 self._advisoryparams = list(advisoryparams)
645 645 # checking for duplicated entries
646 646 self._seenparams = set()
647 647 for pname, __ in self._mandatoryparams + self._advisoryparams:
648 648 if pname in self._seenparams:
649 649 raise RuntimeError('duplicated params: %s' % pname)
650 650 self._seenparams.add(pname)
651 651 # status of the part's generation:
652 652 # - None: not started,
653 653 # - False: currently generated,
654 654 # - True: generation done.
655 655 self._generated = None
656 656 self.mandatory = mandatory
657 657
658 def copy(self):
659 """return a copy of the part
660
661 The new part have the very same content but no partid assigned yet.
662 Parts with generated data cannot be copied."""
663 assert not util.safehasattr(self.data, 'next')
664 return self.__class__(self.type, self._mandatoryparams,
665 self._advisoryparams, self._data, self.mandatory)
666
658 667 # methods used to defines the part content
659 668 def __setdata(self, data):
660 669 if self._generated is not None:
661 670 raise error.ReadOnlyPartError('part is being generated')
662 671 self._data = data
663 672 def __getdata(self):
664 673 return self._data
665 674 data = property(__getdata, __setdata)
666 675
667 676 @property
668 677 def mandatoryparams(self):
669 678 # make it an immutable tuple to force people through ``addparam``
670 679 return tuple(self._mandatoryparams)
671 680
672 681 @property
673 682 def advisoryparams(self):
674 683 # make it an immutable tuple to force people through ``addparam``
675 684 return tuple(self._advisoryparams)
676 685
677 686 def addparam(self, name, value='', mandatory=True):
678 687 if self._generated is not None:
679 688 raise error.ReadOnlyPartError('part is being generated')
680 689 if name in self._seenparams:
681 690 raise ValueError('duplicated params: %s' % name)
682 691 self._seenparams.add(name)
683 692 params = self._advisoryparams
684 693 if mandatory:
685 694 params = self._mandatoryparams
686 695 params.append((name, value))
687 696
688 697 # methods used to generates the bundle2 stream
689 698 def getchunks(self):
690 699 if self._generated is not None:
691 700 raise RuntimeError('part can only be consumed once')
692 701 self._generated = False
693 702 #### header
694 703 if self.mandatory:
695 704 parttype = self.type.upper()
696 705 else:
697 706 parttype = self.type.lower()
698 707 ## parttype
699 708 header = [_pack(_fparttypesize, len(parttype)),
700 709 parttype, _pack(_fpartid, self.id),
701 710 ]
702 711 ## parameters
703 712 # count
704 713 manpar = self.mandatoryparams
705 714 advpar = self.advisoryparams
706 715 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
707 716 # size
708 717 parsizes = []
709 718 for key, value in manpar:
710 719 parsizes.append(len(key))
711 720 parsizes.append(len(value))
712 721 for key, value in advpar:
713 722 parsizes.append(len(key))
714 723 parsizes.append(len(value))
715 724 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
716 725 header.append(paramsizes)
717 726 # key, value
718 727 for key, value in manpar:
719 728 header.append(key)
720 729 header.append(value)
721 730 for key, value in advpar:
722 731 header.append(key)
723 732 header.append(value)
724 733 ## finalize header
725 734 headerchunk = ''.join(header)
726 735 yield _pack(_fpartheadersize, len(headerchunk))
727 736 yield headerchunk
728 737 ## payload
729 738 try:
730 739 for chunk in self._payloadchunks():
731 740 yield _pack(_fpayloadsize, len(chunk))
732 741 yield chunk
733 742 except Exception, exc:
734 743 # backup exception data for later
735 744 exc_info = sys.exc_info()
736 745 msg = 'unexpected error: %s' % exc
737 746 interpart = bundlepart('error:abort', [('message', msg)],
738 747 mandatory=False)
739 748 interpart.id = 0
740 749 yield _pack(_fpayloadsize, -1)
741 750 for chunk in interpart.getchunks():
742 751 yield chunk
743 752 # abort current part payload
744 753 yield _pack(_fpayloadsize, 0)
745 754 raise exc_info[0], exc_info[1], exc_info[2]
746 755 # end of payload
747 756 yield _pack(_fpayloadsize, 0)
748 757 self._generated = True
749 758
750 759 def _payloadchunks(self):
751 760 """yield chunks of a the part payload
752 761
753 762 Exists to handle the different methods to provide data to a part."""
754 763 # we only support fixed size data now.
755 764 # This will be improved in the future.
756 765 if util.safehasattr(self.data, 'next'):
757 766 buff = util.chunkbuffer(self.data)
758 767 chunk = buff.read(preferedchunksize)
759 768 while chunk:
760 769 yield chunk
761 770 chunk = buff.read(preferedchunksize)
762 771 elif len(self.data):
763 772 yield self.data
764 773
765 774
766 775 flaginterrupt = -1
767 776
768 777 class interrupthandler(unpackermixin):
769 778 """read one part and process it with restricted capability
770 779
771 780 This allows to transmit exception raised on the producer size during part
772 781 iteration while the consumer is reading a part.
773 782
774 783 Part processed in this manner only have access to a ui object,"""
775 784
776 785 def __init__(self, ui, fp):
777 786 super(interrupthandler, self).__init__(fp)
778 787 self.ui = ui
779 788
780 789 def _readpartheader(self):
781 790 """reads a part header size and return the bytes blob
782 791
783 792 returns None if empty"""
784 793 headersize = self._unpack(_fpartheadersize)[0]
785 794 if headersize < 0:
786 795 raise error.BundleValueError('negative part header size: %i'
787 796 % headersize)
788 797 self.ui.debug('part header size: %i\n' % headersize)
789 798 if headersize:
790 799 return self._readexact(headersize)
791 800 return None
792 801
793 802 def __call__(self):
794 803 self.ui.debug('bundle2 stream interruption, looking for a part.\n')
795 804 headerblock = self._readpartheader()
796 805 if headerblock is None:
797 806 self.ui.debug('no part found during interruption.\n')
798 807 return
799 808 part = unbundlepart(self.ui, headerblock, self._fp)
800 809 op = interruptoperation(self.ui)
801 810 _processpart(op, part)
802 811
803 812 class interruptoperation(object):
804 813 """A limited operation to be use by part handler during interruption
805 814
806 815 It only have access to an ui object.
807 816 """
808 817
809 818 def __init__(self, ui):
810 819 self.ui = ui
811 820 self.reply = None
812 821
813 822 @property
814 823 def repo(self):
815 824 raise RuntimeError('no repo access from stream interruption')
816 825
817 826 def gettransaction(self):
818 827 raise TransactionUnavailable('no repo access from stream interruption')
819 828
820 829 class unbundlepart(unpackermixin):
821 830 """a bundle part read from a bundle"""
822 831
823 832 def __init__(self, ui, header, fp):
824 833 super(unbundlepart, self).__init__(fp)
825 834 self.ui = ui
826 835 # unbundle state attr
827 836 self._headerdata = header
828 837 self._headeroffset = 0
829 838 self._initialized = False
830 839 self.consumed = False
831 840 # part data
832 841 self.id = None
833 842 self.type = None
834 843 self.mandatoryparams = None
835 844 self.advisoryparams = None
836 845 self.params = None
837 846 self.mandatorykeys = ()
838 847 self._payloadstream = None
839 848 self._readheader()
840 849 self._mandatory = None
841 850 self._chunkindex = [] #(payload, file) position tuples for chunk starts
842 851 self._pos = 0
843 852
844 853 def _fromheader(self, size):
845 854 """return the next <size> byte from the header"""
846 855 offset = self._headeroffset
847 856 data = self._headerdata[offset:(offset + size)]
848 857 self._headeroffset = offset + size
849 858 return data
850 859
851 860 def _unpackheader(self, format):
852 861 """read given format from header
853 862
854 863 This automatically compute the size of the format to read."""
855 864 data = self._fromheader(struct.calcsize(format))
856 865 return _unpack(format, data)
857 866
858 867 def _initparams(self, mandatoryparams, advisoryparams):
859 868 """internal function to setup all logic related parameters"""
860 869 # make it read only to prevent people touching it by mistake.
861 870 self.mandatoryparams = tuple(mandatoryparams)
862 871 self.advisoryparams = tuple(advisoryparams)
863 872 # user friendly UI
864 873 self.params = dict(self.mandatoryparams)
865 874 self.params.update(dict(self.advisoryparams))
866 875 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
867 876
868 877 def _payloadchunks(self, chunknum=0):
869 878 '''seek to specified chunk and start yielding data'''
870 879 if len(self._chunkindex) == 0:
871 880 assert chunknum == 0, 'Must start with chunk 0'
872 881 self._chunkindex.append((0, super(unbundlepart, self).tell()))
873 882 else:
874 883 assert chunknum < len(self._chunkindex), \
875 884 'Unknown chunk %d' % chunknum
876 885 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
877 886
878 887 pos = self._chunkindex[chunknum][0]
879 888 payloadsize = self._unpack(_fpayloadsize)[0]
880 889 self.ui.debug('payload chunk size: %i\n' % payloadsize)
881 890 while payloadsize:
882 891 if payloadsize == flaginterrupt:
883 892 # interruption detection, the handler will now read a
884 893 # single part and process it.
885 894 interrupthandler(self.ui, self._fp)()
886 895 elif payloadsize < 0:
887 896 msg = 'negative payload chunk size: %i' % payloadsize
888 897 raise error.BundleValueError(msg)
889 898 else:
890 899 result = self._readexact(payloadsize)
891 900 chunknum += 1
892 901 pos += payloadsize
893 902 if chunknum == len(self._chunkindex):
894 903 self._chunkindex.append((pos,
895 904 super(unbundlepart, self).tell()))
896 905 yield result
897 906 payloadsize = self._unpack(_fpayloadsize)[0]
898 907 self.ui.debug('payload chunk size: %i\n' % payloadsize)
899 908
900 909 def _findchunk(self, pos):
901 910 '''for a given payload position, return a chunk number and offset'''
902 911 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
903 912 if ppos == pos:
904 913 return chunk, 0
905 914 elif ppos > pos:
906 915 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
907 916 raise ValueError('Unknown chunk')
908 917
909 918 def _readheader(self):
910 919 """read the header and setup the object"""
911 920 typesize = self._unpackheader(_fparttypesize)[0]
912 921 self.type = self._fromheader(typesize)
913 922 self.ui.debug('part type: "%s"\n' % self.type)
914 923 self.id = self._unpackheader(_fpartid)[0]
915 924 self.ui.debug('part id: "%s"\n' % self.id)
916 925 # extract mandatory bit from type
917 926 self.mandatory = (self.type != self.type.lower())
918 927 self.type = self.type.lower()
919 928 ## reading parameters
920 929 # param count
921 930 mancount, advcount = self._unpackheader(_fpartparamcount)
922 931 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
923 932 # param size
924 933 fparamsizes = _makefpartparamsizes(mancount + advcount)
925 934 paramsizes = self._unpackheader(fparamsizes)
926 935 # make it a list of couple again
927 936 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
928 937 # split mandatory from advisory
929 938 mansizes = paramsizes[:mancount]
930 939 advsizes = paramsizes[mancount:]
931 940 # retrieve param value
932 941 manparams = []
933 942 for key, value in mansizes:
934 943 manparams.append((self._fromheader(key), self._fromheader(value)))
935 944 advparams = []
936 945 for key, value in advsizes:
937 946 advparams.append((self._fromheader(key), self._fromheader(value)))
938 947 self._initparams(manparams, advparams)
939 948 ## part payload
940 949 self._payloadstream = util.chunkbuffer(self._payloadchunks())
941 950 # we read the data, tell it
942 951 self._initialized = True
943 952
944 953 def read(self, size=None):
945 954 """read payload data"""
946 955 if not self._initialized:
947 956 self._readheader()
948 957 if size is None:
949 958 data = self._payloadstream.read()
950 959 else:
951 960 data = self._payloadstream.read(size)
952 961 if size is None or len(data) < size:
953 962 self.consumed = True
954 963 self._pos += len(data)
955 964 return data
956 965
957 966 def tell(self):
958 967 return self._pos
959 968
960 969 def seek(self, offset, whence=0):
961 970 if whence == 0:
962 971 newpos = offset
963 972 elif whence == 1:
964 973 newpos = self._pos + offset
965 974 elif whence == 2:
966 975 if not self.consumed:
967 976 self.read()
968 977 newpos = self._chunkindex[-1][0] - offset
969 978 else:
970 979 raise ValueError('Unknown whence value: %r' % (whence,))
971 980
972 981 if newpos > self._chunkindex[-1][0] and not self.consumed:
973 982 self.read()
974 983 if not 0 <= newpos <= self._chunkindex[-1][0]:
975 984 raise ValueError('Offset out of range')
976 985
977 986 if self._pos != newpos:
978 987 chunk, internaloffset = self._findchunk(newpos)
979 988 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
980 989 adjust = self.read(internaloffset)
981 990 if len(adjust) != internaloffset:
982 991 raise util.Abort(_('Seek failed\n'))
983 992 self._pos = newpos
984 993
985 994 capabilities = {'HG20': (),
986 995 'listkeys': (),
987 996 'pushkey': (),
988 997 'digests': tuple(sorted(util.DIGESTS.keys())),
989 998 'remote-changegroup': ('http', 'https'),
990 999 }
991 1000
992 1001 def getrepocaps(repo, allowpushback=False):
993 1002 """return the bundle2 capabilities for a given repo
994 1003
995 1004 Exists to allow extensions (like evolution) to mutate the capabilities.
996 1005 """
997 1006 caps = capabilities.copy()
998 1007 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
999 1008 if obsolete.isenabled(repo, obsolete.exchangeopt):
1000 1009 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1001 1010 caps['obsmarkers'] = supportedformat
1002 1011 if allowpushback:
1003 1012 caps['pushback'] = ()
1004 1013 return caps
1005 1014
1006 1015 def bundle2caps(remote):
1007 1016 """return the bundle capabilities of a peer as dict"""
1008 1017 raw = remote.capable('bundle2')
1009 1018 if not raw and raw != '':
1010 1019 return {}
1011 1020 capsblob = urllib.unquote(remote.capable('bundle2'))
1012 1021 return decodecaps(capsblob)
1013 1022
1014 1023 def obsmarkersversion(caps):
1015 1024 """extract the list of supported obsmarkers versions from a bundle2caps dict
1016 1025 """
1017 1026 obscaps = caps.get('obsmarkers', ())
1018 1027 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1019 1028
1020 1029 @parthandler('changegroup', ('version',))
1021 1030 def handlechangegroup(op, inpart):
1022 1031 """apply a changegroup part on the repo
1023 1032
1024 1033 This is a very early implementation that will massive rework before being
1025 1034 inflicted to any end-user.
1026 1035 """
1027 1036 # Make sure we trigger a transaction creation
1028 1037 #
1029 1038 # The addchangegroup function will get a transaction object by itself, but
1030 1039 # we need to make sure we trigger the creation of a transaction object used
1031 1040 # for the whole processing scope.
1032 1041 op.gettransaction()
1033 1042 unpackerversion = inpart.params.get('version', '01')
1034 1043 # We should raise an appropriate exception here
1035 1044 unpacker = changegroup.packermap[unpackerversion][1]
1036 1045 cg = unpacker(inpart, 'UN')
1037 1046 # the source and url passed here are overwritten by the one contained in
1038 1047 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1039 1048 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1040 1049 op.records.add('changegroup', {'return': ret})
1041 1050 if op.reply is not None:
1042 1051 # This is definitely not the final form of this
1043 1052 # return. But one need to start somewhere.
1044 1053 part = op.reply.newpart('reply:changegroup', mandatory=False)
1045 1054 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1046 1055 part.addparam('return', '%i' % ret, mandatory=False)
1047 1056 assert not inpart.read()
1048 1057
1049 1058 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1050 1059 ['digest:%s' % k for k in util.DIGESTS.keys()])
1051 1060 @parthandler('remote-changegroup', _remotechangegroupparams)
1052 1061 def handleremotechangegroup(op, inpart):
1053 1062 """apply a bundle10 on the repo, given an url and validation information
1054 1063
1055 1064 All the information about the remote bundle to import are given as
1056 1065 parameters. The parameters include:
1057 1066 - url: the url to the bundle10.
1058 1067 - size: the bundle10 file size. It is used to validate what was
1059 1068 retrieved by the client matches the server knowledge about the bundle.
1060 1069 - digests: a space separated list of the digest types provided as
1061 1070 parameters.
1062 1071 - digest:<digest-type>: the hexadecimal representation of the digest with
1063 1072 that name. Like the size, it is used to validate what was retrieved by
1064 1073 the client matches what the server knows about the bundle.
1065 1074
1066 1075 When multiple digest types are given, all of them are checked.
1067 1076 """
1068 1077 try:
1069 1078 raw_url = inpart.params['url']
1070 1079 except KeyError:
1071 1080 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1072 1081 parsed_url = util.url(raw_url)
1073 1082 if parsed_url.scheme not in capabilities['remote-changegroup']:
1074 1083 raise util.Abort(_('remote-changegroup does not support %s urls') %
1075 1084 parsed_url.scheme)
1076 1085
1077 1086 try:
1078 1087 size = int(inpart.params['size'])
1079 1088 except ValueError:
1080 1089 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1081 1090 % 'size')
1082 1091 except KeyError:
1083 1092 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1084 1093
1085 1094 digests = {}
1086 1095 for typ in inpart.params.get('digests', '').split():
1087 1096 param = 'digest:%s' % typ
1088 1097 try:
1089 1098 value = inpart.params[param]
1090 1099 except KeyError:
1091 1100 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1092 1101 param)
1093 1102 digests[typ] = value
1094 1103
1095 1104 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1096 1105
1097 1106 # Make sure we trigger a transaction creation
1098 1107 #
1099 1108 # The addchangegroup function will get a transaction object by itself, but
1100 1109 # we need to make sure we trigger the creation of a transaction object used
1101 1110 # for the whole processing scope.
1102 1111 op.gettransaction()
1103 1112 import exchange
1104 1113 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1105 1114 if not isinstance(cg, changegroup.cg1unpacker):
1106 1115 raise util.Abort(_('%s: not a bundle version 1.0') %
1107 1116 util.hidepassword(raw_url))
1108 1117 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1109 1118 op.records.add('changegroup', {'return': ret})
1110 1119 if op.reply is not None:
1111 1120 # This is definitely not the final form of this
1112 1121 # return. But one need to start somewhere.
1113 1122 part = op.reply.newpart('reply:changegroup')
1114 1123 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1115 1124 part.addparam('return', '%i' % ret, mandatory=False)
1116 1125 try:
1117 1126 real_part.validate()
1118 1127 except util.Abort, e:
1119 1128 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1120 1129 (util.hidepassword(raw_url), str(e)))
1121 1130 assert not inpart.read()
1122 1131
1123 1132 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1124 1133 def handlereplychangegroup(op, inpart):
1125 1134 ret = int(inpart.params['return'])
1126 1135 replyto = int(inpart.params['in-reply-to'])
1127 1136 op.records.add('changegroup', {'return': ret}, replyto)
1128 1137
1129 1138 @parthandler('check:heads')
1130 1139 def handlecheckheads(op, inpart):
1131 1140 """check that head of the repo did not change
1132 1141
1133 1142 This is used to detect a push race when using unbundle.
1134 1143 This replaces the "heads" argument of unbundle."""
1135 1144 h = inpart.read(20)
1136 1145 heads = []
1137 1146 while len(h) == 20:
1138 1147 heads.append(h)
1139 1148 h = inpart.read(20)
1140 1149 assert not h
1141 1150 if heads != op.repo.heads():
1142 1151 raise error.PushRaced('repository changed while pushing - '
1143 1152 'please try again')
1144 1153
1145 1154 @parthandler('output')
1146 1155 def handleoutput(op, inpart):
1147 1156 """forward output captured on the server to the client"""
1148 1157 for line in inpart.read().splitlines():
1149 1158 op.ui.write(('remote: %s\n' % line))
1150 1159
1151 1160 @parthandler('replycaps')
1152 1161 def handlereplycaps(op, inpart):
1153 1162 """Notify that a reply bundle should be created
1154 1163
1155 1164 The payload contains the capabilities information for the reply"""
1156 1165 caps = decodecaps(inpart.read())
1157 1166 if op.reply is None:
1158 1167 op.reply = bundle20(op.ui, caps)
1159 1168
1160 1169 @parthandler('error:abort', ('message', 'hint'))
1161 1170 def handleerrorabort(op, inpart):
1162 1171 """Used to transmit abort error over the wire"""
1163 1172 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1164 1173
1165 1174 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1166 1175 def handleerrorunsupportedcontent(op, inpart):
1167 1176 """Used to transmit unknown content error over the wire"""
1168 1177 kwargs = {}
1169 1178 parttype = inpart.params.get('parttype')
1170 1179 if parttype is not None:
1171 1180 kwargs['parttype'] = parttype
1172 1181 params = inpart.params.get('params')
1173 1182 if params is not None:
1174 1183 kwargs['params'] = params.split('\0')
1175 1184
1176 1185 raise error.UnsupportedPartError(**kwargs)
1177 1186
1178 1187 @parthandler('error:pushraced', ('message',))
1179 1188 def handleerrorpushraced(op, inpart):
1180 1189 """Used to transmit push race error over the wire"""
1181 1190 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1182 1191
1183 1192 @parthandler('listkeys', ('namespace',))
1184 1193 def handlelistkeys(op, inpart):
1185 1194 """retrieve pushkey namespace content stored in a bundle2"""
1186 1195 namespace = inpart.params['namespace']
1187 1196 r = pushkey.decodekeys(inpart.read())
1188 1197 op.records.add('listkeys', (namespace, r))
1189 1198
1190 1199 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1191 1200 def handlepushkey(op, inpart):
1192 1201 """process a pushkey request"""
1193 1202 dec = pushkey.decode
1194 1203 namespace = dec(inpart.params['namespace'])
1195 1204 key = dec(inpart.params['key'])
1196 1205 old = dec(inpart.params['old'])
1197 1206 new = dec(inpart.params['new'])
1198 1207 ret = op.repo.pushkey(namespace, key, old, new)
1199 1208 record = {'namespace': namespace,
1200 1209 'key': key,
1201 1210 'old': old,
1202 1211 'new': new}
1203 1212 op.records.add('pushkey', record)
1204 1213 if op.reply is not None:
1205 1214 rpart = op.reply.newpart('reply:pushkey')
1206 1215 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1207 1216 rpart.addparam('return', '%i' % ret, mandatory=False)
1208 1217
1209 1218 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1210 1219 def handlepushkeyreply(op, inpart):
1211 1220 """retrieve the result of a pushkey request"""
1212 1221 ret = int(inpart.params['return'])
1213 1222 partid = int(inpart.params['in-reply-to'])
1214 1223 op.records.add('pushkey', {'return': ret}, partid)
1215 1224
1216 1225 @parthandler('obsmarkers')
1217 1226 def handleobsmarker(op, inpart):
1218 1227 """add a stream of obsmarkers to the repo"""
1219 1228 tr = op.gettransaction()
1220 1229 markerdata = inpart.read()
1221 1230 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1222 1231 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1223 1232 % len(markerdata))
1224 1233 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1225 1234 if new:
1226 1235 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1227 1236 op.records.add('obsmarkers', {'new': new})
1228 1237 if op.reply is not None:
1229 1238 rpart = op.reply.newpart('reply:obsmarkers')
1230 1239 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1231 1240 rpart.addparam('new', '%i' % new, mandatory=False)
1232 1241
1233 1242
1234 1243 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1235 1244 def handlepushkeyreply(op, inpart):
1236 1245 """retrieve the result of a pushkey request"""
1237 1246 ret = int(inpart.params['new'])
1238 1247 partid = int(inpart.params['in-reply-to'])
1239 1248 op.records.add('obsmarkers', {'new': ret}, partid)
General Comments 0
You need to be logged in to leave comments. Login now