##// END OF EJS Templates
bundle2.unpackermixin: control for underlying file descriptor...
Eric Sumner -
r24026:3daef83a default
parent child Browse files
Show More
@@ -1,1141 +1,1167 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part headers. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 import errno
148 149 import sys
149 150 import util
150 151 import struct
151 152 import urllib
152 153 import string
153 154 import obsolete
154 155 import pushkey
155 156 import url
156 157 import re
157 158
158 159 import changegroup, error
159 160 from i18n import _
160 161
161 162 _pack = struct.pack
162 163 _unpack = struct.unpack
163 164
164 165 _magicstring = 'HG2Y'
165 166
166 167 _fstreamparamsize = '>i'
167 168 _fpartheadersize = '>i'
168 169 _fparttypesize = '>B'
169 170 _fpartid = '>I'
170 171 _fpayloadsize = '>i'
171 172 _fpartparamcount = '>BB'
172 173
173 174 preferedchunksize = 4096
174 175
175 176 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
176 177
177 178 def validateparttype(parttype):
178 179 """raise ValueError if a parttype contains invalid character"""
179 180 if _parttypeforbidden.search(parttype):
180 181 raise ValueError(parttype)
181 182
182 183 def _makefpartparamsizes(nbparams):
183 184 """return a struct format to read part parameter sizes
184 185
185 186 The number parameters is variable so we need to build that format
186 187 dynamically.
187 188 """
188 189 return '>'+('BB'*nbparams)
189 190
190 191 parthandlermapping = {}
191 192
192 193 def parthandler(parttype, params=()):
193 194 """decorator that register a function as a bundle2 part handler
194 195
195 196 eg::
196 197
197 198 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
198 199 def myparttypehandler(...):
199 200 '''process a part of type "my part".'''
200 201 ...
201 202 """
202 203 validateparttype(parttype)
203 204 def _decorator(func):
204 205 lparttype = parttype.lower() # enforce lower case matching.
205 206 assert lparttype not in parthandlermapping
206 207 parthandlermapping[lparttype] = func
207 208 func.params = frozenset(params)
208 209 return func
209 210 return _decorator
210 211
211 212 class unbundlerecords(object):
212 213 """keep record of what happens during and unbundle
213 214
214 215 New records are added using `records.add('cat', obj)`. Where 'cat' is a
215 216 category of record and obj is an arbitrary object.
216 217
217 218 `records['cat']` will return all entries of this category 'cat'.
218 219
219 220 Iterating on the object itself will yield `('category', obj)` tuples
220 221 for all entries.
221 222
222 223 All iterations happens in chronological order.
223 224 """
224 225
225 226 def __init__(self):
226 227 self._categories = {}
227 228 self._sequences = []
228 229 self._replies = {}
229 230
230 231 def add(self, category, entry, inreplyto=None):
231 232 """add a new record of a given category.
232 233
233 234 The entry can then be retrieved in the list returned by
234 235 self['category']."""
235 236 self._categories.setdefault(category, []).append(entry)
236 237 self._sequences.append((category, entry))
237 238 if inreplyto is not None:
238 239 self.getreplies(inreplyto).add(category, entry)
239 240
240 241 def getreplies(self, partid):
241 242 """get the records that are replies to a specific part"""
242 243 return self._replies.setdefault(partid, unbundlerecords())
243 244
244 245 def __getitem__(self, cat):
245 246 return tuple(self._categories.get(cat, ()))
246 247
247 248 def __iter__(self):
248 249 return iter(self._sequences)
249 250
250 251 def __len__(self):
251 252 return len(self._sequences)
252 253
253 254 def __nonzero__(self):
254 255 return bool(self._sequences)
255 256
256 257 class bundleoperation(object):
257 258 """an object that represents a single bundling process
258 259
259 260 Its purpose is to carry unbundle-related objects and states.
260 261
261 262 A new object should be created at the beginning of each bundle processing.
262 263 The object is to be returned by the processing function.
263 264
264 265 The object has very little content now it will ultimately contain:
265 266 * an access to the repo the bundle is applied to,
266 267 * a ui object,
267 268 * a way to retrieve a transaction to add changes to the repo,
268 269 * a way to record the result of processing each part,
269 270 * a way to construct a bundle response when applicable.
270 271 """
271 272
272 273 def __init__(self, repo, transactiongetter):
273 274 self.repo = repo
274 275 self.ui = repo.ui
275 276 self.records = unbundlerecords()
276 277 self.gettransaction = transactiongetter
277 278 self.reply = None
278 279
279 280 class TransactionUnavailable(RuntimeError):
280 281 pass
281 282
282 283 def _notransaction():
283 284 """default method to get a transaction while processing a bundle
284 285
285 286 Raise an exception to highlight the fact that no transaction was expected
286 287 to be created"""
287 288 raise TransactionUnavailable()
288 289
289 290 def processbundle(repo, unbundler, transactiongetter=None):
290 291 """This function process a bundle, apply effect to/from a repo
291 292
292 293 It iterates over each part then searches for and uses the proper handling
293 294 code to process the part. Parts are processed in order.
294 295
295 296 This is very early version of this function that will be strongly reworked
296 297 before final usage.
297 298
298 299 Unknown Mandatory part will abort the process.
299 300 """
300 301 if transactiongetter is None:
301 302 transactiongetter = _notransaction
302 303 op = bundleoperation(repo, transactiongetter)
303 304 # todo:
304 305 # - replace this is a init function soon.
305 306 # - exception catching
306 307 unbundler.params
307 308 iterparts = unbundler.iterparts()
308 309 part = None
309 310 try:
310 311 for part in iterparts:
311 312 _processpart(op, part)
312 313 except Exception, exc:
313 314 for part in iterparts:
314 315 # consume the bundle content
315 316 part.read()
316 317 # Small hack to let caller code distinguish exceptions from bundle2
317 318 # processing from processing the old format. This is mostly
318 319 # needed to handle different return codes to unbundle according to the
319 320 # type of bundle. We should probably clean up or drop this return code
320 321 # craziness in a future version.
321 322 exc.duringunbundle2 = True
322 323 raise
323 324 return op
324 325
325 326 def _processpart(op, part):
326 327 """process a single part from a bundle
327 328
328 329 The part is guaranteed to have been fully consumed when the function exits
329 330 (even if an exception is raised)."""
330 331 try:
331 332 try:
332 333 handler = parthandlermapping.get(part.type)
333 334 if handler is None:
334 335 raise error.UnsupportedPartError(parttype=part.type)
335 336 op.ui.debug('found a handler for part %r\n' % part.type)
336 337 unknownparams = part.mandatorykeys - handler.params
337 338 if unknownparams:
338 339 unknownparams = list(unknownparams)
339 340 unknownparams.sort()
340 341 raise error.UnsupportedPartError(parttype=part.type,
341 342 params=unknownparams)
342 343 except error.UnsupportedPartError, exc:
343 344 if part.mandatory: # mandatory parts
344 345 raise
345 346 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
346 347 return # skip to part processing
347 348
348 349 # handler is called outside the above try block so that we don't
349 350 # risk catching KeyErrors from anything other than the
350 351 # parthandlermapping lookup (any KeyError raised by handler()
351 352 # itself represents a defect of a different variety).
352 353 output = None
353 354 if op.reply is not None:
354 355 op.ui.pushbuffer(error=True)
355 356 output = ''
356 357 try:
357 358 handler(op, part)
358 359 finally:
359 360 if output is not None:
360 361 output = op.ui.popbuffer()
361 362 if output:
362 363 outpart = op.reply.newpart('b2x:output', data=output,
363 364 mandatory=False)
364 365 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
365 366 finally:
366 367 # consume the part content to not corrupt the stream.
367 368 part.read()
368 369
369 370
370 371 def decodecaps(blob):
371 372 """decode a bundle2 caps bytes blob into a dictionary
372 373
373 374 The blob is a list of capabilities (one per line)
374 375 Capabilities may have values using a line of the form::
375 376
376 377 capability=value1,value2,value3
377 378
378 379 The values are always a list."""
379 380 caps = {}
380 381 for line in blob.splitlines():
381 382 if not line:
382 383 continue
383 384 if '=' not in line:
384 385 key, vals = line, ()
385 386 else:
386 387 key, vals = line.split('=', 1)
387 388 vals = vals.split(',')
388 389 key = urllib.unquote(key)
389 390 vals = [urllib.unquote(v) for v in vals]
390 391 caps[key] = vals
391 392 return caps
392 393
393 394 def encodecaps(caps):
394 395 """encode a bundle2 caps dictionary into a bytes blob"""
395 396 chunks = []
396 397 for ca in sorted(caps):
397 398 vals = caps[ca]
398 399 ca = urllib.quote(ca)
399 400 vals = [urllib.quote(v) for v in vals]
400 401 if vals:
401 402 ca = "%s=%s" % (ca, ','.join(vals))
402 403 chunks.append(ca)
403 404 return '\n'.join(chunks)
404 405
405 406 class bundle20(object):
406 407 """represent an outgoing bundle2 container
407 408
408 409 Use the `addparam` method to add stream level parameter. and `newpart` to
409 410 populate it. Then call `getchunks` to retrieve all the binary chunks of
410 411 data that compose the bundle2 container."""
411 412
412 413 def __init__(self, ui, capabilities=()):
413 414 self.ui = ui
414 415 self._params = []
415 416 self._parts = []
416 417 self.capabilities = dict(capabilities)
417 418
418 419 @property
419 420 def nbparts(self):
420 421 """total number of parts added to the bundler"""
421 422 return len(self._parts)
422 423
423 424 # methods used to defines the bundle2 content
424 425 def addparam(self, name, value=None):
425 426 """add a stream level parameter"""
426 427 if not name:
427 428 raise ValueError('empty parameter name')
428 429 if name[0] not in string.letters:
429 430 raise ValueError('non letter first character: %r' % name)
430 431 self._params.append((name, value))
431 432
432 433 def addpart(self, part):
433 434 """add a new part to the bundle2 container
434 435
435 436 Parts contains the actual applicative payload."""
436 437 assert part.id is None
437 438 part.id = len(self._parts) # very cheap counter
438 439 self._parts.append(part)
439 440
440 441 def newpart(self, typeid, *args, **kwargs):
441 442 """create a new part and add it to the containers
442 443
443 444 As the part is directly added to the containers. For now, this means
444 445 that any failure to properly initialize the part after calling
445 446 ``newpart`` should result in a failure of the whole bundling process.
446 447
447 448 You can still fall back to manually create and add if you need better
448 449 control."""
449 450 part = bundlepart(typeid, *args, **kwargs)
450 451 self.addpart(part)
451 452 return part
452 453
453 454 # methods used to generate the bundle2 stream
454 455 def getchunks(self):
455 456 self.ui.debug('start emission of %s stream\n' % _magicstring)
456 457 yield _magicstring
457 458 param = self._paramchunk()
458 459 self.ui.debug('bundle parameter: %s\n' % param)
459 460 yield _pack(_fstreamparamsize, len(param))
460 461 if param:
461 462 yield param
462 463
463 464 self.ui.debug('start of parts\n')
464 465 for part in self._parts:
465 466 self.ui.debug('bundle part: "%s"\n' % part.type)
466 467 for chunk in part.getchunks():
467 468 yield chunk
468 469 self.ui.debug('end of bundle\n')
469 470 yield _pack(_fpartheadersize, 0)
470 471
471 472 def _paramchunk(self):
472 473 """return a encoded version of all stream parameters"""
473 474 blocks = []
474 475 for par, value in self._params:
475 476 par = urllib.quote(par)
476 477 if value is not None:
477 478 value = urllib.quote(value)
478 479 par = '%s=%s' % (par, value)
479 480 blocks.append(par)
480 481 return ' '.join(blocks)
481 482
482 483 class unpackermixin(object):
483 484 """A mixin to extract bytes and struct data from a stream"""
484 485
485 486 def __init__(self, fp):
486 487 self._fp = fp
488 self._seekable = (util.safehasattr(fp, 'seek') and
489 util.safehasattr(fp, 'tell'))
487 490
488 491 def _unpack(self, format):
489 492 """unpack this struct format from the stream"""
490 493 data = self._readexact(struct.calcsize(format))
491 494 return _unpack(format, data)
492 495
493 496 def _readexact(self, size):
494 497 """read exactly <size> bytes from the stream"""
495 498 return changegroup.readexactly(self._fp, size)
496 499
500 def seek(self, offset, whence):
501 """move the underlying file pointer"""
502 if self._seekable:
503 return self._fp.seek(offset, whence)
504 else:
505 raise NotImplementedError(_('File pointer is not seekable'))
506
507 def tell(self):
508 """return the file offset, or None if file is not seekable"""
509 if self._seekable:
510 try:
511 return self._fp.tell()
512 except IOError, e:
513 if e.errno == errno.ESPIPE:
514 self._seekable = False
515 else:
516 raise
517 return None
518
519 def close(self):
520 """close underlying file"""
521 if util.safehasattr(self._fp, 'close'):
522 return self._fp.close()
497 523
498 524 class unbundle20(unpackermixin):
499 525 """interpret a bundle2 stream
500 526
501 527 This class is fed with a binary stream and yields parts through its
502 528 `iterparts` methods."""
503 529
504 530 def __init__(self, ui, fp, header=None):
505 531 """If header is specified, we do not read it out of the stream."""
506 532 self.ui = ui
507 533 super(unbundle20, self).__init__(fp)
508 534 if header is None:
509 535 header = self._readexact(4)
510 536 magic, version = header[0:2], header[2:4]
511 537 if magic != 'HG':
512 538 raise util.Abort(_('not a Mercurial bundle'))
513 539 if version != '2Y':
514 540 raise util.Abort(_('unknown bundle version %s') % version)
515 541 self.ui.debug('start processing of %s stream\n' % header)
516 542
517 543 @util.propertycache
518 544 def params(self):
519 545 """dictionary of stream level parameters"""
520 546 self.ui.debug('reading bundle2 stream parameters\n')
521 547 params = {}
522 548 paramssize = self._unpack(_fstreamparamsize)[0]
523 549 if paramssize < 0:
524 550 raise error.BundleValueError('negative bundle param size: %i'
525 551 % paramssize)
526 552 if paramssize:
527 553 for p in self._readexact(paramssize).split(' '):
528 554 p = p.split('=', 1)
529 555 p = [urllib.unquote(i) for i in p]
530 556 if len(p) < 2:
531 557 p.append(None)
532 558 self._processparam(*p)
533 559 params[p[0]] = p[1]
534 560 return params
535 561
536 562 def _processparam(self, name, value):
537 563 """process a parameter, applying its effect if needed
538 564
539 565 Parameter starting with a lower case letter are advisory and will be
540 566 ignored when unknown. Those starting with an upper case letter are
541 567 mandatory and will this function will raise a KeyError when unknown.
542 568
543 569 Note: no option are currently supported. Any input will be either
544 570 ignored or failing.
545 571 """
546 572 if not name:
547 573 raise ValueError('empty parameter name')
548 574 if name[0] not in string.letters:
549 575 raise ValueError('non letter first character: %r' % name)
550 576 # Some logic will be later added here to try to process the option for
551 577 # a dict of known parameter.
552 578 if name[0].islower():
553 579 self.ui.debug("ignoring unknown parameter %r\n" % name)
554 580 else:
555 581 raise error.UnsupportedPartError(params=(name,))
556 582
557 583
558 584 def iterparts(self):
559 585 """yield all parts contained in the stream"""
560 586 # make sure param have been loaded
561 587 self.params
562 588 self.ui.debug('start extraction of bundle2 parts\n')
563 589 headerblock = self._readpartheader()
564 590 while headerblock is not None:
565 591 part = unbundlepart(self.ui, headerblock, self._fp)
566 592 yield part
567 593 headerblock = self._readpartheader()
568 594 self.ui.debug('end of bundle2 stream\n')
569 595
570 596 def _readpartheader(self):
571 597 """reads a part header size and return the bytes blob
572 598
573 599 returns None if empty"""
574 600 headersize = self._unpack(_fpartheadersize)[0]
575 601 if headersize < 0:
576 602 raise error.BundleValueError('negative part header size: %i'
577 603 % headersize)
578 604 self.ui.debug('part header size: %i\n' % headersize)
579 605 if headersize:
580 606 return self._readexact(headersize)
581 607 return None
582 608
583 609
584 610 class bundlepart(object):
585 611 """A bundle2 part contains application level payload
586 612
587 613 The part `type` is used to route the part to the application level
588 614 handler.
589 615
590 616 The part payload is contained in ``part.data``. It could be raw bytes or a
591 617 generator of byte chunks.
592 618
593 619 You can add parameters to the part using the ``addparam`` method.
594 620 Parameters can be either mandatory (default) or advisory. Remote side
595 621 should be able to safely ignore the advisory ones.
596 622
597 623 Both data and parameters cannot be modified after the generation has begun.
598 624 """
599 625
600 626 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
601 627 data='', mandatory=True):
602 628 validateparttype(parttype)
603 629 self.id = None
604 630 self.type = parttype
605 631 self._data = data
606 632 self._mandatoryparams = list(mandatoryparams)
607 633 self._advisoryparams = list(advisoryparams)
608 634 # checking for duplicated entries
609 635 self._seenparams = set()
610 636 for pname, __ in self._mandatoryparams + self._advisoryparams:
611 637 if pname in self._seenparams:
612 638 raise RuntimeError('duplicated params: %s' % pname)
613 639 self._seenparams.add(pname)
614 640 # status of the part's generation:
615 641 # - None: not started,
616 642 # - False: currently generated,
617 643 # - True: generation done.
618 644 self._generated = None
619 645 self.mandatory = mandatory
620 646
621 647 # methods used to defines the part content
622 648 def __setdata(self, data):
623 649 if self._generated is not None:
624 650 raise error.ReadOnlyPartError('part is being generated')
625 651 self._data = data
626 652 def __getdata(self):
627 653 return self._data
628 654 data = property(__getdata, __setdata)
629 655
630 656 @property
631 657 def mandatoryparams(self):
632 658 # make it an immutable tuple to force people through ``addparam``
633 659 return tuple(self._mandatoryparams)
634 660
635 661 @property
636 662 def advisoryparams(self):
637 663 # make it an immutable tuple to force people through ``addparam``
638 664 return tuple(self._advisoryparams)
639 665
640 666 def addparam(self, name, value='', mandatory=True):
641 667 if self._generated is not None:
642 668 raise error.ReadOnlyPartError('part is being generated')
643 669 if name in self._seenparams:
644 670 raise ValueError('duplicated params: %s' % name)
645 671 self._seenparams.add(name)
646 672 params = self._advisoryparams
647 673 if mandatory:
648 674 params = self._mandatoryparams
649 675 params.append((name, value))
650 676
651 677 # methods used to generates the bundle2 stream
652 678 def getchunks(self):
653 679 if self._generated is not None:
654 680 raise RuntimeError('part can only be consumed once')
655 681 self._generated = False
656 682 #### header
657 683 if self.mandatory:
658 684 parttype = self.type.upper()
659 685 else:
660 686 parttype = self.type.lower()
661 687 ## parttype
662 688 header = [_pack(_fparttypesize, len(parttype)),
663 689 parttype, _pack(_fpartid, self.id),
664 690 ]
665 691 ## parameters
666 692 # count
667 693 manpar = self.mandatoryparams
668 694 advpar = self.advisoryparams
669 695 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
670 696 # size
671 697 parsizes = []
672 698 for key, value in manpar:
673 699 parsizes.append(len(key))
674 700 parsizes.append(len(value))
675 701 for key, value in advpar:
676 702 parsizes.append(len(key))
677 703 parsizes.append(len(value))
678 704 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
679 705 header.append(paramsizes)
680 706 # key, value
681 707 for key, value in manpar:
682 708 header.append(key)
683 709 header.append(value)
684 710 for key, value in advpar:
685 711 header.append(key)
686 712 header.append(value)
687 713 ## finalize header
688 714 headerchunk = ''.join(header)
689 715 yield _pack(_fpartheadersize, len(headerchunk))
690 716 yield headerchunk
691 717 ## payload
692 718 try:
693 719 for chunk in self._payloadchunks():
694 720 yield _pack(_fpayloadsize, len(chunk))
695 721 yield chunk
696 722 except Exception, exc:
697 723 # backup exception data for later
698 724 exc_info = sys.exc_info()
699 725 msg = 'unexpected error: %s' % exc
700 726 interpart = bundlepart('b2x:error:abort', [('message', msg)],
701 727 mandatory=False)
702 728 interpart.id = 0
703 729 yield _pack(_fpayloadsize, -1)
704 730 for chunk in interpart.getchunks():
705 731 yield chunk
706 732 # abort current part payload
707 733 yield _pack(_fpayloadsize, 0)
708 734 raise exc_info[0], exc_info[1], exc_info[2]
709 735 # end of payload
710 736 yield _pack(_fpayloadsize, 0)
711 737 self._generated = True
712 738
713 739 def _payloadchunks(self):
714 740 """yield chunks of a the part payload
715 741
716 742 Exists to handle the different methods to provide data to a part."""
717 743 # we only support fixed size data now.
718 744 # This will be improved in the future.
719 745 if util.safehasattr(self.data, 'next'):
720 746 buff = util.chunkbuffer(self.data)
721 747 chunk = buff.read(preferedchunksize)
722 748 while chunk:
723 749 yield chunk
724 750 chunk = buff.read(preferedchunksize)
725 751 elif len(self.data):
726 752 yield self.data
727 753
728 754
729 755 flaginterrupt = -1
730 756
731 757 class interrupthandler(unpackermixin):
732 758 """read one part and process it with restricted capability
733 759
734 760 This allows to transmit exception raised on the producer size during part
735 761 iteration while the consumer is reading a part.
736 762
737 763 Part processed in this manner only have access to a ui object,"""
738 764
739 765 def __init__(self, ui, fp):
740 766 super(interrupthandler, self).__init__(fp)
741 767 self.ui = ui
742 768
743 769 def _readpartheader(self):
744 770 """reads a part header size and return the bytes blob
745 771
746 772 returns None if empty"""
747 773 headersize = self._unpack(_fpartheadersize)[0]
748 774 if headersize < 0:
749 775 raise error.BundleValueError('negative part header size: %i'
750 776 % headersize)
751 777 self.ui.debug('part header size: %i\n' % headersize)
752 778 if headersize:
753 779 return self._readexact(headersize)
754 780 return None
755 781
756 782 def __call__(self):
757 783 self.ui.debug('bundle2 stream interruption, looking for a part.\n')
758 784 headerblock = self._readpartheader()
759 785 if headerblock is None:
760 786 self.ui.debug('no part found during interruption.\n')
761 787 return
762 788 part = unbundlepart(self.ui, headerblock, self._fp)
763 789 op = interruptoperation(self.ui)
764 790 _processpart(op, part)
765 791
766 792 class interruptoperation(object):
767 793 """A limited operation to be use by part handler during interruption
768 794
769 795 It only have access to an ui object.
770 796 """
771 797
772 798 def __init__(self, ui):
773 799 self.ui = ui
774 800 self.reply = None
775 801
776 802 @property
777 803 def repo(self):
778 804 raise RuntimeError('no repo access from stream interruption')
779 805
780 806 def gettransaction(self):
781 807 raise TransactionUnavailable('no repo access from stream interruption')
782 808
783 809 class unbundlepart(unpackermixin):
784 810 """a bundle part read from a bundle"""
785 811
786 812 def __init__(self, ui, header, fp):
787 813 super(unbundlepart, self).__init__(fp)
788 814 self.ui = ui
789 815 # unbundle state attr
790 816 self._headerdata = header
791 817 self._headeroffset = 0
792 818 self._initialized = False
793 819 self.consumed = False
794 820 # part data
795 821 self.id = None
796 822 self.type = None
797 823 self.mandatoryparams = None
798 824 self.advisoryparams = None
799 825 self.params = None
800 826 self.mandatorykeys = ()
801 827 self._payloadstream = None
802 828 self._readheader()
803 829 self._mandatory = None
804 830
805 831 def _fromheader(self, size):
806 832 """return the next <size> byte from the header"""
807 833 offset = self._headeroffset
808 834 data = self._headerdata[offset:(offset + size)]
809 835 self._headeroffset = offset + size
810 836 return data
811 837
812 838 def _unpackheader(self, format):
813 839 """read given format from header
814 840
815 841 This automatically compute the size of the format to read."""
816 842 data = self._fromheader(struct.calcsize(format))
817 843 return _unpack(format, data)
818 844
819 845 def _initparams(self, mandatoryparams, advisoryparams):
820 846 """internal function to setup all logic related parameters"""
821 847 # make it read only to prevent people touching it by mistake.
822 848 self.mandatoryparams = tuple(mandatoryparams)
823 849 self.advisoryparams = tuple(advisoryparams)
824 850 # user friendly UI
825 851 self.params = dict(self.mandatoryparams)
826 852 self.params.update(dict(self.advisoryparams))
827 853 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
828 854
829 855 def _readheader(self):
830 856 """read the header and setup the object"""
831 857 typesize = self._unpackheader(_fparttypesize)[0]
832 858 self.type = self._fromheader(typesize)
833 859 self.ui.debug('part type: "%s"\n' % self.type)
834 860 self.id = self._unpackheader(_fpartid)[0]
835 861 self.ui.debug('part id: "%s"\n' % self.id)
836 862 # extract mandatory bit from type
837 863 self.mandatory = (self.type != self.type.lower())
838 864 self.type = self.type.lower()
839 865 ## reading parameters
840 866 # param count
841 867 mancount, advcount = self._unpackheader(_fpartparamcount)
842 868 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
843 869 # param size
844 870 fparamsizes = _makefpartparamsizes(mancount + advcount)
845 871 paramsizes = self._unpackheader(fparamsizes)
846 872 # make it a list of couple again
847 873 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
848 874 # split mandatory from advisory
849 875 mansizes = paramsizes[:mancount]
850 876 advsizes = paramsizes[mancount:]
851 877 # retrieve param value
852 878 manparams = []
853 879 for key, value in mansizes:
854 880 manparams.append((self._fromheader(key), self._fromheader(value)))
855 881 advparams = []
856 882 for key, value in advsizes:
857 883 advparams.append((self._fromheader(key), self._fromheader(value)))
858 884 self._initparams(manparams, advparams)
859 885 ## part payload
860 886 def payloadchunks():
861 887 payloadsize = self._unpack(_fpayloadsize)[0]
862 888 self.ui.debug('payload chunk size: %i\n' % payloadsize)
863 889 while payloadsize:
864 890 if payloadsize == flaginterrupt:
865 891 # interruption detection, the handler will now read a
866 892 # single part and process it.
867 893 interrupthandler(self.ui, self._fp)()
868 894 elif payloadsize < 0:
869 895 msg = 'negative payload chunk size: %i' % payloadsize
870 896 raise error.BundleValueError(msg)
871 897 else:
872 898 yield self._readexact(payloadsize)
873 899 payloadsize = self._unpack(_fpayloadsize)[0]
874 900 self.ui.debug('payload chunk size: %i\n' % payloadsize)
875 901 self._payloadstream = util.chunkbuffer(payloadchunks())
876 902 # we read the data, tell it
877 903 self._initialized = True
878 904
879 905 def read(self, size=None):
880 906 """read payload data"""
881 907 if not self._initialized:
882 908 self._readheader()
883 909 if size is None:
884 910 data = self._payloadstream.read()
885 911 else:
886 912 data = self._payloadstream.read(size)
887 913 if size is None or len(data) < size:
888 914 self.consumed = True
889 915 return data
890 916
891 917 capabilities = {'HG2Y': (),
892 918 'b2x:listkeys': (),
893 919 'b2x:pushkey': (),
894 920 'digests': tuple(sorted(util.DIGESTS.keys())),
895 921 'b2x:remote-changegroup': ('http', 'https'),
896 922 }
897 923
898 924 def getrepocaps(repo, allowpushback=False):
899 925 """return the bundle2 capabilities for a given repo
900 926
901 927 Exists to allow extensions (like evolution) to mutate the capabilities.
902 928 """
903 929 caps = capabilities.copy()
904 930 caps['b2x:changegroup'] = tuple(sorted(changegroup.packermap.keys()))
905 931 if obsolete.isenabled(repo, obsolete.exchangeopt):
906 932 supportedformat = tuple('V%i' % v for v in obsolete.formats)
907 933 caps['b2x:obsmarkers'] = supportedformat
908 934 if allowpushback:
909 935 caps['b2x:pushback'] = ()
910 936 return caps
911 937
912 938 def bundle2caps(remote):
913 939 """return the bundle capabilities of a peer as dict"""
914 940 raw = remote.capable('bundle2-exp')
915 941 if not raw and raw != '':
916 942 return {}
917 943 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
918 944 return decodecaps(capsblob)
919 945
920 946 def obsmarkersversion(caps):
921 947 """extract the list of supported obsmarkers versions from a bundle2caps dict
922 948 """
923 949 obscaps = caps.get('b2x:obsmarkers', ())
924 950 return [int(c[1:]) for c in obscaps if c.startswith('V')]
925 951
926 952 @parthandler('b2x:changegroup', ('version',))
927 953 def handlechangegroup(op, inpart):
928 954 """apply a changegroup part on the repo
929 955
930 956 This is a very early implementation that will massive rework before being
931 957 inflicted to any end-user.
932 958 """
933 959 # Make sure we trigger a transaction creation
934 960 #
935 961 # The addchangegroup function will get a transaction object by itself, but
936 962 # we need to make sure we trigger the creation of a transaction object used
937 963 # for the whole processing scope.
938 964 op.gettransaction()
939 965 unpackerversion = inpart.params.get('version', '01')
940 966 # We should raise an appropriate exception here
941 967 unpacker = changegroup.packermap[unpackerversion][1]
942 968 cg = unpacker(inpart, 'UN')
943 969 # the source and url passed here are overwritten by the one contained in
944 970 # the transaction.hookargs argument. So 'bundle2' is a placeholder
945 971 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
946 972 op.records.add('changegroup', {'return': ret})
947 973 if op.reply is not None:
948 974 # This is definitely not the final form of this
949 975 # return. But one need to start somewhere.
950 976 part = op.reply.newpart('b2x:reply:changegroup', mandatory=False)
951 977 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
952 978 part.addparam('return', '%i' % ret, mandatory=False)
953 979 assert not inpart.read()
954 980
955 981 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
956 982 ['digest:%s' % k for k in util.DIGESTS.keys()])
957 983 @parthandler('b2x:remote-changegroup', _remotechangegroupparams)
958 984 def handleremotechangegroup(op, inpart):
959 985 """apply a bundle10 on the repo, given an url and validation information
960 986
961 987 All the information about the remote bundle to import are given as
962 988 parameters. The parameters include:
963 989 - url: the url to the bundle10.
964 990 - size: the bundle10 file size. It is used to validate what was
965 991 retrieved by the client matches the server knowledge about the bundle.
966 992 - digests: a space separated list of the digest types provided as
967 993 parameters.
968 994 - digest:<digest-type>: the hexadecimal representation of the digest with
969 995 that name. Like the size, it is used to validate what was retrieved by
970 996 the client matches what the server knows about the bundle.
971 997
972 998 When multiple digest types are given, all of them are checked.
973 999 """
974 1000 try:
975 1001 raw_url = inpart.params['url']
976 1002 except KeyError:
977 1003 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
978 1004 parsed_url = util.url(raw_url)
979 1005 if parsed_url.scheme not in capabilities['b2x:remote-changegroup']:
980 1006 raise util.Abort(_('remote-changegroup does not support %s urls') %
981 1007 parsed_url.scheme)
982 1008
983 1009 try:
984 1010 size = int(inpart.params['size'])
985 1011 except ValueError:
986 1012 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
987 1013 % 'size')
988 1014 except KeyError:
989 1015 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
990 1016
991 1017 digests = {}
992 1018 for typ in inpart.params.get('digests', '').split():
993 1019 param = 'digest:%s' % typ
994 1020 try:
995 1021 value = inpart.params[param]
996 1022 except KeyError:
997 1023 raise util.Abort(_('remote-changegroup: missing "%s" param') %
998 1024 param)
999 1025 digests[typ] = value
1000 1026
1001 1027 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1002 1028
1003 1029 # Make sure we trigger a transaction creation
1004 1030 #
1005 1031 # The addchangegroup function will get a transaction object by itself, but
1006 1032 # we need to make sure we trigger the creation of a transaction object used
1007 1033 # for the whole processing scope.
1008 1034 op.gettransaction()
1009 1035 import exchange
1010 1036 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1011 1037 if not isinstance(cg, changegroup.cg1unpacker):
1012 1038 raise util.Abort(_('%s: not a bundle version 1.0') %
1013 1039 util.hidepassword(raw_url))
1014 1040 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1015 1041 op.records.add('changegroup', {'return': ret})
1016 1042 if op.reply is not None:
1017 1043 # This is definitely not the final form of this
1018 1044 # return. But one need to start somewhere.
1019 1045 part = op.reply.newpart('b2x:reply:changegroup')
1020 1046 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1021 1047 part.addparam('return', '%i' % ret, mandatory=False)
1022 1048 try:
1023 1049 real_part.validate()
1024 1050 except util.Abort, e:
1025 1051 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1026 1052 (util.hidepassword(raw_url), str(e)))
1027 1053 assert not inpart.read()
1028 1054
1029 1055 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
1030 1056 def handlereplychangegroup(op, inpart):
1031 1057 ret = int(inpart.params['return'])
1032 1058 replyto = int(inpart.params['in-reply-to'])
1033 1059 op.records.add('changegroup', {'return': ret}, replyto)
1034 1060
1035 1061 @parthandler('b2x:check:heads')
1036 1062 def handlecheckheads(op, inpart):
1037 1063 """check that head of the repo did not change
1038 1064
1039 1065 This is used to detect a push race when using unbundle.
1040 1066 This replaces the "heads" argument of unbundle."""
1041 1067 h = inpart.read(20)
1042 1068 heads = []
1043 1069 while len(h) == 20:
1044 1070 heads.append(h)
1045 1071 h = inpart.read(20)
1046 1072 assert not h
1047 1073 if heads != op.repo.heads():
1048 1074 raise error.PushRaced('repository changed while pushing - '
1049 1075 'please try again')
1050 1076
1051 1077 @parthandler('b2x:output')
1052 1078 def handleoutput(op, inpart):
1053 1079 """forward output captured on the server to the client"""
1054 1080 for line in inpart.read().splitlines():
1055 1081 op.ui.write(('remote: %s\n' % line))
1056 1082
1057 1083 @parthandler('b2x:replycaps')
1058 1084 def handlereplycaps(op, inpart):
1059 1085 """Notify that a reply bundle should be created
1060 1086
1061 1087 The payload contains the capabilities information for the reply"""
1062 1088 caps = decodecaps(inpart.read())
1063 1089 if op.reply is None:
1064 1090 op.reply = bundle20(op.ui, caps)
1065 1091
1066 1092 @parthandler('b2x:error:abort', ('message', 'hint'))
1067 1093 def handlereplycaps(op, inpart):
1068 1094 """Used to transmit abort error over the wire"""
1069 1095 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1070 1096
1071 1097 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
1072 1098 def handlereplycaps(op, inpart):
1073 1099 """Used to transmit unknown content error over the wire"""
1074 1100 kwargs = {}
1075 1101 parttype = inpart.params.get('parttype')
1076 1102 if parttype is not None:
1077 1103 kwargs['parttype'] = parttype
1078 1104 params = inpart.params.get('params')
1079 1105 if params is not None:
1080 1106 kwargs['params'] = params.split('\0')
1081 1107
1082 1108 raise error.UnsupportedPartError(**kwargs)
1083 1109
1084 1110 @parthandler('b2x:error:pushraced', ('message',))
1085 1111 def handlereplycaps(op, inpart):
1086 1112 """Used to transmit push race error over the wire"""
1087 1113 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1088 1114
1089 1115 @parthandler('b2x:listkeys', ('namespace',))
1090 1116 def handlelistkeys(op, inpart):
1091 1117 """retrieve pushkey namespace content stored in a bundle2"""
1092 1118 namespace = inpart.params['namespace']
1093 1119 r = pushkey.decodekeys(inpart.read())
1094 1120 op.records.add('listkeys', (namespace, r))
1095 1121
1096 1122 @parthandler('b2x:pushkey', ('namespace', 'key', 'old', 'new'))
1097 1123 def handlepushkey(op, inpart):
1098 1124 """process a pushkey request"""
1099 1125 dec = pushkey.decode
1100 1126 namespace = dec(inpart.params['namespace'])
1101 1127 key = dec(inpart.params['key'])
1102 1128 old = dec(inpart.params['old'])
1103 1129 new = dec(inpart.params['new'])
1104 1130 ret = op.repo.pushkey(namespace, key, old, new)
1105 1131 record = {'namespace': namespace,
1106 1132 'key': key,
1107 1133 'old': old,
1108 1134 'new': new}
1109 1135 op.records.add('pushkey', record)
1110 1136 if op.reply is not None:
1111 1137 rpart = op.reply.newpart('b2x:reply:pushkey')
1112 1138 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1113 1139 rpart.addparam('return', '%i' % ret, mandatory=False)
1114 1140
1115 1141 @parthandler('b2x:reply:pushkey', ('return', 'in-reply-to'))
1116 1142 def handlepushkeyreply(op, inpart):
1117 1143 """retrieve the result of a pushkey request"""
1118 1144 ret = int(inpart.params['return'])
1119 1145 partid = int(inpart.params['in-reply-to'])
1120 1146 op.records.add('pushkey', {'return': ret}, partid)
1121 1147
1122 1148 @parthandler('b2x:obsmarkers')
1123 1149 def handleobsmarker(op, inpart):
1124 1150 """add a stream of obsmarkers to the repo"""
1125 1151 tr = op.gettransaction()
1126 1152 new = op.repo.obsstore.mergemarkers(tr, inpart.read())
1127 1153 if new:
1128 1154 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1129 1155 op.records.add('obsmarkers', {'new': new})
1130 1156 if op.reply is not None:
1131 1157 rpart = op.reply.newpart('b2x:reply:obsmarkers')
1132 1158 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1133 1159 rpart.addparam('new', '%i' % new, mandatory=False)
1134 1160
1135 1161
1136 1162 @parthandler('b2x:reply:obsmarkers', ('new', 'in-reply-to'))
1137 1163 def handlepushkeyreply(op, inpart):
1138 1164 """retrieve the result of a pushkey request"""
1139 1165 ret = int(inpart.params['new'])
1140 1166 partid = int(inpart.params['in-reply-to'])
1141 1167 op.records.add('obsmarkers', {'new': ret}, partid)
General Comments 0
You need to be logged in to leave comments. Login now