##// END OF EJS Templates
bundle2.unpackermixin: default value for seek() whence parameter...
Eric Sumner -
r24070:de32e988 default
parent child Browse files
Show More
@@ -1,1225 +1,1225 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part headers. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 import errno
149 149 import sys
150 150 import util
151 151 import struct
152 152 import urllib
153 153 import string
154 154 import obsolete
155 155 import pushkey
156 156 import url
157 157 import re
158 158
159 159 import changegroup, error
160 160 from i18n import _
161 161
162 162 _pack = struct.pack
163 163 _unpack = struct.unpack
164 164
165 165 _magicstring = 'HG2Y'
166 166
167 167 _fstreamparamsize = '>i'
168 168 _fpartheadersize = '>i'
169 169 _fparttypesize = '>B'
170 170 _fpartid = '>I'
171 171 _fpayloadsize = '>i'
172 172 _fpartparamcount = '>BB'
173 173
174 174 preferedchunksize = 4096
175 175
176 176 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
177 177
178 178 def validateparttype(parttype):
179 179 """raise ValueError if a parttype contains invalid character"""
180 180 if _parttypeforbidden.search(parttype):
181 181 raise ValueError(parttype)
182 182
183 183 def _makefpartparamsizes(nbparams):
184 184 """return a struct format to read part parameter sizes
185 185
186 186 The number parameters is variable so we need to build that format
187 187 dynamically.
188 188 """
189 189 return '>'+('BB'*nbparams)
190 190
191 191 parthandlermapping = {}
192 192
193 193 def parthandler(parttype, params=()):
194 194 """decorator that register a function as a bundle2 part handler
195 195
196 196 eg::
197 197
198 198 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
199 199 def myparttypehandler(...):
200 200 '''process a part of type "my part".'''
201 201 ...
202 202 """
203 203 validateparttype(parttype)
204 204 def _decorator(func):
205 205 lparttype = parttype.lower() # enforce lower case matching.
206 206 assert lparttype not in parthandlermapping
207 207 parthandlermapping[lparttype] = func
208 208 func.params = frozenset(params)
209 209 return func
210 210 return _decorator
211 211
212 212 class unbundlerecords(object):
213 213 """keep record of what happens during and unbundle
214 214
215 215 New records are added using `records.add('cat', obj)`. Where 'cat' is a
216 216 category of record and obj is an arbitrary object.
217 217
218 218 `records['cat']` will return all entries of this category 'cat'.
219 219
220 220 Iterating on the object itself will yield `('category', obj)` tuples
221 221 for all entries.
222 222
223 223 All iterations happens in chronological order.
224 224 """
225 225
226 226 def __init__(self):
227 227 self._categories = {}
228 228 self._sequences = []
229 229 self._replies = {}
230 230
231 231 def add(self, category, entry, inreplyto=None):
232 232 """add a new record of a given category.
233 233
234 234 The entry can then be retrieved in the list returned by
235 235 self['category']."""
236 236 self._categories.setdefault(category, []).append(entry)
237 237 self._sequences.append((category, entry))
238 238 if inreplyto is not None:
239 239 self.getreplies(inreplyto).add(category, entry)
240 240
241 241 def getreplies(self, partid):
242 242 """get the records that are replies to a specific part"""
243 243 return self._replies.setdefault(partid, unbundlerecords())
244 244
245 245 def __getitem__(self, cat):
246 246 return tuple(self._categories.get(cat, ()))
247 247
248 248 def __iter__(self):
249 249 return iter(self._sequences)
250 250
251 251 def __len__(self):
252 252 return len(self._sequences)
253 253
254 254 def __nonzero__(self):
255 255 return bool(self._sequences)
256 256
257 257 class bundleoperation(object):
258 258 """an object that represents a single bundling process
259 259
260 260 Its purpose is to carry unbundle-related objects and states.
261 261
262 262 A new object should be created at the beginning of each bundle processing.
263 263 The object is to be returned by the processing function.
264 264
265 265 The object has very little content now it will ultimately contain:
266 266 * an access to the repo the bundle is applied to,
267 267 * a ui object,
268 268 * a way to retrieve a transaction to add changes to the repo,
269 269 * a way to record the result of processing each part,
270 270 * a way to construct a bundle response when applicable.
271 271 """
272 272
273 273 def __init__(self, repo, transactiongetter):
274 274 self.repo = repo
275 275 self.ui = repo.ui
276 276 self.records = unbundlerecords()
277 277 self.gettransaction = transactiongetter
278 278 self.reply = None
279 279
280 280 class TransactionUnavailable(RuntimeError):
281 281 pass
282 282
283 283 def _notransaction():
284 284 """default method to get a transaction while processing a bundle
285 285
286 286 Raise an exception to highlight the fact that no transaction was expected
287 287 to be created"""
288 288 raise TransactionUnavailable()
289 289
290 290 def processbundle(repo, unbundler, transactiongetter=None):
291 291 """This function process a bundle, apply effect to/from a repo
292 292
293 293 It iterates over each part then searches for and uses the proper handling
294 294 code to process the part. Parts are processed in order.
295 295
296 296 This is very early version of this function that will be strongly reworked
297 297 before final usage.
298 298
299 299 Unknown Mandatory part will abort the process.
300 300 """
301 301 if transactiongetter is None:
302 302 transactiongetter = _notransaction
303 303 op = bundleoperation(repo, transactiongetter)
304 304 # todo:
305 305 # - replace this is a init function soon.
306 306 # - exception catching
307 307 unbundler.params
308 308 iterparts = unbundler.iterparts()
309 309 part = None
310 310 try:
311 311 for part in iterparts:
312 312 _processpart(op, part)
313 313 except Exception, exc:
314 314 for part in iterparts:
315 315 # consume the bundle content
316 316 part.seek(0, 2)
317 317 # Small hack to let caller code distinguish exceptions from bundle2
318 318 # processing from processing the old format. This is mostly
319 319 # needed to handle different return codes to unbundle according to the
320 320 # type of bundle. We should probably clean up or drop this return code
321 321 # craziness in a future version.
322 322 exc.duringunbundle2 = True
323 323 raise
324 324 return op
325 325
326 326 def _processpart(op, part):
327 327 """process a single part from a bundle
328 328
329 329 The part is guaranteed to have been fully consumed when the function exits
330 330 (even if an exception is raised)."""
331 331 try:
332 332 try:
333 333 handler = parthandlermapping.get(part.type)
334 334 if handler is None:
335 335 raise error.UnsupportedPartError(parttype=part.type)
336 336 op.ui.debug('found a handler for part %r\n' % part.type)
337 337 unknownparams = part.mandatorykeys - handler.params
338 338 if unknownparams:
339 339 unknownparams = list(unknownparams)
340 340 unknownparams.sort()
341 341 raise error.UnsupportedPartError(parttype=part.type,
342 342 params=unknownparams)
343 343 except error.UnsupportedPartError, exc:
344 344 if part.mandatory: # mandatory parts
345 345 raise
346 346 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
347 347 return # skip to part processing
348 348
349 349 # handler is called outside the above try block so that we don't
350 350 # risk catching KeyErrors from anything other than the
351 351 # parthandlermapping lookup (any KeyError raised by handler()
352 352 # itself represents a defect of a different variety).
353 353 output = None
354 354 if op.reply is not None:
355 355 op.ui.pushbuffer(error=True)
356 356 output = ''
357 357 try:
358 358 handler(op, part)
359 359 finally:
360 360 if output is not None:
361 361 output = op.ui.popbuffer()
362 362 if output:
363 363 outpart = op.reply.newpart('b2x:output', data=output,
364 364 mandatory=False)
365 365 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
366 366 finally:
367 367 # consume the part content to not corrupt the stream.
368 368 part.seek(0, 2)
369 369
370 370
371 371 def decodecaps(blob):
372 372 """decode a bundle2 caps bytes blob into a dictionary
373 373
374 374 The blob is a list of capabilities (one per line)
375 375 Capabilities may have values using a line of the form::
376 376
377 377 capability=value1,value2,value3
378 378
379 379 The values are always a list."""
380 380 caps = {}
381 381 for line in blob.splitlines():
382 382 if not line:
383 383 continue
384 384 if '=' not in line:
385 385 key, vals = line, ()
386 386 else:
387 387 key, vals = line.split('=', 1)
388 388 vals = vals.split(',')
389 389 key = urllib.unquote(key)
390 390 vals = [urllib.unquote(v) for v in vals]
391 391 caps[key] = vals
392 392 return caps
393 393
394 394 def encodecaps(caps):
395 395 """encode a bundle2 caps dictionary into a bytes blob"""
396 396 chunks = []
397 397 for ca in sorted(caps):
398 398 vals = caps[ca]
399 399 ca = urllib.quote(ca)
400 400 vals = [urllib.quote(v) for v in vals]
401 401 if vals:
402 402 ca = "%s=%s" % (ca, ','.join(vals))
403 403 chunks.append(ca)
404 404 return '\n'.join(chunks)
405 405
406 406 class bundle20(object):
407 407 """represent an outgoing bundle2 container
408 408
409 409 Use the `addparam` method to add stream level parameter. and `newpart` to
410 410 populate it. Then call `getchunks` to retrieve all the binary chunks of
411 411 data that compose the bundle2 container."""
412 412
413 413 def __init__(self, ui, capabilities=()):
414 414 self.ui = ui
415 415 self._params = []
416 416 self._parts = []
417 417 self.capabilities = dict(capabilities)
418 418
419 419 @property
420 420 def nbparts(self):
421 421 """total number of parts added to the bundler"""
422 422 return len(self._parts)
423 423
424 424 # methods used to defines the bundle2 content
425 425 def addparam(self, name, value=None):
426 426 """add a stream level parameter"""
427 427 if not name:
428 428 raise ValueError('empty parameter name')
429 429 if name[0] not in string.letters:
430 430 raise ValueError('non letter first character: %r' % name)
431 431 self._params.append((name, value))
432 432
433 433 def addpart(self, part):
434 434 """add a new part to the bundle2 container
435 435
436 436 Parts contains the actual applicative payload."""
437 437 assert part.id is None
438 438 part.id = len(self._parts) # very cheap counter
439 439 self._parts.append(part)
440 440
441 441 def newpart(self, typeid, *args, **kwargs):
442 442 """create a new part and add it to the containers
443 443
444 444 As the part is directly added to the containers. For now, this means
445 445 that any failure to properly initialize the part after calling
446 446 ``newpart`` should result in a failure of the whole bundling process.
447 447
448 448 You can still fall back to manually create and add if you need better
449 449 control."""
450 450 part = bundlepart(typeid, *args, **kwargs)
451 451 self.addpart(part)
452 452 return part
453 453
454 454 # methods used to generate the bundle2 stream
455 455 def getchunks(self):
456 456 self.ui.debug('start emission of %s stream\n' % _magicstring)
457 457 yield _magicstring
458 458 param = self._paramchunk()
459 459 self.ui.debug('bundle parameter: %s\n' % param)
460 460 yield _pack(_fstreamparamsize, len(param))
461 461 if param:
462 462 yield param
463 463
464 464 self.ui.debug('start of parts\n')
465 465 for part in self._parts:
466 466 self.ui.debug('bundle part: "%s"\n' % part.type)
467 467 for chunk in part.getchunks():
468 468 yield chunk
469 469 self.ui.debug('end of bundle\n')
470 470 yield _pack(_fpartheadersize, 0)
471 471
472 472 def _paramchunk(self):
473 473 """return a encoded version of all stream parameters"""
474 474 blocks = []
475 475 for par, value in self._params:
476 476 par = urllib.quote(par)
477 477 if value is not None:
478 478 value = urllib.quote(value)
479 479 par = '%s=%s' % (par, value)
480 480 blocks.append(par)
481 481 return ' '.join(blocks)
482 482
483 483 class unpackermixin(object):
484 484 """A mixin to extract bytes and struct data from a stream"""
485 485
486 486 def __init__(self, fp):
487 487 self._fp = fp
488 488 self._seekable = (util.safehasattr(fp, 'seek') and
489 489 util.safehasattr(fp, 'tell'))
490 490
491 491 def _unpack(self, format):
492 492 """unpack this struct format from the stream"""
493 493 data = self._readexact(struct.calcsize(format))
494 494 return _unpack(format, data)
495 495
496 496 def _readexact(self, size):
497 497 """read exactly <size> bytes from the stream"""
498 498 return changegroup.readexactly(self._fp, size)
499 499
500 def seek(self, offset, whence):
500 def seek(self, offset, whence=0):
501 501 """move the underlying file pointer"""
502 502 if self._seekable:
503 503 return self._fp.seek(offset, whence)
504 504 else:
505 505 raise NotImplementedError(_('File pointer is not seekable'))
506 506
507 507 def tell(self):
508 508 """return the file offset, or None if file is not seekable"""
509 509 if self._seekable:
510 510 try:
511 511 return self._fp.tell()
512 512 except IOError, e:
513 513 if e.errno == errno.ESPIPE:
514 514 self._seekable = False
515 515 else:
516 516 raise
517 517 return None
518 518
519 519 def close(self):
520 520 """close underlying file"""
521 521 if util.safehasattr(self._fp, 'close'):
522 522 return self._fp.close()
523 523
524 524 class unbundle20(unpackermixin):
525 525 """interpret a bundle2 stream
526 526
527 527 This class is fed with a binary stream and yields parts through its
528 528 `iterparts` methods."""
529 529
530 530 def __init__(self, ui, fp, header=None):
531 531 """If header is specified, we do not read it out of the stream."""
532 532 self.ui = ui
533 533 super(unbundle20, self).__init__(fp)
534 534 if header is None:
535 535 header = self._readexact(4)
536 536 magic, version = header[0:2], header[2:4]
537 537 if magic != 'HG':
538 538 raise util.Abort(_('not a Mercurial bundle'))
539 539 if version != '2Y':
540 540 raise util.Abort(_('unknown bundle version %s') % version)
541 541 self.ui.debug('start processing of %s stream\n' % header)
542 542
543 543 @util.propertycache
544 544 def params(self):
545 545 """dictionary of stream level parameters"""
546 546 self.ui.debug('reading bundle2 stream parameters\n')
547 547 params = {}
548 548 paramssize = self._unpack(_fstreamparamsize)[0]
549 549 if paramssize < 0:
550 550 raise error.BundleValueError('negative bundle param size: %i'
551 551 % paramssize)
552 552 if paramssize:
553 553 for p in self._readexact(paramssize).split(' '):
554 554 p = p.split('=', 1)
555 555 p = [urllib.unquote(i) for i in p]
556 556 if len(p) < 2:
557 557 p.append(None)
558 558 self._processparam(*p)
559 559 params[p[0]] = p[1]
560 560 return params
561 561
562 562 def _processparam(self, name, value):
563 563 """process a parameter, applying its effect if needed
564 564
565 565 Parameter starting with a lower case letter are advisory and will be
566 566 ignored when unknown. Those starting with an upper case letter are
567 567 mandatory and will this function will raise a KeyError when unknown.
568 568
569 569 Note: no option are currently supported. Any input will be either
570 570 ignored or failing.
571 571 """
572 572 if not name:
573 573 raise ValueError('empty parameter name')
574 574 if name[0] not in string.letters:
575 575 raise ValueError('non letter first character: %r' % name)
576 576 # Some logic will be later added here to try to process the option for
577 577 # a dict of known parameter.
578 578 if name[0].islower():
579 579 self.ui.debug("ignoring unknown parameter %r\n" % name)
580 580 else:
581 581 raise error.UnsupportedPartError(params=(name,))
582 582
583 583
584 584 def iterparts(self):
585 585 """yield all parts contained in the stream"""
586 586 # make sure param have been loaded
587 587 self.params
588 588 self.ui.debug('start extraction of bundle2 parts\n')
589 589 headerblock = self._readpartheader()
590 590 while headerblock is not None:
591 591 part = unbundlepart(self.ui, headerblock, self._fp)
592 592 yield part
593 593 part.seek(0, 2)
594 594 headerblock = self._readpartheader()
595 595 self.ui.debug('end of bundle2 stream\n')
596 596
597 597 def _readpartheader(self):
598 598 """reads a part header size and return the bytes blob
599 599
600 600 returns None if empty"""
601 601 headersize = self._unpack(_fpartheadersize)[0]
602 602 if headersize < 0:
603 603 raise error.BundleValueError('negative part header size: %i'
604 604 % headersize)
605 605 self.ui.debug('part header size: %i\n' % headersize)
606 606 if headersize:
607 607 return self._readexact(headersize)
608 608 return None
609 609
610 610
611 611 class bundlepart(object):
612 612 """A bundle2 part contains application level payload
613 613
614 614 The part `type` is used to route the part to the application level
615 615 handler.
616 616
617 617 The part payload is contained in ``part.data``. It could be raw bytes or a
618 618 generator of byte chunks.
619 619
620 620 You can add parameters to the part using the ``addparam`` method.
621 621 Parameters can be either mandatory (default) or advisory. Remote side
622 622 should be able to safely ignore the advisory ones.
623 623
624 624 Both data and parameters cannot be modified after the generation has begun.
625 625 """
626 626
627 627 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
628 628 data='', mandatory=True):
629 629 validateparttype(parttype)
630 630 self.id = None
631 631 self.type = parttype
632 632 self._data = data
633 633 self._mandatoryparams = list(mandatoryparams)
634 634 self._advisoryparams = list(advisoryparams)
635 635 # checking for duplicated entries
636 636 self._seenparams = set()
637 637 for pname, __ in self._mandatoryparams + self._advisoryparams:
638 638 if pname in self._seenparams:
639 639 raise RuntimeError('duplicated params: %s' % pname)
640 640 self._seenparams.add(pname)
641 641 # status of the part's generation:
642 642 # - None: not started,
643 643 # - False: currently generated,
644 644 # - True: generation done.
645 645 self._generated = None
646 646 self.mandatory = mandatory
647 647
648 648 # methods used to defines the part content
649 649 def __setdata(self, data):
650 650 if self._generated is not None:
651 651 raise error.ReadOnlyPartError('part is being generated')
652 652 self._data = data
653 653 def __getdata(self):
654 654 return self._data
655 655 data = property(__getdata, __setdata)
656 656
657 657 @property
658 658 def mandatoryparams(self):
659 659 # make it an immutable tuple to force people through ``addparam``
660 660 return tuple(self._mandatoryparams)
661 661
662 662 @property
663 663 def advisoryparams(self):
664 664 # make it an immutable tuple to force people through ``addparam``
665 665 return tuple(self._advisoryparams)
666 666
667 667 def addparam(self, name, value='', mandatory=True):
668 668 if self._generated is not None:
669 669 raise error.ReadOnlyPartError('part is being generated')
670 670 if name in self._seenparams:
671 671 raise ValueError('duplicated params: %s' % name)
672 672 self._seenparams.add(name)
673 673 params = self._advisoryparams
674 674 if mandatory:
675 675 params = self._mandatoryparams
676 676 params.append((name, value))
677 677
678 678 # methods used to generates the bundle2 stream
679 679 def getchunks(self):
680 680 if self._generated is not None:
681 681 raise RuntimeError('part can only be consumed once')
682 682 self._generated = False
683 683 #### header
684 684 if self.mandatory:
685 685 parttype = self.type.upper()
686 686 else:
687 687 parttype = self.type.lower()
688 688 ## parttype
689 689 header = [_pack(_fparttypesize, len(parttype)),
690 690 parttype, _pack(_fpartid, self.id),
691 691 ]
692 692 ## parameters
693 693 # count
694 694 manpar = self.mandatoryparams
695 695 advpar = self.advisoryparams
696 696 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
697 697 # size
698 698 parsizes = []
699 699 for key, value in manpar:
700 700 parsizes.append(len(key))
701 701 parsizes.append(len(value))
702 702 for key, value in advpar:
703 703 parsizes.append(len(key))
704 704 parsizes.append(len(value))
705 705 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
706 706 header.append(paramsizes)
707 707 # key, value
708 708 for key, value in manpar:
709 709 header.append(key)
710 710 header.append(value)
711 711 for key, value in advpar:
712 712 header.append(key)
713 713 header.append(value)
714 714 ## finalize header
715 715 headerchunk = ''.join(header)
716 716 yield _pack(_fpartheadersize, len(headerchunk))
717 717 yield headerchunk
718 718 ## payload
719 719 try:
720 720 for chunk in self._payloadchunks():
721 721 yield _pack(_fpayloadsize, len(chunk))
722 722 yield chunk
723 723 except Exception, exc:
724 724 # backup exception data for later
725 725 exc_info = sys.exc_info()
726 726 msg = 'unexpected error: %s' % exc
727 727 interpart = bundlepart('b2x:error:abort', [('message', msg)],
728 728 mandatory=False)
729 729 interpart.id = 0
730 730 yield _pack(_fpayloadsize, -1)
731 731 for chunk in interpart.getchunks():
732 732 yield chunk
733 733 # abort current part payload
734 734 yield _pack(_fpayloadsize, 0)
735 735 raise exc_info[0], exc_info[1], exc_info[2]
736 736 # end of payload
737 737 yield _pack(_fpayloadsize, 0)
738 738 self._generated = True
739 739
740 740 def _payloadchunks(self):
741 741 """yield chunks of a the part payload
742 742
743 743 Exists to handle the different methods to provide data to a part."""
744 744 # we only support fixed size data now.
745 745 # This will be improved in the future.
746 746 if util.safehasattr(self.data, 'next'):
747 747 buff = util.chunkbuffer(self.data)
748 748 chunk = buff.read(preferedchunksize)
749 749 while chunk:
750 750 yield chunk
751 751 chunk = buff.read(preferedchunksize)
752 752 elif len(self.data):
753 753 yield self.data
754 754
755 755
756 756 flaginterrupt = -1
757 757
758 758 class interrupthandler(unpackermixin):
759 759 """read one part and process it with restricted capability
760 760
761 761 This allows to transmit exception raised on the producer size during part
762 762 iteration while the consumer is reading a part.
763 763
764 764 Part processed in this manner only have access to a ui object,"""
765 765
766 766 def __init__(self, ui, fp):
767 767 super(interrupthandler, self).__init__(fp)
768 768 self.ui = ui
769 769
770 770 def _readpartheader(self):
771 771 """reads a part header size and return the bytes blob
772 772
773 773 returns None if empty"""
774 774 headersize = self._unpack(_fpartheadersize)[0]
775 775 if headersize < 0:
776 776 raise error.BundleValueError('negative part header size: %i'
777 777 % headersize)
778 778 self.ui.debug('part header size: %i\n' % headersize)
779 779 if headersize:
780 780 return self._readexact(headersize)
781 781 return None
782 782
783 783 def __call__(self):
784 784 self.ui.debug('bundle2 stream interruption, looking for a part.\n')
785 785 headerblock = self._readpartheader()
786 786 if headerblock is None:
787 787 self.ui.debug('no part found during interruption.\n')
788 788 return
789 789 part = unbundlepart(self.ui, headerblock, self._fp)
790 790 op = interruptoperation(self.ui)
791 791 _processpart(op, part)
792 792
793 793 class interruptoperation(object):
794 794 """A limited operation to be use by part handler during interruption
795 795
796 796 It only have access to an ui object.
797 797 """
798 798
799 799 def __init__(self, ui):
800 800 self.ui = ui
801 801 self.reply = None
802 802
803 803 @property
804 804 def repo(self):
805 805 raise RuntimeError('no repo access from stream interruption')
806 806
807 807 def gettransaction(self):
808 808 raise TransactionUnavailable('no repo access from stream interruption')
809 809
810 810 class unbundlepart(unpackermixin):
811 811 """a bundle part read from a bundle"""
812 812
813 813 def __init__(self, ui, header, fp):
814 814 super(unbundlepart, self).__init__(fp)
815 815 self.ui = ui
816 816 # unbundle state attr
817 817 self._headerdata = header
818 818 self._headeroffset = 0
819 819 self._initialized = False
820 820 self.consumed = False
821 821 # part data
822 822 self.id = None
823 823 self.type = None
824 824 self.mandatoryparams = None
825 825 self.advisoryparams = None
826 826 self.params = None
827 827 self.mandatorykeys = ()
828 828 self._payloadstream = None
829 829 self._readheader()
830 830 self._mandatory = None
831 831 self._chunkindex = [] #(payload, file) position tuples for chunk starts
832 832 self._pos = 0
833 833
834 834 def _fromheader(self, size):
835 835 """return the next <size> byte from the header"""
836 836 offset = self._headeroffset
837 837 data = self._headerdata[offset:(offset + size)]
838 838 self._headeroffset = offset + size
839 839 return data
840 840
841 841 def _unpackheader(self, format):
842 842 """read given format from header
843 843
844 844 This automatically compute the size of the format to read."""
845 845 data = self._fromheader(struct.calcsize(format))
846 846 return _unpack(format, data)
847 847
848 848 def _initparams(self, mandatoryparams, advisoryparams):
849 849 """internal function to setup all logic related parameters"""
850 850 # make it read only to prevent people touching it by mistake.
851 851 self.mandatoryparams = tuple(mandatoryparams)
852 852 self.advisoryparams = tuple(advisoryparams)
853 853 # user friendly UI
854 854 self.params = dict(self.mandatoryparams)
855 855 self.params.update(dict(self.advisoryparams))
856 856 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
857 857
858 858 def _payloadchunks(self, chunknum=0):
859 859 '''seek to specified chunk and start yielding data'''
860 860 if len(self._chunkindex) == 0:
861 861 assert chunknum == 0, 'Must start with chunk 0'
862 862 self._chunkindex.append((0, super(unbundlepart, self).tell()))
863 863 else:
864 864 assert chunknum < len(self._chunkindex), \
865 865 'Unknown chunk %d' % chunknum
866 866 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
867 867
868 868 pos = self._chunkindex[chunknum][0]
869 869 payloadsize = self._unpack(_fpayloadsize)[0]
870 870 self.ui.debug('payload chunk size: %i\n' % payloadsize)
871 871 while payloadsize:
872 872 if payloadsize == flaginterrupt:
873 873 # interruption detection, the handler will now read a
874 874 # single part and process it.
875 875 interrupthandler(self.ui, self._fp)()
876 876 elif payloadsize < 0:
877 877 msg = 'negative payload chunk size: %i' % payloadsize
878 878 raise error.BundleValueError(msg)
879 879 else:
880 880 result = self._readexact(payloadsize)
881 881 chunknum += 1
882 882 pos += payloadsize
883 883 if chunknum == len(self._chunkindex):
884 884 self._chunkindex.append((pos,
885 885 super(unbundlepart, self).tell()))
886 886 yield result
887 887 payloadsize = self._unpack(_fpayloadsize)[0]
888 888 self.ui.debug('payload chunk size: %i\n' % payloadsize)
889 889
890 890 def _findchunk(self, pos):
891 891 '''for a given payload position, return a chunk number and offset'''
892 892 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
893 893 if ppos == pos:
894 894 return chunk, 0
895 895 elif ppos > pos:
896 896 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
897 897 raise ValueError('Unknown chunk')
898 898
899 899 def _readheader(self):
900 900 """read the header and setup the object"""
901 901 typesize = self._unpackheader(_fparttypesize)[0]
902 902 self.type = self._fromheader(typesize)
903 903 self.ui.debug('part type: "%s"\n' % self.type)
904 904 self.id = self._unpackheader(_fpartid)[0]
905 905 self.ui.debug('part id: "%s"\n' % self.id)
906 906 # extract mandatory bit from type
907 907 self.mandatory = (self.type != self.type.lower())
908 908 self.type = self.type.lower()
909 909 ## reading parameters
910 910 # param count
911 911 mancount, advcount = self._unpackheader(_fpartparamcount)
912 912 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
913 913 # param size
914 914 fparamsizes = _makefpartparamsizes(mancount + advcount)
915 915 paramsizes = self._unpackheader(fparamsizes)
916 916 # make it a list of couple again
917 917 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
918 918 # split mandatory from advisory
919 919 mansizes = paramsizes[:mancount]
920 920 advsizes = paramsizes[mancount:]
921 921 # retrieve param value
922 922 manparams = []
923 923 for key, value in mansizes:
924 924 manparams.append((self._fromheader(key), self._fromheader(value)))
925 925 advparams = []
926 926 for key, value in advsizes:
927 927 advparams.append((self._fromheader(key), self._fromheader(value)))
928 928 self._initparams(manparams, advparams)
929 929 ## part payload
930 930 self._payloadstream = util.chunkbuffer(self._payloadchunks())
931 931 # we read the data, tell it
932 932 self._initialized = True
933 933
934 934 def read(self, size=None):
935 935 """read payload data"""
936 936 if not self._initialized:
937 937 self._readheader()
938 938 if size is None:
939 939 data = self._payloadstream.read()
940 940 else:
941 941 data = self._payloadstream.read(size)
942 942 if size is None or len(data) < size:
943 943 self.consumed = True
944 944 self._pos += len(data)
945 945 return data
946 946
947 947 def tell(self):
948 948 return self._pos
949 949
950 950 def seek(self, offset, whence=0):
951 951 if whence == 0:
952 952 newpos = offset
953 953 elif whence == 1:
954 954 newpos = self._pos + offset
955 955 elif whence == 2:
956 956 if not self.consumed:
957 957 self.read()
958 958 newpos = self._chunkindex[-1][0] - offset
959 959 else:
960 960 raise ValueError('Unknown whence value: %r' % (whence,))
961 961
962 962 if newpos > self._chunkindex[-1][0] and not self.consumed:
963 963 self.read()
964 964 if not 0 <= newpos <= self._chunkindex[-1][0]:
965 965 raise ValueError('Offset out of range')
966 966
967 967 if self._pos != newpos:
968 968 chunk, internaloffset = self._findchunk(newpos)
969 969 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
970 970 adjust = self.read(internaloffset)
971 971 if len(adjust) != internaloffset:
972 972 raise util.Abort(_('Seek failed\n'))
973 973 self._pos = newpos
974 974
975 975 capabilities = {'HG2Y': (),
976 976 'b2x:listkeys': (),
977 977 'b2x:pushkey': (),
978 978 'digests': tuple(sorted(util.DIGESTS.keys())),
979 979 'b2x:remote-changegroup': ('http', 'https'),
980 980 }
981 981
982 982 def getrepocaps(repo, allowpushback=False):
983 983 """return the bundle2 capabilities for a given repo
984 984
985 985 Exists to allow extensions (like evolution) to mutate the capabilities.
986 986 """
987 987 caps = capabilities.copy()
988 988 caps['b2x:changegroup'] = tuple(sorted(changegroup.packermap.keys()))
989 989 if obsolete.isenabled(repo, obsolete.exchangeopt):
990 990 supportedformat = tuple('V%i' % v for v in obsolete.formats)
991 991 caps['b2x:obsmarkers'] = supportedformat
992 992 if allowpushback:
993 993 caps['b2x:pushback'] = ()
994 994 return caps
995 995
996 996 def bundle2caps(remote):
997 997 """return the bundle capabilities of a peer as dict"""
998 998 raw = remote.capable('bundle2-exp')
999 999 if not raw and raw != '':
1000 1000 return {}
1001 1001 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
1002 1002 return decodecaps(capsblob)
1003 1003
1004 1004 def obsmarkersversion(caps):
1005 1005 """extract the list of supported obsmarkers versions from a bundle2caps dict
1006 1006 """
1007 1007 obscaps = caps.get('b2x:obsmarkers', ())
1008 1008 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1009 1009
1010 1010 @parthandler('b2x:changegroup', ('version',))
1011 1011 def handlechangegroup(op, inpart):
1012 1012 """apply a changegroup part on the repo
1013 1013
1014 1014 This is a very early implementation that will massive rework before being
1015 1015 inflicted to any end-user.
1016 1016 """
1017 1017 # Make sure we trigger a transaction creation
1018 1018 #
1019 1019 # The addchangegroup function will get a transaction object by itself, but
1020 1020 # we need to make sure we trigger the creation of a transaction object used
1021 1021 # for the whole processing scope.
1022 1022 op.gettransaction()
1023 1023 unpackerversion = inpart.params.get('version', '01')
1024 1024 # We should raise an appropriate exception here
1025 1025 unpacker = changegroup.packermap[unpackerversion][1]
1026 1026 cg = unpacker(inpart, 'UN')
1027 1027 # the source and url passed here are overwritten by the one contained in
1028 1028 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1029 1029 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1030 1030 op.records.add('changegroup', {'return': ret})
1031 1031 if op.reply is not None:
1032 1032 # This is definitely not the final form of this
1033 1033 # return. But one need to start somewhere.
1034 1034 part = op.reply.newpart('b2x:reply:changegroup', mandatory=False)
1035 1035 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1036 1036 part.addparam('return', '%i' % ret, mandatory=False)
1037 1037 assert not inpart.read()
1038 1038
1039 1039 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1040 1040 ['digest:%s' % k for k in util.DIGESTS.keys()])
1041 1041 @parthandler('b2x:remote-changegroup', _remotechangegroupparams)
1042 1042 def handleremotechangegroup(op, inpart):
1043 1043 """apply a bundle10 on the repo, given an url and validation information
1044 1044
1045 1045 All the information about the remote bundle to import are given as
1046 1046 parameters. The parameters include:
1047 1047 - url: the url to the bundle10.
1048 1048 - size: the bundle10 file size. It is used to validate what was
1049 1049 retrieved by the client matches the server knowledge about the bundle.
1050 1050 - digests: a space separated list of the digest types provided as
1051 1051 parameters.
1052 1052 - digest:<digest-type>: the hexadecimal representation of the digest with
1053 1053 that name. Like the size, it is used to validate what was retrieved by
1054 1054 the client matches what the server knows about the bundle.
1055 1055
1056 1056 When multiple digest types are given, all of them are checked.
1057 1057 """
1058 1058 try:
1059 1059 raw_url = inpart.params['url']
1060 1060 except KeyError:
1061 1061 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1062 1062 parsed_url = util.url(raw_url)
1063 1063 if parsed_url.scheme not in capabilities['b2x:remote-changegroup']:
1064 1064 raise util.Abort(_('remote-changegroup does not support %s urls') %
1065 1065 parsed_url.scheme)
1066 1066
1067 1067 try:
1068 1068 size = int(inpart.params['size'])
1069 1069 except ValueError:
1070 1070 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1071 1071 % 'size')
1072 1072 except KeyError:
1073 1073 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1074 1074
1075 1075 digests = {}
1076 1076 for typ in inpart.params.get('digests', '').split():
1077 1077 param = 'digest:%s' % typ
1078 1078 try:
1079 1079 value = inpart.params[param]
1080 1080 except KeyError:
1081 1081 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1082 1082 param)
1083 1083 digests[typ] = value
1084 1084
1085 1085 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1086 1086
1087 1087 # Make sure we trigger a transaction creation
1088 1088 #
1089 1089 # The addchangegroup function will get a transaction object by itself, but
1090 1090 # we need to make sure we trigger the creation of a transaction object used
1091 1091 # for the whole processing scope.
1092 1092 op.gettransaction()
1093 1093 import exchange
1094 1094 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1095 1095 if not isinstance(cg, changegroup.cg1unpacker):
1096 1096 raise util.Abort(_('%s: not a bundle version 1.0') %
1097 1097 util.hidepassword(raw_url))
1098 1098 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1099 1099 op.records.add('changegroup', {'return': ret})
1100 1100 if op.reply is not None:
1101 1101 # This is definitely not the final form of this
1102 1102 # return. But one need to start somewhere.
1103 1103 part = op.reply.newpart('b2x:reply:changegroup')
1104 1104 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1105 1105 part.addparam('return', '%i' % ret, mandatory=False)
1106 1106 try:
1107 1107 real_part.validate()
1108 1108 except util.Abort, e:
1109 1109 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1110 1110 (util.hidepassword(raw_url), str(e)))
1111 1111 assert not inpart.read()
1112 1112
1113 1113 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
1114 1114 def handlereplychangegroup(op, inpart):
1115 1115 ret = int(inpart.params['return'])
1116 1116 replyto = int(inpart.params['in-reply-to'])
1117 1117 op.records.add('changegroup', {'return': ret}, replyto)
1118 1118
1119 1119 @parthandler('b2x:check:heads')
1120 1120 def handlecheckheads(op, inpart):
1121 1121 """check that head of the repo did not change
1122 1122
1123 1123 This is used to detect a push race when using unbundle.
1124 1124 This replaces the "heads" argument of unbundle."""
1125 1125 h = inpart.read(20)
1126 1126 heads = []
1127 1127 while len(h) == 20:
1128 1128 heads.append(h)
1129 1129 h = inpart.read(20)
1130 1130 assert not h
1131 1131 if heads != op.repo.heads():
1132 1132 raise error.PushRaced('repository changed while pushing - '
1133 1133 'please try again')
1134 1134
1135 1135 @parthandler('b2x:output')
1136 1136 def handleoutput(op, inpart):
1137 1137 """forward output captured on the server to the client"""
1138 1138 for line in inpart.read().splitlines():
1139 1139 op.ui.write(('remote: %s\n' % line))
1140 1140
1141 1141 @parthandler('b2x:replycaps')
1142 1142 def handlereplycaps(op, inpart):
1143 1143 """Notify that a reply bundle should be created
1144 1144
1145 1145 The payload contains the capabilities information for the reply"""
1146 1146 caps = decodecaps(inpart.read())
1147 1147 if op.reply is None:
1148 1148 op.reply = bundle20(op.ui, caps)
1149 1149
1150 1150 @parthandler('b2x:error:abort', ('message', 'hint'))
1151 1151 def handlereplycaps(op, inpart):
1152 1152 """Used to transmit abort error over the wire"""
1153 1153 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1154 1154
1155 1155 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
1156 1156 def handlereplycaps(op, inpart):
1157 1157 """Used to transmit unknown content error over the wire"""
1158 1158 kwargs = {}
1159 1159 parttype = inpart.params.get('parttype')
1160 1160 if parttype is not None:
1161 1161 kwargs['parttype'] = parttype
1162 1162 params = inpart.params.get('params')
1163 1163 if params is not None:
1164 1164 kwargs['params'] = params.split('\0')
1165 1165
1166 1166 raise error.UnsupportedPartError(**kwargs)
1167 1167
1168 1168 @parthandler('b2x:error:pushraced', ('message',))
1169 1169 def handlereplycaps(op, inpart):
1170 1170 """Used to transmit push race error over the wire"""
1171 1171 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1172 1172
1173 1173 @parthandler('b2x:listkeys', ('namespace',))
1174 1174 def handlelistkeys(op, inpart):
1175 1175 """retrieve pushkey namespace content stored in a bundle2"""
1176 1176 namespace = inpart.params['namespace']
1177 1177 r = pushkey.decodekeys(inpart.read())
1178 1178 op.records.add('listkeys', (namespace, r))
1179 1179
1180 1180 @parthandler('b2x:pushkey', ('namespace', 'key', 'old', 'new'))
1181 1181 def handlepushkey(op, inpart):
1182 1182 """process a pushkey request"""
1183 1183 dec = pushkey.decode
1184 1184 namespace = dec(inpart.params['namespace'])
1185 1185 key = dec(inpart.params['key'])
1186 1186 old = dec(inpart.params['old'])
1187 1187 new = dec(inpart.params['new'])
1188 1188 ret = op.repo.pushkey(namespace, key, old, new)
1189 1189 record = {'namespace': namespace,
1190 1190 'key': key,
1191 1191 'old': old,
1192 1192 'new': new}
1193 1193 op.records.add('pushkey', record)
1194 1194 if op.reply is not None:
1195 1195 rpart = op.reply.newpart('b2x:reply:pushkey')
1196 1196 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1197 1197 rpart.addparam('return', '%i' % ret, mandatory=False)
1198 1198
1199 1199 @parthandler('b2x:reply:pushkey', ('return', 'in-reply-to'))
1200 1200 def handlepushkeyreply(op, inpart):
1201 1201 """retrieve the result of a pushkey request"""
1202 1202 ret = int(inpart.params['return'])
1203 1203 partid = int(inpart.params['in-reply-to'])
1204 1204 op.records.add('pushkey', {'return': ret}, partid)
1205 1205
1206 1206 @parthandler('b2x:obsmarkers')
1207 1207 def handleobsmarker(op, inpart):
1208 1208 """add a stream of obsmarkers to the repo"""
1209 1209 tr = op.gettransaction()
1210 1210 new = op.repo.obsstore.mergemarkers(tr, inpart.read())
1211 1211 if new:
1212 1212 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1213 1213 op.records.add('obsmarkers', {'new': new})
1214 1214 if op.reply is not None:
1215 1215 rpart = op.reply.newpart('b2x:reply:obsmarkers')
1216 1216 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1217 1217 rpart.addparam('new', '%i' % new, mandatory=False)
1218 1218
1219 1219
1220 1220 @parthandler('b2x:reply:obsmarkers', ('new', 'in-reply-to'))
1221 1221 def handlepushkeyreply(op, inpart):
1222 1222 """retrieve the result of a pushkey request"""
1223 1223 ret = int(inpart.params['new'])
1224 1224 partid = int(inpart.params['in-reply-to'])
1225 1225 op.records.add('obsmarkers', {'new': ret}, partid)
General Comments 0
You need to be logged in to leave comments. Login now