##// END OF EJS Templates
bundle2: rename error exception class for unsupported feature...
Pierre-Yves David -
r26393:cff70549 default
parent child Browse files
Show More
@@ -1,1422 +1,1422
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import
149 149
150 150 import errno
151 151 import re
152 152 import string
153 153 import struct
154 154 import sys
155 155 import urllib
156 156
157 157 from .i18n import _
158 158 from . import (
159 159 changegroup,
160 160 error,
161 161 obsolete,
162 162 pushkey,
163 163 tags,
164 164 url,
165 165 util,
166 166 )
167 167
168 168 _pack = struct.pack
169 169 _unpack = struct.unpack
170 170
171 171 _fstreamparamsize = '>i'
172 172 _fpartheadersize = '>i'
173 173 _fparttypesize = '>B'
174 174 _fpartid = '>I'
175 175 _fpayloadsize = '>i'
176 176 _fpartparamcount = '>BB'
177 177
178 178 preferedchunksize = 4096
179 179
180 180 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
181 181
182 182 def outdebug(ui, message):
183 183 """debug regarding output stream (bundling)"""
184 184 if ui.configbool('devel', 'bundle2.debug', False):
185 185 ui.debug('bundle2-output: %s\n' % message)
186 186
187 187 def indebug(ui, message):
188 188 """debug on input stream (unbundling)"""
189 189 if ui.configbool('devel', 'bundle2.debug', False):
190 190 ui.debug('bundle2-input: %s\n' % message)
191 191
192 192 def validateparttype(parttype):
193 193 """raise ValueError if a parttype contains invalid character"""
194 194 if _parttypeforbidden.search(parttype):
195 195 raise ValueError(parttype)
196 196
197 197 def _makefpartparamsizes(nbparams):
198 198 """return a struct format to read part parameter sizes
199 199
200 200 The number parameters is variable so we need to build that format
201 201 dynamically.
202 202 """
203 203 return '>'+('BB'*nbparams)
204 204
205 205 parthandlermapping = {}
206 206
207 207 def parthandler(parttype, params=()):
208 208 """decorator that register a function as a bundle2 part handler
209 209
210 210 eg::
211 211
212 212 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
213 213 def myparttypehandler(...):
214 214 '''process a part of type "my part".'''
215 215 ...
216 216 """
217 217 validateparttype(parttype)
218 218 def _decorator(func):
219 219 lparttype = parttype.lower() # enforce lower case matching.
220 220 assert lparttype not in parthandlermapping
221 221 parthandlermapping[lparttype] = func
222 222 func.params = frozenset(params)
223 223 return func
224 224 return _decorator
225 225
226 226 class unbundlerecords(object):
227 227 """keep record of what happens during and unbundle
228 228
229 229 New records are added using `records.add('cat', obj)`. Where 'cat' is a
230 230 category of record and obj is an arbitrary object.
231 231
232 232 `records['cat']` will return all entries of this category 'cat'.
233 233
234 234 Iterating on the object itself will yield `('category', obj)` tuples
235 235 for all entries.
236 236
237 237 All iterations happens in chronological order.
238 238 """
239 239
240 240 def __init__(self):
241 241 self._categories = {}
242 242 self._sequences = []
243 243 self._replies = {}
244 244
245 245 def add(self, category, entry, inreplyto=None):
246 246 """add a new record of a given category.
247 247
248 248 The entry can then be retrieved in the list returned by
249 249 self['category']."""
250 250 self._categories.setdefault(category, []).append(entry)
251 251 self._sequences.append((category, entry))
252 252 if inreplyto is not None:
253 253 self.getreplies(inreplyto).add(category, entry)
254 254
255 255 def getreplies(self, partid):
256 256 """get the records that are replies to a specific part"""
257 257 return self._replies.setdefault(partid, unbundlerecords())
258 258
259 259 def __getitem__(self, cat):
260 260 return tuple(self._categories.get(cat, ()))
261 261
262 262 def __iter__(self):
263 263 return iter(self._sequences)
264 264
265 265 def __len__(self):
266 266 return len(self._sequences)
267 267
268 268 def __nonzero__(self):
269 269 return bool(self._sequences)
270 270
271 271 class bundleoperation(object):
272 272 """an object that represents a single bundling process
273 273
274 274 Its purpose is to carry unbundle-related objects and states.
275 275
276 276 A new object should be created at the beginning of each bundle processing.
277 277 The object is to be returned by the processing function.
278 278
279 279 The object has very little content now it will ultimately contain:
280 280 * an access to the repo the bundle is applied to,
281 281 * a ui object,
282 282 * a way to retrieve a transaction to add changes to the repo,
283 283 * a way to record the result of processing each part,
284 284 * a way to construct a bundle response when applicable.
285 285 """
286 286
287 287 def __init__(self, repo, transactiongetter, captureoutput=True):
288 288 self.repo = repo
289 289 self.ui = repo.ui
290 290 self.records = unbundlerecords()
291 291 self.gettransaction = transactiongetter
292 292 self.reply = None
293 293 self.captureoutput = captureoutput
294 294
295 295 class TransactionUnavailable(RuntimeError):
296 296 pass
297 297
298 298 def _notransaction():
299 299 """default method to get a transaction while processing a bundle
300 300
301 301 Raise an exception to highlight the fact that no transaction was expected
302 302 to be created"""
303 303 raise TransactionUnavailable()
304 304
305 305 def processbundle(repo, unbundler, transactiongetter=None, op=None):
306 306 """This function process a bundle, apply effect to/from a repo
307 307
308 308 It iterates over each part then searches for and uses the proper handling
309 309 code to process the part. Parts are processed in order.
310 310
311 311 This is very early version of this function that will be strongly reworked
312 312 before final usage.
313 313
314 314 Unknown Mandatory part will abort the process.
315 315
316 316 It is temporarily possible to provide a prebuilt bundleoperation to the
317 317 function. This is used to ensure output is properly propagated in case of
318 318 an error during the unbundling. This output capturing part will likely be
319 319 reworked and this ability will probably go away in the process.
320 320 """
321 321 if op is None:
322 322 if transactiongetter is None:
323 323 transactiongetter = _notransaction
324 324 op = bundleoperation(repo, transactiongetter)
325 325 # todo:
326 326 # - replace this is a init function soon.
327 327 # - exception catching
328 328 unbundler.params
329 329 if repo.ui.debugflag:
330 330 msg = ['bundle2-input-bundle:']
331 331 if unbundler.params:
332 332 msg.append(' %i params')
333 333 if op.gettransaction is None:
334 334 msg.append(' no-transaction')
335 335 else:
336 336 msg.append(' with-transaction')
337 337 msg.append('\n')
338 338 repo.ui.debug(''.join(msg))
339 339 iterparts = enumerate(unbundler.iterparts())
340 340 part = None
341 341 nbpart = 0
342 342 try:
343 343 for nbpart, part in iterparts:
344 344 _processpart(op, part)
345 345 except BaseException as exc:
346 346 for nbpart, part in iterparts:
347 347 # consume the bundle content
348 348 part.seek(0, 2)
349 349 # Small hack to let caller code distinguish exceptions from bundle2
350 350 # processing from processing the old format. This is mostly
351 351 # needed to handle different return codes to unbundle according to the
352 352 # type of bundle. We should probably clean up or drop this return code
353 353 # craziness in a future version.
354 354 exc.duringunbundle2 = True
355 355 salvaged = []
356 356 replycaps = None
357 357 if op.reply is not None:
358 358 salvaged = op.reply.salvageoutput()
359 359 replycaps = op.reply.capabilities
360 360 exc._replycaps = replycaps
361 361 exc._bundle2salvagedoutput = salvaged
362 362 raise
363 363 finally:
364 364 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
365 365
366 366 return op
367 367
368 368 def _processpart(op, part):
369 369 """process a single part from a bundle
370 370
371 371 The part is guaranteed to have been fully consumed when the function exits
372 372 (even if an exception is raised)."""
373 373 status = 'unknown' # used by debug output
374 374 try:
375 375 try:
376 376 handler = parthandlermapping.get(part.type)
377 377 if handler is None:
378 378 status = 'unsupported-type'
379 raise error.UnsupportedPartError(parttype=part.type)
379 raise error.BundleUnknownFeatureError(parttype=part.type)
380 380 indebug(op.ui, 'found a handler for part %r' % part.type)
381 381 unknownparams = part.mandatorykeys - handler.params
382 382 if unknownparams:
383 383 unknownparams = list(unknownparams)
384 384 unknownparams.sort()
385 385 status = 'unsupported-params (%s)' % unknownparams
386 raise error.UnsupportedPartError(parttype=part.type,
387 params=unknownparams)
386 raise error.BundleUnknownFeatureError(parttype=part.type,
387 params=unknownparams)
388 388 status = 'supported'
389 except error.UnsupportedPartError as exc:
389 except error.BundleUnknownFeatureError as exc:
390 390 if part.mandatory: # mandatory parts
391 391 raise
392 392 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
393 393 return # skip to part processing
394 394 finally:
395 395 if op.ui.debugflag:
396 396 msg = ['bundle2-input-part: "%s"' % part.type]
397 397 if not part.mandatory:
398 398 msg.append(' (advisory)')
399 399 nbmp = len(part.mandatorykeys)
400 400 nbap = len(part.params) - nbmp
401 401 if nbmp or nbap:
402 402 msg.append(' (params:')
403 403 if nbmp:
404 404 msg.append(' %i mandatory' % nbmp)
405 405 if nbap:
406 406 msg.append(' %i advisory' % nbmp)
407 407 msg.append(')')
408 408 msg.append(' %s\n' % status)
409 409 op.ui.debug(''.join(msg))
410 410
411 411 # handler is called outside the above try block so that we don't
412 412 # risk catching KeyErrors from anything other than the
413 413 # parthandlermapping lookup (any KeyError raised by handler()
414 414 # itself represents a defect of a different variety).
415 415 output = None
416 416 if op.captureoutput and op.reply is not None:
417 417 op.ui.pushbuffer(error=True, subproc=True)
418 418 output = ''
419 419 try:
420 420 handler(op, part)
421 421 finally:
422 422 if output is not None:
423 423 output = op.ui.popbuffer()
424 424 if output:
425 425 outpart = op.reply.newpart('output', data=output,
426 426 mandatory=False)
427 427 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
428 428 finally:
429 429 # consume the part content to not corrupt the stream.
430 430 part.seek(0, 2)
431 431
432 432
433 433 def decodecaps(blob):
434 434 """decode a bundle2 caps bytes blob into a dictionary
435 435
436 436 The blob is a list of capabilities (one per line)
437 437 Capabilities may have values using a line of the form::
438 438
439 439 capability=value1,value2,value3
440 440
441 441 The values are always a list."""
442 442 caps = {}
443 443 for line in blob.splitlines():
444 444 if not line:
445 445 continue
446 446 if '=' not in line:
447 447 key, vals = line, ()
448 448 else:
449 449 key, vals = line.split('=', 1)
450 450 vals = vals.split(',')
451 451 key = urllib.unquote(key)
452 452 vals = [urllib.unquote(v) for v in vals]
453 453 caps[key] = vals
454 454 return caps
455 455
456 456 def encodecaps(caps):
457 457 """encode a bundle2 caps dictionary into a bytes blob"""
458 458 chunks = []
459 459 for ca in sorted(caps):
460 460 vals = caps[ca]
461 461 ca = urllib.quote(ca)
462 462 vals = [urllib.quote(v) for v in vals]
463 463 if vals:
464 464 ca = "%s=%s" % (ca, ','.join(vals))
465 465 chunks.append(ca)
466 466 return '\n'.join(chunks)
467 467
468 468 class bundle20(object):
469 469 """represent an outgoing bundle2 container
470 470
471 471 Use the `addparam` method to add stream level parameter. and `newpart` to
472 472 populate it. Then call `getchunks` to retrieve all the binary chunks of
473 473 data that compose the bundle2 container."""
474 474
475 475 _magicstring = 'HG20'
476 476
477 477 def __init__(self, ui, capabilities=()):
478 478 self.ui = ui
479 479 self._params = []
480 480 self._parts = []
481 481 self.capabilities = dict(capabilities)
482 482
483 483 @property
484 484 def nbparts(self):
485 485 """total number of parts added to the bundler"""
486 486 return len(self._parts)
487 487
488 488 # methods used to defines the bundle2 content
489 489 def addparam(self, name, value=None):
490 490 """add a stream level parameter"""
491 491 if not name:
492 492 raise ValueError('empty parameter name')
493 493 if name[0] not in string.letters:
494 494 raise ValueError('non letter first character: %r' % name)
495 495 self._params.append((name, value))
496 496
497 497 def addpart(self, part):
498 498 """add a new part to the bundle2 container
499 499
500 500 Parts contains the actual applicative payload."""
501 501 assert part.id is None
502 502 part.id = len(self._parts) # very cheap counter
503 503 self._parts.append(part)
504 504
505 505 def newpart(self, typeid, *args, **kwargs):
506 506 """create a new part and add it to the containers
507 507
508 508 As the part is directly added to the containers. For now, this means
509 509 that any failure to properly initialize the part after calling
510 510 ``newpart`` should result in a failure of the whole bundling process.
511 511
512 512 You can still fall back to manually create and add if you need better
513 513 control."""
514 514 part = bundlepart(typeid, *args, **kwargs)
515 515 self.addpart(part)
516 516 return part
517 517
518 518 # methods used to generate the bundle2 stream
519 519 def getchunks(self):
520 520 if self.ui.debugflag:
521 521 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
522 522 if self._params:
523 523 msg.append(' (%i params)' % len(self._params))
524 524 msg.append(' %i parts total\n' % len(self._parts))
525 525 self.ui.debug(''.join(msg))
526 526 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
527 527 yield self._magicstring
528 528 param = self._paramchunk()
529 529 outdebug(self.ui, 'bundle parameter: %s' % param)
530 530 yield _pack(_fstreamparamsize, len(param))
531 531 if param:
532 532 yield param
533 533
534 534 outdebug(self.ui, 'start of parts')
535 535 for part in self._parts:
536 536 outdebug(self.ui, 'bundle part: "%s"' % part.type)
537 537 for chunk in part.getchunks(ui=self.ui):
538 538 yield chunk
539 539 outdebug(self.ui, 'end of bundle')
540 540 yield _pack(_fpartheadersize, 0)
541 541
542 542 def _paramchunk(self):
543 543 """return a encoded version of all stream parameters"""
544 544 blocks = []
545 545 for par, value in self._params:
546 546 par = urllib.quote(par)
547 547 if value is not None:
548 548 value = urllib.quote(value)
549 549 par = '%s=%s' % (par, value)
550 550 blocks.append(par)
551 551 return ' '.join(blocks)
552 552
553 553 def salvageoutput(self):
554 554 """return a list with a copy of all output parts in the bundle
555 555
556 556 This is meant to be used during error handling to make sure we preserve
557 557 server output"""
558 558 salvaged = []
559 559 for part in self._parts:
560 560 if part.type.startswith('output'):
561 561 salvaged.append(part.copy())
562 562 return salvaged
563 563
564 564
565 565 class unpackermixin(object):
566 566 """A mixin to extract bytes and struct data from a stream"""
567 567
568 568 def __init__(self, fp):
569 569 self._fp = fp
570 570 self._seekable = (util.safehasattr(fp, 'seek') and
571 571 util.safehasattr(fp, 'tell'))
572 572
573 573 def _unpack(self, format):
574 574 """unpack this struct format from the stream"""
575 575 data = self._readexact(struct.calcsize(format))
576 576 return _unpack(format, data)
577 577
578 578 def _readexact(self, size):
579 579 """read exactly <size> bytes from the stream"""
580 580 return changegroup.readexactly(self._fp, size)
581 581
582 582 def seek(self, offset, whence=0):
583 583 """move the underlying file pointer"""
584 584 if self._seekable:
585 585 return self._fp.seek(offset, whence)
586 586 else:
587 587 raise NotImplementedError(_('File pointer is not seekable'))
588 588
589 589 def tell(self):
590 590 """return the file offset, or None if file is not seekable"""
591 591 if self._seekable:
592 592 try:
593 593 return self._fp.tell()
594 594 except IOError as e:
595 595 if e.errno == errno.ESPIPE:
596 596 self._seekable = False
597 597 else:
598 598 raise
599 599 return None
600 600
601 601 def close(self):
602 602 """close underlying file"""
603 603 if util.safehasattr(self._fp, 'close'):
604 604 return self._fp.close()
605 605
606 606 def getunbundler(ui, fp, magicstring=None):
607 607 """return a valid unbundler object for a given magicstring"""
608 608 if magicstring is None:
609 609 magicstring = changegroup.readexactly(fp, 4)
610 610 magic, version = magicstring[0:2], magicstring[2:4]
611 611 if magic != 'HG':
612 612 raise util.Abort(_('not a Mercurial bundle'))
613 613 unbundlerclass = formatmap.get(version)
614 614 if unbundlerclass is None:
615 615 raise util.Abort(_('unknown bundle version %s') % version)
616 616 unbundler = unbundlerclass(ui, fp)
617 617 indebug(ui, 'start processing of %s stream' % magicstring)
618 618 return unbundler
619 619
620 620 class unbundle20(unpackermixin):
621 621 """interpret a bundle2 stream
622 622
623 623 This class is fed with a binary stream and yields parts through its
624 624 `iterparts` methods."""
625 625
626 626 def __init__(self, ui, fp):
627 627 """If header is specified, we do not read it out of the stream."""
628 628 self.ui = ui
629 629 super(unbundle20, self).__init__(fp)
630 630
631 631 @util.propertycache
632 632 def params(self):
633 633 """dictionary of stream level parameters"""
634 634 indebug(self.ui, 'reading bundle2 stream parameters')
635 635 params = {}
636 636 paramssize = self._unpack(_fstreamparamsize)[0]
637 637 if paramssize < 0:
638 638 raise error.BundleValueError('negative bundle param size: %i'
639 639 % paramssize)
640 640 if paramssize:
641 641 for p in self._readexact(paramssize).split(' '):
642 642 p = p.split('=', 1)
643 643 p = [urllib.unquote(i) for i in p]
644 644 if len(p) < 2:
645 645 p.append(None)
646 646 self._processparam(*p)
647 647 params[p[0]] = p[1]
648 648 return params
649 649
650 650 def _processparam(self, name, value):
651 651 """process a parameter, applying its effect if needed
652 652
653 653 Parameter starting with a lower case letter are advisory and will be
654 654 ignored when unknown. Those starting with an upper case letter are
655 655 mandatory and will this function will raise a KeyError when unknown.
656 656
657 657 Note: no option are currently supported. Any input will be either
658 658 ignored or failing.
659 659 """
660 660 if not name:
661 661 raise ValueError('empty parameter name')
662 662 if name[0] not in string.letters:
663 663 raise ValueError('non letter first character: %r' % name)
664 664 # Some logic will be later added here to try to process the option for
665 665 # a dict of known parameter.
666 666 if name[0].islower():
667 667 indebug(self.ui, "ignoring unknown parameter %r" % name)
668 668 else:
669 raise error.UnsupportedPartError(params=(name,))
669 raise error.BundleUnknownFeatureError(params=(name,))
670 670
671 671
672 672 def iterparts(self):
673 673 """yield all parts contained in the stream"""
674 674 # make sure param have been loaded
675 675 self.params
676 676 indebug(self.ui, 'start extraction of bundle2 parts')
677 677 headerblock = self._readpartheader()
678 678 while headerblock is not None:
679 679 part = unbundlepart(self.ui, headerblock, self._fp)
680 680 yield part
681 681 part.seek(0, 2)
682 682 headerblock = self._readpartheader()
683 683 indebug(self.ui, 'end of bundle2 stream')
684 684
685 685 def _readpartheader(self):
686 686 """reads a part header size and return the bytes blob
687 687
688 688 returns None if empty"""
689 689 headersize = self._unpack(_fpartheadersize)[0]
690 690 if headersize < 0:
691 691 raise error.BundleValueError('negative part header size: %i'
692 692 % headersize)
693 693 indebug(self.ui, 'part header size: %i' % headersize)
694 694 if headersize:
695 695 return self._readexact(headersize)
696 696 return None
697 697
698 698 def compressed(self):
699 699 return False
700 700
701 701 formatmap = {'20': unbundle20}
702 702
703 703 class bundlepart(object):
704 704 """A bundle2 part contains application level payload
705 705
706 706 The part `type` is used to route the part to the application level
707 707 handler.
708 708
709 709 The part payload is contained in ``part.data``. It could be raw bytes or a
710 710 generator of byte chunks.
711 711
712 712 You can add parameters to the part using the ``addparam`` method.
713 713 Parameters can be either mandatory (default) or advisory. Remote side
714 714 should be able to safely ignore the advisory ones.
715 715
716 716 Both data and parameters cannot be modified after the generation has begun.
717 717 """
718 718
719 719 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
720 720 data='', mandatory=True):
721 721 validateparttype(parttype)
722 722 self.id = None
723 723 self.type = parttype
724 724 self._data = data
725 725 self._mandatoryparams = list(mandatoryparams)
726 726 self._advisoryparams = list(advisoryparams)
727 727 # checking for duplicated entries
728 728 self._seenparams = set()
729 729 for pname, __ in self._mandatoryparams + self._advisoryparams:
730 730 if pname in self._seenparams:
731 731 raise RuntimeError('duplicated params: %s' % pname)
732 732 self._seenparams.add(pname)
733 733 # status of the part's generation:
734 734 # - None: not started,
735 735 # - False: currently generated,
736 736 # - True: generation done.
737 737 self._generated = None
738 738 self.mandatory = mandatory
739 739
740 740 def copy(self):
741 741 """return a copy of the part
742 742
743 743 The new part have the very same content but no partid assigned yet.
744 744 Parts with generated data cannot be copied."""
745 745 assert not util.safehasattr(self.data, 'next')
746 746 return self.__class__(self.type, self._mandatoryparams,
747 747 self._advisoryparams, self._data, self.mandatory)
748 748
749 749 # methods used to defines the part content
750 750 def __setdata(self, data):
751 751 if self._generated is not None:
752 752 raise error.ReadOnlyPartError('part is being generated')
753 753 self._data = data
754 754 def __getdata(self):
755 755 return self._data
756 756 data = property(__getdata, __setdata)
757 757
758 758 @property
759 759 def mandatoryparams(self):
760 760 # make it an immutable tuple to force people through ``addparam``
761 761 return tuple(self._mandatoryparams)
762 762
763 763 @property
764 764 def advisoryparams(self):
765 765 # make it an immutable tuple to force people through ``addparam``
766 766 return tuple(self._advisoryparams)
767 767
768 768 def addparam(self, name, value='', mandatory=True):
769 769 if self._generated is not None:
770 770 raise error.ReadOnlyPartError('part is being generated')
771 771 if name in self._seenparams:
772 772 raise ValueError('duplicated params: %s' % name)
773 773 self._seenparams.add(name)
774 774 params = self._advisoryparams
775 775 if mandatory:
776 776 params = self._mandatoryparams
777 777 params.append((name, value))
778 778
779 779 # methods used to generates the bundle2 stream
780 780 def getchunks(self, ui):
781 781 if self._generated is not None:
782 782 raise RuntimeError('part can only be consumed once')
783 783 self._generated = False
784 784
785 785 if ui.debugflag:
786 786 msg = ['bundle2-output-part: "%s"' % self.type]
787 787 if not self.mandatory:
788 788 msg.append(' (advisory)')
789 789 nbmp = len(self.mandatoryparams)
790 790 nbap = len(self.advisoryparams)
791 791 if nbmp or nbap:
792 792 msg.append(' (params:')
793 793 if nbmp:
794 794 msg.append(' %i mandatory' % nbmp)
795 795 if nbap:
796 796 msg.append(' %i advisory' % nbmp)
797 797 msg.append(')')
798 798 if not self.data:
799 799 msg.append(' empty payload')
800 800 elif util.safehasattr(self.data, 'next'):
801 801 msg.append(' streamed payload')
802 802 else:
803 803 msg.append(' %i bytes payload' % len(self.data))
804 804 msg.append('\n')
805 805 ui.debug(''.join(msg))
806 806
807 807 #### header
808 808 if self.mandatory:
809 809 parttype = self.type.upper()
810 810 else:
811 811 parttype = self.type.lower()
812 812 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
813 813 ## parttype
814 814 header = [_pack(_fparttypesize, len(parttype)),
815 815 parttype, _pack(_fpartid, self.id),
816 816 ]
817 817 ## parameters
818 818 # count
819 819 manpar = self.mandatoryparams
820 820 advpar = self.advisoryparams
821 821 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
822 822 # size
823 823 parsizes = []
824 824 for key, value in manpar:
825 825 parsizes.append(len(key))
826 826 parsizes.append(len(value))
827 827 for key, value in advpar:
828 828 parsizes.append(len(key))
829 829 parsizes.append(len(value))
830 830 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
831 831 header.append(paramsizes)
832 832 # key, value
833 833 for key, value in manpar:
834 834 header.append(key)
835 835 header.append(value)
836 836 for key, value in advpar:
837 837 header.append(key)
838 838 header.append(value)
839 839 ## finalize header
840 840 headerchunk = ''.join(header)
841 841 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
842 842 yield _pack(_fpartheadersize, len(headerchunk))
843 843 yield headerchunk
844 844 ## payload
845 845 try:
846 846 for chunk in self._payloadchunks():
847 847 outdebug(ui, 'payload chunk size: %i' % len(chunk))
848 848 yield _pack(_fpayloadsize, len(chunk))
849 849 yield chunk
850 850 except GeneratorExit:
851 851 # GeneratorExit means that nobody is listening for our
852 852 # results anyway, so just bail quickly rather than trying
853 853 # to produce an error part.
854 854 ui.debug('bundle2-generatorexit\n')
855 855 raise
856 856 except BaseException as exc:
857 857 # backup exception data for later
858 858 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
859 859 % exc)
860 860 exc_info = sys.exc_info()
861 861 msg = 'unexpected error: %s' % exc
862 862 interpart = bundlepart('error:abort', [('message', msg)],
863 863 mandatory=False)
864 864 interpart.id = 0
865 865 yield _pack(_fpayloadsize, -1)
866 866 for chunk in interpart.getchunks(ui=ui):
867 867 yield chunk
868 868 outdebug(ui, 'closing payload chunk')
869 869 # abort current part payload
870 870 yield _pack(_fpayloadsize, 0)
871 871 raise exc_info[0], exc_info[1], exc_info[2]
872 872 # end of payload
873 873 outdebug(ui, 'closing payload chunk')
874 874 yield _pack(_fpayloadsize, 0)
875 875 self._generated = True
876 876
877 877 def _payloadchunks(self):
878 878 """yield chunks of a the part payload
879 879
880 880 Exists to handle the different methods to provide data to a part."""
881 881 # we only support fixed size data now.
882 882 # This will be improved in the future.
883 883 if util.safehasattr(self.data, 'next'):
884 884 buff = util.chunkbuffer(self.data)
885 885 chunk = buff.read(preferedchunksize)
886 886 while chunk:
887 887 yield chunk
888 888 chunk = buff.read(preferedchunksize)
889 889 elif len(self.data):
890 890 yield self.data
891 891
892 892
893 893 flaginterrupt = -1
894 894
895 895 class interrupthandler(unpackermixin):
896 896 """read one part and process it with restricted capability
897 897
898 898 This allows to transmit exception raised on the producer size during part
899 899 iteration while the consumer is reading a part.
900 900
901 901 Part processed in this manner only have access to a ui object,"""
902 902
903 903 def __init__(self, ui, fp):
904 904 super(interrupthandler, self).__init__(fp)
905 905 self.ui = ui
906 906
907 907 def _readpartheader(self):
908 908 """reads a part header size and return the bytes blob
909 909
910 910 returns None if empty"""
911 911 headersize = self._unpack(_fpartheadersize)[0]
912 912 if headersize < 0:
913 913 raise error.BundleValueError('negative part header size: %i'
914 914 % headersize)
915 915 indebug(self.ui, 'part header size: %i\n' % headersize)
916 916 if headersize:
917 917 return self._readexact(headersize)
918 918 return None
919 919
920 920 def __call__(self):
921 921
922 922 self.ui.debug('bundle2-input-stream-interrupt:'
923 923 ' opening out of band context\n')
924 924 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
925 925 headerblock = self._readpartheader()
926 926 if headerblock is None:
927 927 indebug(self.ui, 'no part found during interruption.')
928 928 return
929 929 part = unbundlepart(self.ui, headerblock, self._fp)
930 930 op = interruptoperation(self.ui)
931 931 _processpart(op, part)
932 932 self.ui.debug('bundle2-input-stream-interrupt:'
933 933 ' closing out of band context\n')
934 934
935 935 class interruptoperation(object):
936 936 """A limited operation to be use by part handler during interruption
937 937
938 938 It only have access to an ui object.
939 939 """
940 940
941 941 def __init__(self, ui):
942 942 self.ui = ui
943 943 self.reply = None
944 944 self.captureoutput = False
945 945
946 946 @property
947 947 def repo(self):
948 948 raise RuntimeError('no repo access from stream interruption')
949 949
950 950 def gettransaction(self):
951 951 raise TransactionUnavailable('no repo access from stream interruption')
952 952
953 953 class unbundlepart(unpackermixin):
954 954 """a bundle part read from a bundle"""
955 955
956 956 def __init__(self, ui, header, fp):
957 957 super(unbundlepart, self).__init__(fp)
958 958 self.ui = ui
959 959 # unbundle state attr
960 960 self._headerdata = header
961 961 self._headeroffset = 0
962 962 self._initialized = False
963 963 self.consumed = False
964 964 # part data
965 965 self.id = None
966 966 self.type = None
967 967 self.mandatoryparams = None
968 968 self.advisoryparams = None
969 969 self.params = None
970 970 self.mandatorykeys = ()
971 971 self._payloadstream = None
972 972 self._readheader()
973 973 self._mandatory = None
974 974 self._chunkindex = [] #(payload, file) position tuples for chunk starts
975 975 self._pos = 0
976 976
977 977 def _fromheader(self, size):
978 978 """return the next <size> byte from the header"""
979 979 offset = self._headeroffset
980 980 data = self._headerdata[offset:(offset + size)]
981 981 self._headeroffset = offset + size
982 982 return data
983 983
984 984 def _unpackheader(self, format):
985 985 """read given format from header
986 986
987 987 This automatically compute the size of the format to read."""
988 988 data = self._fromheader(struct.calcsize(format))
989 989 return _unpack(format, data)
990 990
991 991 def _initparams(self, mandatoryparams, advisoryparams):
992 992 """internal function to setup all logic related parameters"""
993 993 # make it read only to prevent people touching it by mistake.
994 994 self.mandatoryparams = tuple(mandatoryparams)
995 995 self.advisoryparams = tuple(advisoryparams)
996 996 # user friendly UI
997 997 self.params = dict(self.mandatoryparams)
998 998 self.params.update(dict(self.advisoryparams))
999 999 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1000 1000
1001 1001 def _payloadchunks(self, chunknum=0):
1002 1002 '''seek to specified chunk and start yielding data'''
1003 1003 if len(self._chunkindex) == 0:
1004 1004 assert chunknum == 0, 'Must start with chunk 0'
1005 1005 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1006 1006 else:
1007 1007 assert chunknum < len(self._chunkindex), \
1008 1008 'Unknown chunk %d' % chunknum
1009 1009 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1010 1010
1011 1011 pos = self._chunkindex[chunknum][0]
1012 1012 payloadsize = self._unpack(_fpayloadsize)[0]
1013 1013 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1014 1014 while payloadsize:
1015 1015 if payloadsize == flaginterrupt:
1016 1016 # interruption detection, the handler will now read a
1017 1017 # single part and process it.
1018 1018 interrupthandler(self.ui, self._fp)()
1019 1019 elif payloadsize < 0:
1020 1020 msg = 'negative payload chunk size: %i' % payloadsize
1021 1021 raise error.BundleValueError(msg)
1022 1022 else:
1023 1023 result = self._readexact(payloadsize)
1024 1024 chunknum += 1
1025 1025 pos += payloadsize
1026 1026 if chunknum == len(self._chunkindex):
1027 1027 self._chunkindex.append((pos,
1028 1028 super(unbundlepart, self).tell()))
1029 1029 yield result
1030 1030 payloadsize = self._unpack(_fpayloadsize)[0]
1031 1031 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1032 1032
1033 1033 def _findchunk(self, pos):
1034 1034 '''for a given payload position, return a chunk number and offset'''
1035 1035 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1036 1036 if ppos == pos:
1037 1037 return chunk, 0
1038 1038 elif ppos > pos:
1039 1039 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1040 1040 raise ValueError('Unknown chunk')
1041 1041
1042 1042 def _readheader(self):
1043 1043 """read the header and setup the object"""
1044 1044 typesize = self._unpackheader(_fparttypesize)[0]
1045 1045 self.type = self._fromheader(typesize)
1046 1046 indebug(self.ui, 'part type: "%s"' % self.type)
1047 1047 self.id = self._unpackheader(_fpartid)[0]
1048 1048 indebug(self.ui, 'part id: "%s"' % self.id)
1049 1049 # extract mandatory bit from type
1050 1050 self.mandatory = (self.type != self.type.lower())
1051 1051 self.type = self.type.lower()
1052 1052 ## reading parameters
1053 1053 # param count
1054 1054 mancount, advcount = self._unpackheader(_fpartparamcount)
1055 1055 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1056 1056 # param size
1057 1057 fparamsizes = _makefpartparamsizes(mancount + advcount)
1058 1058 paramsizes = self._unpackheader(fparamsizes)
1059 1059 # make it a list of couple again
1060 1060 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1061 1061 # split mandatory from advisory
1062 1062 mansizes = paramsizes[:mancount]
1063 1063 advsizes = paramsizes[mancount:]
1064 1064 # retrieve param value
1065 1065 manparams = []
1066 1066 for key, value in mansizes:
1067 1067 manparams.append((self._fromheader(key), self._fromheader(value)))
1068 1068 advparams = []
1069 1069 for key, value in advsizes:
1070 1070 advparams.append((self._fromheader(key), self._fromheader(value)))
1071 1071 self._initparams(manparams, advparams)
1072 1072 ## part payload
1073 1073 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1074 1074 # we read the data, tell it
1075 1075 self._initialized = True
1076 1076
1077 1077 def read(self, size=None):
1078 1078 """read payload data"""
1079 1079 if not self._initialized:
1080 1080 self._readheader()
1081 1081 if size is None:
1082 1082 data = self._payloadstream.read()
1083 1083 else:
1084 1084 data = self._payloadstream.read(size)
1085 1085 self._pos += len(data)
1086 1086 if size is None or len(data) < size:
1087 1087 if not self.consumed and self._pos:
1088 1088 self.ui.debug('bundle2-input-part: total payload size %i\n'
1089 1089 % self._pos)
1090 1090 self.consumed = True
1091 1091 return data
1092 1092
1093 1093 def tell(self):
1094 1094 return self._pos
1095 1095
1096 1096 def seek(self, offset, whence=0):
1097 1097 if whence == 0:
1098 1098 newpos = offset
1099 1099 elif whence == 1:
1100 1100 newpos = self._pos + offset
1101 1101 elif whence == 2:
1102 1102 if not self.consumed:
1103 1103 self.read()
1104 1104 newpos = self._chunkindex[-1][0] - offset
1105 1105 else:
1106 1106 raise ValueError('Unknown whence value: %r' % (whence,))
1107 1107
1108 1108 if newpos > self._chunkindex[-1][0] and not self.consumed:
1109 1109 self.read()
1110 1110 if not 0 <= newpos <= self._chunkindex[-1][0]:
1111 1111 raise ValueError('Offset out of range')
1112 1112
1113 1113 if self._pos != newpos:
1114 1114 chunk, internaloffset = self._findchunk(newpos)
1115 1115 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1116 1116 adjust = self.read(internaloffset)
1117 1117 if len(adjust) != internaloffset:
1118 1118 raise util.Abort(_('Seek failed\n'))
1119 1119 self._pos = newpos
1120 1120
1121 1121 # These are only the static capabilities.
1122 1122 # Check the 'getrepocaps' function for the rest.
1123 1123 capabilities = {'HG20': (),
1124 1124 'error': ('abort', 'unsupportedcontent', 'pushraced',
1125 1125 'pushkey'),
1126 1126 'listkeys': (),
1127 1127 'pushkey': (),
1128 1128 'digests': tuple(sorted(util.DIGESTS.keys())),
1129 1129 'remote-changegroup': ('http', 'https'),
1130 1130 'hgtagsfnodes': (),
1131 1131 }
1132 1132
1133 1133 def getrepocaps(repo, allowpushback=False):
1134 1134 """return the bundle2 capabilities for a given repo
1135 1135
1136 1136 Exists to allow extensions (like evolution) to mutate the capabilities.
1137 1137 """
1138 1138 caps = capabilities.copy()
1139 1139 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1140 1140 if obsolete.isenabled(repo, obsolete.exchangeopt):
1141 1141 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1142 1142 caps['obsmarkers'] = supportedformat
1143 1143 if allowpushback:
1144 1144 caps['pushback'] = ()
1145 1145 return caps
1146 1146
1147 1147 def bundle2caps(remote):
1148 1148 """return the bundle capabilities of a peer as dict"""
1149 1149 raw = remote.capable('bundle2')
1150 1150 if not raw and raw != '':
1151 1151 return {}
1152 1152 capsblob = urllib.unquote(remote.capable('bundle2'))
1153 1153 return decodecaps(capsblob)
1154 1154
1155 1155 def obsmarkersversion(caps):
1156 1156 """extract the list of supported obsmarkers versions from a bundle2caps dict
1157 1157 """
1158 1158 obscaps = caps.get('obsmarkers', ())
1159 1159 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1160 1160
1161 1161 @parthandler('changegroup', ('version', 'nbchanges'))
1162 1162 def handlechangegroup(op, inpart):
1163 1163 """apply a changegroup part on the repo
1164 1164
1165 1165 This is a very early implementation that will massive rework before being
1166 1166 inflicted to any end-user.
1167 1167 """
1168 1168 # Make sure we trigger a transaction creation
1169 1169 #
1170 1170 # The addchangegroup function will get a transaction object by itself, but
1171 1171 # we need to make sure we trigger the creation of a transaction object used
1172 1172 # for the whole processing scope.
1173 1173 op.gettransaction()
1174 1174 unpackerversion = inpart.params.get('version', '01')
1175 1175 # We should raise an appropriate exception here
1176 1176 unpacker = changegroup.packermap[unpackerversion][1]
1177 1177 cg = unpacker(inpart, None)
1178 1178 # the source and url passed here are overwritten by the one contained in
1179 1179 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1180 1180 nbchangesets = None
1181 1181 if 'nbchanges' in inpart.params:
1182 1182 nbchangesets = int(inpart.params.get('nbchanges'))
1183 1183 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2',
1184 1184 expectedtotal=nbchangesets)
1185 1185 op.records.add('changegroup', {'return': ret})
1186 1186 if op.reply is not None:
1187 1187 # This is definitely not the final form of this
1188 1188 # return. But one need to start somewhere.
1189 1189 part = op.reply.newpart('reply:changegroup', mandatory=False)
1190 1190 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1191 1191 part.addparam('return', '%i' % ret, mandatory=False)
1192 1192 assert not inpart.read()
1193 1193
1194 1194 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1195 1195 ['digest:%s' % k for k in util.DIGESTS.keys()])
1196 1196 @parthandler('remote-changegroup', _remotechangegroupparams)
1197 1197 def handleremotechangegroup(op, inpart):
1198 1198 """apply a bundle10 on the repo, given an url and validation information
1199 1199
1200 1200 All the information about the remote bundle to import are given as
1201 1201 parameters. The parameters include:
1202 1202 - url: the url to the bundle10.
1203 1203 - size: the bundle10 file size. It is used to validate what was
1204 1204 retrieved by the client matches the server knowledge about the bundle.
1205 1205 - digests: a space separated list of the digest types provided as
1206 1206 parameters.
1207 1207 - digest:<digest-type>: the hexadecimal representation of the digest with
1208 1208 that name. Like the size, it is used to validate what was retrieved by
1209 1209 the client matches what the server knows about the bundle.
1210 1210
1211 1211 When multiple digest types are given, all of them are checked.
1212 1212 """
1213 1213 try:
1214 1214 raw_url = inpart.params['url']
1215 1215 except KeyError:
1216 1216 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1217 1217 parsed_url = util.url(raw_url)
1218 1218 if parsed_url.scheme not in capabilities['remote-changegroup']:
1219 1219 raise util.Abort(_('remote-changegroup does not support %s urls') %
1220 1220 parsed_url.scheme)
1221 1221
1222 1222 try:
1223 1223 size = int(inpart.params['size'])
1224 1224 except ValueError:
1225 1225 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1226 1226 % 'size')
1227 1227 except KeyError:
1228 1228 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1229 1229
1230 1230 digests = {}
1231 1231 for typ in inpart.params.get('digests', '').split():
1232 1232 param = 'digest:%s' % typ
1233 1233 try:
1234 1234 value = inpart.params[param]
1235 1235 except KeyError:
1236 1236 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1237 1237 param)
1238 1238 digests[typ] = value
1239 1239
1240 1240 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1241 1241
1242 1242 # Make sure we trigger a transaction creation
1243 1243 #
1244 1244 # The addchangegroup function will get a transaction object by itself, but
1245 1245 # we need to make sure we trigger the creation of a transaction object used
1246 1246 # for the whole processing scope.
1247 1247 op.gettransaction()
1248 1248 from . import exchange
1249 1249 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1250 1250 if not isinstance(cg, changegroup.cg1unpacker):
1251 1251 raise util.Abort(_('%s: not a bundle version 1.0') %
1252 1252 util.hidepassword(raw_url))
1253 1253 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1254 1254 op.records.add('changegroup', {'return': ret})
1255 1255 if op.reply is not None:
1256 1256 # This is definitely not the final form of this
1257 1257 # return. But one need to start somewhere.
1258 1258 part = op.reply.newpart('reply:changegroup')
1259 1259 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1260 1260 part.addparam('return', '%i' % ret, mandatory=False)
1261 1261 try:
1262 1262 real_part.validate()
1263 1263 except util.Abort as e:
1264 1264 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1265 1265 (util.hidepassword(raw_url), str(e)))
1266 1266 assert not inpart.read()
1267 1267
1268 1268 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1269 1269 def handlereplychangegroup(op, inpart):
1270 1270 ret = int(inpart.params['return'])
1271 1271 replyto = int(inpart.params['in-reply-to'])
1272 1272 op.records.add('changegroup', {'return': ret}, replyto)
1273 1273
1274 1274 @parthandler('check:heads')
1275 1275 def handlecheckheads(op, inpart):
1276 1276 """check that head of the repo did not change
1277 1277
1278 1278 This is used to detect a push race when using unbundle.
1279 1279 This replaces the "heads" argument of unbundle."""
1280 1280 h = inpart.read(20)
1281 1281 heads = []
1282 1282 while len(h) == 20:
1283 1283 heads.append(h)
1284 1284 h = inpart.read(20)
1285 1285 assert not h
1286 1286 if heads != op.repo.heads():
1287 1287 raise error.PushRaced('repository changed while pushing - '
1288 1288 'please try again')
1289 1289
1290 1290 @parthandler('output')
1291 1291 def handleoutput(op, inpart):
1292 1292 """forward output captured on the server to the client"""
1293 1293 for line in inpart.read().splitlines():
1294 1294 op.ui.status(('remote: %s\n' % line))
1295 1295
1296 1296 @parthandler('replycaps')
1297 1297 def handlereplycaps(op, inpart):
1298 1298 """Notify that a reply bundle should be created
1299 1299
1300 1300 The payload contains the capabilities information for the reply"""
1301 1301 caps = decodecaps(inpart.read())
1302 1302 if op.reply is None:
1303 1303 op.reply = bundle20(op.ui, caps)
1304 1304
1305 1305 @parthandler('error:abort', ('message', 'hint'))
1306 1306 def handleerrorabort(op, inpart):
1307 1307 """Used to transmit abort error over the wire"""
1308 1308 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1309 1309
1310 1310 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1311 1311 'in-reply-to'))
1312 1312 def handleerrorpushkey(op, inpart):
1313 1313 """Used to transmit failure of a mandatory pushkey over the wire"""
1314 1314 kwargs = {}
1315 1315 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1316 1316 value = inpart.params.get(name)
1317 1317 if value is not None:
1318 1318 kwargs[name] = value
1319 1319 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1320 1320
1321 1321 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1322 1322 def handleerrorunsupportedcontent(op, inpart):
1323 1323 """Used to transmit unknown content error over the wire"""
1324 1324 kwargs = {}
1325 1325 parttype = inpart.params.get('parttype')
1326 1326 if parttype is not None:
1327 1327 kwargs['parttype'] = parttype
1328 1328 params = inpart.params.get('params')
1329 1329 if params is not None:
1330 1330 kwargs['params'] = params.split('\0')
1331 1331
1332 raise error.UnsupportedPartError(**kwargs)
1332 raise error.BundleUnknownFeatureError(**kwargs)
1333 1333
1334 1334 @parthandler('error:pushraced', ('message',))
1335 1335 def handleerrorpushraced(op, inpart):
1336 1336 """Used to transmit push race error over the wire"""
1337 1337 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1338 1338
1339 1339 @parthandler('listkeys', ('namespace',))
1340 1340 def handlelistkeys(op, inpart):
1341 1341 """retrieve pushkey namespace content stored in a bundle2"""
1342 1342 namespace = inpart.params['namespace']
1343 1343 r = pushkey.decodekeys(inpart.read())
1344 1344 op.records.add('listkeys', (namespace, r))
1345 1345
1346 1346 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1347 1347 def handlepushkey(op, inpart):
1348 1348 """process a pushkey request"""
1349 1349 dec = pushkey.decode
1350 1350 namespace = dec(inpart.params['namespace'])
1351 1351 key = dec(inpart.params['key'])
1352 1352 old = dec(inpart.params['old'])
1353 1353 new = dec(inpart.params['new'])
1354 1354 ret = op.repo.pushkey(namespace, key, old, new)
1355 1355 record = {'namespace': namespace,
1356 1356 'key': key,
1357 1357 'old': old,
1358 1358 'new': new}
1359 1359 op.records.add('pushkey', record)
1360 1360 if op.reply is not None:
1361 1361 rpart = op.reply.newpart('reply:pushkey')
1362 1362 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1363 1363 rpart.addparam('return', '%i' % ret, mandatory=False)
1364 1364 if inpart.mandatory and not ret:
1365 1365 kwargs = {}
1366 1366 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1367 1367 if key in inpart.params:
1368 1368 kwargs[key] = inpart.params[key]
1369 1369 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1370 1370
1371 1371 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1372 1372 def handlepushkeyreply(op, inpart):
1373 1373 """retrieve the result of a pushkey request"""
1374 1374 ret = int(inpart.params['return'])
1375 1375 partid = int(inpart.params['in-reply-to'])
1376 1376 op.records.add('pushkey', {'return': ret}, partid)
1377 1377
1378 1378 @parthandler('obsmarkers')
1379 1379 def handleobsmarker(op, inpart):
1380 1380 """add a stream of obsmarkers to the repo"""
1381 1381 tr = op.gettransaction()
1382 1382 markerdata = inpart.read()
1383 1383 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1384 1384 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1385 1385 % len(markerdata))
1386 1386 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1387 1387 if new:
1388 1388 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1389 1389 op.records.add('obsmarkers', {'new': new})
1390 1390 if op.reply is not None:
1391 1391 rpart = op.reply.newpart('reply:obsmarkers')
1392 1392 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1393 1393 rpart.addparam('new', '%i' % new, mandatory=False)
1394 1394
1395 1395
1396 1396 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1397 1397 def handleobsmarkerreply(op, inpart):
1398 1398 """retrieve the result of a pushkey request"""
1399 1399 ret = int(inpart.params['new'])
1400 1400 partid = int(inpart.params['in-reply-to'])
1401 1401 op.records.add('obsmarkers', {'new': ret}, partid)
1402 1402
1403 1403 @parthandler('hgtagsfnodes')
1404 1404 def handlehgtagsfnodes(op, inpart):
1405 1405 """Applies .hgtags fnodes cache entries to the local repo.
1406 1406
1407 1407 Payload is pairs of 20 byte changeset nodes and filenodes.
1408 1408 """
1409 1409 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1410 1410
1411 1411 count = 0
1412 1412 while True:
1413 1413 node = inpart.read(20)
1414 1414 fnode = inpart.read(20)
1415 1415 if len(node) < 20 or len(fnode) < 20:
1416 1416 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1417 1417 break
1418 1418 cache.setfnode(node, fnode)
1419 1419 count += 1
1420 1420
1421 1421 cache.write()
1422 1422 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
@@ -1,192 +1,192
1 1 # error.py - Mercurial exceptions
2 2 #
3 3 # Copyright 2005-2008 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 """Mercurial exceptions.
9 9
10 10 This allows us to catch exceptions at higher levels without forcing
11 11 imports.
12 12 """
13 13
14 14 from __future__ import absolute_import
15 15
16 16 # Do not import anything here, please
17 17
18 18 class HintException(Exception):
19 19 def __init__(self, *args, **kw):
20 20 Exception.__init__(self, *args)
21 21 self.hint = kw.get('hint')
22 22
23 23 class RevlogError(HintException):
24 24 pass
25 25
26 26 class FilteredIndexError(IndexError):
27 27 pass
28 28
29 29 class LookupError(RevlogError, KeyError):
30 30 def __init__(self, name, index, message):
31 31 self.name = name
32 32 self.index = index
33 33 # this can't be called 'message' because at least some installs of
34 34 # Python 2.6+ complain about the 'message' property being deprecated
35 35 self.lookupmessage = message
36 36 if isinstance(name, str) and len(name) == 20:
37 37 from .node import short
38 38 name = short(name)
39 39 RevlogError.__init__(self, '%s@%s: %s' % (index, name, message))
40 40
41 41 def __str__(self):
42 42 return RevlogError.__str__(self)
43 43
44 44 class FilteredLookupError(LookupError):
45 45 pass
46 46
47 47 class ManifestLookupError(LookupError):
48 48 pass
49 49
50 50 class CommandError(Exception):
51 51 """Exception raised on errors in parsing the command line."""
52 52
53 53 class InterventionRequired(Exception):
54 54 """Exception raised when a command requires human intervention."""
55 55
56 56 class Abort(HintException):
57 57 """Raised if a command needs to print an error and exit."""
58 58 pass
59 59
60 60 class HookAbort(Abort):
61 61 """raised when a validation hook fails, aborting an operation
62 62
63 63 Exists to allow more specialized catching."""
64 64 pass
65 65
66 66 class ConfigError(Abort):
67 67 """Exception raised when parsing config files"""
68 68
69 69 class OutOfBandError(Exception):
70 70 """Exception raised when a remote repo reports failure"""
71 71
72 72 def __init__(self, *args, **kw):
73 73 Exception.__init__(self, *args)
74 74 self.hint = kw.get('hint')
75 75
76 76 class ParseError(Exception):
77 77 """Raised when parsing config files and {rev,file}sets (msg[, pos])"""
78 78
79 79 class UnknownIdentifier(ParseError):
80 80 """Exception raised when a {rev,file}set references an unknown identifier"""
81 81
82 82 def __init__(self, function, symbols):
83 83 from .i18n import _
84 84 ParseError.__init__(self, _("unknown identifier: %s") % function)
85 85 self.function = function
86 86 self.symbols = symbols
87 87
88 88 class RepoError(HintException):
89 89 pass
90 90
91 91 class RepoLookupError(RepoError):
92 92 pass
93 93
94 94 class FilteredRepoLookupError(RepoLookupError):
95 95 pass
96 96
97 97 class CapabilityError(RepoError):
98 98 pass
99 99
100 100 class RequirementError(RepoError):
101 101 """Exception raised if .hg/requires has an unknown entry."""
102 102 pass
103 103
104 104 class LockError(IOError):
105 105 def __init__(self, errno, strerror, filename, desc):
106 106 IOError.__init__(self, errno, strerror, filename)
107 107 self.desc = desc
108 108
109 109 class LockHeld(LockError):
110 110 def __init__(self, errno, filename, desc, locker):
111 111 LockError.__init__(self, errno, 'Lock held', filename, desc)
112 112 self.locker = locker
113 113
114 114 class LockUnavailable(LockError):
115 115 pass
116 116
117 117 # LockError is for errors while acquiring the lock -- this is unrelated
118 118 class LockInheritanceContractViolation(AssertionError):
119 119 pass
120 120
121 121 class ResponseError(Exception):
122 122 """Raised to print an error with part of output and exit."""
123 123
124 124 class UnknownCommand(Exception):
125 125 """Exception raised if command is not in the command table."""
126 126
127 127 class AmbiguousCommand(Exception):
128 128 """Exception raised if command shortcut matches more than one command."""
129 129
130 130 # derived from KeyboardInterrupt to simplify some breakout code
131 131 class SignalInterrupt(KeyboardInterrupt):
132 132 """Exception raised on SIGTERM and SIGHUP."""
133 133
134 134 class SignatureError(Exception):
135 135 pass
136 136
137 137 class PushRaced(RuntimeError):
138 138 """An exception raised during unbundling that indicate a push race"""
139 139
140 140 # bundle2 related errors
141 141 class BundleValueError(ValueError):
142 142 """error raised when bundle2 cannot be processed"""
143 143
144 class UnsupportedPartError(BundleValueError):
144 class BundleUnknownFeatureError(BundleValueError):
145 145 def __init__(self, parttype=None, params=()):
146 146 self.parttype = parttype
147 147 self.params = params
148 148 if self.parttype is None:
149 149 msg = 'Stream Parameter'
150 150 else:
151 151 msg = parttype
152 152 if self.params:
153 153 msg = '%s - %s' % (msg, ', '.join(self.params))
154 154 ValueError.__init__(self, msg)
155 155
156 156 class ReadOnlyPartError(RuntimeError):
157 157 """error raised when code tries to alter a part being generated"""
158 158 pass
159 159
160 160 class PushkeyFailed(Abort):
161 161 """error raised when a pushkey part failed to update a value"""
162 162
163 163 def __init__(self, partid, namespace=None, key=None, new=None, old=None,
164 164 ret=None):
165 165 self.partid = partid
166 166 self.namespace = namespace
167 167 self.key = key
168 168 self.new = new
169 169 self.old = old
170 170 self.ret = ret
171 171 # no i18n expected to be processed into a better message
172 172 Abort.__init__(self, 'failed to update value for "%s/%s"'
173 173 % (namespace, key))
174 174
175 175 class CensoredNodeError(RevlogError):
176 176 """error raised when content verification fails on a censored node
177 177
178 178 Also contains the tombstone data substituted for the uncensored data.
179 179 """
180 180
181 181 def __init__(self, filename, node, tombstone):
182 182 from .node import short
183 183 RevlogError.__init__(self, '%s:%s' % (filename, short(node)))
184 184 self.tombstone = tombstone
185 185
186 186 class CensoredBaseError(RevlogError):
187 187 """error raised when a delta is rejected because its base is censored
188 188
189 189 A delta based on a censored revision must be formed as single patch
190 190 operation which replaces the entire base with new content. This ensures
191 191 the delta may be applied by clones which have not censored the base.
192 192 """
General Comments 0
You need to be logged in to leave comments. Login now