##// END OF EJS Templates
unbundle20: move header parsing into the 'getunbundler' function...
Pierre-Yves David -
r24642:54e5c239 default
parent child Browse files
Show More
@@ -1,1231 +1,1232 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part headers. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 import errno
149 149 import sys
150 150 import util
151 151 import struct
152 152 import urllib
153 153 import string
154 154 import obsolete
155 155 import pushkey
156 156 import url
157 157 import re
158 158
159 159 import changegroup, error
160 160 from i18n import _
161 161
162 162 _pack = struct.pack
163 163 _unpack = struct.unpack
164 164
165 165 _fstreamparamsize = '>i'
166 166 _fpartheadersize = '>i'
167 167 _fparttypesize = '>B'
168 168 _fpartid = '>I'
169 169 _fpayloadsize = '>i'
170 170 _fpartparamcount = '>BB'
171 171
172 172 preferedchunksize = 4096
173 173
174 174 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
175 175
176 176 def validateparttype(parttype):
177 177 """raise ValueError if a parttype contains invalid character"""
178 178 if _parttypeforbidden.search(parttype):
179 179 raise ValueError(parttype)
180 180
181 181 def _makefpartparamsizes(nbparams):
182 182 """return a struct format to read part parameter sizes
183 183
184 184 The number parameters is variable so we need to build that format
185 185 dynamically.
186 186 """
187 187 return '>'+('BB'*nbparams)
188 188
189 189 parthandlermapping = {}
190 190
191 191 def parthandler(parttype, params=()):
192 192 """decorator that register a function as a bundle2 part handler
193 193
194 194 eg::
195 195
196 196 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
197 197 def myparttypehandler(...):
198 198 '''process a part of type "my part".'''
199 199 ...
200 200 """
201 201 validateparttype(parttype)
202 202 def _decorator(func):
203 203 lparttype = parttype.lower() # enforce lower case matching.
204 204 assert lparttype not in parthandlermapping
205 205 parthandlermapping[lparttype] = func
206 206 func.params = frozenset(params)
207 207 return func
208 208 return _decorator
209 209
210 210 class unbundlerecords(object):
211 211 """keep record of what happens during and unbundle
212 212
213 213 New records are added using `records.add('cat', obj)`. Where 'cat' is a
214 214 category of record and obj is an arbitrary object.
215 215
216 216 `records['cat']` will return all entries of this category 'cat'.
217 217
218 218 Iterating on the object itself will yield `('category', obj)` tuples
219 219 for all entries.
220 220
221 221 All iterations happens in chronological order.
222 222 """
223 223
224 224 def __init__(self):
225 225 self._categories = {}
226 226 self._sequences = []
227 227 self._replies = {}
228 228
229 229 def add(self, category, entry, inreplyto=None):
230 230 """add a new record of a given category.
231 231
232 232 The entry can then be retrieved in the list returned by
233 233 self['category']."""
234 234 self._categories.setdefault(category, []).append(entry)
235 235 self._sequences.append((category, entry))
236 236 if inreplyto is not None:
237 237 self.getreplies(inreplyto).add(category, entry)
238 238
239 239 def getreplies(self, partid):
240 240 """get the records that are replies to a specific part"""
241 241 return self._replies.setdefault(partid, unbundlerecords())
242 242
243 243 def __getitem__(self, cat):
244 244 return tuple(self._categories.get(cat, ()))
245 245
246 246 def __iter__(self):
247 247 return iter(self._sequences)
248 248
249 249 def __len__(self):
250 250 return len(self._sequences)
251 251
252 252 def __nonzero__(self):
253 253 return bool(self._sequences)
254 254
255 255 class bundleoperation(object):
256 256 """an object that represents a single bundling process
257 257
258 258 Its purpose is to carry unbundle-related objects and states.
259 259
260 260 A new object should be created at the beginning of each bundle processing.
261 261 The object is to be returned by the processing function.
262 262
263 263 The object has very little content now it will ultimately contain:
264 264 * an access to the repo the bundle is applied to,
265 265 * a ui object,
266 266 * a way to retrieve a transaction to add changes to the repo,
267 267 * a way to record the result of processing each part,
268 268 * a way to construct a bundle response when applicable.
269 269 """
270 270
271 271 def __init__(self, repo, transactiongetter):
272 272 self.repo = repo
273 273 self.ui = repo.ui
274 274 self.records = unbundlerecords()
275 275 self.gettransaction = transactiongetter
276 276 self.reply = None
277 277
278 278 class TransactionUnavailable(RuntimeError):
279 279 pass
280 280
281 281 def _notransaction():
282 282 """default method to get a transaction while processing a bundle
283 283
284 284 Raise an exception to highlight the fact that no transaction was expected
285 285 to be created"""
286 286 raise TransactionUnavailable()
287 287
288 288 def processbundle(repo, unbundler, transactiongetter=None):
289 289 """This function process a bundle, apply effect to/from a repo
290 290
291 291 It iterates over each part then searches for and uses the proper handling
292 292 code to process the part. Parts are processed in order.
293 293
294 294 This is very early version of this function that will be strongly reworked
295 295 before final usage.
296 296
297 297 Unknown Mandatory part will abort the process.
298 298 """
299 299 if transactiongetter is None:
300 300 transactiongetter = _notransaction
301 301 op = bundleoperation(repo, transactiongetter)
302 302 # todo:
303 303 # - replace this is a init function soon.
304 304 # - exception catching
305 305 unbundler.params
306 306 iterparts = unbundler.iterparts()
307 307 part = None
308 308 try:
309 309 for part in iterparts:
310 310 _processpart(op, part)
311 311 except Exception, exc:
312 312 for part in iterparts:
313 313 # consume the bundle content
314 314 part.seek(0, 2)
315 315 # Small hack to let caller code distinguish exceptions from bundle2
316 316 # processing from processing the old format. This is mostly
317 317 # needed to handle different return codes to unbundle according to the
318 318 # type of bundle. We should probably clean up or drop this return code
319 319 # craziness in a future version.
320 320 exc.duringunbundle2 = True
321 321 raise
322 322 return op
323 323
324 324 def _processpart(op, part):
325 325 """process a single part from a bundle
326 326
327 327 The part is guaranteed to have been fully consumed when the function exits
328 328 (even if an exception is raised)."""
329 329 try:
330 330 try:
331 331 handler = parthandlermapping.get(part.type)
332 332 if handler is None:
333 333 raise error.UnsupportedPartError(parttype=part.type)
334 334 op.ui.debug('found a handler for part %r\n' % part.type)
335 335 unknownparams = part.mandatorykeys - handler.params
336 336 if unknownparams:
337 337 unknownparams = list(unknownparams)
338 338 unknownparams.sort()
339 339 raise error.UnsupportedPartError(parttype=part.type,
340 340 params=unknownparams)
341 341 except error.UnsupportedPartError, exc:
342 342 if part.mandatory: # mandatory parts
343 343 raise
344 344 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
345 345 return # skip to part processing
346 346
347 347 # handler is called outside the above try block so that we don't
348 348 # risk catching KeyErrors from anything other than the
349 349 # parthandlermapping lookup (any KeyError raised by handler()
350 350 # itself represents a defect of a different variety).
351 351 output = None
352 352 if op.reply is not None:
353 353 op.ui.pushbuffer(error=True)
354 354 output = ''
355 355 try:
356 356 handler(op, part)
357 357 finally:
358 358 if output is not None:
359 359 output = op.ui.popbuffer()
360 360 if output:
361 361 outpart = op.reply.newpart('b2x:output', data=output,
362 362 mandatory=False)
363 363 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
364 364 finally:
365 365 # consume the part content to not corrupt the stream.
366 366 part.seek(0, 2)
367 367
368 368
369 369 def decodecaps(blob):
370 370 """decode a bundle2 caps bytes blob into a dictionary
371 371
372 372 The blob is a list of capabilities (one per line)
373 373 Capabilities may have values using a line of the form::
374 374
375 375 capability=value1,value2,value3
376 376
377 377 The values are always a list."""
378 378 caps = {}
379 379 for line in blob.splitlines():
380 380 if not line:
381 381 continue
382 382 if '=' not in line:
383 383 key, vals = line, ()
384 384 else:
385 385 key, vals = line.split('=', 1)
386 386 vals = vals.split(',')
387 387 key = urllib.unquote(key)
388 388 vals = [urllib.unquote(v) for v in vals]
389 389 caps[key] = vals
390 390 return caps
391 391
392 392 def encodecaps(caps):
393 393 """encode a bundle2 caps dictionary into a bytes blob"""
394 394 chunks = []
395 395 for ca in sorted(caps):
396 396 vals = caps[ca]
397 397 ca = urllib.quote(ca)
398 398 vals = [urllib.quote(v) for v in vals]
399 399 if vals:
400 400 ca = "%s=%s" % (ca, ','.join(vals))
401 401 chunks.append(ca)
402 402 return '\n'.join(chunks)
403 403
404 404 class bundle20(object):
405 405 """represent an outgoing bundle2 container
406 406
407 407 Use the `addparam` method to add stream level parameter. and `newpart` to
408 408 populate it. Then call `getchunks` to retrieve all the binary chunks of
409 409 data that compose the bundle2 container."""
410 410
411 411 _magicstring = 'HG2Y'
412 412
413 413 def __init__(self, ui, capabilities=()):
414 414 self.ui = ui
415 415 self._params = []
416 416 self._parts = []
417 417 self.capabilities = dict(capabilities)
418 418
419 419 @property
420 420 def nbparts(self):
421 421 """total number of parts added to the bundler"""
422 422 return len(self._parts)
423 423
424 424 # methods used to defines the bundle2 content
425 425 def addparam(self, name, value=None):
426 426 """add a stream level parameter"""
427 427 if not name:
428 428 raise ValueError('empty parameter name')
429 429 if name[0] not in string.letters:
430 430 raise ValueError('non letter first character: %r' % name)
431 431 self._params.append((name, value))
432 432
433 433 def addpart(self, part):
434 434 """add a new part to the bundle2 container
435 435
436 436 Parts contains the actual applicative payload."""
437 437 assert part.id is None
438 438 part.id = len(self._parts) # very cheap counter
439 439 self._parts.append(part)
440 440
441 441 def newpart(self, typeid, *args, **kwargs):
442 442 """create a new part and add it to the containers
443 443
444 444 As the part is directly added to the containers. For now, this means
445 445 that any failure to properly initialize the part after calling
446 446 ``newpart`` should result in a failure of the whole bundling process.
447 447
448 448 You can still fall back to manually create and add if you need better
449 449 control."""
450 450 part = bundlepart(typeid, *args, **kwargs)
451 451 self.addpart(part)
452 452 return part
453 453
454 454 # methods used to generate the bundle2 stream
455 455 def getchunks(self):
456 456 self.ui.debug('start emission of %s stream\n' % self._magicstring)
457 457 yield self._magicstring
458 458 param = self._paramchunk()
459 459 self.ui.debug('bundle parameter: %s\n' % param)
460 460 yield _pack(_fstreamparamsize, len(param))
461 461 if param:
462 462 yield param
463 463
464 464 self.ui.debug('start of parts\n')
465 465 for part in self._parts:
466 466 self.ui.debug('bundle part: "%s"\n' % part.type)
467 467 for chunk in part.getchunks():
468 468 yield chunk
469 469 self.ui.debug('end of bundle\n')
470 470 yield _pack(_fpartheadersize, 0)
471 471
472 472 def _paramchunk(self):
473 473 """return a encoded version of all stream parameters"""
474 474 blocks = []
475 475 for par, value in self._params:
476 476 par = urllib.quote(par)
477 477 if value is not None:
478 478 value = urllib.quote(value)
479 479 par = '%s=%s' % (par, value)
480 480 blocks.append(par)
481 481 return ' '.join(blocks)
482 482
483 483 class unpackermixin(object):
484 484 """A mixin to extract bytes and struct data from a stream"""
485 485
486 486 def __init__(self, fp):
487 487 self._fp = fp
488 488 self._seekable = (util.safehasattr(fp, 'seek') and
489 489 util.safehasattr(fp, 'tell'))
490 490
491 491 def _unpack(self, format):
492 492 """unpack this struct format from the stream"""
493 493 data = self._readexact(struct.calcsize(format))
494 494 return _unpack(format, data)
495 495
496 496 def _readexact(self, size):
497 497 """read exactly <size> bytes from the stream"""
498 498 return changegroup.readexactly(self._fp, size)
499 499
500 500 def seek(self, offset, whence=0):
501 501 """move the underlying file pointer"""
502 502 if self._seekable:
503 503 return self._fp.seek(offset, whence)
504 504 else:
505 505 raise NotImplementedError(_('File pointer is not seekable'))
506 506
507 507 def tell(self):
508 508 """return the file offset, or None if file is not seekable"""
509 509 if self._seekable:
510 510 try:
511 511 return self._fp.tell()
512 512 except IOError, e:
513 513 if e.errno == errno.ESPIPE:
514 514 self._seekable = False
515 515 else:
516 516 raise
517 517 return None
518 518
519 519 def close(self):
520 520 """close underlying file"""
521 521 if util.safehasattr(self._fp, 'close'):
522 522 return self._fp.close()
523 523
524 524 def getunbundler(ui, fp, header=None):
525 525 """return a valid unbundler object for a given header"""
526 return unbundle20(ui, fp, header)
526 if header is None:
527 header = changegroup.readexactly(fp, 4)
528 magic, version = header[0:2], header[2:4]
529 if magic != 'HG':
530 raise util.Abort(_('not a Mercurial bundle'))
531 if version != '2Y':
532 raise util.Abort(_('unknown bundle version %s') % version)
533 unbundler = unbundle20(ui, fp)
534 ui.debug('start processing of %s stream\n' % header)
535 return unbundler
527 536
528 537 class unbundle20(unpackermixin):
529 538 """interpret a bundle2 stream
530 539
531 540 This class is fed with a binary stream and yields parts through its
532 541 `iterparts` methods."""
533 542
534 def __init__(self, ui, fp, header=None):
543 def __init__(self, ui, fp):
535 544 """If header is specified, we do not read it out of the stream."""
536 545 self.ui = ui
537 546 super(unbundle20, self).__init__(fp)
538 if header is None:
539 header = self._readexact(4)
540 magic, version = header[0:2], header[2:4]
541 if magic != 'HG':
542 raise util.Abort(_('not a Mercurial bundle'))
543 if version != '2Y':
544 raise util.Abort(_('unknown bundle version %s') % version)
545 self.ui.debug('start processing of %s stream\n' % header)
546 547
547 548 @util.propertycache
548 549 def params(self):
549 550 """dictionary of stream level parameters"""
550 551 self.ui.debug('reading bundle2 stream parameters\n')
551 552 params = {}
552 553 paramssize = self._unpack(_fstreamparamsize)[0]
553 554 if paramssize < 0:
554 555 raise error.BundleValueError('negative bundle param size: %i'
555 556 % paramssize)
556 557 if paramssize:
557 558 for p in self._readexact(paramssize).split(' '):
558 559 p = p.split('=', 1)
559 560 p = [urllib.unquote(i) for i in p]
560 561 if len(p) < 2:
561 562 p.append(None)
562 563 self._processparam(*p)
563 564 params[p[0]] = p[1]
564 565 return params
565 566
566 567 def _processparam(self, name, value):
567 568 """process a parameter, applying its effect if needed
568 569
569 570 Parameter starting with a lower case letter are advisory and will be
570 571 ignored when unknown. Those starting with an upper case letter are
571 572 mandatory and will this function will raise a KeyError when unknown.
572 573
573 574 Note: no option are currently supported. Any input will be either
574 575 ignored or failing.
575 576 """
576 577 if not name:
577 578 raise ValueError('empty parameter name')
578 579 if name[0] not in string.letters:
579 580 raise ValueError('non letter first character: %r' % name)
580 581 # Some logic will be later added here to try to process the option for
581 582 # a dict of known parameter.
582 583 if name[0].islower():
583 584 self.ui.debug("ignoring unknown parameter %r\n" % name)
584 585 else:
585 586 raise error.UnsupportedPartError(params=(name,))
586 587
587 588
588 589 def iterparts(self):
589 590 """yield all parts contained in the stream"""
590 591 # make sure param have been loaded
591 592 self.params
592 593 self.ui.debug('start extraction of bundle2 parts\n')
593 594 headerblock = self._readpartheader()
594 595 while headerblock is not None:
595 596 part = unbundlepart(self.ui, headerblock, self._fp)
596 597 yield part
597 598 part.seek(0, 2)
598 599 headerblock = self._readpartheader()
599 600 self.ui.debug('end of bundle2 stream\n')
600 601
601 602 def _readpartheader(self):
602 603 """reads a part header size and return the bytes blob
603 604
604 605 returns None if empty"""
605 606 headersize = self._unpack(_fpartheadersize)[0]
606 607 if headersize < 0:
607 608 raise error.BundleValueError('negative part header size: %i'
608 609 % headersize)
609 610 self.ui.debug('part header size: %i\n' % headersize)
610 611 if headersize:
611 612 return self._readexact(headersize)
612 613 return None
613 614
614 615 def compressed(self):
615 616 return False
616 617
617 618 class bundlepart(object):
618 619 """A bundle2 part contains application level payload
619 620
620 621 The part `type` is used to route the part to the application level
621 622 handler.
622 623
623 624 The part payload is contained in ``part.data``. It could be raw bytes or a
624 625 generator of byte chunks.
625 626
626 627 You can add parameters to the part using the ``addparam`` method.
627 628 Parameters can be either mandatory (default) or advisory. Remote side
628 629 should be able to safely ignore the advisory ones.
629 630
630 631 Both data and parameters cannot be modified after the generation has begun.
631 632 """
632 633
633 634 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
634 635 data='', mandatory=True):
635 636 validateparttype(parttype)
636 637 self.id = None
637 638 self.type = parttype
638 639 self._data = data
639 640 self._mandatoryparams = list(mandatoryparams)
640 641 self._advisoryparams = list(advisoryparams)
641 642 # checking for duplicated entries
642 643 self._seenparams = set()
643 644 for pname, __ in self._mandatoryparams + self._advisoryparams:
644 645 if pname in self._seenparams:
645 646 raise RuntimeError('duplicated params: %s' % pname)
646 647 self._seenparams.add(pname)
647 648 # status of the part's generation:
648 649 # - None: not started,
649 650 # - False: currently generated,
650 651 # - True: generation done.
651 652 self._generated = None
652 653 self.mandatory = mandatory
653 654
654 655 # methods used to defines the part content
655 656 def __setdata(self, data):
656 657 if self._generated is not None:
657 658 raise error.ReadOnlyPartError('part is being generated')
658 659 self._data = data
659 660 def __getdata(self):
660 661 return self._data
661 662 data = property(__getdata, __setdata)
662 663
663 664 @property
664 665 def mandatoryparams(self):
665 666 # make it an immutable tuple to force people through ``addparam``
666 667 return tuple(self._mandatoryparams)
667 668
668 669 @property
669 670 def advisoryparams(self):
670 671 # make it an immutable tuple to force people through ``addparam``
671 672 return tuple(self._advisoryparams)
672 673
673 674 def addparam(self, name, value='', mandatory=True):
674 675 if self._generated is not None:
675 676 raise error.ReadOnlyPartError('part is being generated')
676 677 if name in self._seenparams:
677 678 raise ValueError('duplicated params: %s' % name)
678 679 self._seenparams.add(name)
679 680 params = self._advisoryparams
680 681 if mandatory:
681 682 params = self._mandatoryparams
682 683 params.append((name, value))
683 684
684 685 # methods used to generates the bundle2 stream
685 686 def getchunks(self):
686 687 if self._generated is not None:
687 688 raise RuntimeError('part can only be consumed once')
688 689 self._generated = False
689 690 #### header
690 691 if self.mandatory:
691 692 parttype = self.type.upper()
692 693 else:
693 694 parttype = self.type.lower()
694 695 ## parttype
695 696 header = [_pack(_fparttypesize, len(parttype)),
696 697 parttype, _pack(_fpartid, self.id),
697 698 ]
698 699 ## parameters
699 700 # count
700 701 manpar = self.mandatoryparams
701 702 advpar = self.advisoryparams
702 703 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
703 704 # size
704 705 parsizes = []
705 706 for key, value in manpar:
706 707 parsizes.append(len(key))
707 708 parsizes.append(len(value))
708 709 for key, value in advpar:
709 710 parsizes.append(len(key))
710 711 parsizes.append(len(value))
711 712 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
712 713 header.append(paramsizes)
713 714 # key, value
714 715 for key, value in manpar:
715 716 header.append(key)
716 717 header.append(value)
717 718 for key, value in advpar:
718 719 header.append(key)
719 720 header.append(value)
720 721 ## finalize header
721 722 headerchunk = ''.join(header)
722 723 yield _pack(_fpartheadersize, len(headerchunk))
723 724 yield headerchunk
724 725 ## payload
725 726 try:
726 727 for chunk in self._payloadchunks():
727 728 yield _pack(_fpayloadsize, len(chunk))
728 729 yield chunk
729 730 except Exception, exc:
730 731 # backup exception data for later
731 732 exc_info = sys.exc_info()
732 733 msg = 'unexpected error: %s' % exc
733 734 interpart = bundlepart('b2x:error:abort', [('message', msg)],
734 735 mandatory=False)
735 736 interpart.id = 0
736 737 yield _pack(_fpayloadsize, -1)
737 738 for chunk in interpart.getchunks():
738 739 yield chunk
739 740 # abort current part payload
740 741 yield _pack(_fpayloadsize, 0)
741 742 raise exc_info[0], exc_info[1], exc_info[2]
742 743 # end of payload
743 744 yield _pack(_fpayloadsize, 0)
744 745 self._generated = True
745 746
746 747 def _payloadchunks(self):
747 748 """yield chunks of a the part payload
748 749
749 750 Exists to handle the different methods to provide data to a part."""
750 751 # we only support fixed size data now.
751 752 # This will be improved in the future.
752 753 if util.safehasattr(self.data, 'next'):
753 754 buff = util.chunkbuffer(self.data)
754 755 chunk = buff.read(preferedchunksize)
755 756 while chunk:
756 757 yield chunk
757 758 chunk = buff.read(preferedchunksize)
758 759 elif len(self.data):
759 760 yield self.data
760 761
761 762
762 763 flaginterrupt = -1
763 764
764 765 class interrupthandler(unpackermixin):
765 766 """read one part and process it with restricted capability
766 767
767 768 This allows to transmit exception raised on the producer size during part
768 769 iteration while the consumer is reading a part.
769 770
770 771 Part processed in this manner only have access to a ui object,"""
771 772
772 773 def __init__(self, ui, fp):
773 774 super(interrupthandler, self).__init__(fp)
774 775 self.ui = ui
775 776
776 777 def _readpartheader(self):
777 778 """reads a part header size and return the bytes blob
778 779
779 780 returns None if empty"""
780 781 headersize = self._unpack(_fpartheadersize)[0]
781 782 if headersize < 0:
782 783 raise error.BundleValueError('negative part header size: %i'
783 784 % headersize)
784 785 self.ui.debug('part header size: %i\n' % headersize)
785 786 if headersize:
786 787 return self._readexact(headersize)
787 788 return None
788 789
789 790 def __call__(self):
790 791 self.ui.debug('bundle2 stream interruption, looking for a part.\n')
791 792 headerblock = self._readpartheader()
792 793 if headerblock is None:
793 794 self.ui.debug('no part found during interruption.\n')
794 795 return
795 796 part = unbundlepart(self.ui, headerblock, self._fp)
796 797 op = interruptoperation(self.ui)
797 798 _processpart(op, part)
798 799
799 800 class interruptoperation(object):
800 801 """A limited operation to be use by part handler during interruption
801 802
802 803 It only have access to an ui object.
803 804 """
804 805
805 806 def __init__(self, ui):
806 807 self.ui = ui
807 808 self.reply = None
808 809
809 810 @property
810 811 def repo(self):
811 812 raise RuntimeError('no repo access from stream interruption')
812 813
813 814 def gettransaction(self):
814 815 raise TransactionUnavailable('no repo access from stream interruption')
815 816
816 817 class unbundlepart(unpackermixin):
817 818 """a bundle part read from a bundle"""
818 819
819 820 def __init__(self, ui, header, fp):
820 821 super(unbundlepart, self).__init__(fp)
821 822 self.ui = ui
822 823 # unbundle state attr
823 824 self._headerdata = header
824 825 self._headeroffset = 0
825 826 self._initialized = False
826 827 self.consumed = False
827 828 # part data
828 829 self.id = None
829 830 self.type = None
830 831 self.mandatoryparams = None
831 832 self.advisoryparams = None
832 833 self.params = None
833 834 self.mandatorykeys = ()
834 835 self._payloadstream = None
835 836 self._readheader()
836 837 self._mandatory = None
837 838 self._chunkindex = [] #(payload, file) position tuples for chunk starts
838 839 self._pos = 0
839 840
840 841 def _fromheader(self, size):
841 842 """return the next <size> byte from the header"""
842 843 offset = self._headeroffset
843 844 data = self._headerdata[offset:(offset + size)]
844 845 self._headeroffset = offset + size
845 846 return data
846 847
847 848 def _unpackheader(self, format):
848 849 """read given format from header
849 850
850 851 This automatically compute the size of the format to read."""
851 852 data = self._fromheader(struct.calcsize(format))
852 853 return _unpack(format, data)
853 854
854 855 def _initparams(self, mandatoryparams, advisoryparams):
855 856 """internal function to setup all logic related parameters"""
856 857 # make it read only to prevent people touching it by mistake.
857 858 self.mandatoryparams = tuple(mandatoryparams)
858 859 self.advisoryparams = tuple(advisoryparams)
859 860 # user friendly UI
860 861 self.params = dict(self.mandatoryparams)
861 862 self.params.update(dict(self.advisoryparams))
862 863 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
863 864
864 865 def _payloadchunks(self, chunknum=0):
865 866 '''seek to specified chunk and start yielding data'''
866 867 if len(self._chunkindex) == 0:
867 868 assert chunknum == 0, 'Must start with chunk 0'
868 869 self._chunkindex.append((0, super(unbundlepart, self).tell()))
869 870 else:
870 871 assert chunknum < len(self._chunkindex), \
871 872 'Unknown chunk %d' % chunknum
872 873 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
873 874
874 875 pos = self._chunkindex[chunknum][0]
875 876 payloadsize = self._unpack(_fpayloadsize)[0]
876 877 self.ui.debug('payload chunk size: %i\n' % payloadsize)
877 878 while payloadsize:
878 879 if payloadsize == flaginterrupt:
879 880 # interruption detection, the handler will now read a
880 881 # single part and process it.
881 882 interrupthandler(self.ui, self._fp)()
882 883 elif payloadsize < 0:
883 884 msg = 'negative payload chunk size: %i' % payloadsize
884 885 raise error.BundleValueError(msg)
885 886 else:
886 887 result = self._readexact(payloadsize)
887 888 chunknum += 1
888 889 pos += payloadsize
889 890 if chunknum == len(self._chunkindex):
890 891 self._chunkindex.append((pos,
891 892 super(unbundlepart, self).tell()))
892 893 yield result
893 894 payloadsize = self._unpack(_fpayloadsize)[0]
894 895 self.ui.debug('payload chunk size: %i\n' % payloadsize)
895 896
896 897 def _findchunk(self, pos):
897 898 '''for a given payload position, return a chunk number and offset'''
898 899 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
899 900 if ppos == pos:
900 901 return chunk, 0
901 902 elif ppos > pos:
902 903 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
903 904 raise ValueError('Unknown chunk')
904 905
905 906 def _readheader(self):
906 907 """read the header and setup the object"""
907 908 typesize = self._unpackheader(_fparttypesize)[0]
908 909 self.type = self._fromheader(typesize)
909 910 self.ui.debug('part type: "%s"\n' % self.type)
910 911 self.id = self._unpackheader(_fpartid)[0]
911 912 self.ui.debug('part id: "%s"\n' % self.id)
912 913 # extract mandatory bit from type
913 914 self.mandatory = (self.type != self.type.lower())
914 915 self.type = self.type.lower()
915 916 ## reading parameters
916 917 # param count
917 918 mancount, advcount = self._unpackheader(_fpartparamcount)
918 919 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
919 920 # param size
920 921 fparamsizes = _makefpartparamsizes(mancount + advcount)
921 922 paramsizes = self._unpackheader(fparamsizes)
922 923 # make it a list of couple again
923 924 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
924 925 # split mandatory from advisory
925 926 mansizes = paramsizes[:mancount]
926 927 advsizes = paramsizes[mancount:]
927 928 # retrieve param value
928 929 manparams = []
929 930 for key, value in mansizes:
930 931 manparams.append((self._fromheader(key), self._fromheader(value)))
931 932 advparams = []
932 933 for key, value in advsizes:
933 934 advparams.append((self._fromheader(key), self._fromheader(value)))
934 935 self._initparams(manparams, advparams)
935 936 ## part payload
936 937 self._payloadstream = util.chunkbuffer(self._payloadchunks())
937 938 # we read the data, tell it
938 939 self._initialized = True
939 940
940 941 def read(self, size=None):
941 942 """read payload data"""
942 943 if not self._initialized:
943 944 self._readheader()
944 945 if size is None:
945 946 data = self._payloadstream.read()
946 947 else:
947 948 data = self._payloadstream.read(size)
948 949 if size is None or len(data) < size:
949 950 self.consumed = True
950 951 self._pos += len(data)
951 952 return data
952 953
953 954 def tell(self):
954 955 return self._pos
955 956
956 957 def seek(self, offset, whence=0):
957 958 if whence == 0:
958 959 newpos = offset
959 960 elif whence == 1:
960 961 newpos = self._pos + offset
961 962 elif whence == 2:
962 963 if not self.consumed:
963 964 self.read()
964 965 newpos = self._chunkindex[-1][0] - offset
965 966 else:
966 967 raise ValueError('Unknown whence value: %r' % (whence,))
967 968
968 969 if newpos > self._chunkindex[-1][0] and not self.consumed:
969 970 self.read()
970 971 if not 0 <= newpos <= self._chunkindex[-1][0]:
971 972 raise ValueError('Offset out of range')
972 973
973 974 if self._pos != newpos:
974 975 chunk, internaloffset = self._findchunk(newpos)
975 976 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
976 977 adjust = self.read(internaloffset)
977 978 if len(adjust) != internaloffset:
978 979 raise util.Abort(_('Seek failed\n'))
979 980 self._pos = newpos
980 981
981 982 capabilities = {'HG2Y': (),
982 983 'b2x:listkeys': (),
983 984 'b2x:pushkey': (),
984 985 'digests': tuple(sorted(util.DIGESTS.keys())),
985 986 'b2x:remote-changegroup': ('http', 'https'),
986 987 }
987 988
988 989 def getrepocaps(repo, allowpushback=False):
989 990 """return the bundle2 capabilities for a given repo
990 991
991 992 Exists to allow extensions (like evolution) to mutate the capabilities.
992 993 """
993 994 caps = capabilities.copy()
994 995 caps['b2x:changegroup'] = tuple(sorted(changegroup.packermap.keys()))
995 996 if obsolete.isenabled(repo, obsolete.exchangeopt):
996 997 supportedformat = tuple('V%i' % v for v in obsolete.formats)
997 998 caps['b2x:obsmarkers'] = supportedformat
998 999 if allowpushback:
999 1000 caps['b2x:pushback'] = ()
1000 1001 return caps
1001 1002
1002 1003 def bundle2caps(remote):
1003 1004 """return the bundle capabilities of a peer as dict"""
1004 1005 raw = remote.capable('bundle2-exp')
1005 1006 if not raw and raw != '':
1006 1007 return {}
1007 1008 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
1008 1009 return decodecaps(capsblob)
1009 1010
1010 1011 def obsmarkersversion(caps):
1011 1012 """extract the list of supported obsmarkers versions from a bundle2caps dict
1012 1013 """
1013 1014 obscaps = caps.get('b2x:obsmarkers', ())
1014 1015 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1015 1016
1016 1017 @parthandler('b2x:changegroup', ('version',))
1017 1018 def handlechangegroup(op, inpart):
1018 1019 """apply a changegroup part on the repo
1019 1020
1020 1021 This is a very early implementation that will massive rework before being
1021 1022 inflicted to any end-user.
1022 1023 """
1023 1024 # Make sure we trigger a transaction creation
1024 1025 #
1025 1026 # The addchangegroup function will get a transaction object by itself, but
1026 1027 # we need to make sure we trigger the creation of a transaction object used
1027 1028 # for the whole processing scope.
1028 1029 op.gettransaction()
1029 1030 unpackerversion = inpart.params.get('version', '01')
1030 1031 # We should raise an appropriate exception here
1031 1032 unpacker = changegroup.packermap[unpackerversion][1]
1032 1033 cg = unpacker(inpart, 'UN')
1033 1034 # the source and url passed here are overwritten by the one contained in
1034 1035 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1035 1036 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1036 1037 op.records.add('changegroup', {'return': ret})
1037 1038 if op.reply is not None:
1038 1039 # This is definitely not the final form of this
1039 1040 # return. But one need to start somewhere.
1040 1041 part = op.reply.newpart('b2x:reply:changegroup', mandatory=False)
1041 1042 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1042 1043 part.addparam('return', '%i' % ret, mandatory=False)
1043 1044 assert not inpart.read()
1044 1045
1045 1046 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1046 1047 ['digest:%s' % k for k in util.DIGESTS.keys()])
1047 1048 @parthandler('b2x:remote-changegroup', _remotechangegroupparams)
1048 1049 def handleremotechangegroup(op, inpart):
1049 1050 """apply a bundle10 on the repo, given an url and validation information
1050 1051
1051 1052 All the information about the remote bundle to import are given as
1052 1053 parameters. The parameters include:
1053 1054 - url: the url to the bundle10.
1054 1055 - size: the bundle10 file size. It is used to validate what was
1055 1056 retrieved by the client matches the server knowledge about the bundle.
1056 1057 - digests: a space separated list of the digest types provided as
1057 1058 parameters.
1058 1059 - digest:<digest-type>: the hexadecimal representation of the digest with
1059 1060 that name. Like the size, it is used to validate what was retrieved by
1060 1061 the client matches what the server knows about the bundle.
1061 1062
1062 1063 When multiple digest types are given, all of them are checked.
1063 1064 """
1064 1065 try:
1065 1066 raw_url = inpart.params['url']
1066 1067 except KeyError:
1067 1068 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1068 1069 parsed_url = util.url(raw_url)
1069 1070 if parsed_url.scheme not in capabilities['b2x:remote-changegroup']:
1070 1071 raise util.Abort(_('remote-changegroup does not support %s urls') %
1071 1072 parsed_url.scheme)
1072 1073
1073 1074 try:
1074 1075 size = int(inpart.params['size'])
1075 1076 except ValueError:
1076 1077 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1077 1078 % 'size')
1078 1079 except KeyError:
1079 1080 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1080 1081
1081 1082 digests = {}
1082 1083 for typ in inpart.params.get('digests', '').split():
1083 1084 param = 'digest:%s' % typ
1084 1085 try:
1085 1086 value = inpart.params[param]
1086 1087 except KeyError:
1087 1088 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1088 1089 param)
1089 1090 digests[typ] = value
1090 1091
1091 1092 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1092 1093
1093 1094 # Make sure we trigger a transaction creation
1094 1095 #
1095 1096 # The addchangegroup function will get a transaction object by itself, but
1096 1097 # we need to make sure we trigger the creation of a transaction object used
1097 1098 # for the whole processing scope.
1098 1099 op.gettransaction()
1099 1100 import exchange
1100 1101 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1101 1102 if not isinstance(cg, changegroup.cg1unpacker):
1102 1103 raise util.Abort(_('%s: not a bundle version 1.0') %
1103 1104 util.hidepassword(raw_url))
1104 1105 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1105 1106 op.records.add('changegroup', {'return': ret})
1106 1107 if op.reply is not None:
1107 1108 # This is definitely not the final form of this
1108 1109 # return. But one need to start somewhere.
1109 1110 part = op.reply.newpart('b2x:reply:changegroup')
1110 1111 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1111 1112 part.addparam('return', '%i' % ret, mandatory=False)
1112 1113 try:
1113 1114 real_part.validate()
1114 1115 except util.Abort, e:
1115 1116 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1116 1117 (util.hidepassword(raw_url), str(e)))
1117 1118 assert not inpart.read()
1118 1119
1119 1120 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
1120 1121 def handlereplychangegroup(op, inpart):
1121 1122 ret = int(inpart.params['return'])
1122 1123 replyto = int(inpart.params['in-reply-to'])
1123 1124 op.records.add('changegroup', {'return': ret}, replyto)
1124 1125
1125 1126 @parthandler('b2x:check:heads')
1126 1127 def handlecheckheads(op, inpart):
1127 1128 """check that head of the repo did not change
1128 1129
1129 1130 This is used to detect a push race when using unbundle.
1130 1131 This replaces the "heads" argument of unbundle."""
1131 1132 h = inpart.read(20)
1132 1133 heads = []
1133 1134 while len(h) == 20:
1134 1135 heads.append(h)
1135 1136 h = inpart.read(20)
1136 1137 assert not h
1137 1138 if heads != op.repo.heads():
1138 1139 raise error.PushRaced('repository changed while pushing - '
1139 1140 'please try again')
1140 1141
1141 1142 @parthandler('b2x:output')
1142 1143 def handleoutput(op, inpart):
1143 1144 """forward output captured on the server to the client"""
1144 1145 for line in inpart.read().splitlines():
1145 1146 op.ui.write(('remote: %s\n' % line))
1146 1147
1147 1148 @parthandler('b2x:replycaps')
1148 1149 def handlereplycaps(op, inpart):
1149 1150 """Notify that a reply bundle should be created
1150 1151
1151 1152 The payload contains the capabilities information for the reply"""
1152 1153 caps = decodecaps(inpart.read())
1153 1154 if op.reply is None:
1154 1155 op.reply = bundle20(op.ui, caps)
1155 1156
1156 1157 @parthandler('b2x:error:abort', ('message', 'hint'))
1157 1158 def handlereplycaps(op, inpart):
1158 1159 """Used to transmit abort error over the wire"""
1159 1160 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1160 1161
1161 1162 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
1162 1163 def handlereplycaps(op, inpart):
1163 1164 """Used to transmit unknown content error over the wire"""
1164 1165 kwargs = {}
1165 1166 parttype = inpart.params.get('parttype')
1166 1167 if parttype is not None:
1167 1168 kwargs['parttype'] = parttype
1168 1169 params = inpart.params.get('params')
1169 1170 if params is not None:
1170 1171 kwargs['params'] = params.split('\0')
1171 1172
1172 1173 raise error.UnsupportedPartError(**kwargs)
1173 1174
1174 1175 @parthandler('b2x:error:pushraced', ('message',))
1175 1176 def handlereplycaps(op, inpart):
1176 1177 """Used to transmit push race error over the wire"""
1177 1178 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1178 1179
1179 1180 @parthandler('b2x:listkeys', ('namespace',))
1180 1181 def handlelistkeys(op, inpart):
1181 1182 """retrieve pushkey namespace content stored in a bundle2"""
1182 1183 namespace = inpart.params['namespace']
1183 1184 r = pushkey.decodekeys(inpart.read())
1184 1185 op.records.add('listkeys', (namespace, r))
1185 1186
1186 1187 @parthandler('b2x:pushkey', ('namespace', 'key', 'old', 'new'))
1187 1188 def handlepushkey(op, inpart):
1188 1189 """process a pushkey request"""
1189 1190 dec = pushkey.decode
1190 1191 namespace = dec(inpart.params['namespace'])
1191 1192 key = dec(inpart.params['key'])
1192 1193 old = dec(inpart.params['old'])
1193 1194 new = dec(inpart.params['new'])
1194 1195 ret = op.repo.pushkey(namespace, key, old, new)
1195 1196 record = {'namespace': namespace,
1196 1197 'key': key,
1197 1198 'old': old,
1198 1199 'new': new}
1199 1200 op.records.add('pushkey', record)
1200 1201 if op.reply is not None:
1201 1202 rpart = op.reply.newpart('b2x:reply:pushkey')
1202 1203 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1203 1204 rpart.addparam('return', '%i' % ret, mandatory=False)
1204 1205
1205 1206 @parthandler('b2x:reply:pushkey', ('return', 'in-reply-to'))
1206 1207 def handlepushkeyreply(op, inpart):
1207 1208 """retrieve the result of a pushkey request"""
1208 1209 ret = int(inpart.params['return'])
1209 1210 partid = int(inpart.params['in-reply-to'])
1210 1211 op.records.add('pushkey', {'return': ret}, partid)
1211 1212
1212 1213 @parthandler('b2x:obsmarkers')
1213 1214 def handleobsmarker(op, inpart):
1214 1215 """add a stream of obsmarkers to the repo"""
1215 1216 tr = op.gettransaction()
1216 1217 new = op.repo.obsstore.mergemarkers(tr, inpart.read())
1217 1218 if new:
1218 1219 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1219 1220 op.records.add('obsmarkers', {'new': new})
1220 1221 if op.reply is not None:
1221 1222 rpart = op.reply.newpart('b2x:reply:obsmarkers')
1222 1223 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1223 1224 rpart.addparam('new', '%i' % new, mandatory=False)
1224 1225
1225 1226
1226 1227 @parthandler('b2x:reply:obsmarkers', ('new', 'in-reply-to'))
1227 1228 def handlepushkeyreply(op, inpart):
1228 1229 """retrieve the result of a pushkey request"""
1229 1230 ret = int(inpart.params['new'])
1230 1231 partid = int(inpart.params['in-reply-to'])
1231 1232 op.records.add('obsmarkers', {'new': ret}, partid)
General Comments 0
You need to be logged in to leave comments. Login now