##// END OF EJS Templates
config: register the 'devel.bundle2.debug' config...
marmoute -
r33160:02248206 default
parent child Browse files
Show More
@@ -1,1848 +1,1848
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import
149 149
150 150 import errno
151 151 import re
152 152 import string
153 153 import struct
154 154 import sys
155 155
156 156 from .i18n import _
157 157 from . import (
158 158 changegroup,
159 159 error,
160 160 obsolete,
161 161 phases,
162 162 pushkey,
163 163 pycompat,
164 164 tags,
165 165 url,
166 166 util,
167 167 )
168 168
169 169 urlerr = util.urlerr
170 170 urlreq = util.urlreq
171 171
172 172 _pack = struct.pack
173 173 _unpack = struct.unpack
174 174
175 175 _fstreamparamsize = '>i'
176 176 _fpartheadersize = '>i'
177 177 _fparttypesize = '>B'
178 178 _fpartid = '>I'
179 179 _fpayloadsize = '>i'
180 180 _fpartparamcount = '>BB'
181 181
182 182 _fphasesentry = '>i20s'
183 183
184 184 preferedchunksize = 4096
185 185
186 186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
187 187
188 188 def outdebug(ui, message):
189 189 """debug regarding output stream (bundling)"""
190 if ui.configbool('devel', 'bundle2.debug', False):
190 if ui.configbool('devel', 'bundle2.debug'):
191 191 ui.debug('bundle2-output: %s\n' % message)
192 192
193 193 def indebug(ui, message):
194 194 """debug on input stream (unbundling)"""
195 if ui.configbool('devel', 'bundle2.debug', False):
195 if ui.configbool('devel', 'bundle2.debug'):
196 196 ui.debug('bundle2-input: %s\n' % message)
197 197
198 198 def validateparttype(parttype):
199 199 """raise ValueError if a parttype contains invalid character"""
200 200 if _parttypeforbidden.search(parttype):
201 201 raise ValueError(parttype)
202 202
203 203 def _makefpartparamsizes(nbparams):
204 204 """return a struct format to read part parameter sizes
205 205
206 206 The number parameters is variable so we need to build that format
207 207 dynamically.
208 208 """
209 209 return '>'+('BB'*nbparams)
210 210
211 211 parthandlermapping = {}
212 212
213 213 def parthandler(parttype, params=()):
214 214 """decorator that register a function as a bundle2 part handler
215 215
216 216 eg::
217 217
218 218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
219 219 def myparttypehandler(...):
220 220 '''process a part of type "my part".'''
221 221 ...
222 222 """
223 223 validateparttype(parttype)
224 224 def _decorator(func):
225 225 lparttype = parttype.lower() # enforce lower case matching.
226 226 assert lparttype not in parthandlermapping
227 227 parthandlermapping[lparttype] = func
228 228 func.params = frozenset(params)
229 229 return func
230 230 return _decorator
231 231
232 232 class unbundlerecords(object):
233 233 """keep record of what happens during and unbundle
234 234
235 235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
236 236 category of record and obj is an arbitrary object.
237 237
238 238 `records['cat']` will return all entries of this category 'cat'.
239 239
240 240 Iterating on the object itself will yield `('category', obj)` tuples
241 241 for all entries.
242 242
243 243 All iterations happens in chronological order.
244 244 """
245 245
246 246 def __init__(self):
247 247 self._categories = {}
248 248 self._sequences = []
249 249 self._replies = {}
250 250
251 251 def add(self, category, entry, inreplyto=None):
252 252 """add a new record of a given category.
253 253
254 254 The entry can then be retrieved in the list returned by
255 255 self['category']."""
256 256 self._categories.setdefault(category, []).append(entry)
257 257 self._sequences.append((category, entry))
258 258 if inreplyto is not None:
259 259 self.getreplies(inreplyto).add(category, entry)
260 260
261 261 def getreplies(self, partid):
262 262 """get the records that are replies to a specific part"""
263 263 return self._replies.setdefault(partid, unbundlerecords())
264 264
265 265 def __getitem__(self, cat):
266 266 return tuple(self._categories.get(cat, ()))
267 267
268 268 def __iter__(self):
269 269 return iter(self._sequences)
270 270
271 271 def __len__(self):
272 272 return len(self._sequences)
273 273
274 274 def __nonzero__(self):
275 275 return bool(self._sequences)
276 276
277 277 __bool__ = __nonzero__
278 278
279 279 class bundleoperation(object):
280 280 """an object that represents a single bundling process
281 281
282 282 Its purpose is to carry unbundle-related objects and states.
283 283
284 284 A new object should be created at the beginning of each bundle processing.
285 285 The object is to be returned by the processing function.
286 286
287 287 The object has very little content now it will ultimately contain:
288 288 * an access to the repo the bundle is applied to,
289 289 * a ui object,
290 290 * a way to retrieve a transaction to add changes to the repo,
291 291 * a way to record the result of processing each part,
292 292 * a way to construct a bundle response when applicable.
293 293 """
294 294
295 295 def __init__(self, repo, transactiongetter, captureoutput=True):
296 296 self.repo = repo
297 297 self.ui = repo.ui
298 298 self.records = unbundlerecords()
299 299 self.gettransaction = transactiongetter
300 300 self.reply = None
301 301 self.captureoutput = captureoutput
302 302
303 303 class TransactionUnavailable(RuntimeError):
304 304 pass
305 305
306 306 def _notransaction():
307 307 """default method to get a transaction while processing a bundle
308 308
309 309 Raise an exception to highlight the fact that no transaction was expected
310 310 to be created"""
311 311 raise TransactionUnavailable()
312 312
313 313 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
314 314 # transform me into unbundler.apply() as soon as the freeze is lifted
315 315 if isinstance(unbundler, unbundle20):
316 316 tr.hookargs['bundle2'] = '1'
317 317 if source is not None and 'source' not in tr.hookargs:
318 318 tr.hookargs['source'] = source
319 319 if url is not None and 'url' not in tr.hookargs:
320 320 tr.hookargs['url'] = url
321 321 return processbundle(repo, unbundler, lambda: tr)
322 322 else:
323 323 # the transactiongetter won't be used, but we might as well set it
324 324 op = bundleoperation(repo, lambda: tr)
325 325 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
326 326 return op
327 327
328 328 def processbundle(repo, unbundler, transactiongetter=None, op=None):
329 329 """This function process a bundle, apply effect to/from a repo
330 330
331 331 It iterates over each part then searches for and uses the proper handling
332 332 code to process the part. Parts are processed in order.
333 333
334 334 Unknown Mandatory part will abort the process.
335 335
336 336 It is temporarily possible to provide a prebuilt bundleoperation to the
337 337 function. This is used to ensure output is properly propagated in case of
338 338 an error during the unbundling. This output capturing part will likely be
339 339 reworked and this ability will probably go away in the process.
340 340 """
341 341 if op is None:
342 342 if transactiongetter is None:
343 343 transactiongetter = _notransaction
344 344 op = bundleoperation(repo, transactiongetter)
345 345 # todo:
346 346 # - replace this is a init function soon.
347 347 # - exception catching
348 348 unbundler.params
349 349 if repo.ui.debugflag:
350 350 msg = ['bundle2-input-bundle:']
351 351 if unbundler.params:
352 352 msg.append(' %i params' % len(unbundler.params))
353 353 if op.gettransaction is None or op.gettransaction is _notransaction:
354 354 msg.append(' no-transaction')
355 355 else:
356 356 msg.append(' with-transaction')
357 357 msg.append('\n')
358 358 repo.ui.debug(''.join(msg))
359 359 iterparts = enumerate(unbundler.iterparts())
360 360 part = None
361 361 nbpart = 0
362 362 try:
363 363 for nbpart, part in iterparts:
364 364 _processpart(op, part)
365 365 except Exception as exc:
366 366 # Any exceptions seeking to the end of the bundle at this point are
367 367 # almost certainly related to the underlying stream being bad.
368 368 # And, chances are that the exception we're handling is related to
369 369 # getting in that bad state. So, we swallow the seeking error and
370 370 # re-raise the original error.
371 371 seekerror = False
372 372 try:
373 373 for nbpart, part in iterparts:
374 374 # consume the bundle content
375 375 part.seek(0, 2)
376 376 except Exception:
377 377 seekerror = True
378 378
379 379 # Small hack to let caller code distinguish exceptions from bundle2
380 380 # processing from processing the old format. This is mostly
381 381 # needed to handle different return codes to unbundle according to the
382 382 # type of bundle. We should probably clean up or drop this return code
383 383 # craziness in a future version.
384 384 exc.duringunbundle2 = True
385 385 salvaged = []
386 386 replycaps = None
387 387 if op.reply is not None:
388 388 salvaged = op.reply.salvageoutput()
389 389 replycaps = op.reply.capabilities
390 390 exc._replycaps = replycaps
391 391 exc._bundle2salvagedoutput = salvaged
392 392
393 393 # Re-raising from a variable loses the original stack. So only use
394 394 # that form if we need to.
395 395 if seekerror:
396 396 raise exc
397 397 else:
398 398 raise
399 399 finally:
400 400 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
401 401
402 402 return op
403 403
404 404 def _processchangegroup(op, cg, tr, source, url, **kwargs):
405 405 ret, addednodes = cg.apply(op.repo, tr, source, url, **kwargs)
406 406 op.records.add('changegroup', {
407 407 'return': ret,
408 408 'addednodes': addednodes,
409 409 })
410 410 return ret
411 411
412 412 def _processpart(op, part):
413 413 """process a single part from a bundle
414 414
415 415 The part is guaranteed to have been fully consumed when the function exits
416 416 (even if an exception is raised)."""
417 417 status = 'unknown' # used by debug output
418 418 hardabort = False
419 419 try:
420 420 try:
421 421 handler = parthandlermapping.get(part.type)
422 422 if handler is None:
423 423 status = 'unsupported-type'
424 424 raise error.BundleUnknownFeatureError(parttype=part.type)
425 425 indebug(op.ui, 'found a handler for part %r' % part.type)
426 426 unknownparams = part.mandatorykeys - handler.params
427 427 if unknownparams:
428 428 unknownparams = list(unknownparams)
429 429 unknownparams.sort()
430 430 status = 'unsupported-params (%s)' % unknownparams
431 431 raise error.BundleUnknownFeatureError(parttype=part.type,
432 432 params=unknownparams)
433 433 status = 'supported'
434 434 except error.BundleUnknownFeatureError as exc:
435 435 if part.mandatory: # mandatory parts
436 436 raise
437 437 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
438 438 return # skip to part processing
439 439 finally:
440 440 if op.ui.debugflag:
441 441 msg = ['bundle2-input-part: "%s"' % part.type]
442 442 if not part.mandatory:
443 443 msg.append(' (advisory)')
444 444 nbmp = len(part.mandatorykeys)
445 445 nbap = len(part.params) - nbmp
446 446 if nbmp or nbap:
447 447 msg.append(' (params:')
448 448 if nbmp:
449 449 msg.append(' %i mandatory' % nbmp)
450 450 if nbap:
451 451 msg.append(' %i advisory' % nbmp)
452 452 msg.append(')')
453 453 msg.append(' %s\n' % status)
454 454 op.ui.debug(''.join(msg))
455 455
456 456 # handler is called outside the above try block so that we don't
457 457 # risk catching KeyErrors from anything other than the
458 458 # parthandlermapping lookup (any KeyError raised by handler()
459 459 # itself represents a defect of a different variety).
460 460 output = None
461 461 if op.captureoutput and op.reply is not None:
462 462 op.ui.pushbuffer(error=True, subproc=True)
463 463 output = ''
464 464 try:
465 465 handler(op, part)
466 466 finally:
467 467 if output is not None:
468 468 output = op.ui.popbuffer()
469 469 if output:
470 470 outpart = op.reply.newpart('output', data=output,
471 471 mandatory=False)
472 472 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
473 473 # If exiting or interrupted, do not attempt to seek the stream in the
474 474 # finally block below. This makes abort faster.
475 475 except (SystemExit, KeyboardInterrupt):
476 476 hardabort = True
477 477 raise
478 478 finally:
479 479 # consume the part content to not corrupt the stream.
480 480 if not hardabort:
481 481 part.seek(0, 2)
482 482
483 483
484 484 def decodecaps(blob):
485 485 """decode a bundle2 caps bytes blob into a dictionary
486 486
487 487 The blob is a list of capabilities (one per line)
488 488 Capabilities may have values using a line of the form::
489 489
490 490 capability=value1,value2,value3
491 491
492 492 The values are always a list."""
493 493 caps = {}
494 494 for line in blob.splitlines():
495 495 if not line:
496 496 continue
497 497 if '=' not in line:
498 498 key, vals = line, ()
499 499 else:
500 500 key, vals = line.split('=', 1)
501 501 vals = vals.split(',')
502 502 key = urlreq.unquote(key)
503 503 vals = [urlreq.unquote(v) for v in vals]
504 504 caps[key] = vals
505 505 return caps
506 506
507 507 def encodecaps(caps):
508 508 """encode a bundle2 caps dictionary into a bytes blob"""
509 509 chunks = []
510 510 for ca in sorted(caps):
511 511 vals = caps[ca]
512 512 ca = urlreq.quote(ca)
513 513 vals = [urlreq.quote(v) for v in vals]
514 514 if vals:
515 515 ca = "%s=%s" % (ca, ','.join(vals))
516 516 chunks.append(ca)
517 517 return '\n'.join(chunks)
518 518
519 519 bundletypes = {
520 520 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
521 521 # since the unification ssh accepts a header but there
522 522 # is no capability signaling it.
523 523 "HG20": (), # special-cased below
524 524 "HG10UN": ("HG10UN", 'UN'),
525 525 "HG10BZ": ("HG10", 'BZ'),
526 526 "HG10GZ": ("HG10GZ", 'GZ'),
527 527 }
528 528
529 529 # hgweb uses this list to communicate its preferred type
530 530 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
531 531
532 532 class bundle20(object):
533 533 """represent an outgoing bundle2 container
534 534
535 535 Use the `addparam` method to add stream level parameter. and `newpart` to
536 536 populate it. Then call `getchunks` to retrieve all the binary chunks of
537 537 data that compose the bundle2 container."""
538 538
539 539 _magicstring = 'HG20'
540 540
541 541 def __init__(self, ui, capabilities=()):
542 542 self.ui = ui
543 543 self._params = []
544 544 self._parts = []
545 545 self.capabilities = dict(capabilities)
546 546 self._compengine = util.compengines.forbundletype('UN')
547 547 self._compopts = None
548 548
549 549 def setcompression(self, alg, compopts=None):
550 550 """setup core part compression to <alg>"""
551 551 if alg in (None, 'UN'):
552 552 return
553 553 assert not any(n.lower() == 'compression' for n, v in self._params)
554 554 self.addparam('Compression', alg)
555 555 self._compengine = util.compengines.forbundletype(alg)
556 556 self._compopts = compopts
557 557
558 558 @property
559 559 def nbparts(self):
560 560 """total number of parts added to the bundler"""
561 561 return len(self._parts)
562 562
563 563 # methods used to defines the bundle2 content
564 564 def addparam(self, name, value=None):
565 565 """add a stream level parameter"""
566 566 if not name:
567 567 raise ValueError('empty parameter name')
568 568 if name[0] not in string.letters:
569 569 raise ValueError('non letter first character: %r' % name)
570 570 self._params.append((name, value))
571 571
572 572 def addpart(self, part):
573 573 """add a new part to the bundle2 container
574 574
575 575 Parts contains the actual applicative payload."""
576 576 assert part.id is None
577 577 part.id = len(self._parts) # very cheap counter
578 578 self._parts.append(part)
579 579
580 580 def newpart(self, typeid, *args, **kwargs):
581 581 """create a new part and add it to the containers
582 582
583 583 As the part is directly added to the containers. For now, this means
584 584 that any failure to properly initialize the part after calling
585 585 ``newpart`` should result in a failure of the whole bundling process.
586 586
587 587 You can still fall back to manually create and add if you need better
588 588 control."""
589 589 part = bundlepart(typeid, *args, **kwargs)
590 590 self.addpart(part)
591 591 return part
592 592
593 593 # methods used to generate the bundle2 stream
594 594 def getchunks(self):
595 595 if self.ui.debugflag:
596 596 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
597 597 if self._params:
598 598 msg.append(' (%i params)' % len(self._params))
599 599 msg.append(' %i parts total\n' % len(self._parts))
600 600 self.ui.debug(''.join(msg))
601 601 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
602 602 yield self._magicstring
603 603 param = self._paramchunk()
604 604 outdebug(self.ui, 'bundle parameter: %s' % param)
605 605 yield _pack(_fstreamparamsize, len(param))
606 606 if param:
607 607 yield param
608 608 for chunk in self._compengine.compressstream(self._getcorechunk(),
609 609 self._compopts):
610 610 yield chunk
611 611
612 612 def _paramchunk(self):
613 613 """return a encoded version of all stream parameters"""
614 614 blocks = []
615 615 for par, value in self._params:
616 616 par = urlreq.quote(par)
617 617 if value is not None:
618 618 value = urlreq.quote(value)
619 619 par = '%s=%s' % (par, value)
620 620 blocks.append(par)
621 621 return ' '.join(blocks)
622 622
623 623 def _getcorechunk(self):
624 624 """yield chunk for the core part of the bundle
625 625
626 626 (all but headers and parameters)"""
627 627 outdebug(self.ui, 'start of parts')
628 628 for part in self._parts:
629 629 outdebug(self.ui, 'bundle part: "%s"' % part.type)
630 630 for chunk in part.getchunks(ui=self.ui):
631 631 yield chunk
632 632 outdebug(self.ui, 'end of bundle')
633 633 yield _pack(_fpartheadersize, 0)
634 634
635 635
636 636 def salvageoutput(self):
637 637 """return a list with a copy of all output parts in the bundle
638 638
639 639 This is meant to be used during error handling to make sure we preserve
640 640 server output"""
641 641 salvaged = []
642 642 for part in self._parts:
643 643 if part.type.startswith('output'):
644 644 salvaged.append(part.copy())
645 645 return salvaged
646 646
647 647
648 648 class unpackermixin(object):
649 649 """A mixin to extract bytes and struct data from a stream"""
650 650
651 651 def __init__(self, fp):
652 652 self._fp = fp
653 653
654 654 def _unpack(self, format):
655 655 """unpack this struct format from the stream
656 656
657 657 This method is meant for internal usage by the bundle2 protocol only.
658 658 They directly manipulate the low level stream including bundle2 level
659 659 instruction.
660 660
661 661 Do not use it to implement higher-level logic or methods."""
662 662 data = self._readexact(struct.calcsize(format))
663 663 return _unpack(format, data)
664 664
665 665 def _readexact(self, size):
666 666 """read exactly <size> bytes from the stream
667 667
668 668 This method is meant for internal usage by the bundle2 protocol only.
669 669 They directly manipulate the low level stream including bundle2 level
670 670 instruction.
671 671
672 672 Do not use it to implement higher-level logic or methods."""
673 673 return changegroup.readexactly(self._fp, size)
674 674
675 675 def getunbundler(ui, fp, magicstring=None):
676 676 """return a valid unbundler object for a given magicstring"""
677 677 if magicstring is None:
678 678 magicstring = changegroup.readexactly(fp, 4)
679 679 magic, version = magicstring[0:2], magicstring[2:4]
680 680 if magic != 'HG':
681 681 ui.debug(
682 682 "error: invalid magic: %r (version %r), should be 'HG'\n"
683 683 % (magic, version))
684 684 raise error.Abort(_('not a Mercurial bundle'))
685 685 unbundlerclass = formatmap.get(version)
686 686 if unbundlerclass is None:
687 687 raise error.Abort(_('unknown bundle version %s') % version)
688 688 unbundler = unbundlerclass(ui, fp)
689 689 indebug(ui, 'start processing of %s stream' % magicstring)
690 690 return unbundler
691 691
692 692 class unbundle20(unpackermixin):
693 693 """interpret a bundle2 stream
694 694
695 695 This class is fed with a binary stream and yields parts through its
696 696 `iterparts` methods."""
697 697
698 698 _magicstring = 'HG20'
699 699
700 700 def __init__(self, ui, fp):
701 701 """If header is specified, we do not read it out of the stream."""
702 702 self.ui = ui
703 703 self._compengine = util.compengines.forbundletype('UN')
704 704 self._compressed = None
705 705 super(unbundle20, self).__init__(fp)
706 706
707 707 @util.propertycache
708 708 def params(self):
709 709 """dictionary of stream level parameters"""
710 710 indebug(self.ui, 'reading bundle2 stream parameters')
711 711 params = {}
712 712 paramssize = self._unpack(_fstreamparamsize)[0]
713 713 if paramssize < 0:
714 714 raise error.BundleValueError('negative bundle param size: %i'
715 715 % paramssize)
716 716 if paramssize:
717 717 params = self._readexact(paramssize)
718 718 params = self._processallparams(params)
719 719 return params
720 720
721 721 def _processallparams(self, paramsblock):
722 722 """"""
723 723 params = util.sortdict()
724 724 for p in paramsblock.split(' '):
725 725 p = p.split('=', 1)
726 726 p = [urlreq.unquote(i) for i in p]
727 727 if len(p) < 2:
728 728 p.append(None)
729 729 self._processparam(*p)
730 730 params[p[0]] = p[1]
731 731 return params
732 732
733 733
734 734 def _processparam(self, name, value):
735 735 """process a parameter, applying its effect if needed
736 736
737 737 Parameter starting with a lower case letter are advisory and will be
738 738 ignored when unknown. Those starting with an upper case letter are
739 739 mandatory and will this function will raise a KeyError when unknown.
740 740
741 741 Note: no option are currently supported. Any input will be either
742 742 ignored or failing.
743 743 """
744 744 if not name:
745 745 raise ValueError('empty parameter name')
746 746 if name[0] not in string.letters:
747 747 raise ValueError('non letter first character: %r' % name)
748 748 try:
749 749 handler = b2streamparamsmap[name.lower()]
750 750 except KeyError:
751 751 if name[0].islower():
752 752 indebug(self.ui, "ignoring unknown parameter %r" % name)
753 753 else:
754 754 raise error.BundleUnknownFeatureError(params=(name,))
755 755 else:
756 756 handler(self, name, value)
757 757
758 758 def _forwardchunks(self):
759 759 """utility to transfer a bundle2 as binary
760 760
761 761 This is made necessary by the fact the 'getbundle' command over 'ssh'
762 762 have no way to know then the reply end, relying on the bundle to be
763 763 interpreted to know its end. This is terrible and we are sorry, but we
764 764 needed to move forward to get general delta enabled.
765 765 """
766 766 yield self._magicstring
767 767 assert 'params' not in vars(self)
768 768 paramssize = self._unpack(_fstreamparamsize)[0]
769 769 if paramssize < 0:
770 770 raise error.BundleValueError('negative bundle param size: %i'
771 771 % paramssize)
772 772 yield _pack(_fstreamparamsize, paramssize)
773 773 if paramssize:
774 774 params = self._readexact(paramssize)
775 775 self._processallparams(params)
776 776 yield params
777 777 assert self._compengine.bundletype == 'UN'
778 778 # From there, payload might need to be decompressed
779 779 self._fp = self._compengine.decompressorreader(self._fp)
780 780 emptycount = 0
781 781 while emptycount < 2:
782 782 # so we can brainlessly loop
783 783 assert _fpartheadersize == _fpayloadsize
784 784 size = self._unpack(_fpartheadersize)[0]
785 785 yield _pack(_fpartheadersize, size)
786 786 if size:
787 787 emptycount = 0
788 788 else:
789 789 emptycount += 1
790 790 continue
791 791 if size == flaginterrupt:
792 792 continue
793 793 elif size < 0:
794 794 raise error.BundleValueError('negative chunk size: %i')
795 795 yield self._readexact(size)
796 796
797 797
798 798 def iterparts(self):
799 799 """yield all parts contained in the stream"""
800 800 # make sure param have been loaded
801 801 self.params
802 802 # From there, payload need to be decompressed
803 803 self._fp = self._compengine.decompressorreader(self._fp)
804 804 indebug(self.ui, 'start extraction of bundle2 parts')
805 805 headerblock = self._readpartheader()
806 806 while headerblock is not None:
807 807 part = unbundlepart(self.ui, headerblock, self._fp)
808 808 yield part
809 809 part.seek(0, 2)
810 810 headerblock = self._readpartheader()
811 811 indebug(self.ui, 'end of bundle2 stream')
812 812
813 813 def _readpartheader(self):
814 814 """reads a part header size and return the bytes blob
815 815
816 816 returns None if empty"""
817 817 headersize = self._unpack(_fpartheadersize)[0]
818 818 if headersize < 0:
819 819 raise error.BundleValueError('negative part header size: %i'
820 820 % headersize)
821 821 indebug(self.ui, 'part header size: %i' % headersize)
822 822 if headersize:
823 823 return self._readexact(headersize)
824 824 return None
825 825
826 826 def compressed(self):
827 827 self.params # load params
828 828 return self._compressed
829 829
830 830 def close(self):
831 831 """close underlying file"""
832 832 if util.safehasattr(self._fp, 'close'):
833 833 return self._fp.close()
834 834
835 835 formatmap = {'20': unbundle20}
836 836
837 837 b2streamparamsmap = {}
838 838
839 839 def b2streamparamhandler(name):
840 840 """register a handler for a stream level parameter"""
841 841 def decorator(func):
842 842 assert name not in formatmap
843 843 b2streamparamsmap[name] = func
844 844 return func
845 845 return decorator
846 846
847 847 @b2streamparamhandler('compression')
848 848 def processcompression(unbundler, param, value):
849 849 """read compression parameter and install payload decompression"""
850 850 if value not in util.compengines.supportedbundletypes:
851 851 raise error.BundleUnknownFeatureError(params=(param,),
852 852 values=(value,))
853 853 unbundler._compengine = util.compengines.forbundletype(value)
854 854 if value is not None:
855 855 unbundler._compressed = True
856 856
857 857 class bundlepart(object):
858 858 """A bundle2 part contains application level payload
859 859
860 860 The part `type` is used to route the part to the application level
861 861 handler.
862 862
863 863 The part payload is contained in ``part.data``. It could be raw bytes or a
864 864 generator of byte chunks.
865 865
866 866 You can add parameters to the part using the ``addparam`` method.
867 867 Parameters can be either mandatory (default) or advisory. Remote side
868 868 should be able to safely ignore the advisory ones.
869 869
870 870 Both data and parameters cannot be modified after the generation has begun.
871 871 """
872 872
873 873 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
874 874 data='', mandatory=True):
875 875 validateparttype(parttype)
876 876 self.id = None
877 877 self.type = parttype
878 878 self._data = data
879 879 self._mandatoryparams = list(mandatoryparams)
880 880 self._advisoryparams = list(advisoryparams)
881 881 # checking for duplicated entries
882 882 self._seenparams = set()
883 883 for pname, __ in self._mandatoryparams + self._advisoryparams:
884 884 if pname in self._seenparams:
885 885 raise error.ProgrammingError('duplicated params: %s' % pname)
886 886 self._seenparams.add(pname)
887 887 # status of the part's generation:
888 888 # - None: not started,
889 889 # - False: currently generated,
890 890 # - True: generation done.
891 891 self._generated = None
892 892 self.mandatory = mandatory
893 893
894 894 def __repr__(self):
895 895 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
896 896 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
897 897 % (cls, id(self), self.id, self.type, self.mandatory))
898 898
899 899 def copy(self):
900 900 """return a copy of the part
901 901
902 902 The new part have the very same content but no partid assigned yet.
903 903 Parts with generated data cannot be copied."""
904 904 assert not util.safehasattr(self.data, 'next')
905 905 return self.__class__(self.type, self._mandatoryparams,
906 906 self._advisoryparams, self._data, self.mandatory)
907 907
908 908 # methods used to defines the part content
909 909 @property
910 910 def data(self):
911 911 return self._data
912 912
913 913 @data.setter
914 914 def data(self, data):
915 915 if self._generated is not None:
916 916 raise error.ReadOnlyPartError('part is being generated')
917 917 self._data = data
918 918
919 919 @property
920 920 def mandatoryparams(self):
921 921 # make it an immutable tuple to force people through ``addparam``
922 922 return tuple(self._mandatoryparams)
923 923
924 924 @property
925 925 def advisoryparams(self):
926 926 # make it an immutable tuple to force people through ``addparam``
927 927 return tuple(self._advisoryparams)
928 928
929 929 def addparam(self, name, value='', mandatory=True):
930 930 """add a parameter to the part
931 931
932 932 If 'mandatory' is set to True, the remote handler must claim support
933 933 for this parameter or the unbundling will be aborted.
934 934
935 935 The 'name' and 'value' cannot exceed 255 bytes each.
936 936 """
937 937 if self._generated is not None:
938 938 raise error.ReadOnlyPartError('part is being generated')
939 939 if name in self._seenparams:
940 940 raise ValueError('duplicated params: %s' % name)
941 941 self._seenparams.add(name)
942 942 params = self._advisoryparams
943 943 if mandatory:
944 944 params = self._mandatoryparams
945 945 params.append((name, value))
946 946
947 947 # methods used to generates the bundle2 stream
948 948 def getchunks(self, ui):
949 949 if self._generated is not None:
950 950 raise error.ProgrammingError('part can only be consumed once')
951 951 self._generated = False
952 952
953 953 if ui.debugflag:
954 954 msg = ['bundle2-output-part: "%s"' % self.type]
955 955 if not self.mandatory:
956 956 msg.append(' (advisory)')
957 957 nbmp = len(self.mandatoryparams)
958 958 nbap = len(self.advisoryparams)
959 959 if nbmp or nbap:
960 960 msg.append(' (params:')
961 961 if nbmp:
962 962 msg.append(' %i mandatory' % nbmp)
963 963 if nbap:
964 964 msg.append(' %i advisory' % nbmp)
965 965 msg.append(')')
966 966 if not self.data:
967 967 msg.append(' empty payload')
968 968 elif util.safehasattr(self.data, 'next'):
969 969 msg.append(' streamed payload')
970 970 else:
971 971 msg.append(' %i bytes payload' % len(self.data))
972 972 msg.append('\n')
973 973 ui.debug(''.join(msg))
974 974
975 975 #### header
976 976 if self.mandatory:
977 977 parttype = self.type.upper()
978 978 else:
979 979 parttype = self.type.lower()
980 980 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
981 981 ## parttype
982 982 header = [_pack(_fparttypesize, len(parttype)),
983 983 parttype, _pack(_fpartid, self.id),
984 984 ]
985 985 ## parameters
986 986 # count
987 987 manpar = self.mandatoryparams
988 988 advpar = self.advisoryparams
989 989 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
990 990 # size
991 991 parsizes = []
992 992 for key, value in manpar:
993 993 parsizes.append(len(key))
994 994 parsizes.append(len(value))
995 995 for key, value in advpar:
996 996 parsizes.append(len(key))
997 997 parsizes.append(len(value))
998 998 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
999 999 header.append(paramsizes)
1000 1000 # key, value
1001 1001 for key, value in manpar:
1002 1002 header.append(key)
1003 1003 header.append(value)
1004 1004 for key, value in advpar:
1005 1005 header.append(key)
1006 1006 header.append(value)
1007 1007 ## finalize header
1008 1008 headerchunk = ''.join(header)
1009 1009 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1010 1010 yield _pack(_fpartheadersize, len(headerchunk))
1011 1011 yield headerchunk
1012 1012 ## payload
1013 1013 try:
1014 1014 for chunk in self._payloadchunks():
1015 1015 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1016 1016 yield _pack(_fpayloadsize, len(chunk))
1017 1017 yield chunk
1018 1018 except GeneratorExit:
1019 1019 # GeneratorExit means that nobody is listening for our
1020 1020 # results anyway, so just bail quickly rather than trying
1021 1021 # to produce an error part.
1022 1022 ui.debug('bundle2-generatorexit\n')
1023 1023 raise
1024 1024 except BaseException as exc:
1025 1025 # backup exception data for later
1026 1026 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1027 1027 % exc)
1028 1028 tb = sys.exc_info()[2]
1029 1029 msg = 'unexpected error: %s' % exc
1030 1030 interpart = bundlepart('error:abort', [('message', msg)],
1031 1031 mandatory=False)
1032 1032 interpart.id = 0
1033 1033 yield _pack(_fpayloadsize, -1)
1034 1034 for chunk in interpart.getchunks(ui=ui):
1035 1035 yield chunk
1036 1036 outdebug(ui, 'closing payload chunk')
1037 1037 # abort current part payload
1038 1038 yield _pack(_fpayloadsize, 0)
1039 1039 pycompat.raisewithtb(exc, tb)
1040 1040 # end of payload
1041 1041 outdebug(ui, 'closing payload chunk')
1042 1042 yield _pack(_fpayloadsize, 0)
1043 1043 self._generated = True
1044 1044
1045 1045 def _payloadchunks(self):
1046 1046 """yield chunks of a the part payload
1047 1047
1048 1048 Exists to handle the different methods to provide data to a part."""
1049 1049 # we only support fixed size data now.
1050 1050 # This will be improved in the future.
1051 1051 if util.safehasattr(self.data, 'next'):
1052 1052 buff = util.chunkbuffer(self.data)
1053 1053 chunk = buff.read(preferedchunksize)
1054 1054 while chunk:
1055 1055 yield chunk
1056 1056 chunk = buff.read(preferedchunksize)
1057 1057 elif len(self.data):
1058 1058 yield self.data
1059 1059
1060 1060
1061 1061 flaginterrupt = -1
1062 1062
1063 1063 class interrupthandler(unpackermixin):
1064 1064 """read one part and process it with restricted capability
1065 1065
1066 1066 This allows to transmit exception raised on the producer size during part
1067 1067 iteration while the consumer is reading a part.
1068 1068
1069 1069 Part processed in this manner only have access to a ui object,"""
1070 1070
1071 1071 def __init__(self, ui, fp):
1072 1072 super(interrupthandler, self).__init__(fp)
1073 1073 self.ui = ui
1074 1074
1075 1075 def _readpartheader(self):
1076 1076 """reads a part header size and return the bytes blob
1077 1077
1078 1078 returns None if empty"""
1079 1079 headersize = self._unpack(_fpartheadersize)[0]
1080 1080 if headersize < 0:
1081 1081 raise error.BundleValueError('negative part header size: %i'
1082 1082 % headersize)
1083 1083 indebug(self.ui, 'part header size: %i\n' % headersize)
1084 1084 if headersize:
1085 1085 return self._readexact(headersize)
1086 1086 return None
1087 1087
1088 1088 def __call__(self):
1089 1089
1090 1090 self.ui.debug('bundle2-input-stream-interrupt:'
1091 1091 ' opening out of band context\n')
1092 1092 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1093 1093 headerblock = self._readpartheader()
1094 1094 if headerblock is None:
1095 1095 indebug(self.ui, 'no part found during interruption.')
1096 1096 return
1097 1097 part = unbundlepart(self.ui, headerblock, self._fp)
1098 1098 op = interruptoperation(self.ui)
1099 1099 _processpart(op, part)
1100 1100 self.ui.debug('bundle2-input-stream-interrupt:'
1101 1101 ' closing out of band context\n')
1102 1102
1103 1103 class interruptoperation(object):
1104 1104 """A limited operation to be use by part handler during interruption
1105 1105
1106 1106 It only have access to an ui object.
1107 1107 """
1108 1108
1109 1109 def __init__(self, ui):
1110 1110 self.ui = ui
1111 1111 self.reply = None
1112 1112 self.captureoutput = False
1113 1113
1114 1114 @property
1115 1115 def repo(self):
1116 1116 raise error.ProgrammingError('no repo access from stream interruption')
1117 1117
1118 1118 def gettransaction(self):
1119 1119 raise TransactionUnavailable('no repo access from stream interruption')
1120 1120
1121 1121 class unbundlepart(unpackermixin):
1122 1122 """a bundle part read from a bundle"""
1123 1123
1124 1124 def __init__(self, ui, header, fp):
1125 1125 super(unbundlepart, self).__init__(fp)
1126 1126 self._seekable = (util.safehasattr(fp, 'seek') and
1127 1127 util.safehasattr(fp, 'tell'))
1128 1128 self.ui = ui
1129 1129 # unbundle state attr
1130 1130 self._headerdata = header
1131 1131 self._headeroffset = 0
1132 1132 self._initialized = False
1133 1133 self.consumed = False
1134 1134 # part data
1135 1135 self.id = None
1136 1136 self.type = None
1137 1137 self.mandatoryparams = None
1138 1138 self.advisoryparams = None
1139 1139 self.params = None
1140 1140 self.mandatorykeys = ()
1141 1141 self._payloadstream = None
1142 1142 self._readheader()
1143 1143 self._mandatory = None
1144 1144 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1145 1145 self._pos = 0
1146 1146
1147 1147 def _fromheader(self, size):
1148 1148 """return the next <size> byte from the header"""
1149 1149 offset = self._headeroffset
1150 1150 data = self._headerdata[offset:(offset + size)]
1151 1151 self._headeroffset = offset + size
1152 1152 return data
1153 1153
1154 1154 def _unpackheader(self, format):
1155 1155 """read given format from header
1156 1156
1157 1157 This automatically compute the size of the format to read."""
1158 1158 data = self._fromheader(struct.calcsize(format))
1159 1159 return _unpack(format, data)
1160 1160
1161 1161 def _initparams(self, mandatoryparams, advisoryparams):
1162 1162 """internal function to setup all logic related parameters"""
1163 1163 # make it read only to prevent people touching it by mistake.
1164 1164 self.mandatoryparams = tuple(mandatoryparams)
1165 1165 self.advisoryparams = tuple(advisoryparams)
1166 1166 # user friendly UI
1167 1167 self.params = util.sortdict(self.mandatoryparams)
1168 1168 self.params.update(self.advisoryparams)
1169 1169 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1170 1170
1171 1171 def _payloadchunks(self, chunknum=0):
1172 1172 '''seek to specified chunk and start yielding data'''
1173 1173 if len(self._chunkindex) == 0:
1174 1174 assert chunknum == 0, 'Must start with chunk 0'
1175 1175 self._chunkindex.append((0, self._tellfp()))
1176 1176 else:
1177 1177 assert chunknum < len(self._chunkindex), \
1178 1178 'Unknown chunk %d' % chunknum
1179 1179 self._seekfp(self._chunkindex[chunknum][1])
1180 1180
1181 1181 pos = self._chunkindex[chunknum][0]
1182 1182 payloadsize = self._unpack(_fpayloadsize)[0]
1183 1183 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1184 1184 while payloadsize:
1185 1185 if payloadsize == flaginterrupt:
1186 1186 # interruption detection, the handler will now read a
1187 1187 # single part and process it.
1188 1188 interrupthandler(self.ui, self._fp)()
1189 1189 elif payloadsize < 0:
1190 1190 msg = 'negative payload chunk size: %i' % payloadsize
1191 1191 raise error.BundleValueError(msg)
1192 1192 else:
1193 1193 result = self._readexact(payloadsize)
1194 1194 chunknum += 1
1195 1195 pos += payloadsize
1196 1196 if chunknum == len(self._chunkindex):
1197 1197 self._chunkindex.append((pos, self._tellfp()))
1198 1198 yield result
1199 1199 payloadsize = self._unpack(_fpayloadsize)[0]
1200 1200 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1201 1201
1202 1202 def _findchunk(self, pos):
1203 1203 '''for a given payload position, return a chunk number and offset'''
1204 1204 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1205 1205 if ppos == pos:
1206 1206 return chunk, 0
1207 1207 elif ppos > pos:
1208 1208 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1209 1209 raise ValueError('Unknown chunk')
1210 1210
1211 1211 def _readheader(self):
1212 1212 """read the header and setup the object"""
1213 1213 typesize = self._unpackheader(_fparttypesize)[0]
1214 1214 self.type = self._fromheader(typesize)
1215 1215 indebug(self.ui, 'part type: "%s"' % self.type)
1216 1216 self.id = self._unpackheader(_fpartid)[0]
1217 1217 indebug(self.ui, 'part id: "%s"' % self.id)
1218 1218 # extract mandatory bit from type
1219 1219 self.mandatory = (self.type != self.type.lower())
1220 1220 self.type = self.type.lower()
1221 1221 ## reading parameters
1222 1222 # param count
1223 1223 mancount, advcount = self._unpackheader(_fpartparamcount)
1224 1224 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1225 1225 # param size
1226 1226 fparamsizes = _makefpartparamsizes(mancount + advcount)
1227 1227 paramsizes = self._unpackheader(fparamsizes)
1228 1228 # make it a list of couple again
1229 1229 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1230 1230 # split mandatory from advisory
1231 1231 mansizes = paramsizes[:mancount]
1232 1232 advsizes = paramsizes[mancount:]
1233 1233 # retrieve param value
1234 1234 manparams = []
1235 1235 for key, value in mansizes:
1236 1236 manparams.append((self._fromheader(key), self._fromheader(value)))
1237 1237 advparams = []
1238 1238 for key, value in advsizes:
1239 1239 advparams.append((self._fromheader(key), self._fromheader(value)))
1240 1240 self._initparams(manparams, advparams)
1241 1241 ## part payload
1242 1242 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1243 1243 # we read the data, tell it
1244 1244 self._initialized = True
1245 1245
1246 1246 def read(self, size=None):
1247 1247 """read payload data"""
1248 1248 if not self._initialized:
1249 1249 self._readheader()
1250 1250 if size is None:
1251 1251 data = self._payloadstream.read()
1252 1252 else:
1253 1253 data = self._payloadstream.read(size)
1254 1254 self._pos += len(data)
1255 1255 if size is None or len(data) < size:
1256 1256 if not self.consumed and self._pos:
1257 1257 self.ui.debug('bundle2-input-part: total payload size %i\n'
1258 1258 % self._pos)
1259 1259 self.consumed = True
1260 1260 return data
1261 1261
1262 1262 def tell(self):
1263 1263 return self._pos
1264 1264
1265 1265 def seek(self, offset, whence=0):
1266 1266 if whence == 0:
1267 1267 newpos = offset
1268 1268 elif whence == 1:
1269 1269 newpos = self._pos + offset
1270 1270 elif whence == 2:
1271 1271 if not self.consumed:
1272 1272 self.read()
1273 1273 newpos = self._chunkindex[-1][0] - offset
1274 1274 else:
1275 1275 raise ValueError('Unknown whence value: %r' % (whence,))
1276 1276
1277 1277 if newpos > self._chunkindex[-1][0] and not self.consumed:
1278 1278 self.read()
1279 1279 if not 0 <= newpos <= self._chunkindex[-1][0]:
1280 1280 raise ValueError('Offset out of range')
1281 1281
1282 1282 if self._pos != newpos:
1283 1283 chunk, internaloffset = self._findchunk(newpos)
1284 1284 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1285 1285 adjust = self.read(internaloffset)
1286 1286 if len(adjust) != internaloffset:
1287 1287 raise error.Abort(_('Seek failed\n'))
1288 1288 self._pos = newpos
1289 1289
1290 1290 def _seekfp(self, offset, whence=0):
1291 1291 """move the underlying file pointer
1292 1292
1293 1293 This method is meant for internal usage by the bundle2 protocol only.
1294 1294 They directly manipulate the low level stream including bundle2 level
1295 1295 instruction.
1296 1296
1297 1297 Do not use it to implement higher-level logic or methods."""
1298 1298 if self._seekable:
1299 1299 return self._fp.seek(offset, whence)
1300 1300 else:
1301 1301 raise NotImplementedError(_('File pointer is not seekable'))
1302 1302
1303 1303 def _tellfp(self):
1304 1304 """return the file offset, or None if file is not seekable
1305 1305
1306 1306 This method is meant for internal usage by the bundle2 protocol only.
1307 1307 They directly manipulate the low level stream including bundle2 level
1308 1308 instruction.
1309 1309
1310 1310 Do not use it to implement higher-level logic or methods."""
1311 1311 if self._seekable:
1312 1312 try:
1313 1313 return self._fp.tell()
1314 1314 except IOError as e:
1315 1315 if e.errno == errno.ESPIPE:
1316 1316 self._seekable = False
1317 1317 else:
1318 1318 raise
1319 1319 return None
1320 1320
1321 1321 # These are only the static capabilities.
1322 1322 # Check the 'getrepocaps' function for the rest.
1323 1323 capabilities = {'HG20': (),
1324 1324 'error': ('abort', 'unsupportedcontent', 'pushraced',
1325 1325 'pushkey'),
1326 1326 'listkeys': (),
1327 1327 'pushkey': (),
1328 1328 'digests': tuple(sorted(util.DIGESTS.keys())),
1329 1329 'remote-changegroup': ('http', 'https'),
1330 1330 'hgtagsfnodes': (),
1331 1331 }
1332 1332
1333 1333 def getrepocaps(repo, allowpushback=False):
1334 1334 """return the bundle2 capabilities for a given repo
1335 1335
1336 1336 Exists to allow extensions (like evolution) to mutate the capabilities.
1337 1337 """
1338 1338 caps = capabilities.copy()
1339 1339 caps['changegroup'] = tuple(sorted(
1340 1340 changegroup.supportedincomingversions(repo)))
1341 1341 if obsolete.isenabled(repo, obsolete.exchangeopt):
1342 1342 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1343 1343 caps['obsmarkers'] = supportedformat
1344 1344 if allowpushback:
1345 1345 caps['pushback'] = ()
1346 1346 cpmode = repo.ui.config('server', 'concurrent-push-mode', 'strict')
1347 1347 if cpmode == 'check-related':
1348 1348 caps['checkheads'] = ('related',)
1349 1349 return caps
1350 1350
1351 1351 def bundle2caps(remote):
1352 1352 """return the bundle capabilities of a peer as dict"""
1353 1353 raw = remote.capable('bundle2')
1354 1354 if not raw and raw != '':
1355 1355 return {}
1356 1356 capsblob = urlreq.unquote(remote.capable('bundle2'))
1357 1357 return decodecaps(capsblob)
1358 1358
1359 1359 def obsmarkersversion(caps):
1360 1360 """extract the list of supported obsmarkers versions from a bundle2caps dict
1361 1361 """
1362 1362 obscaps = caps.get('obsmarkers', ())
1363 1363 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1364 1364
1365 1365 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1366 1366 vfs=None, compression=None, compopts=None):
1367 1367 if bundletype.startswith('HG10'):
1368 1368 cg = changegroup.getchangegroup(repo, source, outgoing, version='01')
1369 1369 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1370 1370 compression=compression, compopts=compopts)
1371 1371 elif not bundletype.startswith('HG20'):
1372 1372 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1373 1373
1374 1374 caps = {}
1375 1375 if 'obsolescence' in opts:
1376 1376 caps['obsmarkers'] = ('V1',)
1377 1377 bundle = bundle20(ui, caps)
1378 1378 bundle.setcompression(compression, compopts)
1379 1379 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1380 1380 chunkiter = bundle.getchunks()
1381 1381
1382 1382 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1383 1383
1384 1384 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1385 1385 # We should eventually reconcile this logic with the one behind
1386 1386 # 'exchange.getbundle2partsgenerator'.
1387 1387 #
1388 1388 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1389 1389 # different right now. So we keep them separated for now for the sake of
1390 1390 # simplicity.
1391 1391
1392 1392 # we always want a changegroup in such bundle
1393 1393 cgversion = opts.get('cg.version')
1394 1394 if cgversion is None:
1395 1395 cgversion = changegroup.safeversion(repo)
1396 1396 cg = changegroup.getchangegroup(repo, source, outgoing,
1397 1397 version=cgversion)
1398 1398 part = bundler.newpart('changegroup', data=cg.getchunks())
1399 1399 part.addparam('version', cg.version)
1400 1400 if 'clcount' in cg.extras:
1401 1401 part.addparam('nbchanges', str(cg.extras['clcount']),
1402 1402 mandatory=False)
1403 1403
1404 1404 addparttagsfnodescache(repo, bundler, outgoing)
1405 1405
1406 1406 if opts.get('obsolescence', False):
1407 1407 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1408 1408 buildobsmarkerspart(bundler, obsmarkers)
1409 1409
1410 1410 if opts.get('phases', False):
1411 1411 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1412 1412 phasedata = []
1413 1413 for phase in phases.allphases:
1414 1414 for head in headsbyphase[phase]:
1415 1415 phasedata.append(_pack(_fphasesentry, phase, head))
1416 1416 bundler.newpart('phase-heads', data=''.join(phasedata))
1417 1417
1418 1418 def addparttagsfnodescache(repo, bundler, outgoing):
1419 1419 # we include the tags fnode cache for the bundle changeset
1420 1420 # (as an optional parts)
1421 1421 cache = tags.hgtagsfnodescache(repo.unfiltered())
1422 1422 chunks = []
1423 1423
1424 1424 # .hgtags fnodes are only relevant for head changesets. While we could
1425 1425 # transfer values for all known nodes, there will likely be little to
1426 1426 # no benefit.
1427 1427 #
1428 1428 # We don't bother using a generator to produce output data because
1429 1429 # a) we only have 40 bytes per head and even esoteric numbers of heads
1430 1430 # consume little memory (1M heads is 40MB) b) we don't want to send the
1431 1431 # part if we don't have entries and knowing if we have entries requires
1432 1432 # cache lookups.
1433 1433 for node in outgoing.missingheads:
1434 1434 # Don't compute missing, as this may slow down serving.
1435 1435 fnode = cache.getfnode(node, computemissing=False)
1436 1436 if fnode is not None:
1437 1437 chunks.extend([node, fnode])
1438 1438
1439 1439 if chunks:
1440 1440 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1441 1441
1442 1442 def buildobsmarkerspart(bundler, markers):
1443 1443 """add an obsmarker part to the bundler with <markers>
1444 1444
1445 1445 No part is created if markers is empty.
1446 1446 Raises ValueError if the bundler doesn't support any known obsmarker format.
1447 1447 """
1448 1448 if not markers:
1449 1449 return None
1450 1450
1451 1451 remoteversions = obsmarkersversion(bundler.capabilities)
1452 1452 version = obsolete.commonversion(remoteversions)
1453 1453 if version is None:
1454 1454 raise ValueError('bundler does not support common obsmarker format')
1455 1455 stream = obsolete.encodemarkers(markers, True, version=version)
1456 1456 return bundler.newpart('obsmarkers', data=stream)
1457 1457
1458 1458 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1459 1459 compopts=None):
1460 1460 """Write a bundle file and return its filename.
1461 1461
1462 1462 Existing files will not be overwritten.
1463 1463 If no filename is specified, a temporary file is created.
1464 1464 bz2 compression can be turned off.
1465 1465 The bundle file will be deleted in case of errors.
1466 1466 """
1467 1467
1468 1468 if bundletype == "HG20":
1469 1469 bundle = bundle20(ui)
1470 1470 bundle.setcompression(compression, compopts)
1471 1471 part = bundle.newpart('changegroup', data=cg.getchunks())
1472 1472 part.addparam('version', cg.version)
1473 1473 if 'clcount' in cg.extras:
1474 1474 part.addparam('nbchanges', str(cg.extras['clcount']),
1475 1475 mandatory=False)
1476 1476 chunkiter = bundle.getchunks()
1477 1477 else:
1478 1478 # compression argument is only for the bundle2 case
1479 1479 assert compression is None
1480 1480 if cg.version != '01':
1481 1481 raise error.Abort(_('old bundle types only supports v1 '
1482 1482 'changegroups'))
1483 1483 header, comp = bundletypes[bundletype]
1484 1484 if comp not in util.compengines.supportedbundletypes:
1485 1485 raise error.Abort(_('unknown stream compression type: %s')
1486 1486 % comp)
1487 1487 compengine = util.compengines.forbundletype(comp)
1488 1488 def chunkiter():
1489 1489 yield header
1490 1490 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1491 1491 yield chunk
1492 1492 chunkiter = chunkiter()
1493 1493
1494 1494 # parse the changegroup data, otherwise we will block
1495 1495 # in case of sshrepo because we don't know the end of the stream
1496 1496 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1497 1497
1498 1498 def combinechangegroupresults(op):
1499 1499 """logic to combine 0 or more addchangegroup results into one"""
1500 1500 results = [r.get('return', 0)
1501 1501 for r in op.records['changegroup']]
1502 1502 changedheads = 0
1503 1503 result = 1
1504 1504 for ret in results:
1505 1505 # If any changegroup result is 0, return 0
1506 1506 if ret == 0:
1507 1507 result = 0
1508 1508 break
1509 1509 if ret < -1:
1510 1510 changedheads += ret + 1
1511 1511 elif ret > 1:
1512 1512 changedheads += ret - 1
1513 1513 if changedheads > 0:
1514 1514 result = 1 + changedheads
1515 1515 elif changedheads < 0:
1516 1516 result = -1 + changedheads
1517 1517 return result
1518 1518
1519 1519 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1520 1520 def handlechangegroup(op, inpart):
1521 1521 """apply a changegroup part on the repo
1522 1522
1523 1523 This is a very early implementation that will massive rework before being
1524 1524 inflicted to any end-user.
1525 1525 """
1526 1526 tr = op.gettransaction()
1527 1527 unpackerversion = inpart.params.get('version', '01')
1528 1528 # We should raise an appropriate exception here
1529 1529 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1530 1530 # the source and url passed here are overwritten by the one contained in
1531 1531 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1532 1532 nbchangesets = None
1533 1533 if 'nbchanges' in inpart.params:
1534 1534 nbchangesets = int(inpart.params.get('nbchanges'))
1535 1535 if ('treemanifest' in inpart.params and
1536 1536 'treemanifest' not in op.repo.requirements):
1537 1537 if len(op.repo.changelog) != 0:
1538 1538 raise error.Abort(_(
1539 1539 "bundle contains tree manifests, but local repo is "
1540 1540 "non-empty and does not use tree manifests"))
1541 1541 op.repo.requirements.add('treemanifest')
1542 1542 op.repo._applyopenerreqs()
1543 1543 op.repo._writerequirements()
1544 1544 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1545 1545 expectedtotal=nbchangesets)
1546 1546 if op.reply is not None:
1547 1547 # This is definitely not the final form of this
1548 1548 # return. But one need to start somewhere.
1549 1549 part = op.reply.newpart('reply:changegroup', mandatory=False)
1550 1550 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1551 1551 part.addparam('return', '%i' % ret, mandatory=False)
1552 1552 assert not inpart.read()
1553 1553
1554 1554 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1555 1555 ['digest:%s' % k for k in util.DIGESTS.keys()])
1556 1556 @parthandler('remote-changegroup', _remotechangegroupparams)
1557 1557 def handleremotechangegroup(op, inpart):
1558 1558 """apply a bundle10 on the repo, given an url and validation information
1559 1559
1560 1560 All the information about the remote bundle to import are given as
1561 1561 parameters. The parameters include:
1562 1562 - url: the url to the bundle10.
1563 1563 - size: the bundle10 file size. It is used to validate what was
1564 1564 retrieved by the client matches the server knowledge about the bundle.
1565 1565 - digests: a space separated list of the digest types provided as
1566 1566 parameters.
1567 1567 - digest:<digest-type>: the hexadecimal representation of the digest with
1568 1568 that name. Like the size, it is used to validate what was retrieved by
1569 1569 the client matches what the server knows about the bundle.
1570 1570
1571 1571 When multiple digest types are given, all of them are checked.
1572 1572 """
1573 1573 try:
1574 1574 raw_url = inpart.params['url']
1575 1575 except KeyError:
1576 1576 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1577 1577 parsed_url = util.url(raw_url)
1578 1578 if parsed_url.scheme not in capabilities['remote-changegroup']:
1579 1579 raise error.Abort(_('remote-changegroup does not support %s urls') %
1580 1580 parsed_url.scheme)
1581 1581
1582 1582 try:
1583 1583 size = int(inpart.params['size'])
1584 1584 except ValueError:
1585 1585 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1586 1586 % 'size')
1587 1587 except KeyError:
1588 1588 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1589 1589
1590 1590 digests = {}
1591 1591 for typ in inpart.params.get('digests', '').split():
1592 1592 param = 'digest:%s' % typ
1593 1593 try:
1594 1594 value = inpart.params[param]
1595 1595 except KeyError:
1596 1596 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1597 1597 param)
1598 1598 digests[typ] = value
1599 1599
1600 1600 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1601 1601
1602 1602 tr = op.gettransaction()
1603 1603 from . import exchange
1604 1604 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1605 1605 if not isinstance(cg, changegroup.cg1unpacker):
1606 1606 raise error.Abort(_('%s: not a bundle version 1.0') %
1607 1607 util.hidepassword(raw_url))
1608 1608 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1609 1609 if op.reply is not None:
1610 1610 # This is definitely not the final form of this
1611 1611 # return. But one need to start somewhere.
1612 1612 part = op.reply.newpart('reply:changegroup')
1613 1613 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1614 1614 part.addparam('return', '%i' % ret, mandatory=False)
1615 1615 try:
1616 1616 real_part.validate()
1617 1617 except error.Abort as e:
1618 1618 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1619 1619 (util.hidepassword(raw_url), str(e)))
1620 1620 assert not inpart.read()
1621 1621
1622 1622 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1623 1623 def handlereplychangegroup(op, inpart):
1624 1624 ret = int(inpart.params['return'])
1625 1625 replyto = int(inpart.params['in-reply-to'])
1626 1626 op.records.add('changegroup', {'return': ret}, replyto)
1627 1627
1628 1628 @parthandler('check:heads')
1629 1629 def handlecheckheads(op, inpart):
1630 1630 """check that head of the repo did not change
1631 1631
1632 1632 This is used to detect a push race when using unbundle.
1633 1633 This replaces the "heads" argument of unbundle."""
1634 1634 h = inpart.read(20)
1635 1635 heads = []
1636 1636 while len(h) == 20:
1637 1637 heads.append(h)
1638 1638 h = inpart.read(20)
1639 1639 assert not h
1640 1640 # Trigger a transaction so that we are guaranteed to have the lock now.
1641 1641 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1642 1642 op.gettransaction()
1643 1643 if sorted(heads) != sorted(op.repo.heads()):
1644 1644 raise error.PushRaced('repository changed while pushing - '
1645 1645 'please try again')
1646 1646
1647 1647 @parthandler('check:updated-heads')
1648 1648 def handlecheckupdatedheads(op, inpart):
1649 1649 """check for race on the heads touched by a push
1650 1650
1651 1651 This is similar to 'check:heads' but focus on the heads actually updated
1652 1652 during the push. If other activities happen on unrelated heads, it is
1653 1653 ignored.
1654 1654
1655 1655 This allow server with high traffic to avoid push contention as long as
1656 1656 unrelated parts of the graph are involved."""
1657 1657 h = inpart.read(20)
1658 1658 heads = []
1659 1659 while len(h) == 20:
1660 1660 heads.append(h)
1661 1661 h = inpart.read(20)
1662 1662 assert not h
1663 1663 # trigger a transaction so that we are guaranteed to have the lock now.
1664 1664 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1665 1665 op.gettransaction()
1666 1666
1667 1667 currentheads = set()
1668 1668 for ls in op.repo.branchmap().itervalues():
1669 1669 currentheads.update(ls)
1670 1670
1671 1671 for h in heads:
1672 1672 if h not in currentheads:
1673 1673 raise error.PushRaced('repository changed while pushing - '
1674 1674 'please try again')
1675 1675
1676 1676 @parthandler('output')
1677 1677 def handleoutput(op, inpart):
1678 1678 """forward output captured on the server to the client"""
1679 1679 for line in inpart.read().splitlines():
1680 1680 op.ui.status(_('remote: %s\n') % line)
1681 1681
1682 1682 @parthandler('replycaps')
1683 1683 def handlereplycaps(op, inpart):
1684 1684 """Notify that a reply bundle should be created
1685 1685
1686 1686 The payload contains the capabilities information for the reply"""
1687 1687 caps = decodecaps(inpart.read())
1688 1688 if op.reply is None:
1689 1689 op.reply = bundle20(op.ui, caps)
1690 1690
1691 1691 class AbortFromPart(error.Abort):
1692 1692 """Sub-class of Abort that denotes an error from a bundle2 part."""
1693 1693
1694 1694 @parthandler('error:abort', ('message', 'hint'))
1695 1695 def handleerrorabort(op, inpart):
1696 1696 """Used to transmit abort error over the wire"""
1697 1697 raise AbortFromPart(inpart.params['message'],
1698 1698 hint=inpart.params.get('hint'))
1699 1699
1700 1700 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1701 1701 'in-reply-to'))
1702 1702 def handleerrorpushkey(op, inpart):
1703 1703 """Used to transmit failure of a mandatory pushkey over the wire"""
1704 1704 kwargs = {}
1705 1705 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1706 1706 value = inpart.params.get(name)
1707 1707 if value is not None:
1708 1708 kwargs[name] = value
1709 1709 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1710 1710
1711 1711 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1712 1712 def handleerrorunsupportedcontent(op, inpart):
1713 1713 """Used to transmit unknown content error over the wire"""
1714 1714 kwargs = {}
1715 1715 parttype = inpart.params.get('parttype')
1716 1716 if parttype is not None:
1717 1717 kwargs['parttype'] = parttype
1718 1718 params = inpart.params.get('params')
1719 1719 if params is not None:
1720 1720 kwargs['params'] = params.split('\0')
1721 1721
1722 1722 raise error.BundleUnknownFeatureError(**kwargs)
1723 1723
1724 1724 @parthandler('error:pushraced', ('message',))
1725 1725 def handleerrorpushraced(op, inpart):
1726 1726 """Used to transmit push race error over the wire"""
1727 1727 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1728 1728
1729 1729 @parthandler('listkeys', ('namespace',))
1730 1730 def handlelistkeys(op, inpart):
1731 1731 """retrieve pushkey namespace content stored in a bundle2"""
1732 1732 namespace = inpart.params['namespace']
1733 1733 r = pushkey.decodekeys(inpart.read())
1734 1734 op.records.add('listkeys', (namespace, r))
1735 1735
1736 1736 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1737 1737 def handlepushkey(op, inpart):
1738 1738 """process a pushkey request"""
1739 1739 dec = pushkey.decode
1740 1740 namespace = dec(inpart.params['namespace'])
1741 1741 key = dec(inpart.params['key'])
1742 1742 old = dec(inpart.params['old'])
1743 1743 new = dec(inpart.params['new'])
1744 1744 # Grab the transaction to ensure that we have the lock before performing the
1745 1745 # pushkey.
1746 1746 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1747 1747 op.gettransaction()
1748 1748 ret = op.repo.pushkey(namespace, key, old, new)
1749 1749 record = {'namespace': namespace,
1750 1750 'key': key,
1751 1751 'old': old,
1752 1752 'new': new}
1753 1753 op.records.add('pushkey', record)
1754 1754 if op.reply is not None:
1755 1755 rpart = op.reply.newpart('reply:pushkey')
1756 1756 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1757 1757 rpart.addparam('return', '%i' % ret, mandatory=False)
1758 1758 if inpart.mandatory and not ret:
1759 1759 kwargs = {}
1760 1760 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1761 1761 if key in inpart.params:
1762 1762 kwargs[key] = inpart.params[key]
1763 1763 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1764 1764
1765 1765 def _readphaseheads(inpart):
1766 1766 headsbyphase = [[] for i in phases.allphases]
1767 1767 entrysize = struct.calcsize(_fphasesentry)
1768 1768 while True:
1769 1769 entry = inpart.read(entrysize)
1770 1770 if len(entry) < entrysize:
1771 1771 if entry:
1772 1772 raise error.Abort(_('bad phase-heads bundle part'))
1773 1773 break
1774 1774 phase, node = struct.unpack(_fphasesentry, entry)
1775 1775 headsbyphase[phase].append(node)
1776 1776 return headsbyphase
1777 1777
1778 1778 @parthandler('phase-heads')
1779 1779 def handlephases(op, inpart):
1780 1780 """apply phases from bundle part to repo"""
1781 1781 headsbyphase = _readphaseheads(inpart)
1782 1782 addednodes = []
1783 1783 for entry in op.records['changegroup']:
1784 1784 addednodes.extend(entry['addednodes'])
1785 1785 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase,
1786 1786 addednodes)
1787 1787
1788 1788 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1789 1789 def handlepushkeyreply(op, inpart):
1790 1790 """retrieve the result of a pushkey request"""
1791 1791 ret = int(inpart.params['return'])
1792 1792 partid = int(inpart.params['in-reply-to'])
1793 1793 op.records.add('pushkey', {'return': ret}, partid)
1794 1794
1795 1795 @parthandler('obsmarkers')
1796 1796 def handleobsmarker(op, inpart):
1797 1797 """add a stream of obsmarkers to the repo"""
1798 1798 tr = op.gettransaction()
1799 1799 markerdata = inpart.read()
1800 1800 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1801 1801 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1802 1802 % len(markerdata))
1803 1803 # The mergemarkers call will crash if marker creation is not enabled.
1804 1804 # we want to avoid this if the part is advisory.
1805 1805 if not inpart.mandatory and op.repo.obsstore.readonly:
1806 1806 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1807 1807 return
1808 1808 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1809 1809 op.repo.invalidatevolatilesets()
1810 1810 if new:
1811 1811 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1812 1812 op.records.add('obsmarkers', {'new': new})
1813 1813 if op.reply is not None:
1814 1814 rpart = op.reply.newpart('reply:obsmarkers')
1815 1815 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1816 1816 rpart.addparam('new', '%i' % new, mandatory=False)
1817 1817
1818 1818
1819 1819 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1820 1820 def handleobsmarkerreply(op, inpart):
1821 1821 """retrieve the result of a pushkey request"""
1822 1822 ret = int(inpart.params['new'])
1823 1823 partid = int(inpart.params['in-reply-to'])
1824 1824 op.records.add('obsmarkers', {'new': ret}, partid)
1825 1825
1826 1826 @parthandler('hgtagsfnodes')
1827 1827 def handlehgtagsfnodes(op, inpart):
1828 1828 """Applies .hgtags fnodes cache entries to the local repo.
1829 1829
1830 1830 Payload is pairs of 20 byte changeset nodes and filenodes.
1831 1831 """
1832 1832 # Grab the transaction so we ensure that we have the lock at this point.
1833 1833 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1834 1834 op.gettransaction()
1835 1835 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1836 1836
1837 1837 count = 0
1838 1838 while True:
1839 1839 node = inpart.read(20)
1840 1840 fnode = inpart.read(20)
1841 1841 if len(node) < 20 or len(fnode) < 20:
1842 1842 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1843 1843 break
1844 1844 cache.setfnode(node, fnode)
1845 1845 count += 1
1846 1846
1847 1847 cache.write()
1848 1848 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
@@ -1,73 +1,76
1 1 # configitems.py - centralized declaration of configuration option
2 2 #
3 3 # Copyright 2017 Pierre-Yves David <pierre-yves.david@octobus.net>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import functools
11 11
12 12 from . import (
13 13 error,
14 14 )
15 15
16 16 def loadconfigtable(ui, extname, configtable):
17 17 """update config item known to the ui with the extension ones"""
18 18 for section, items in configtable.items():
19 19 knownitems = ui._knownconfig.setdefault(section, {})
20 20 knownkeys = set(knownitems)
21 21 newkeys = set(items)
22 22 for key in sorted(knownkeys & newkeys):
23 23 msg = "extension '%s' overwrite config item '%s.%s'"
24 24 msg %= (extname, section, key)
25 25 ui.develwarn(msg, config='warn-config')
26 26
27 27 knownitems.update(items)
28 28
29 29 class configitem(object):
30 30 """represent a known config item
31 31
32 32 :section: the official config section where to find this item,
33 33 :name: the official name within the section,
34 34 :default: default value for this item,
35 35 """
36 36
37 37 def __init__(self, section, name, default=None):
38 38 self.section = section
39 39 self.name = name
40 40 self.default = default
41 41
42 42 coreitems = {}
43 43
44 44 def _register(configtable, *args, **kwargs):
45 45 item = configitem(*args, **kwargs)
46 46 section = configtable.setdefault(item.section, {})
47 47 if item.name in section:
48 48 msg = "duplicated config item registration for '%s.%s'"
49 49 raise error.ProgrammingError(msg % (item.section, item.name))
50 50 section[item.name] = item
51 51
52 52 # Registering actual config items
53 53
54 54 def getitemregister(configtable):
55 55 return functools.partial(_register, configtable)
56 56
57 57 coreconfigitem = getitemregister(coreitems)
58 58
59 59 coreconfigitem('devel', 'all-warnings',
60 60 default=False,
61 61 )
62 coreconfigitem('devel', 'bundle2.debug',
63 default=False,
64 )
62 65 coreconfigitem('patch', 'fuzz',
63 66 default=2,
64 67 )
65 68 coreconfigitem('ui', 'clonebundleprefers',
66 69 default=list,
67 70 )
68 71 coreconfigitem('ui', 'interactive',
69 72 default=None,
70 73 )
71 74 coreconfigitem('ui', 'quiet',
72 75 default=False,
73 76 )
General Comments 0
You need to be logged in to leave comments. Login now