##// END OF EJS Templates
bundle2: seek in part iterator...
Eric Sumner -
r24048:13d88b7e default
parent child Browse files
Show More
@@ -1,1224 +1,1225
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part headers. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 import errno
149 149 import sys
150 150 import util
151 151 import struct
152 152 import urllib
153 153 import string
154 154 import obsolete
155 155 import pushkey
156 156 import url
157 157 import re
158 158
159 159 import changegroup, error
160 160 from i18n import _
161 161
162 162 _pack = struct.pack
163 163 _unpack = struct.unpack
164 164
165 165 _magicstring = 'HG2Y'
166 166
167 167 _fstreamparamsize = '>i'
168 168 _fpartheadersize = '>i'
169 169 _fparttypesize = '>B'
170 170 _fpartid = '>I'
171 171 _fpayloadsize = '>i'
172 172 _fpartparamcount = '>BB'
173 173
174 174 preferedchunksize = 4096
175 175
176 176 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
177 177
178 178 def validateparttype(parttype):
179 179 """raise ValueError if a parttype contains invalid character"""
180 180 if _parttypeforbidden.search(parttype):
181 181 raise ValueError(parttype)
182 182
183 183 def _makefpartparamsizes(nbparams):
184 184 """return a struct format to read part parameter sizes
185 185
186 186 The number parameters is variable so we need to build that format
187 187 dynamically.
188 188 """
189 189 return '>'+('BB'*nbparams)
190 190
191 191 parthandlermapping = {}
192 192
193 193 def parthandler(parttype, params=()):
194 194 """decorator that register a function as a bundle2 part handler
195 195
196 196 eg::
197 197
198 198 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
199 199 def myparttypehandler(...):
200 200 '''process a part of type "my part".'''
201 201 ...
202 202 """
203 203 validateparttype(parttype)
204 204 def _decorator(func):
205 205 lparttype = parttype.lower() # enforce lower case matching.
206 206 assert lparttype not in parthandlermapping
207 207 parthandlermapping[lparttype] = func
208 208 func.params = frozenset(params)
209 209 return func
210 210 return _decorator
211 211
212 212 class unbundlerecords(object):
213 213 """keep record of what happens during and unbundle
214 214
215 215 New records are added using `records.add('cat', obj)`. Where 'cat' is a
216 216 category of record and obj is an arbitrary object.
217 217
218 218 `records['cat']` will return all entries of this category 'cat'.
219 219
220 220 Iterating on the object itself will yield `('category', obj)` tuples
221 221 for all entries.
222 222
223 223 All iterations happens in chronological order.
224 224 """
225 225
226 226 def __init__(self):
227 227 self._categories = {}
228 228 self._sequences = []
229 229 self._replies = {}
230 230
231 231 def add(self, category, entry, inreplyto=None):
232 232 """add a new record of a given category.
233 233
234 234 The entry can then be retrieved in the list returned by
235 235 self['category']."""
236 236 self._categories.setdefault(category, []).append(entry)
237 237 self._sequences.append((category, entry))
238 238 if inreplyto is not None:
239 239 self.getreplies(inreplyto).add(category, entry)
240 240
241 241 def getreplies(self, partid):
242 242 """get the records that are replies to a specific part"""
243 243 return self._replies.setdefault(partid, unbundlerecords())
244 244
245 245 def __getitem__(self, cat):
246 246 return tuple(self._categories.get(cat, ()))
247 247
248 248 def __iter__(self):
249 249 return iter(self._sequences)
250 250
251 251 def __len__(self):
252 252 return len(self._sequences)
253 253
254 254 def __nonzero__(self):
255 255 return bool(self._sequences)
256 256
257 257 class bundleoperation(object):
258 258 """an object that represents a single bundling process
259 259
260 260 Its purpose is to carry unbundle-related objects and states.
261 261
262 262 A new object should be created at the beginning of each bundle processing.
263 263 The object is to be returned by the processing function.
264 264
265 265 The object has very little content now it will ultimately contain:
266 266 * an access to the repo the bundle is applied to,
267 267 * a ui object,
268 268 * a way to retrieve a transaction to add changes to the repo,
269 269 * a way to record the result of processing each part,
270 270 * a way to construct a bundle response when applicable.
271 271 """
272 272
273 273 def __init__(self, repo, transactiongetter):
274 274 self.repo = repo
275 275 self.ui = repo.ui
276 276 self.records = unbundlerecords()
277 277 self.gettransaction = transactiongetter
278 278 self.reply = None
279 279
280 280 class TransactionUnavailable(RuntimeError):
281 281 pass
282 282
283 283 def _notransaction():
284 284 """default method to get a transaction while processing a bundle
285 285
286 286 Raise an exception to highlight the fact that no transaction was expected
287 287 to be created"""
288 288 raise TransactionUnavailable()
289 289
290 290 def processbundle(repo, unbundler, transactiongetter=None):
291 291 """This function process a bundle, apply effect to/from a repo
292 292
293 293 It iterates over each part then searches for and uses the proper handling
294 294 code to process the part. Parts are processed in order.
295 295
296 296 This is very early version of this function that will be strongly reworked
297 297 before final usage.
298 298
299 299 Unknown Mandatory part will abort the process.
300 300 """
301 301 if transactiongetter is None:
302 302 transactiongetter = _notransaction
303 303 op = bundleoperation(repo, transactiongetter)
304 304 # todo:
305 305 # - replace this is a init function soon.
306 306 # - exception catching
307 307 unbundler.params
308 308 iterparts = unbundler.iterparts()
309 309 part = None
310 310 try:
311 311 for part in iterparts:
312 312 _processpart(op, part)
313 313 except Exception, exc:
314 314 for part in iterparts:
315 315 # consume the bundle content
316 316 part.seek(0, 2)
317 317 # Small hack to let caller code distinguish exceptions from bundle2
318 318 # processing from processing the old format. This is mostly
319 319 # needed to handle different return codes to unbundle according to the
320 320 # type of bundle. We should probably clean up or drop this return code
321 321 # craziness in a future version.
322 322 exc.duringunbundle2 = True
323 323 raise
324 324 return op
325 325
326 326 def _processpart(op, part):
327 327 """process a single part from a bundle
328 328
329 329 The part is guaranteed to have been fully consumed when the function exits
330 330 (even if an exception is raised)."""
331 331 try:
332 332 try:
333 333 handler = parthandlermapping.get(part.type)
334 334 if handler is None:
335 335 raise error.UnsupportedPartError(parttype=part.type)
336 336 op.ui.debug('found a handler for part %r\n' % part.type)
337 337 unknownparams = part.mandatorykeys - handler.params
338 338 if unknownparams:
339 339 unknownparams = list(unknownparams)
340 340 unknownparams.sort()
341 341 raise error.UnsupportedPartError(parttype=part.type,
342 342 params=unknownparams)
343 343 except error.UnsupportedPartError, exc:
344 344 if part.mandatory: # mandatory parts
345 345 raise
346 346 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
347 347 return # skip to part processing
348 348
349 349 # handler is called outside the above try block so that we don't
350 350 # risk catching KeyErrors from anything other than the
351 351 # parthandlermapping lookup (any KeyError raised by handler()
352 352 # itself represents a defect of a different variety).
353 353 output = None
354 354 if op.reply is not None:
355 355 op.ui.pushbuffer(error=True)
356 356 output = ''
357 357 try:
358 358 handler(op, part)
359 359 finally:
360 360 if output is not None:
361 361 output = op.ui.popbuffer()
362 362 if output:
363 363 outpart = op.reply.newpart('b2x:output', data=output,
364 364 mandatory=False)
365 365 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
366 366 finally:
367 367 # consume the part content to not corrupt the stream.
368 368 part.seek(0, 2)
369 369
370 370
371 371 def decodecaps(blob):
372 372 """decode a bundle2 caps bytes blob into a dictionary
373 373
374 374 The blob is a list of capabilities (one per line)
375 375 Capabilities may have values using a line of the form::
376 376
377 377 capability=value1,value2,value3
378 378
379 379 The values are always a list."""
380 380 caps = {}
381 381 for line in blob.splitlines():
382 382 if not line:
383 383 continue
384 384 if '=' not in line:
385 385 key, vals = line, ()
386 386 else:
387 387 key, vals = line.split('=', 1)
388 388 vals = vals.split(',')
389 389 key = urllib.unquote(key)
390 390 vals = [urllib.unquote(v) for v in vals]
391 391 caps[key] = vals
392 392 return caps
393 393
394 394 def encodecaps(caps):
395 395 """encode a bundle2 caps dictionary into a bytes blob"""
396 396 chunks = []
397 397 for ca in sorted(caps):
398 398 vals = caps[ca]
399 399 ca = urllib.quote(ca)
400 400 vals = [urllib.quote(v) for v in vals]
401 401 if vals:
402 402 ca = "%s=%s" % (ca, ','.join(vals))
403 403 chunks.append(ca)
404 404 return '\n'.join(chunks)
405 405
406 406 class bundle20(object):
407 407 """represent an outgoing bundle2 container
408 408
409 409 Use the `addparam` method to add stream level parameter. and `newpart` to
410 410 populate it. Then call `getchunks` to retrieve all the binary chunks of
411 411 data that compose the bundle2 container."""
412 412
413 413 def __init__(self, ui, capabilities=()):
414 414 self.ui = ui
415 415 self._params = []
416 416 self._parts = []
417 417 self.capabilities = dict(capabilities)
418 418
419 419 @property
420 420 def nbparts(self):
421 421 """total number of parts added to the bundler"""
422 422 return len(self._parts)
423 423
424 424 # methods used to defines the bundle2 content
425 425 def addparam(self, name, value=None):
426 426 """add a stream level parameter"""
427 427 if not name:
428 428 raise ValueError('empty parameter name')
429 429 if name[0] not in string.letters:
430 430 raise ValueError('non letter first character: %r' % name)
431 431 self._params.append((name, value))
432 432
433 433 def addpart(self, part):
434 434 """add a new part to the bundle2 container
435 435
436 436 Parts contains the actual applicative payload."""
437 437 assert part.id is None
438 438 part.id = len(self._parts) # very cheap counter
439 439 self._parts.append(part)
440 440
441 441 def newpart(self, typeid, *args, **kwargs):
442 442 """create a new part and add it to the containers
443 443
444 444 As the part is directly added to the containers. For now, this means
445 445 that any failure to properly initialize the part after calling
446 446 ``newpart`` should result in a failure of the whole bundling process.
447 447
448 448 You can still fall back to manually create and add if you need better
449 449 control."""
450 450 part = bundlepart(typeid, *args, **kwargs)
451 451 self.addpart(part)
452 452 return part
453 453
454 454 # methods used to generate the bundle2 stream
455 455 def getchunks(self):
456 456 self.ui.debug('start emission of %s stream\n' % _magicstring)
457 457 yield _magicstring
458 458 param = self._paramchunk()
459 459 self.ui.debug('bundle parameter: %s\n' % param)
460 460 yield _pack(_fstreamparamsize, len(param))
461 461 if param:
462 462 yield param
463 463
464 464 self.ui.debug('start of parts\n')
465 465 for part in self._parts:
466 466 self.ui.debug('bundle part: "%s"\n' % part.type)
467 467 for chunk in part.getchunks():
468 468 yield chunk
469 469 self.ui.debug('end of bundle\n')
470 470 yield _pack(_fpartheadersize, 0)
471 471
472 472 def _paramchunk(self):
473 473 """return a encoded version of all stream parameters"""
474 474 blocks = []
475 475 for par, value in self._params:
476 476 par = urllib.quote(par)
477 477 if value is not None:
478 478 value = urllib.quote(value)
479 479 par = '%s=%s' % (par, value)
480 480 blocks.append(par)
481 481 return ' '.join(blocks)
482 482
483 483 class unpackermixin(object):
484 484 """A mixin to extract bytes and struct data from a stream"""
485 485
486 486 def __init__(self, fp):
487 487 self._fp = fp
488 488 self._seekable = (util.safehasattr(fp, 'seek') and
489 489 util.safehasattr(fp, 'tell'))
490 490
491 491 def _unpack(self, format):
492 492 """unpack this struct format from the stream"""
493 493 data = self._readexact(struct.calcsize(format))
494 494 return _unpack(format, data)
495 495
496 496 def _readexact(self, size):
497 497 """read exactly <size> bytes from the stream"""
498 498 return changegroup.readexactly(self._fp, size)
499 499
500 500 def seek(self, offset, whence):
501 501 """move the underlying file pointer"""
502 502 if self._seekable:
503 503 return self._fp.seek(offset, whence)
504 504 else:
505 505 raise NotImplementedError(_('File pointer is not seekable'))
506 506
507 507 def tell(self):
508 508 """return the file offset, or None if file is not seekable"""
509 509 if self._seekable:
510 510 try:
511 511 return self._fp.tell()
512 512 except IOError, e:
513 513 if e.errno == errno.ESPIPE:
514 514 self._seekable = False
515 515 else:
516 516 raise
517 517 return None
518 518
519 519 def close(self):
520 520 """close underlying file"""
521 521 if util.safehasattr(self._fp, 'close'):
522 522 return self._fp.close()
523 523
524 524 class unbundle20(unpackermixin):
525 525 """interpret a bundle2 stream
526 526
527 527 This class is fed with a binary stream and yields parts through its
528 528 `iterparts` methods."""
529 529
530 530 def __init__(self, ui, fp, header=None):
531 531 """If header is specified, we do not read it out of the stream."""
532 532 self.ui = ui
533 533 super(unbundle20, self).__init__(fp)
534 534 if header is None:
535 535 header = self._readexact(4)
536 536 magic, version = header[0:2], header[2:4]
537 537 if magic != 'HG':
538 538 raise util.Abort(_('not a Mercurial bundle'))
539 539 if version != '2Y':
540 540 raise util.Abort(_('unknown bundle version %s') % version)
541 541 self.ui.debug('start processing of %s stream\n' % header)
542 542
543 543 @util.propertycache
544 544 def params(self):
545 545 """dictionary of stream level parameters"""
546 546 self.ui.debug('reading bundle2 stream parameters\n')
547 547 params = {}
548 548 paramssize = self._unpack(_fstreamparamsize)[0]
549 549 if paramssize < 0:
550 550 raise error.BundleValueError('negative bundle param size: %i'
551 551 % paramssize)
552 552 if paramssize:
553 553 for p in self._readexact(paramssize).split(' '):
554 554 p = p.split('=', 1)
555 555 p = [urllib.unquote(i) for i in p]
556 556 if len(p) < 2:
557 557 p.append(None)
558 558 self._processparam(*p)
559 559 params[p[0]] = p[1]
560 560 return params
561 561
562 562 def _processparam(self, name, value):
563 563 """process a parameter, applying its effect if needed
564 564
565 565 Parameter starting with a lower case letter are advisory and will be
566 566 ignored when unknown. Those starting with an upper case letter are
567 567 mandatory and will this function will raise a KeyError when unknown.
568 568
569 569 Note: no option are currently supported. Any input will be either
570 570 ignored or failing.
571 571 """
572 572 if not name:
573 573 raise ValueError('empty parameter name')
574 574 if name[0] not in string.letters:
575 575 raise ValueError('non letter first character: %r' % name)
576 576 # Some logic will be later added here to try to process the option for
577 577 # a dict of known parameter.
578 578 if name[0].islower():
579 579 self.ui.debug("ignoring unknown parameter %r\n" % name)
580 580 else:
581 581 raise error.UnsupportedPartError(params=(name,))
582 582
583 583
584 584 def iterparts(self):
585 585 """yield all parts contained in the stream"""
586 586 # make sure param have been loaded
587 587 self.params
588 588 self.ui.debug('start extraction of bundle2 parts\n')
589 589 headerblock = self._readpartheader()
590 590 while headerblock is not None:
591 591 part = unbundlepart(self.ui, headerblock, self._fp)
592 592 yield part
593 part.seek(0, 2)
593 594 headerblock = self._readpartheader()
594 595 self.ui.debug('end of bundle2 stream\n')
595 596
596 597 def _readpartheader(self):
597 598 """reads a part header size and return the bytes blob
598 599
599 600 returns None if empty"""
600 601 headersize = self._unpack(_fpartheadersize)[0]
601 602 if headersize < 0:
602 603 raise error.BundleValueError('negative part header size: %i'
603 604 % headersize)
604 605 self.ui.debug('part header size: %i\n' % headersize)
605 606 if headersize:
606 607 return self._readexact(headersize)
607 608 return None
608 609
609 610
610 611 class bundlepart(object):
611 612 """A bundle2 part contains application level payload
612 613
613 614 The part `type` is used to route the part to the application level
614 615 handler.
615 616
616 617 The part payload is contained in ``part.data``. It could be raw bytes or a
617 618 generator of byte chunks.
618 619
619 620 You can add parameters to the part using the ``addparam`` method.
620 621 Parameters can be either mandatory (default) or advisory. Remote side
621 622 should be able to safely ignore the advisory ones.
622 623
623 624 Both data and parameters cannot be modified after the generation has begun.
624 625 """
625 626
626 627 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
627 628 data='', mandatory=True):
628 629 validateparttype(parttype)
629 630 self.id = None
630 631 self.type = parttype
631 632 self._data = data
632 633 self._mandatoryparams = list(mandatoryparams)
633 634 self._advisoryparams = list(advisoryparams)
634 635 # checking for duplicated entries
635 636 self._seenparams = set()
636 637 for pname, __ in self._mandatoryparams + self._advisoryparams:
637 638 if pname in self._seenparams:
638 639 raise RuntimeError('duplicated params: %s' % pname)
639 640 self._seenparams.add(pname)
640 641 # status of the part's generation:
641 642 # - None: not started,
642 643 # - False: currently generated,
643 644 # - True: generation done.
644 645 self._generated = None
645 646 self.mandatory = mandatory
646 647
647 648 # methods used to defines the part content
648 649 def __setdata(self, data):
649 650 if self._generated is not None:
650 651 raise error.ReadOnlyPartError('part is being generated')
651 652 self._data = data
652 653 def __getdata(self):
653 654 return self._data
654 655 data = property(__getdata, __setdata)
655 656
656 657 @property
657 658 def mandatoryparams(self):
658 659 # make it an immutable tuple to force people through ``addparam``
659 660 return tuple(self._mandatoryparams)
660 661
661 662 @property
662 663 def advisoryparams(self):
663 664 # make it an immutable tuple to force people through ``addparam``
664 665 return tuple(self._advisoryparams)
665 666
666 667 def addparam(self, name, value='', mandatory=True):
667 668 if self._generated is not None:
668 669 raise error.ReadOnlyPartError('part is being generated')
669 670 if name in self._seenparams:
670 671 raise ValueError('duplicated params: %s' % name)
671 672 self._seenparams.add(name)
672 673 params = self._advisoryparams
673 674 if mandatory:
674 675 params = self._mandatoryparams
675 676 params.append((name, value))
676 677
677 678 # methods used to generates the bundle2 stream
678 679 def getchunks(self):
679 680 if self._generated is not None:
680 681 raise RuntimeError('part can only be consumed once')
681 682 self._generated = False
682 683 #### header
683 684 if self.mandatory:
684 685 parttype = self.type.upper()
685 686 else:
686 687 parttype = self.type.lower()
687 688 ## parttype
688 689 header = [_pack(_fparttypesize, len(parttype)),
689 690 parttype, _pack(_fpartid, self.id),
690 691 ]
691 692 ## parameters
692 693 # count
693 694 manpar = self.mandatoryparams
694 695 advpar = self.advisoryparams
695 696 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
696 697 # size
697 698 parsizes = []
698 699 for key, value in manpar:
699 700 parsizes.append(len(key))
700 701 parsizes.append(len(value))
701 702 for key, value in advpar:
702 703 parsizes.append(len(key))
703 704 parsizes.append(len(value))
704 705 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
705 706 header.append(paramsizes)
706 707 # key, value
707 708 for key, value in manpar:
708 709 header.append(key)
709 710 header.append(value)
710 711 for key, value in advpar:
711 712 header.append(key)
712 713 header.append(value)
713 714 ## finalize header
714 715 headerchunk = ''.join(header)
715 716 yield _pack(_fpartheadersize, len(headerchunk))
716 717 yield headerchunk
717 718 ## payload
718 719 try:
719 720 for chunk in self._payloadchunks():
720 721 yield _pack(_fpayloadsize, len(chunk))
721 722 yield chunk
722 723 except Exception, exc:
723 724 # backup exception data for later
724 725 exc_info = sys.exc_info()
725 726 msg = 'unexpected error: %s' % exc
726 727 interpart = bundlepart('b2x:error:abort', [('message', msg)],
727 728 mandatory=False)
728 729 interpart.id = 0
729 730 yield _pack(_fpayloadsize, -1)
730 731 for chunk in interpart.getchunks():
731 732 yield chunk
732 733 # abort current part payload
733 734 yield _pack(_fpayloadsize, 0)
734 735 raise exc_info[0], exc_info[1], exc_info[2]
735 736 # end of payload
736 737 yield _pack(_fpayloadsize, 0)
737 738 self._generated = True
738 739
739 740 def _payloadchunks(self):
740 741 """yield chunks of a the part payload
741 742
742 743 Exists to handle the different methods to provide data to a part."""
743 744 # we only support fixed size data now.
744 745 # This will be improved in the future.
745 746 if util.safehasattr(self.data, 'next'):
746 747 buff = util.chunkbuffer(self.data)
747 748 chunk = buff.read(preferedchunksize)
748 749 while chunk:
749 750 yield chunk
750 751 chunk = buff.read(preferedchunksize)
751 752 elif len(self.data):
752 753 yield self.data
753 754
754 755
755 756 flaginterrupt = -1
756 757
757 758 class interrupthandler(unpackermixin):
758 759 """read one part and process it with restricted capability
759 760
760 761 This allows to transmit exception raised on the producer size during part
761 762 iteration while the consumer is reading a part.
762 763
763 764 Part processed in this manner only have access to a ui object,"""
764 765
765 766 def __init__(self, ui, fp):
766 767 super(interrupthandler, self).__init__(fp)
767 768 self.ui = ui
768 769
769 770 def _readpartheader(self):
770 771 """reads a part header size and return the bytes blob
771 772
772 773 returns None if empty"""
773 774 headersize = self._unpack(_fpartheadersize)[0]
774 775 if headersize < 0:
775 776 raise error.BundleValueError('negative part header size: %i'
776 777 % headersize)
777 778 self.ui.debug('part header size: %i\n' % headersize)
778 779 if headersize:
779 780 return self._readexact(headersize)
780 781 return None
781 782
782 783 def __call__(self):
783 784 self.ui.debug('bundle2 stream interruption, looking for a part.\n')
784 785 headerblock = self._readpartheader()
785 786 if headerblock is None:
786 787 self.ui.debug('no part found during interruption.\n')
787 788 return
788 789 part = unbundlepart(self.ui, headerblock, self._fp)
789 790 op = interruptoperation(self.ui)
790 791 _processpart(op, part)
791 792
792 793 class interruptoperation(object):
793 794 """A limited operation to be use by part handler during interruption
794 795
795 796 It only have access to an ui object.
796 797 """
797 798
798 799 def __init__(self, ui):
799 800 self.ui = ui
800 801 self.reply = None
801 802
802 803 @property
803 804 def repo(self):
804 805 raise RuntimeError('no repo access from stream interruption')
805 806
806 807 def gettransaction(self):
807 808 raise TransactionUnavailable('no repo access from stream interruption')
808 809
809 810 class unbundlepart(unpackermixin):
810 811 """a bundle part read from a bundle"""
811 812
812 813 def __init__(self, ui, header, fp):
813 814 super(unbundlepart, self).__init__(fp)
814 815 self.ui = ui
815 816 # unbundle state attr
816 817 self._headerdata = header
817 818 self._headeroffset = 0
818 819 self._initialized = False
819 820 self.consumed = False
820 821 # part data
821 822 self.id = None
822 823 self.type = None
823 824 self.mandatoryparams = None
824 825 self.advisoryparams = None
825 826 self.params = None
826 827 self.mandatorykeys = ()
827 828 self._payloadstream = None
828 829 self._readheader()
829 830 self._mandatory = None
830 831 self._chunkindex = [] #(payload, file) position tuples for chunk starts
831 832 self._pos = 0
832 833
833 834 def _fromheader(self, size):
834 835 """return the next <size> byte from the header"""
835 836 offset = self._headeroffset
836 837 data = self._headerdata[offset:(offset + size)]
837 838 self._headeroffset = offset + size
838 839 return data
839 840
840 841 def _unpackheader(self, format):
841 842 """read given format from header
842 843
843 844 This automatically compute the size of the format to read."""
844 845 data = self._fromheader(struct.calcsize(format))
845 846 return _unpack(format, data)
846 847
847 848 def _initparams(self, mandatoryparams, advisoryparams):
848 849 """internal function to setup all logic related parameters"""
849 850 # make it read only to prevent people touching it by mistake.
850 851 self.mandatoryparams = tuple(mandatoryparams)
851 852 self.advisoryparams = tuple(advisoryparams)
852 853 # user friendly UI
853 854 self.params = dict(self.mandatoryparams)
854 855 self.params.update(dict(self.advisoryparams))
855 856 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
856 857
857 858 def _payloadchunks(self, chunknum=0):
858 859 '''seek to specified chunk and start yielding data'''
859 860 if len(self._chunkindex) == 0:
860 861 assert chunknum == 0, 'Must start with chunk 0'
861 862 self._chunkindex.append((0, super(unbundlepart, self).tell()))
862 863 else:
863 864 assert chunknum < len(self._chunkindex), \
864 865 'Unknown chunk %d' % chunknum
865 866 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
866 867
867 868 pos = self._chunkindex[chunknum][0]
868 869 payloadsize = self._unpack(_fpayloadsize)[0]
869 870 self.ui.debug('payload chunk size: %i\n' % payloadsize)
870 871 while payloadsize:
871 872 if payloadsize == flaginterrupt:
872 873 # interruption detection, the handler will now read a
873 874 # single part and process it.
874 875 interrupthandler(self.ui, self._fp)()
875 876 elif payloadsize < 0:
876 877 msg = 'negative payload chunk size: %i' % payloadsize
877 878 raise error.BundleValueError(msg)
878 879 else:
879 880 result = self._readexact(payloadsize)
880 881 chunknum += 1
881 882 pos += payloadsize
882 883 if chunknum == len(self._chunkindex):
883 884 self._chunkindex.append((pos,
884 885 super(unbundlepart, self).tell()))
885 886 yield result
886 887 payloadsize = self._unpack(_fpayloadsize)[0]
887 888 self.ui.debug('payload chunk size: %i\n' % payloadsize)
888 889
889 890 def _findchunk(self, pos):
890 891 '''for a given payload position, return a chunk number and offset'''
891 892 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
892 893 if ppos == pos:
893 894 return chunk, 0
894 895 elif ppos > pos:
895 896 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
896 897 raise ValueError('Unknown chunk')
897 898
898 899 def _readheader(self):
899 900 """read the header and setup the object"""
900 901 typesize = self._unpackheader(_fparttypesize)[0]
901 902 self.type = self._fromheader(typesize)
902 903 self.ui.debug('part type: "%s"\n' % self.type)
903 904 self.id = self._unpackheader(_fpartid)[0]
904 905 self.ui.debug('part id: "%s"\n' % self.id)
905 906 # extract mandatory bit from type
906 907 self.mandatory = (self.type != self.type.lower())
907 908 self.type = self.type.lower()
908 909 ## reading parameters
909 910 # param count
910 911 mancount, advcount = self._unpackheader(_fpartparamcount)
911 912 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
912 913 # param size
913 914 fparamsizes = _makefpartparamsizes(mancount + advcount)
914 915 paramsizes = self._unpackheader(fparamsizes)
915 916 # make it a list of couple again
916 917 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
917 918 # split mandatory from advisory
918 919 mansizes = paramsizes[:mancount]
919 920 advsizes = paramsizes[mancount:]
920 921 # retrieve param value
921 922 manparams = []
922 923 for key, value in mansizes:
923 924 manparams.append((self._fromheader(key), self._fromheader(value)))
924 925 advparams = []
925 926 for key, value in advsizes:
926 927 advparams.append((self._fromheader(key), self._fromheader(value)))
927 928 self._initparams(manparams, advparams)
928 929 ## part payload
929 930 self._payloadstream = util.chunkbuffer(self._payloadchunks())
930 931 # we read the data, tell it
931 932 self._initialized = True
932 933
933 934 def read(self, size=None):
934 935 """read payload data"""
935 936 if not self._initialized:
936 937 self._readheader()
937 938 if size is None:
938 939 data = self._payloadstream.read()
939 940 else:
940 941 data = self._payloadstream.read(size)
941 942 if size is None or len(data) < size:
942 943 self.consumed = True
943 944 self._pos += len(data)
944 945 return data
945 946
946 947 def tell(self):
947 948 return self._pos
948 949
949 950 def seek(self, offset, whence=0):
950 951 if whence == 0:
951 952 newpos = offset
952 953 elif whence == 1:
953 954 newpos = self._pos + offset
954 955 elif whence == 2:
955 956 if not self.consumed:
956 957 self.read()
957 958 newpos = self._chunkindex[-1][0] - offset
958 959 else:
959 960 raise ValueError('Unknown whence value: %r' % (whence,))
960 961
961 962 if newpos > self._chunkindex[-1][0] and not self.consumed:
962 963 self.read()
963 964 if not 0 <= newpos <= self._chunkindex[-1][0]:
964 965 raise ValueError('Offset out of range')
965 966
966 967 if self._pos != newpos:
967 968 chunk, internaloffset = self._findchunk(newpos)
968 969 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
969 970 adjust = self.read(internaloffset)
970 971 if len(adjust) != internaloffset:
971 972 raise util.Abort(_('Seek failed\n'))
972 973 self._pos = newpos
973 974
974 975 capabilities = {'HG2Y': (),
975 976 'b2x:listkeys': (),
976 977 'b2x:pushkey': (),
977 978 'digests': tuple(sorted(util.DIGESTS.keys())),
978 979 'b2x:remote-changegroup': ('http', 'https'),
979 980 }
980 981
981 982 def getrepocaps(repo, allowpushback=False):
982 983 """return the bundle2 capabilities for a given repo
983 984
984 985 Exists to allow extensions (like evolution) to mutate the capabilities.
985 986 """
986 987 caps = capabilities.copy()
987 988 caps['b2x:changegroup'] = tuple(sorted(changegroup.packermap.keys()))
988 989 if obsolete.isenabled(repo, obsolete.exchangeopt):
989 990 supportedformat = tuple('V%i' % v for v in obsolete.formats)
990 991 caps['b2x:obsmarkers'] = supportedformat
991 992 if allowpushback:
992 993 caps['b2x:pushback'] = ()
993 994 return caps
994 995
995 996 def bundle2caps(remote):
996 997 """return the bundle capabilities of a peer as dict"""
997 998 raw = remote.capable('bundle2-exp')
998 999 if not raw and raw != '':
999 1000 return {}
1000 1001 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
1001 1002 return decodecaps(capsblob)
1002 1003
1003 1004 def obsmarkersversion(caps):
1004 1005 """extract the list of supported obsmarkers versions from a bundle2caps dict
1005 1006 """
1006 1007 obscaps = caps.get('b2x:obsmarkers', ())
1007 1008 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1008 1009
1009 1010 @parthandler('b2x:changegroup', ('version',))
1010 1011 def handlechangegroup(op, inpart):
1011 1012 """apply a changegroup part on the repo
1012 1013
1013 1014 This is a very early implementation that will massive rework before being
1014 1015 inflicted to any end-user.
1015 1016 """
1016 1017 # Make sure we trigger a transaction creation
1017 1018 #
1018 1019 # The addchangegroup function will get a transaction object by itself, but
1019 1020 # we need to make sure we trigger the creation of a transaction object used
1020 1021 # for the whole processing scope.
1021 1022 op.gettransaction()
1022 1023 unpackerversion = inpart.params.get('version', '01')
1023 1024 # We should raise an appropriate exception here
1024 1025 unpacker = changegroup.packermap[unpackerversion][1]
1025 1026 cg = unpacker(inpart, 'UN')
1026 1027 # the source and url passed here are overwritten by the one contained in
1027 1028 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1028 1029 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1029 1030 op.records.add('changegroup', {'return': ret})
1030 1031 if op.reply is not None:
1031 1032 # This is definitely not the final form of this
1032 1033 # return. But one need to start somewhere.
1033 1034 part = op.reply.newpart('b2x:reply:changegroup', mandatory=False)
1034 1035 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1035 1036 part.addparam('return', '%i' % ret, mandatory=False)
1036 1037 assert not inpart.read()
1037 1038
1038 1039 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1039 1040 ['digest:%s' % k for k in util.DIGESTS.keys()])
1040 1041 @parthandler('b2x:remote-changegroup', _remotechangegroupparams)
1041 1042 def handleremotechangegroup(op, inpart):
1042 1043 """apply a bundle10 on the repo, given an url and validation information
1043 1044
1044 1045 All the information about the remote bundle to import are given as
1045 1046 parameters. The parameters include:
1046 1047 - url: the url to the bundle10.
1047 1048 - size: the bundle10 file size. It is used to validate what was
1048 1049 retrieved by the client matches the server knowledge about the bundle.
1049 1050 - digests: a space separated list of the digest types provided as
1050 1051 parameters.
1051 1052 - digest:<digest-type>: the hexadecimal representation of the digest with
1052 1053 that name. Like the size, it is used to validate what was retrieved by
1053 1054 the client matches what the server knows about the bundle.
1054 1055
1055 1056 When multiple digest types are given, all of them are checked.
1056 1057 """
1057 1058 try:
1058 1059 raw_url = inpart.params['url']
1059 1060 except KeyError:
1060 1061 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1061 1062 parsed_url = util.url(raw_url)
1062 1063 if parsed_url.scheme not in capabilities['b2x:remote-changegroup']:
1063 1064 raise util.Abort(_('remote-changegroup does not support %s urls') %
1064 1065 parsed_url.scheme)
1065 1066
1066 1067 try:
1067 1068 size = int(inpart.params['size'])
1068 1069 except ValueError:
1069 1070 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1070 1071 % 'size')
1071 1072 except KeyError:
1072 1073 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1073 1074
1074 1075 digests = {}
1075 1076 for typ in inpart.params.get('digests', '').split():
1076 1077 param = 'digest:%s' % typ
1077 1078 try:
1078 1079 value = inpart.params[param]
1079 1080 except KeyError:
1080 1081 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1081 1082 param)
1082 1083 digests[typ] = value
1083 1084
1084 1085 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1085 1086
1086 1087 # Make sure we trigger a transaction creation
1087 1088 #
1088 1089 # The addchangegroup function will get a transaction object by itself, but
1089 1090 # we need to make sure we trigger the creation of a transaction object used
1090 1091 # for the whole processing scope.
1091 1092 op.gettransaction()
1092 1093 import exchange
1093 1094 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1094 1095 if not isinstance(cg, changegroup.cg1unpacker):
1095 1096 raise util.Abort(_('%s: not a bundle version 1.0') %
1096 1097 util.hidepassword(raw_url))
1097 1098 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1098 1099 op.records.add('changegroup', {'return': ret})
1099 1100 if op.reply is not None:
1100 1101 # This is definitely not the final form of this
1101 1102 # return. But one need to start somewhere.
1102 1103 part = op.reply.newpart('b2x:reply:changegroup')
1103 1104 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1104 1105 part.addparam('return', '%i' % ret, mandatory=False)
1105 1106 try:
1106 1107 real_part.validate()
1107 1108 except util.Abort, e:
1108 1109 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1109 1110 (util.hidepassword(raw_url), str(e)))
1110 1111 assert not inpart.read()
1111 1112
1112 1113 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
1113 1114 def handlereplychangegroup(op, inpart):
1114 1115 ret = int(inpart.params['return'])
1115 1116 replyto = int(inpart.params['in-reply-to'])
1116 1117 op.records.add('changegroup', {'return': ret}, replyto)
1117 1118
1118 1119 @parthandler('b2x:check:heads')
1119 1120 def handlecheckheads(op, inpart):
1120 1121 """check that head of the repo did not change
1121 1122
1122 1123 This is used to detect a push race when using unbundle.
1123 1124 This replaces the "heads" argument of unbundle."""
1124 1125 h = inpart.read(20)
1125 1126 heads = []
1126 1127 while len(h) == 20:
1127 1128 heads.append(h)
1128 1129 h = inpart.read(20)
1129 1130 assert not h
1130 1131 if heads != op.repo.heads():
1131 1132 raise error.PushRaced('repository changed while pushing - '
1132 1133 'please try again')
1133 1134
1134 1135 @parthandler('b2x:output')
1135 1136 def handleoutput(op, inpart):
1136 1137 """forward output captured on the server to the client"""
1137 1138 for line in inpart.read().splitlines():
1138 1139 op.ui.write(('remote: %s\n' % line))
1139 1140
1140 1141 @parthandler('b2x:replycaps')
1141 1142 def handlereplycaps(op, inpart):
1142 1143 """Notify that a reply bundle should be created
1143 1144
1144 1145 The payload contains the capabilities information for the reply"""
1145 1146 caps = decodecaps(inpart.read())
1146 1147 if op.reply is None:
1147 1148 op.reply = bundle20(op.ui, caps)
1148 1149
1149 1150 @parthandler('b2x:error:abort', ('message', 'hint'))
1150 1151 def handlereplycaps(op, inpart):
1151 1152 """Used to transmit abort error over the wire"""
1152 1153 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1153 1154
1154 1155 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
1155 1156 def handlereplycaps(op, inpart):
1156 1157 """Used to transmit unknown content error over the wire"""
1157 1158 kwargs = {}
1158 1159 parttype = inpart.params.get('parttype')
1159 1160 if parttype is not None:
1160 1161 kwargs['parttype'] = parttype
1161 1162 params = inpart.params.get('params')
1162 1163 if params is not None:
1163 1164 kwargs['params'] = params.split('\0')
1164 1165
1165 1166 raise error.UnsupportedPartError(**kwargs)
1166 1167
1167 1168 @parthandler('b2x:error:pushraced', ('message',))
1168 1169 def handlereplycaps(op, inpart):
1169 1170 """Used to transmit push race error over the wire"""
1170 1171 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1171 1172
1172 1173 @parthandler('b2x:listkeys', ('namespace',))
1173 1174 def handlelistkeys(op, inpart):
1174 1175 """retrieve pushkey namespace content stored in a bundle2"""
1175 1176 namespace = inpart.params['namespace']
1176 1177 r = pushkey.decodekeys(inpart.read())
1177 1178 op.records.add('listkeys', (namespace, r))
1178 1179
1179 1180 @parthandler('b2x:pushkey', ('namespace', 'key', 'old', 'new'))
1180 1181 def handlepushkey(op, inpart):
1181 1182 """process a pushkey request"""
1182 1183 dec = pushkey.decode
1183 1184 namespace = dec(inpart.params['namespace'])
1184 1185 key = dec(inpart.params['key'])
1185 1186 old = dec(inpart.params['old'])
1186 1187 new = dec(inpart.params['new'])
1187 1188 ret = op.repo.pushkey(namespace, key, old, new)
1188 1189 record = {'namespace': namespace,
1189 1190 'key': key,
1190 1191 'old': old,
1191 1192 'new': new}
1192 1193 op.records.add('pushkey', record)
1193 1194 if op.reply is not None:
1194 1195 rpart = op.reply.newpart('b2x:reply:pushkey')
1195 1196 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1196 1197 rpart.addparam('return', '%i' % ret, mandatory=False)
1197 1198
1198 1199 @parthandler('b2x:reply:pushkey', ('return', 'in-reply-to'))
1199 1200 def handlepushkeyreply(op, inpart):
1200 1201 """retrieve the result of a pushkey request"""
1201 1202 ret = int(inpart.params['return'])
1202 1203 partid = int(inpart.params['in-reply-to'])
1203 1204 op.records.add('pushkey', {'return': ret}, partid)
1204 1205
1205 1206 @parthandler('b2x:obsmarkers')
1206 1207 def handleobsmarker(op, inpart):
1207 1208 """add a stream of obsmarkers to the repo"""
1208 1209 tr = op.gettransaction()
1209 1210 new = op.repo.obsstore.mergemarkers(tr, inpart.read())
1210 1211 if new:
1211 1212 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1212 1213 op.records.add('obsmarkers', {'new': new})
1213 1214 if op.reply is not None:
1214 1215 rpart = op.reply.newpart('b2x:reply:obsmarkers')
1215 1216 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1216 1217 rpart.addparam('new', '%i' % new, mandatory=False)
1217 1218
1218 1219
1219 1220 @parthandler('b2x:reply:obsmarkers', ('new', 'in-reply-to'))
1220 1221 def handlepushkeyreply(op, inpart):
1221 1222 """retrieve the result of a pushkey request"""
1222 1223 ret = int(inpart.params['new'])
1223 1224 partid = int(inpart.params['in-reply-to'])
1224 1225 op.records.add('obsmarkers', {'new': ret}, partid)
General Comments 0
You need to be logged in to leave comments. Login now