##// END OF EJS Templates
bundle20: move magic string into the class...
Pierre-Yves David -
r24640:685639f9 default
parent child Browse files
Show More
@@ -1,1227 +1,1227 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part headers. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 import errno
149 149 import sys
150 150 import util
151 151 import struct
152 152 import urllib
153 153 import string
154 154 import obsolete
155 155 import pushkey
156 156 import url
157 157 import re
158 158
159 159 import changegroup, error
160 160 from i18n import _
161 161
162 162 _pack = struct.pack
163 163 _unpack = struct.unpack
164 164
165 _magicstring = 'HG2Y'
166
167 165 _fstreamparamsize = '>i'
168 166 _fpartheadersize = '>i'
169 167 _fparttypesize = '>B'
170 168 _fpartid = '>I'
171 169 _fpayloadsize = '>i'
172 170 _fpartparamcount = '>BB'
173 171
174 172 preferedchunksize = 4096
175 173
176 174 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
177 175
178 176 def validateparttype(parttype):
179 177 """raise ValueError if a parttype contains invalid character"""
180 178 if _parttypeforbidden.search(parttype):
181 179 raise ValueError(parttype)
182 180
183 181 def _makefpartparamsizes(nbparams):
184 182 """return a struct format to read part parameter sizes
185 183
186 184 The number parameters is variable so we need to build that format
187 185 dynamically.
188 186 """
189 187 return '>'+('BB'*nbparams)
190 188
191 189 parthandlermapping = {}
192 190
193 191 def parthandler(parttype, params=()):
194 192 """decorator that register a function as a bundle2 part handler
195 193
196 194 eg::
197 195
198 196 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
199 197 def myparttypehandler(...):
200 198 '''process a part of type "my part".'''
201 199 ...
202 200 """
203 201 validateparttype(parttype)
204 202 def _decorator(func):
205 203 lparttype = parttype.lower() # enforce lower case matching.
206 204 assert lparttype not in parthandlermapping
207 205 parthandlermapping[lparttype] = func
208 206 func.params = frozenset(params)
209 207 return func
210 208 return _decorator
211 209
212 210 class unbundlerecords(object):
213 211 """keep record of what happens during and unbundle
214 212
215 213 New records are added using `records.add('cat', obj)`. Where 'cat' is a
216 214 category of record and obj is an arbitrary object.
217 215
218 216 `records['cat']` will return all entries of this category 'cat'.
219 217
220 218 Iterating on the object itself will yield `('category', obj)` tuples
221 219 for all entries.
222 220
223 221 All iterations happens in chronological order.
224 222 """
225 223
226 224 def __init__(self):
227 225 self._categories = {}
228 226 self._sequences = []
229 227 self._replies = {}
230 228
231 229 def add(self, category, entry, inreplyto=None):
232 230 """add a new record of a given category.
233 231
234 232 The entry can then be retrieved in the list returned by
235 233 self['category']."""
236 234 self._categories.setdefault(category, []).append(entry)
237 235 self._sequences.append((category, entry))
238 236 if inreplyto is not None:
239 237 self.getreplies(inreplyto).add(category, entry)
240 238
241 239 def getreplies(self, partid):
242 240 """get the records that are replies to a specific part"""
243 241 return self._replies.setdefault(partid, unbundlerecords())
244 242
245 243 def __getitem__(self, cat):
246 244 return tuple(self._categories.get(cat, ()))
247 245
248 246 def __iter__(self):
249 247 return iter(self._sequences)
250 248
251 249 def __len__(self):
252 250 return len(self._sequences)
253 251
254 252 def __nonzero__(self):
255 253 return bool(self._sequences)
256 254
257 255 class bundleoperation(object):
258 256 """an object that represents a single bundling process
259 257
260 258 Its purpose is to carry unbundle-related objects and states.
261 259
262 260 A new object should be created at the beginning of each bundle processing.
263 261 The object is to be returned by the processing function.
264 262
265 263 The object has very little content now it will ultimately contain:
266 264 * an access to the repo the bundle is applied to,
267 265 * a ui object,
268 266 * a way to retrieve a transaction to add changes to the repo,
269 267 * a way to record the result of processing each part,
270 268 * a way to construct a bundle response when applicable.
271 269 """
272 270
273 271 def __init__(self, repo, transactiongetter):
274 272 self.repo = repo
275 273 self.ui = repo.ui
276 274 self.records = unbundlerecords()
277 275 self.gettransaction = transactiongetter
278 276 self.reply = None
279 277
280 278 class TransactionUnavailable(RuntimeError):
281 279 pass
282 280
283 281 def _notransaction():
284 282 """default method to get a transaction while processing a bundle
285 283
286 284 Raise an exception to highlight the fact that no transaction was expected
287 285 to be created"""
288 286 raise TransactionUnavailable()
289 287
290 288 def processbundle(repo, unbundler, transactiongetter=None):
291 289 """This function process a bundle, apply effect to/from a repo
292 290
293 291 It iterates over each part then searches for and uses the proper handling
294 292 code to process the part. Parts are processed in order.
295 293
296 294 This is very early version of this function that will be strongly reworked
297 295 before final usage.
298 296
299 297 Unknown Mandatory part will abort the process.
300 298 """
301 299 if transactiongetter is None:
302 300 transactiongetter = _notransaction
303 301 op = bundleoperation(repo, transactiongetter)
304 302 # todo:
305 303 # - replace this is a init function soon.
306 304 # - exception catching
307 305 unbundler.params
308 306 iterparts = unbundler.iterparts()
309 307 part = None
310 308 try:
311 309 for part in iterparts:
312 310 _processpart(op, part)
313 311 except Exception, exc:
314 312 for part in iterparts:
315 313 # consume the bundle content
316 314 part.seek(0, 2)
317 315 # Small hack to let caller code distinguish exceptions from bundle2
318 316 # processing from processing the old format. This is mostly
319 317 # needed to handle different return codes to unbundle according to the
320 318 # type of bundle. We should probably clean up or drop this return code
321 319 # craziness in a future version.
322 320 exc.duringunbundle2 = True
323 321 raise
324 322 return op
325 323
326 324 def _processpart(op, part):
327 325 """process a single part from a bundle
328 326
329 327 The part is guaranteed to have been fully consumed when the function exits
330 328 (even if an exception is raised)."""
331 329 try:
332 330 try:
333 331 handler = parthandlermapping.get(part.type)
334 332 if handler is None:
335 333 raise error.UnsupportedPartError(parttype=part.type)
336 334 op.ui.debug('found a handler for part %r\n' % part.type)
337 335 unknownparams = part.mandatorykeys - handler.params
338 336 if unknownparams:
339 337 unknownparams = list(unknownparams)
340 338 unknownparams.sort()
341 339 raise error.UnsupportedPartError(parttype=part.type,
342 340 params=unknownparams)
343 341 except error.UnsupportedPartError, exc:
344 342 if part.mandatory: # mandatory parts
345 343 raise
346 344 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
347 345 return # skip to part processing
348 346
349 347 # handler is called outside the above try block so that we don't
350 348 # risk catching KeyErrors from anything other than the
351 349 # parthandlermapping lookup (any KeyError raised by handler()
352 350 # itself represents a defect of a different variety).
353 351 output = None
354 352 if op.reply is not None:
355 353 op.ui.pushbuffer(error=True)
356 354 output = ''
357 355 try:
358 356 handler(op, part)
359 357 finally:
360 358 if output is not None:
361 359 output = op.ui.popbuffer()
362 360 if output:
363 361 outpart = op.reply.newpart('b2x:output', data=output,
364 362 mandatory=False)
365 363 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
366 364 finally:
367 365 # consume the part content to not corrupt the stream.
368 366 part.seek(0, 2)
369 367
370 368
371 369 def decodecaps(blob):
372 370 """decode a bundle2 caps bytes blob into a dictionary
373 371
374 372 The blob is a list of capabilities (one per line)
375 373 Capabilities may have values using a line of the form::
376 374
377 375 capability=value1,value2,value3
378 376
379 377 The values are always a list."""
380 378 caps = {}
381 379 for line in blob.splitlines():
382 380 if not line:
383 381 continue
384 382 if '=' not in line:
385 383 key, vals = line, ()
386 384 else:
387 385 key, vals = line.split('=', 1)
388 386 vals = vals.split(',')
389 387 key = urllib.unquote(key)
390 388 vals = [urllib.unquote(v) for v in vals]
391 389 caps[key] = vals
392 390 return caps
393 391
394 392 def encodecaps(caps):
395 393 """encode a bundle2 caps dictionary into a bytes blob"""
396 394 chunks = []
397 395 for ca in sorted(caps):
398 396 vals = caps[ca]
399 397 ca = urllib.quote(ca)
400 398 vals = [urllib.quote(v) for v in vals]
401 399 if vals:
402 400 ca = "%s=%s" % (ca, ','.join(vals))
403 401 chunks.append(ca)
404 402 return '\n'.join(chunks)
405 403
406 404 class bundle20(object):
407 405 """represent an outgoing bundle2 container
408 406
409 407 Use the `addparam` method to add stream level parameter. and `newpart` to
410 408 populate it. Then call `getchunks` to retrieve all the binary chunks of
411 409 data that compose the bundle2 container."""
412 410
411 _magicstring = 'HG2Y'
412
413 413 def __init__(self, ui, capabilities=()):
414 414 self.ui = ui
415 415 self._params = []
416 416 self._parts = []
417 417 self.capabilities = dict(capabilities)
418 418
419 419 @property
420 420 def nbparts(self):
421 421 """total number of parts added to the bundler"""
422 422 return len(self._parts)
423 423
424 424 # methods used to defines the bundle2 content
425 425 def addparam(self, name, value=None):
426 426 """add a stream level parameter"""
427 427 if not name:
428 428 raise ValueError('empty parameter name')
429 429 if name[0] not in string.letters:
430 430 raise ValueError('non letter first character: %r' % name)
431 431 self._params.append((name, value))
432 432
433 433 def addpart(self, part):
434 434 """add a new part to the bundle2 container
435 435
436 436 Parts contains the actual applicative payload."""
437 437 assert part.id is None
438 438 part.id = len(self._parts) # very cheap counter
439 439 self._parts.append(part)
440 440
441 441 def newpart(self, typeid, *args, **kwargs):
442 442 """create a new part and add it to the containers
443 443
444 444 As the part is directly added to the containers. For now, this means
445 445 that any failure to properly initialize the part after calling
446 446 ``newpart`` should result in a failure of the whole bundling process.
447 447
448 448 You can still fall back to manually create and add if you need better
449 449 control."""
450 450 part = bundlepart(typeid, *args, **kwargs)
451 451 self.addpart(part)
452 452 return part
453 453
454 454 # methods used to generate the bundle2 stream
455 455 def getchunks(self):
456 self.ui.debug('start emission of %s stream\n' % _magicstring)
457 yield _magicstring
456 self.ui.debug('start emission of %s stream\n' % self._magicstring)
457 yield self._magicstring
458 458 param = self._paramchunk()
459 459 self.ui.debug('bundle parameter: %s\n' % param)
460 460 yield _pack(_fstreamparamsize, len(param))
461 461 if param:
462 462 yield param
463 463
464 464 self.ui.debug('start of parts\n')
465 465 for part in self._parts:
466 466 self.ui.debug('bundle part: "%s"\n' % part.type)
467 467 for chunk in part.getchunks():
468 468 yield chunk
469 469 self.ui.debug('end of bundle\n')
470 470 yield _pack(_fpartheadersize, 0)
471 471
472 472 def _paramchunk(self):
473 473 """return a encoded version of all stream parameters"""
474 474 blocks = []
475 475 for par, value in self._params:
476 476 par = urllib.quote(par)
477 477 if value is not None:
478 478 value = urllib.quote(value)
479 479 par = '%s=%s' % (par, value)
480 480 blocks.append(par)
481 481 return ' '.join(blocks)
482 482
483 483 class unpackermixin(object):
484 484 """A mixin to extract bytes and struct data from a stream"""
485 485
486 486 def __init__(self, fp):
487 487 self._fp = fp
488 488 self._seekable = (util.safehasattr(fp, 'seek') and
489 489 util.safehasattr(fp, 'tell'))
490 490
491 491 def _unpack(self, format):
492 492 """unpack this struct format from the stream"""
493 493 data = self._readexact(struct.calcsize(format))
494 494 return _unpack(format, data)
495 495
496 496 def _readexact(self, size):
497 497 """read exactly <size> bytes from the stream"""
498 498 return changegroup.readexactly(self._fp, size)
499 499
500 500 def seek(self, offset, whence=0):
501 501 """move the underlying file pointer"""
502 502 if self._seekable:
503 503 return self._fp.seek(offset, whence)
504 504 else:
505 505 raise NotImplementedError(_('File pointer is not seekable'))
506 506
507 507 def tell(self):
508 508 """return the file offset, or None if file is not seekable"""
509 509 if self._seekable:
510 510 try:
511 511 return self._fp.tell()
512 512 except IOError, e:
513 513 if e.errno == errno.ESPIPE:
514 514 self._seekable = False
515 515 else:
516 516 raise
517 517 return None
518 518
519 519 def close(self):
520 520 """close underlying file"""
521 521 if util.safehasattr(self._fp, 'close'):
522 522 return self._fp.close()
523 523
524 524 class unbundle20(unpackermixin):
525 525 """interpret a bundle2 stream
526 526
527 527 This class is fed with a binary stream and yields parts through its
528 528 `iterparts` methods."""
529 529
530 530 def __init__(self, ui, fp, header=None):
531 531 """If header is specified, we do not read it out of the stream."""
532 532 self.ui = ui
533 533 super(unbundle20, self).__init__(fp)
534 534 if header is None:
535 535 header = self._readexact(4)
536 536 magic, version = header[0:2], header[2:4]
537 537 if magic != 'HG':
538 538 raise util.Abort(_('not a Mercurial bundle'))
539 539 if version != '2Y':
540 540 raise util.Abort(_('unknown bundle version %s') % version)
541 541 self.ui.debug('start processing of %s stream\n' % header)
542 542
543 543 @util.propertycache
544 544 def params(self):
545 545 """dictionary of stream level parameters"""
546 546 self.ui.debug('reading bundle2 stream parameters\n')
547 547 params = {}
548 548 paramssize = self._unpack(_fstreamparamsize)[0]
549 549 if paramssize < 0:
550 550 raise error.BundleValueError('negative bundle param size: %i'
551 551 % paramssize)
552 552 if paramssize:
553 553 for p in self._readexact(paramssize).split(' '):
554 554 p = p.split('=', 1)
555 555 p = [urllib.unquote(i) for i in p]
556 556 if len(p) < 2:
557 557 p.append(None)
558 558 self._processparam(*p)
559 559 params[p[0]] = p[1]
560 560 return params
561 561
562 562 def _processparam(self, name, value):
563 563 """process a parameter, applying its effect if needed
564 564
565 565 Parameter starting with a lower case letter are advisory and will be
566 566 ignored when unknown. Those starting with an upper case letter are
567 567 mandatory and will this function will raise a KeyError when unknown.
568 568
569 569 Note: no option are currently supported. Any input will be either
570 570 ignored or failing.
571 571 """
572 572 if not name:
573 573 raise ValueError('empty parameter name')
574 574 if name[0] not in string.letters:
575 575 raise ValueError('non letter first character: %r' % name)
576 576 # Some logic will be later added here to try to process the option for
577 577 # a dict of known parameter.
578 578 if name[0].islower():
579 579 self.ui.debug("ignoring unknown parameter %r\n" % name)
580 580 else:
581 581 raise error.UnsupportedPartError(params=(name,))
582 582
583 583
584 584 def iterparts(self):
585 585 """yield all parts contained in the stream"""
586 586 # make sure param have been loaded
587 587 self.params
588 588 self.ui.debug('start extraction of bundle2 parts\n')
589 589 headerblock = self._readpartheader()
590 590 while headerblock is not None:
591 591 part = unbundlepart(self.ui, headerblock, self._fp)
592 592 yield part
593 593 part.seek(0, 2)
594 594 headerblock = self._readpartheader()
595 595 self.ui.debug('end of bundle2 stream\n')
596 596
597 597 def _readpartheader(self):
598 598 """reads a part header size and return the bytes blob
599 599
600 600 returns None if empty"""
601 601 headersize = self._unpack(_fpartheadersize)[0]
602 602 if headersize < 0:
603 603 raise error.BundleValueError('negative part header size: %i'
604 604 % headersize)
605 605 self.ui.debug('part header size: %i\n' % headersize)
606 606 if headersize:
607 607 return self._readexact(headersize)
608 608 return None
609 609
610 610 def compressed(self):
611 611 return False
612 612
613 613 class bundlepart(object):
614 614 """A bundle2 part contains application level payload
615 615
616 616 The part `type` is used to route the part to the application level
617 617 handler.
618 618
619 619 The part payload is contained in ``part.data``. It could be raw bytes or a
620 620 generator of byte chunks.
621 621
622 622 You can add parameters to the part using the ``addparam`` method.
623 623 Parameters can be either mandatory (default) or advisory. Remote side
624 624 should be able to safely ignore the advisory ones.
625 625
626 626 Both data and parameters cannot be modified after the generation has begun.
627 627 """
628 628
629 629 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
630 630 data='', mandatory=True):
631 631 validateparttype(parttype)
632 632 self.id = None
633 633 self.type = parttype
634 634 self._data = data
635 635 self._mandatoryparams = list(mandatoryparams)
636 636 self._advisoryparams = list(advisoryparams)
637 637 # checking for duplicated entries
638 638 self._seenparams = set()
639 639 for pname, __ in self._mandatoryparams + self._advisoryparams:
640 640 if pname in self._seenparams:
641 641 raise RuntimeError('duplicated params: %s' % pname)
642 642 self._seenparams.add(pname)
643 643 # status of the part's generation:
644 644 # - None: not started,
645 645 # - False: currently generated,
646 646 # - True: generation done.
647 647 self._generated = None
648 648 self.mandatory = mandatory
649 649
650 650 # methods used to defines the part content
651 651 def __setdata(self, data):
652 652 if self._generated is not None:
653 653 raise error.ReadOnlyPartError('part is being generated')
654 654 self._data = data
655 655 def __getdata(self):
656 656 return self._data
657 657 data = property(__getdata, __setdata)
658 658
659 659 @property
660 660 def mandatoryparams(self):
661 661 # make it an immutable tuple to force people through ``addparam``
662 662 return tuple(self._mandatoryparams)
663 663
664 664 @property
665 665 def advisoryparams(self):
666 666 # make it an immutable tuple to force people through ``addparam``
667 667 return tuple(self._advisoryparams)
668 668
669 669 def addparam(self, name, value='', mandatory=True):
670 670 if self._generated is not None:
671 671 raise error.ReadOnlyPartError('part is being generated')
672 672 if name in self._seenparams:
673 673 raise ValueError('duplicated params: %s' % name)
674 674 self._seenparams.add(name)
675 675 params = self._advisoryparams
676 676 if mandatory:
677 677 params = self._mandatoryparams
678 678 params.append((name, value))
679 679
680 680 # methods used to generates the bundle2 stream
681 681 def getchunks(self):
682 682 if self._generated is not None:
683 683 raise RuntimeError('part can only be consumed once')
684 684 self._generated = False
685 685 #### header
686 686 if self.mandatory:
687 687 parttype = self.type.upper()
688 688 else:
689 689 parttype = self.type.lower()
690 690 ## parttype
691 691 header = [_pack(_fparttypesize, len(parttype)),
692 692 parttype, _pack(_fpartid, self.id),
693 693 ]
694 694 ## parameters
695 695 # count
696 696 manpar = self.mandatoryparams
697 697 advpar = self.advisoryparams
698 698 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
699 699 # size
700 700 parsizes = []
701 701 for key, value in manpar:
702 702 parsizes.append(len(key))
703 703 parsizes.append(len(value))
704 704 for key, value in advpar:
705 705 parsizes.append(len(key))
706 706 parsizes.append(len(value))
707 707 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
708 708 header.append(paramsizes)
709 709 # key, value
710 710 for key, value in manpar:
711 711 header.append(key)
712 712 header.append(value)
713 713 for key, value in advpar:
714 714 header.append(key)
715 715 header.append(value)
716 716 ## finalize header
717 717 headerchunk = ''.join(header)
718 718 yield _pack(_fpartheadersize, len(headerchunk))
719 719 yield headerchunk
720 720 ## payload
721 721 try:
722 722 for chunk in self._payloadchunks():
723 723 yield _pack(_fpayloadsize, len(chunk))
724 724 yield chunk
725 725 except Exception, exc:
726 726 # backup exception data for later
727 727 exc_info = sys.exc_info()
728 728 msg = 'unexpected error: %s' % exc
729 729 interpart = bundlepart('b2x:error:abort', [('message', msg)],
730 730 mandatory=False)
731 731 interpart.id = 0
732 732 yield _pack(_fpayloadsize, -1)
733 733 for chunk in interpart.getchunks():
734 734 yield chunk
735 735 # abort current part payload
736 736 yield _pack(_fpayloadsize, 0)
737 737 raise exc_info[0], exc_info[1], exc_info[2]
738 738 # end of payload
739 739 yield _pack(_fpayloadsize, 0)
740 740 self._generated = True
741 741
742 742 def _payloadchunks(self):
743 743 """yield chunks of a the part payload
744 744
745 745 Exists to handle the different methods to provide data to a part."""
746 746 # we only support fixed size data now.
747 747 # This will be improved in the future.
748 748 if util.safehasattr(self.data, 'next'):
749 749 buff = util.chunkbuffer(self.data)
750 750 chunk = buff.read(preferedchunksize)
751 751 while chunk:
752 752 yield chunk
753 753 chunk = buff.read(preferedchunksize)
754 754 elif len(self.data):
755 755 yield self.data
756 756
757 757
758 758 flaginterrupt = -1
759 759
760 760 class interrupthandler(unpackermixin):
761 761 """read one part and process it with restricted capability
762 762
763 763 This allows to transmit exception raised on the producer size during part
764 764 iteration while the consumer is reading a part.
765 765
766 766 Part processed in this manner only have access to a ui object,"""
767 767
768 768 def __init__(self, ui, fp):
769 769 super(interrupthandler, self).__init__(fp)
770 770 self.ui = ui
771 771
772 772 def _readpartheader(self):
773 773 """reads a part header size and return the bytes blob
774 774
775 775 returns None if empty"""
776 776 headersize = self._unpack(_fpartheadersize)[0]
777 777 if headersize < 0:
778 778 raise error.BundleValueError('negative part header size: %i'
779 779 % headersize)
780 780 self.ui.debug('part header size: %i\n' % headersize)
781 781 if headersize:
782 782 return self._readexact(headersize)
783 783 return None
784 784
785 785 def __call__(self):
786 786 self.ui.debug('bundle2 stream interruption, looking for a part.\n')
787 787 headerblock = self._readpartheader()
788 788 if headerblock is None:
789 789 self.ui.debug('no part found during interruption.\n')
790 790 return
791 791 part = unbundlepart(self.ui, headerblock, self._fp)
792 792 op = interruptoperation(self.ui)
793 793 _processpart(op, part)
794 794
795 795 class interruptoperation(object):
796 796 """A limited operation to be use by part handler during interruption
797 797
798 798 It only have access to an ui object.
799 799 """
800 800
801 801 def __init__(self, ui):
802 802 self.ui = ui
803 803 self.reply = None
804 804
805 805 @property
806 806 def repo(self):
807 807 raise RuntimeError('no repo access from stream interruption')
808 808
809 809 def gettransaction(self):
810 810 raise TransactionUnavailable('no repo access from stream interruption')
811 811
812 812 class unbundlepart(unpackermixin):
813 813 """a bundle part read from a bundle"""
814 814
815 815 def __init__(self, ui, header, fp):
816 816 super(unbundlepart, self).__init__(fp)
817 817 self.ui = ui
818 818 # unbundle state attr
819 819 self._headerdata = header
820 820 self._headeroffset = 0
821 821 self._initialized = False
822 822 self.consumed = False
823 823 # part data
824 824 self.id = None
825 825 self.type = None
826 826 self.mandatoryparams = None
827 827 self.advisoryparams = None
828 828 self.params = None
829 829 self.mandatorykeys = ()
830 830 self._payloadstream = None
831 831 self._readheader()
832 832 self._mandatory = None
833 833 self._chunkindex = [] #(payload, file) position tuples for chunk starts
834 834 self._pos = 0
835 835
836 836 def _fromheader(self, size):
837 837 """return the next <size> byte from the header"""
838 838 offset = self._headeroffset
839 839 data = self._headerdata[offset:(offset + size)]
840 840 self._headeroffset = offset + size
841 841 return data
842 842
843 843 def _unpackheader(self, format):
844 844 """read given format from header
845 845
846 846 This automatically compute the size of the format to read."""
847 847 data = self._fromheader(struct.calcsize(format))
848 848 return _unpack(format, data)
849 849
850 850 def _initparams(self, mandatoryparams, advisoryparams):
851 851 """internal function to setup all logic related parameters"""
852 852 # make it read only to prevent people touching it by mistake.
853 853 self.mandatoryparams = tuple(mandatoryparams)
854 854 self.advisoryparams = tuple(advisoryparams)
855 855 # user friendly UI
856 856 self.params = dict(self.mandatoryparams)
857 857 self.params.update(dict(self.advisoryparams))
858 858 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
859 859
860 860 def _payloadchunks(self, chunknum=0):
861 861 '''seek to specified chunk and start yielding data'''
862 862 if len(self._chunkindex) == 0:
863 863 assert chunknum == 0, 'Must start with chunk 0'
864 864 self._chunkindex.append((0, super(unbundlepart, self).tell()))
865 865 else:
866 866 assert chunknum < len(self._chunkindex), \
867 867 'Unknown chunk %d' % chunknum
868 868 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
869 869
870 870 pos = self._chunkindex[chunknum][0]
871 871 payloadsize = self._unpack(_fpayloadsize)[0]
872 872 self.ui.debug('payload chunk size: %i\n' % payloadsize)
873 873 while payloadsize:
874 874 if payloadsize == flaginterrupt:
875 875 # interruption detection, the handler will now read a
876 876 # single part and process it.
877 877 interrupthandler(self.ui, self._fp)()
878 878 elif payloadsize < 0:
879 879 msg = 'negative payload chunk size: %i' % payloadsize
880 880 raise error.BundleValueError(msg)
881 881 else:
882 882 result = self._readexact(payloadsize)
883 883 chunknum += 1
884 884 pos += payloadsize
885 885 if chunknum == len(self._chunkindex):
886 886 self._chunkindex.append((pos,
887 887 super(unbundlepart, self).tell()))
888 888 yield result
889 889 payloadsize = self._unpack(_fpayloadsize)[0]
890 890 self.ui.debug('payload chunk size: %i\n' % payloadsize)
891 891
892 892 def _findchunk(self, pos):
893 893 '''for a given payload position, return a chunk number and offset'''
894 894 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
895 895 if ppos == pos:
896 896 return chunk, 0
897 897 elif ppos > pos:
898 898 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
899 899 raise ValueError('Unknown chunk')
900 900
901 901 def _readheader(self):
902 902 """read the header and setup the object"""
903 903 typesize = self._unpackheader(_fparttypesize)[0]
904 904 self.type = self._fromheader(typesize)
905 905 self.ui.debug('part type: "%s"\n' % self.type)
906 906 self.id = self._unpackheader(_fpartid)[0]
907 907 self.ui.debug('part id: "%s"\n' % self.id)
908 908 # extract mandatory bit from type
909 909 self.mandatory = (self.type != self.type.lower())
910 910 self.type = self.type.lower()
911 911 ## reading parameters
912 912 # param count
913 913 mancount, advcount = self._unpackheader(_fpartparamcount)
914 914 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
915 915 # param size
916 916 fparamsizes = _makefpartparamsizes(mancount + advcount)
917 917 paramsizes = self._unpackheader(fparamsizes)
918 918 # make it a list of couple again
919 919 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
920 920 # split mandatory from advisory
921 921 mansizes = paramsizes[:mancount]
922 922 advsizes = paramsizes[mancount:]
923 923 # retrieve param value
924 924 manparams = []
925 925 for key, value in mansizes:
926 926 manparams.append((self._fromheader(key), self._fromheader(value)))
927 927 advparams = []
928 928 for key, value in advsizes:
929 929 advparams.append((self._fromheader(key), self._fromheader(value)))
930 930 self._initparams(manparams, advparams)
931 931 ## part payload
932 932 self._payloadstream = util.chunkbuffer(self._payloadchunks())
933 933 # we read the data, tell it
934 934 self._initialized = True
935 935
936 936 def read(self, size=None):
937 937 """read payload data"""
938 938 if not self._initialized:
939 939 self._readheader()
940 940 if size is None:
941 941 data = self._payloadstream.read()
942 942 else:
943 943 data = self._payloadstream.read(size)
944 944 if size is None or len(data) < size:
945 945 self.consumed = True
946 946 self._pos += len(data)
947 947 return data
948 948
949 949 def tell(self):
950 950 return self._pos
951 951
952 952 def seek(self, offset, whence=0):
953 953 if whence == 0:
954 954 newpos = offset
955 955 elif whence == 1:
956 956 newpos = self._pos + offset
957 957 elif whence == 2:
958 958 if not self.consumed:
959 959 self.read()
960 960 newpos = self._chunkindex[-1][0] - offset
961 961 else:
962 962 raise ValueError('Unknown whence value: %r' % (whence,))
963 963
964 964 if newpos > self._chunkindex[-1][0] and not self.consumed:
965 965 self.read()
966 966 if not 0 <= newpos <= self._chunkindex[-1][0]:
967 967 raise ValueError('Offset out of range')
968 968
969 969 if self._pos != newpos:
970 970 chunk, internaloffset = self._findchunk(newpos)
971 971 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
972 972 adjust = self.read(internaloffset)
973 973 if len(adjust) != internaloffset:
974 974 raise util.Abort(_('Seek failed\n'))
975 975 self._pos = newpos
976 976
977 977 capabilities = {'HG2Y': (),
978 978 'b2x:listkeys': (),
979 979 'b2x:pushkey': (),
980 980 'digests': tuple(sorted(util.DIGESTS.keys())),
981 981 'b2x:remote-changegroup': ('http', 'https'),
982 982 }
983 983
984 984 def getrepocaps(repo, allowpushback=False):
985 985 """return the bundle2 capabilities for a given repo
986 986
987 987 Exists to allow extensions (like evolution) to mutate the capabilities.
988 988 """
989 989 caps = capabilities.copy()
990 990 caps['b2x:changegroup'] = tuple(sorted(changegroup.packermap.keys()))
991 991 if obsolete.isenabled(repo, obsolete.exchangeopt):
992 992 supportedformat = tuple('V%i' % v for v in obsolete.formats)
993 993 caps['b2x:obsmarkers'] = supportedformat
994 994 if allowpushback:
995 995 caps['b2x:pushback'] = ()
996 996 return caps
997 997
998 998 def bundle2caps(remote):
999 999 """return the bundle capabilities of a peer as dict"""
1000 1000 raw = remote.capable('bundle2-exp')
1001 1001 if not raw and raw != '':
1002 1002 return {}
1003 1003 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
1004 1004 return decodecaps(capsblob)
1005 1005
1006 1006 def obsmarkersversion(caps):
1007 1007 """extract the list of supported obsmarkers versions from a bundle2caps dict
1008 1008 """
1009 1009 obscaps = caps.get('b2x:obsmarkers', ())
1010 1010 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1011 1011
1012 1012 @parthandler('b2x:changegroup', ('version',))
1013 1013 def handlechangegroup(op, inpart):
1014 1014 """apply a changegroup part on the repo
1015 1015
1016 1016 This is a very early implementation that will massive rework before being
1017 1017 inflicted to any end-user.
1018 1018 """
1019 1019 # Make sure we trigger a transaction creation
1020 1020 #
1021 1021 # The addchangegroup function will get a transaction object by itself, but
1022 1022 # we need to make sure we trigger the creation of a transaction object used
1023 1023 # for the whole processing scope.
1024 1024 op.gettransaction()
1025 1025 unpackerversion = inpart.params.get('version', '01')
1026 1026 # We should raise an appropriate exception here
1027 1027 unpacker = changegroup.packermap[unpackerversion][1]
1028 1028 cg = unpacker(inpart, 'UN')
1029 1029 # the source and url passed here are overwritten by the one contained in
1030 1030 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1031 1031 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1032 1032 op.records.add('changegroup', {'return': ret})
1033 1033 if op.reply is not None:
1034 1034 # This is definitely not the final form of this
1035 1035 # return. But one need to start somewhere.
1036 1036 part = op.reply.newpart('b2x:reply:changegroup', mandatory=False)
1037 1037 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1038 1038 part.addparam('return', '%i' % ret, mandatory=False)
1039 1039 assert not inpart.read()
1040 1040
1041 1041 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1042 1042 ['digest:%s' % k for k in util.DIGESTS.keys()])
1043 1043 @parthandler('b2x:remote-changegroup', _remotechangegroupparams)
1044 1044 def handleremotechangegroup(op, inpart):
1045 1045 """apply a bundle10 on the repo, given an url and validation information
1046 1046
1047 1047 All the information about the remote bundle to import are given as
1048 1048 parameters. The parameters include:
1049 1049 - url: the url to the bundle10.
1050 1050 - size: the bundle10 file size. It is used to validate what was
1051 1051 retrieved by the client matches the server knowledge about the bundle.
1052 1052 - digests: a space separated list of the digest types provided as
1053 1053 parameters.
1054 1054 - digest:<digest-type>: the hexadecimal representation of the digest with
1055 1055 that name. Like the size, it is used to validate what was retrieved by
1056 1056 the client matches what the server knows about the bundle.
1057 1057
1058 1058 When multiple digest types are given, all of them are checked.
1059 1059 """
1060 1060 try:
1061 1061 raw_url = inpart.params['url']
1062 1062 except KeyError:
1063 1063 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1064 1064 parsed_url = util.url(raw_url)
1065 1065 if parsed_url.scheme not in capabilities['b2x:remote-changegroup']:
1066 1066 raise util.Abort(_('remote-changegroup does not support %s urls') %
1067 1067 parsed_url.scheme)
1068 1068
1069 1069 try:
1070 1070 size = int(inpart.params['size'])
1071 1071 except ValueError:
1072 1072 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1073 1073 % 'size')
1074 1074 except KeyError:
1075 1075 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1076 1076
1077 1077 digests = {}
1078 1078 for typ in inpart.params.get('digests', '').split():
1079 1079 param = 'digest:%s' % typ
1080 1080 try:
1081 1081 value = inpart.params[param]
1082 1082 except KeyError:
1083 1083 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1084 1084 param)
1085 1085 digests[typ] = value
1086 1086
1087 1087 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1088 1088
1089 1089 # Make sure we trigger a transaction creation
1090 1090 #
1091 1091 # The addchangegroup function will get a transaction object by itself, but
1092 1092 # we need to make sure we trigger the creation of a transaction object used
1093 1093 # for the whole processing scope.
1094 1094 op.gettransaction()
1095 1095 import exchange
1096 1096 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1097 1097 if not isinstance(cg, changegroup.cg1unpacker):
1098 1098 raise util.Abort(_('%s: not a bundle version 1.0') %
1099 1099 util.hidepassword(raw_url))
1100 1100 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1101 1101 op.records.add('changegroup', {'return': ret})
1102 1102 if op.reply is not None:
1103 1103 # This is definitely not the final form of this
1104 1104 # return. But one need to start somewhere.
1105 1105 part = op.reply.newpart('b2x:reply:changegroup')
1106 1106 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1107 1107 part.addparam('return', '%i' % ret, mandatory=False)
1108 1108 try:
1109 1109 real_part.validate()
1110 1110 except util.Abort, e:
1111 1111 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1112 1112 (util.hidepassword(raw_url), str(e)))
1113 1113 assert not inpart.read()
1114 1114
1115 1115 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
1116 1116 def handlereplychangegroup(op, inpart):
1117 1117 ret = int(inpart.params['return'])
1118 1118 replyto = int(inpart.params['in-reply-to'])
1119 1119 op.records.add('changegroup', {'return': ret}, replyto)
1120 1120
1121 1121 @parthandler('b2x:check:heads')
1122 1122 def handlecheckheads(op, inpart):
1123 1123 """check that head of the repo did not change
1124 1124
1125 1125 This is used to detect a push race when using unbundle.
1126 1126 This replaces the "heads" argument of unbundle."""
1127 1127 h = inpart.read(20)
1128 1128 heads = []
1129 1129 while len(h) == 20:
1130 1130 heads.append(h)
1131 1131 h = inpart.read(20)
1132 1132 assert not h
1133 1133 if heads != op.repo.heads():
1134 1134 raise error.PushRaced('repository changed while pushing - '
1135 1135 'please try again')
1136 1136
1137 1137 @parthandler('b2x:output')
1138 1138 def handleoutput(op, inpart):
1139 1139 """forward output captured on the server to the client"""
1140 1140 for line in inpart.read().splitlines():
1141 1141 op.ui.write(('remote: %s\n' % line))
1142 1142
1143 1143 @parthandler('b2x:replycaps')
1144 1144 def handlereplycaps(op, inpart):
1145 1145 """Notify that a reply bundle should be created
1146 1146
1147 1147 The payload contains the capabilities information for the reply"""
1148 1148 caps = decodecaps(inpart.read())
1149 1149 if op.reply is None:
1150 1150 op.reply = bundle20(op.ui, caps)
1151 1151
1152 1152 @parthandler('b2x:error:abort', ('message', 'hint'))
1153 1153 def handlereplycaps(op, inpart):
1154 1154 """Used to transmit abort error over the wire"""
1155 1155 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1156 1156
1157 1157 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
1158 1158 def handlereplycaps(op, inpart):
1159 1159 """Used to transmit unknown content error over the wire"""
1160 1160 kwargs = {}
1161 1161 parttype = inpart.params.get('parttype')
1162 1162 if parttype is not None:
1163 1163 kwargs['parttype'] = parttype
1164 1164 params = inpart.params.get('params')
1165 1165 if params is not None:
1166 1166 kwargs['params'] = params.split('\0')
1167 1167
1168 1168 raise error.UnsupportedPartError(**kwargs)
1169 1169
1170 1170 @parthandler('b2x:error:pushraced', ('message',))
1171 1171 def handlereplycaps(op, inpart):
1172 1172 """Used to transmit push race error over the wire"""
1173 1173 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1174 1174
1175 1175 @parthandler('b2x:listkeys', ('namespace',))
1176 1176 def handlelistkeys(op, inpart):
1177 1177 """retrieve pushkey namespace content stored in a bundle2"""
1178 1178 namespace = inpart.params['namespace']
1179 1179 r = pushkey.decodekeys(inpart.read())
1180 1180 op.records.add('listkeys', (namespace, r))
1181 1181
1182 1182 @parthandler('b2x:pushkey', ('namespace', 'key', 'old', 'new'))
1183 1183 def handlepushkey(op, inpart):
1184 1184 """process a pushkey request"""
1185 1185 dec = pushkey.decode
1186 1186 namespace = dec(inpart.params['namespace'])
1187 1187 key = dec(inpart.params['key'])
1188 1188 old = dec(inpart.params['old'])
1189 1189 new = dec(inpart.params['new'])
1190 1190 ret = op.repo.pushkey(namespace, key, old, new)
1191 1191 record = {'namespace': namespace,
1192 1192 'key': key,
1193 1193 'old': old,
1194 1194 'new': new}
1195 1195 op.records.add('pushkey', record)
1196 1196 if op.reply is not None:
1197 1197 rpart = op.reply.newpart('b2x:reply:pushkey')
1198 1198 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1199 1199 rpart.addparam('return', '%i' % ret, mandatory=False)
1200 1200
1201 1201 @parthandler('b2x:reply:pushkey', ('return', 'in-reply-to'))
1202 1202 def handlepushkeyreply(op, inpart):
1203 1203 """retrieve the result of a pushkey request"""
1204 1204 ret = int(inpart.params['return'])
1205 1205 partid = int(inpart.params['in-reply-to'])
1206 1206 op.records.add('pushkey', {'return': ret}, partid)
1207 1207
1208 1208 @parthandler('b2x:obsmarkers')
1209 1209 def handleobsmarker(op, inpart):
1210 1210 """add a stream of obsmarkers to the repo"""
1211 1211 tr = op.gettransaction()
1212 1212 new = op.repo.obsstore.mergemarkers(tr, inpart.read())
1213 1213 if new:
1214 1214 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1215 1215 op.records.add('obsmarkers', {'new': new})
1216 1216 if op.reply is not None:
1217 1217 rpart = op.reply.newpart('b2x:reply:obsmarkers')
1218 1218 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1219 1219 rpart.addparam('new', '%i' % new, mandatory=False)
1220 1220
1221 1221
1222 1222 @parthandler('b2x:reply:obsmarkers', ('new', 'in-reply-to'))
1223 1223 def handlepushkeyreply(op, inpart):
1224 1224 """retrieve the result of a pushkey request"""
1225 1225 ret = int(inpart.params['new'])
1226 1226 partid = int(inpart.params['in-reply-to'])
1227 1227 op.records.add('obsmarkers', {'new': ret}, partid)
General Comments 0
You need to be logged in to leave comments. Login now