##// END OF EJS Templates
bundle: extract _processchangegroup() method...
Martin von Zweigbergk -
r33038:f0efd2bf default
parent child Browse files
Show More
@@ -1,1839 +1,1839 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import
149 149
150 150 import errno
151 151 import re
152 152 import string
153 153 import struct
154 154 import sys
155 155
156 156 from .i18n import _
157 157 from . import (
158 158 changegroup,
159 159 error,
160 160 obsolete,
161 161 phases,
162 162 pushkey,
163 163 pycompat,
164 164 tags,
165 165 url,
166 166 util,
167 167 )
168 168
169 169 urlerr = util.urlerr
170 170 urlreq = util.urlreq
171 171
172 172 _pack = struct.pack
173 173 _unpack = struct.unpack
174 174
175 175 _fstreamparamsize = '>i'
176 176 _fpartheadersize = '>i'
177 177 _fparttypesize = '>B'
178 178 _fpartid = '>I'
179 179 _fpayloadsize = '>i'
180 180 _fpartparamcount = '>BB'
181 181
182 182 _fphasesentry = '>i20s'
183 183
184 184 preferedchunksize = 4096
185 185
186 186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
187 187
188 188 def outdebug(ui, message):
189 189 """debug regarding output stream (bundling)"""
190 190 if ui.configbool('devel', 'bundle2.debug', False):
191 191 ui.debug('bundle2-output: %s\n' % message)
192 192
193 193 def indebug(ui, message):
194 194 """debug on input stream (unbundling)"""
195 195 if ui.configbool('devel', 'bundle2.debug', False):
196 196 ui.debug('bundle2-input: %s\n' % message)
197 197
198 198 def validateparttype(parttype):
199 199 """raise ValueError if a parttype contains invalid character"""
200 200 if _parttypeforbidden.search(parttype):
201 201 raise ValueError(parttype)
202 202
203 203 def _makefpartparamsizes(nbparams):
204 204 """return a struct format to read part parameter sizes
205 205
206 206 The number parameters is variable so we need to build that format
207 207 dynamically.
208 208 """
209 209 return '>'+('BB'*nbparams)
210 210
211 211 parthandlermapping = {}
212 212
213 213 def parthandler(parttype, params=()):
214 214 """decorator that register a function as a bundle2 part handler
215 215
216 216 eg::
217 217
218 218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
219 219 def myparttypehandler(...):
220 220 '''process a part of type "my part".'''
221 221 ...
222 222 """
223 223 validateparttype(parttype)
224 224 def _decorator(func):
225 225 lparttype = parttype.lower() # enforce lower case matching.
226 226 assert lparttype not in parthandlermapping
227 227 parthandlermapping[lparttype] = func
228 228 func.params = frozenset(params)
229 229 return func
230 230 return _decorator
231 231
232 232 class unbundlerecords(object):
233 233 """keep record of what happens during and unbundle
234 234
235 235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
236 236 category of record and obj is an arbitrary object.
237 237
238 238 `records['cat']` will return all entries of this category 'cat'.
239 239
240 240 Iterating on the object itself will yield `('category', obj)` tuples
241 241 for all entries.
242 242
243 243 All iterations happens in chronological order.
244 244 """
245 245
246 246 def __init__(self):
247 247 self._categories = {}
248 248 self._sequences = []
249 249 self._replies = {}
250 250
251 251 def add(self, category, entry, inreplyto=None):
252 252 """add a new record of a given category.
253 253
254 254 The entry can then be retrieved in the list returned by
255 255 self['category']."""
256 256 self._categories.setdefault(category, []).append(entry)
257 257 self._sequences.append((category, entry))
258 258 if inreplyto is not None:
259 259 self.getreplies(inreplyto).add(category, entry)
260 260
261 261 def getreplies(self, partid):
262 262 """get the records that are replies to a specific part"""
263 263 return self._replies.setdefault(partid, unbundlerecords())
264 264
265 265 def __getitem__(self, cat):
266 266 return tuple(self._categories.get(cat, ()))
267 267
268 268 def __iter__(self):
269 269 return iter(self._sequences)
270 270
271 271 def __len__(self):
272 272 return len(self._sequences)
273 273
274 274 def __nonzero__(self):
275 275 return bool(self._sequences)
276 276
277 277 __bool__ = __nonzero__
278 278
279 279 class bundleoperation(object):
280 280 """an object that represents a single bundling process
281 281
282 282 Its purpose is to carry unbundle-related objects and states.
283 283
284 284 A new object should be created at the beginning of each bundle processing.
285 285 The object is to be returned by the processing function.
286 286
287 287 The object has very little content now it will ultimately contain:
288 288 * an access to the repo the bundle is applied to,
289 289 * a ui object,
290 290 * a way to retrieve a transaction to add changes to the repo,
291 291 * a way to record the result of processing each part,
292 292 * a way to construct a bundle response when applicable.
293 293 """
294 294
295 295 def __init__(self, repo, transactiongetter, captureoutput=True):
296 296 self.repo = repo
297 297 self.ui = repo.ui
298 298 self.records = unbundlerecords()
299 299 self.gettransaction = transactiongetter
300 300 self.reply = None
301 301 self.captureoutput = captureoutput
302 302
303 303 class TransactionUnavailable(RuntimeError):
304 304 pass
305 305
306 306 def _notransaction():
307 307 """default method to get a transaction while processing a bundle
308 308
309 309 Raise an exception to highlight the fact that no transaction was expected
310 310 to be created"""
311 311 raise TransactionUnavailable()
312 312
313 313 def applybundle(repo, unbundler, tr, source=None, url=None):
314 314 # transform me into unbundler.apply() as soon as the freeze is lifted
315 315 tr.hookargs['bundle2'] = '1'
316 316 if source is not None and 'source' not in tr.hookargs:
317 317 tr.hookargs['source'] = source
318 318 if url is not None and 'url' not in tr.hookargs:
319 319 tr.hookargs['url'] = url
320 320 return processbundle(repo, unbundler, lambda: tr)
321 321
322 322 def processbundle(repo, unbundler, transactiongetter=None, op=None):
323 323 """This function process a bundle, apply effect to/from a repo
324 324
325 325 It iterates over each part then searches for and uses the proper handling
326 326 code to process the part. Parts are processed in order.
327 327
328 328 Unknown Mandatory part will abort the process.
329 329
330 330 It is temporarily possible to provide a prebuilt bundleoperation to the
331 331 function. This is used to ensure output is properly propagated in case of
332 332 an error during the unbundling. This output capturing part will likely be
333 333 reworked and this ability will probably go away in the process.
334 334 """
335 335 if op is None:
336 336 if transactiongetter is None:
337 337 transactiongetter = _notransaction
338 338 op = bundleoperation(repo, transactiongetter)
339 339 # todo:
340 340 # - replace this is a init function soon.
341 341 # - exception catching
342 342 unbundler.params
343 343 if repo.ui.debugflag:
344 344 msg = ['bundle2-input-bundle:']
345 345 if unbundler.params:
346 346 msg.append(' %i params')
347 347 if op.gettransaction is None or op.gettransaction is _notransaction:
348 348 msg.append(' no-transaction')
349 349 else:
350 350 msg.append(' with-transaction')
351 351 msg.append('\n')
352 352 repo.ui.debug(''.join(msg))
353 353 iterparts = enumerate(unbundler.iterparts())
354 354 part = None
355 355 nbpart = 0
356 356 try:
357 357 for nbpart, part in iterparts:
358 358 _processpart(op, part)
359 359 except Exception as exc:
360 360 # Any exceptions seeking to the end of the bundle at this point are
361 361 # almost certainly related to the underlying stream being bad.
362 362 # And, chances are that the exception we're handling is related to
363 363 # getting in that bad state. So, we swallow the seeking error and
364 364 # re-raise the original error.
365 365 seekerror = False
366 366 try:
367 367 for nbpart, part in iterparts:
368 368 # consume the bundle content
369 369 part.seek(0, 2)
370 370 except Exception:
371 371 seekerror = True
372 372
373 373 # Small hack to let caller code distinguish exceptions from bundle2
374 374 # processing from processing the old format. This is mostly
375 375 # needed to handle different return codes to unbundle according to the
376 376 # type of bundle. We should probably clean up or drop this return code
377 377 # craziness in a future version.
378 378 exc.duringunbundle2 = True
379 379 salvaged = []
380 380 replycaps = None
381 381 if op.reply is not None:
382 382 salvaged = op.reply.salvageoutput()
383 383 replycaps = op.reply.capabilities
384 384 exc._replycaps = replycaps
385 385 exc._bundle2salvagedoutput = salvaged
386 386
387 387 # Re-raising from a variable loses the original stack. So only use
388 388 # that form if we need to.
389 389 if seekerror:
390 390 raise exc
391 391 else:
392 392 raise
393 393 finally:
394 394 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
395 395
396 396 return op
397 397
398 def _processchangegroup(op, cg, tr, source, url, **kwargs):
399 ret, addednodes = cg.apply(op.repo, tr, source, url, **kwargs)
400 op.records.add('changegroup', {
401 'return': ret,
402 'addednodes': addednodes,
403 })
404 return ret
405
398 406 def _processpart(op, part):
399 407 """process a single part from a bundle
400 408
401 409 The part is guaranteed to have been fully consumed when the function exits
402 410 (even if an exception is raised)."""
403 411 status = 'unknown' # used by debug output
404 412 hardabort = False
405 413 try:
406 414 try:
407 415 handler = parthandlermapping.get(part.type)
408 416 if handler is None:
409 417 status = 'unsupported-type'
410 418 raise error.BundleUnknownFeatureError(parttype=part.type)
411 419 indebug(op.ui, 'found a handler for part %r' % part.type)
412 420 unknownparams = part.mandatorykeys - handler.params
413 421 if unknownparams:
414 422 unknownparams = list(unknownparams)
415 423 unknownparams.sort()
416 424 status = 'unsupported-params (%s)' % unknownparams
417 425 raise error.BundleUnknownFeatureError(parttype=part.type,
418 426 params=unknownparams)
419 427 status = 'supported'
420 428 except error.BundleUnknownFeatureError as exc:
421 429 if part.mandatory: # mandatory parts
422 430 raise
423 431 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
424 432 return # skip to part processing
425 433 finally:
426 434 if op.ui.debugflag:
427 435 msg = ['bundle2-input-part: "%s"' % part.type]
428 436 if not part.mandatory:
429 437 msg.append(' (advisory)')
430 438 nbmp = len(part.mandatorykeys)
431 439 nbap = len(part.params) - nbmp
432 440 if nbmp or nbap:
433 441 msg.append(' (params:')
434 442 if nbmp:
435 443 msg.append(' %i mandatory' % nbmp)
436 444 if nbap:
437 445 msg.append(' %i advisory' % nbmp)
438 446 msg.append(')')
439 447 msg.append(' %s\n' % status)
440 448 op.ui.debug(''.join(msg))
441 449
442 450 # handler is called outside the above try block so that we don't
443 451 # risk catching KeyErrors from anything other than the
444 452 # parthandlermapping lookup (any KeyError raised by handler()
445 453 # itself represents a defect of a different variety).
446 454 output = None
447 455 if op.captureoutput and op.reply is not None:
448 456 op.ui.pushbuffer(error=True, subproc=True)
449 457 output = ''
450 458 try:
451 459 handler(op, part)
452 460 finally:
453 461 if output is not None:
454 462 output = op.ui.popbuffer()
455 463 if output:
456 464 outpart = op.reply.newpart('output', data=output,
457 465 mandatory=False)
458 466 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
459 467 # If exiting or interrupted, do not attempt to seek the stream in the
460 468 # finally block below. This makes abort faster.
461 469 except (SystemExit, KeyboardInterrupt):
462 470 hardabort = True
463 471 raise
464 472 finally:
465 473 # consume the part content to not corrupt the stream.
466 474 if not hardabort:
467 475 part.seek(0, 2)
468 476
469 477
470 478 def decodecaps(blob):
471 479 """decode a bundle2 caps bytes blob into a dictionary
472 480
473 481 The blob is a list of capabilities (one per line)
474 482 Capabilities may have values using a line of the form::
475 483
476 484 capability=value1,value2,value3
477 485
478 486 The values are always a list."""
479 487 caps = {}
480 488 for line in blob.splitlines():
481 489 if not line:
482 490 continue
483 491 if '=' not in line:
484 492 key, vals = line, ()
485 493 else:
486 494 key, vals = line.split('=', 1)
487 495 vals = vals.split(',')
488 496 key = urlreq.unquote(key)
489 497 vals = [urlreq.unquote(v) for v in vals]
490 498 caps[key] = vals
491 499 return caps
492 500
493 501 def encodecaps(caps):
494 502 """encode a bundle2 caps dictionary into a bytes blob"""
495 503 chunks = []
496 504 for ca in sorted(caps):
497 505 vals = caps[ca]
498 506 ca = urlreq.quote(ca)
499 507 vals = [urlreq.quote(v) for v in vals]
500 508 if vals:
501 509 ca = "%s=%s" % (ca, ','.join(vals))
502 510 chunks.append(ca)
503 511 return '\n'.join(chunks)
504 512
505 513 bundletypes = {
506 514 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
507 515 # since the unification ssh accepts a header but there
508 516 # is no capability signaling it.
509 517 "HG20": (), # special-cased below
510 518 "HG10UN": ("HG10UN", 'UN'),
511 519 "HG10BZ": ("HG10", 'BZ'),
512 520 "HG10GZ": ("HG10GZ", 'GZ'),
513 521 }
514 522
515 523 # hgweb uses this list to communicate its preferred type
516 524 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
517 525
518 526 class bundle20(object):
519 527 """represent an outgoing bundle2 container
520 528
521 529 Use the `addparam` method to add stream level parameter. and `newpart` to
522 530 populate it. Then call `getchunks` to retrieve all the binary chunks of
523 531 data that compose the bundle2 container."""
524 532
525 533 _magicstring = 'HG20'
526 534
527 535 def __init__(self, ui, capabilities=()):
528 536 self.ui = ui
529 537 self._params = []
530 538 self._parts = []
531 539 self.capabilities = dict(capabilities)
532 540 self._compengine = util.compengines.forbundletype('UN')
533 541 self._compopts = None
534 542
535 543 def setcompression(self, alg, compopts=None):
536 544 """setup core part compression to <alg>"""
537 545 if alg in (None, 'UN'):
538 546 return
539 547 assert not any(n.lower() == 'compression' for n, v in self._params)
540 548 self.addparam('Compression', alg)
541 549 self._compengine = util.compengines.forbundletype(alg)
542 550 self._compopts = compopts
543 551
544 552 @property
545 553 def nbparts(self):
546 554 """total number of parts added to the bundler"""
547 555 return len(self._parts)
548 556
549 557 # methods used to defines the bundle2 content
550 558 def addparam(self, name, value=None):
551 559 """add a stream level parameter"""
552 560 if not name:
553 561 raise ValueError('empty parameter name')
554 562 if name[0] not in string.letters:
555 563 raise ValueError('non letter first character: %r' % name)
556 564 self._params.append((name, value))
557 565
558 566 def addpart(self, part):
559 567 """add a new part to the bundle2 container
560 568
561 569 Parts contains the actual applicative payload."""
562 570 assert part.id is None
563 571 part.id = len(self._parts) # very cheap counter
564 572 self._parts.append(part)
565 573
566 574 def newpart(self, typeid, *args, **kwargs):
567 575 """create a new part and add it to the containers
568 576
569 577 As the part is directly added to the containers. For now, this means
570 578 that any failure to properly initialize the part after calling
571 579 ``newpart`` should result in a failure of the whole bundling process.
572 580
573 581 You can still fall back to manually create and add if you need better
574 582 control."""
575 583 part = bundlepart(typeid, *args, **kwargs)
576 584 self.addpart(part)
577 585 return part
578 586
579 587 # methods used to generate the bundle2 stream
580 588 def getchunks(self):
581 589 if self.ui.debugflag:
582 590 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
583 591 if self._params:
584 592 msg.append(' (%i params)' % len(self._params))
585 593 msg.append(' %i parts total\n' % len(self._parts))
586 594 self.ui.debug(''.join(msg))
587 595 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
588 596 yield self._magicstring
589 597 param = self._paramchunk()
590 598 outdebug(self.ui, 'bundle parameter: %s' % param)
591 599 yield _pack(_fstreamparamsize, len(param))
592 600 if param:
593 601 yield param
594 602 for chunk in self._compengine.compressstream(self._getcorechunk(),
595 603 self._compopts):
596 604 yield chunk
597 605
598 606 def _paramchunk(self):
599 607 """return a encoded version of all stream parameters"""
600 608 blocks = []
601 609 for par, value in self._params:
602 610 par = urlreq.quote(par)
603 611 if value is not None:
604 612 value = urlreq.quote(value)
605 613 par = '%s=%s' % (par, value)
606 614 blocks.append(par)
607 615 return ' '.join(blocks)
608 616
609 617 def _getcorechunk(self):
610 618 """yield chunk for the core part of the bundle
611 619
612 620 (all but headers and parameters)"""
613 621 outdebug(self.ui, 'start of parts')
614 622 for part in self._parts:
615 623 outdebug(self.ui, 'bundle part: "%s"' % part.type)
616 624 for chunk in part.getchunks(ui=self.ui):
617 625 yield chunk
618 626 outdebug(self.ui, 'end of bundle')
619 627 yield _pack(_fpartheadersize, 0)
620 628
621 629
622 630 def salvageoutput(self):
623 631 """return a list with a copy of all output parts in the bundle
624 632
625 633 This is meant to be used during error handling to make sure we preserve
626 634 server output"""
627 635 salvaged = []
628 636 for part in self._parts:
629 637 if part.type.startswith('output'):
630 638 salvaged.append(part.copy())
631 639 return salvaged
632 640
633 641
634 642 class unpackermixin(object):
635 643 """A mixin to extract bytes and struct data from a stream"""
636 644
637 645 def __init__(self, fp):
638 646 self._fp = fp
639 647
640 648 def _unpack(self, format):
641 649 """unpack this struct format from the stream
642 650
643 651 This method is meant for internal usage by the bundle2 protocol only.
644 652 They directly manipulate the low level stream including bundle2 level
645 653 instruction.
646 654
647 655 Do not use it to implement higher-level logic or methods."""
648 656 data = self._readexact(struct.calcsize(format))
649 657 return _unpack(format, data)
650 658
651 659 def _readexact(self, size):
652 660 """read exactly <size> bytes from the stream
653 661
654 662 This method is meant for internal usage by the bundle2 protocol only.
655 663 They directly manipulate the low level stream including bundle2 level
656 664 instruction.
657 665
658 666 Do not use it to implement higher-level logic or methods."""
659 667 return changegroup.readexactly(self._fp, size)
660 668
661 669 def getunbundler(ui, fp, magicstring=None):
662 670 """return a valid unbundler object for a given magicstring"""
663 671 if magicstring is None:
664 672 magicstring = changegroup.readexactly(fp, 4)
665 673 magic, version = magicstring[0:2], magicstring[2:4]
666 674 if magic != 'HG':
667 675 raise error.Abort(_('not a Mercurial bundle'))
668 676 unbundlerclass = formatmap.get(version)
669 677 if unbundlerclass is None:
670 678 raise error.Abort(_('unknown bundle version %s') % version)
671 679 unbundler = unbundlerclass(ui, fp)
672 680 indebug(ui, 'start processing of %s stream' % magicstring)
673 681 return unbundler
674 682
675 683 class unbundle20(unpackermixin):
676 684 """interpret a bundle2 stream
677 685
678 686 This class is fed with a binary stream and yields parts through its
679 687 `iterparts` methods."""
680 688
681 689 _magicstring = 'HG20'
682 690
683 691 def __init__(self, ui, fp):
684 692 """If header is specified, we do not read it out of the stream."""
685 693 self.ui = ui
686 694 self._compengine = util.compengines.forbundletype('UN')
687 695 self._compressed = None
688 696 super(unbundle20, self).__init__(fp)
689 697
690 698 @util.propertycache
691 699 def params(self):
692 700 """dictionary of stream level parameters"""
693 701 indebug(self.ui, 'reading bundle2 stream parameters')
694 702 params = {}
695 703 paramssize = self._unpack(_fstreamparamsize)[0]
696 704 if paramssize < 0:
697 705 raise error.BundleValueError('negative bundle param size: %i'
698 706 % paramssize)
699 707 if paramssize:
700 708 params = self._readexact(paramssize)
701 709 params = self._processallparams(params)
702 710 return params
703 711
704 712 def _processallparams(self, paramsblock):
705 713 """"""
706 714 params = util.sortdict()
707 715 for p in paramsblock.split(' '):
708 716 p = p.split('=', 1)
709 717 p = [urlreq.unquote(i) for i in p]
710 718 if len(p) < 2:
711 719 p.append(None)
712 720 self._processparam(*p)
713 721 params[p[0]] = p[1]
714 722 return params
715 723
716 724
717 725 def _processparam(self, name, value):
718 726 """process a parameter, applying its effect if needed
719 727
720 728 Parameter starting with a lower case letter are advisory and will be
721 729 ignored when unknown. Those starting with an upper case letter are
722 730 mandatory and will this function will raise a KeyError when unknown.
723 731
724 732 Note: no option are currently supported. Any input will be either
725 733 ignored or failing.
726 734 """
727 735 if not name:
728 736 raise ValueError('empty parameter name')
729 737 if name[0] not in string.letters:
730 738 raise ValueError('non letter first character: %r' % name)
731 739 try:
732 740 handler = b2streamparamsmap[name.lower()]
733 741 except KeyError:
734 742 if name[0].islower():
735 743 indebug(self.ui, "ignoring unknown parameter %r" % name)
736 744 else:
737 745 raise error.BundleUnknownFeatureError(params=(name,))
738 746 else:
739 747 handler(self, name, value)
740 748
741 749 def _forwardchunks(self):
742 750 """utility to transfer a bundle2 as binary
743 751
744 752 This is made necessary by the fact the 'getbundle' command over 'ssh'
745 753 have no way to know then the reply end, relying on the bundle to be
746 754 interpreted to know its end. This is terrible and we are sorry, but we
747 755 needed to move forward to get general delta enabled.
748 756 """
749 757 yield self._magicstring
750 758 assert 'params' not in vars(self)
751 759 paramssize = self._unpack(_fstreamparamsize)[0]
752 760 if paramssize < 0:
753 761 raise error.BundleValueError('negative bundle param size: %i'
754 762 % paramssize)
755 763 yield _pack(_fstreamparamsize, paramssize)
756 764 if paramssize:
757 765 params = self._readexact(paramssize)
758 766 self._processallparams(params)
759 767 yield params
760 768 assert self._compengine.bundletype == 'UN'
761 769 # From there, payload might need to be decompressed
762 770 self._fp = self._compengine.decompressorreader(self._fp)
763 771 emptycount = 0
764 772 while emptycount < 2:
765 773 # so we can brainlessly loop
766 774 assert _fpartheadersize == _fpayloadsize
767 775 size = self._unpack(_fpartheadersize)[0]
768 776 yield _pack(_fpartheadersize, size)
769 777 if size:
770 778 emptycount = 0
771 779 else:
772 780 emptycount += 1
773 781 continue
774 782 if size == flaginterrupt:
775 783 continue
776 784 elif size < 0:
777 785 raise error.BundleValueError('negative chunk size: %i')
778 786 yield self._readexact(size)
779 787
780 788
781 789 def iterparts(self):
782 790 """yield all parts contained in the stream"""
783 791 # make sure param have been loaded
784 792 self.params
785 793 # From there, payload need to be decompressed
786 794 self._fp = self._compengine.decompressorreader(self._fp)
787 795 indebug(self.ui, 'start extraction of bundle2 parts')
788 796 headerblock = self._readpartheader()
789 797 while headerblock is not None:
790 798 part = unbundlepart(self.ui, headerblock, self._fp)
791 799 yield part
792 800 part.seek(0, 2)
793 801 headerblock = self._readpartheader()
794 802 indebug(self.ui, 'end of bundle2 stream')
795 803
796 804 def _readpartheader(self):
797 805 """reads a part header size and return the bytes blob
798 806
799 807 returns None if empty"""
800 808 headersize = self._unpack(_fpartheadersize)[0]
801 809 if headersize < 0:
802 810 raise error.BundleValueError('negative part header size: %i'
803 811 % headersize)
804 812 indebug(self.ui, 'part header size: %i' % headersize)
805 813 if headersize:
806 814 return self._readexact(headersize)
807 815 return None
808 816
809 817 def compressed(self):
810 818 self.params # load params
811 819 return self._compressed
812 820
813 821 def close(self):
814 822 """close underlying file"""
815 823 if util.safehasattr(self._fp, 'close'):
816 824 return self._fp.close()
817 825
818 826 formatmap = {'20': unbundle20}
819 827
820 828 b2streamparamsmap = {}
821 829
822 830 def b2streamparamhandler(name):
823 831 """register a handler for a stream level parameter"""
824 832 def decorator(func):
825 833 assert name not in formatmap
826 834 b2streamparamsmap[name] = func
827 835 return func
828 836 return decorator
829 837
830 838 @b2streamparamhandler('compression')
831 839 def processcompression(unbundler, param, value):
832 840 """read compression parameter and install payload decompression"""
833 841 if value not in util.compengines.supportedbundletypes:
834 842 raise error.BundleUnknownFeatureError(params=(param,),
835 843 values=(value,))
836 844 unbundler._compengine = util.compengines.forbundletype(value)
837 845 if value is not None:
838 846 unbundler._compressed = True
839 847
840 848 class bundlepart(object):
841 849 """A bundle2 part contains application level payload
842 850
843 851 The part `type` is used to route the part to the application level
844 852 handler.
845 853
846 854 The part payload is contained in ``part.data``. It could be raw bytes or a
847 855 generator of byte chunks.
848 856
849 857 You can add parameters to the part using the ``addparam`` method.
850 858 Parameters can be either mandatory (default) or advisory. Remote side
851 859 should be able to safely ignore the advisory ones.
852 860
853 861 Both data and parameters cannot be modified after the generation has begun.
854 862 """
855 863
856 864 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
857 865 data='', mandatory=True):
858 866 validateparttype(parttype)
859 867 self.id = None
860 868 self.type = parttype
861 869 self._data = data
862 870 self._mandatoryparams = list(mandatoryparams)
863 871 self._advisoryparams = list(advisoryparams)
864 872 # checking for duplicated entries
865 873 self._seenparams = set()
866 874 for pname, __ in self._mandatoryparams + self._advisoryparams:
867 875 if pname in self._seenparams:
868 876 raise error.ProgrammingError('duplicated params: %s' % pname)
869 877 self._seenparams.add(pname)
870 878 # status of the part's generation:
871 879 # - None: not started,
872 880 # - False: currently generated,
873 881 # - True: generation done.
874 882 self._generated = None
875 883 self.mandatory = mandatory
876 884
877 885 def __repr__(self):
878 886 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
879 887 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
880 888 % (cls, id(self), self.id, self.type, self.mandatory))
881 889
882 890 def copy(self):
883 891 """return a copy of the part
884 892
885 893 The new part have the very same content but no partid assigned yet.
886 894 Parts with generated data cannot be copied."""
887 895 assert not util.safehasattr(self.data, 'next')
888 896 return self.__class__(self.type, self._mandatoryparams,
889 897 self._advisoryparams, self._data, self.mandatory)
890 898
891 899 # methods used to defines the part content
892 900 @property
893 901 def data(self):
894 902 return self._data
895 903
896 904 @data.setter
897 905 def data(self, data):
898 906 if self._generated is not None:
899 907 raise error.ReadOnlyPartError('part is being generated')
900 908 self._data = data
901 909
902 910 @property
903 911 def mandatoryparams(self):
904 912 # make it an immutable tuple to force people through ``addparam``
905 913 return tuple(self._mandatoryparams)
906 914
907 915 @property
908 916 def advisoryparams(self):
909 917 # make it an immutable tuple to force people through ``addparam``
910 918 return tuple(self._advisoryparams)
911 919
912 920 def addparam(self, name, value='', mandatory=True):
913 921 """add a parameter to the part
914 922
915 923 If 'mandatory' is set to True, the remote handler must claim support
916 924 for this parameter or the unbundling will be aborted.
917 925
918 926 The 'name' and 'value' cannot exceed 255 bytes each.
919 927 """
920 928 if self._generated is not None:
921 929 raise error.ReadOnlyPartError('part is being generated')
922 930 if name in self._seenparams:
923 931 raise ValueError('duplicated params: %s' % name)
924 932 self._seenparams.add(name)
925 933 params = self._advisoryparams
926 934 if mandatory:
927 935 params = self._mandatoryparams
928 936 params.append((name, value))
929 937
930 938 # methods used to generates the bundle2 stream
931 939 def getchunks(self, ui):
932 940 if self._generated is not None:
933 941 raise error.ProgrammingError('part can only be consumed once')
934 942 self._generated = False
935 943
936 944 if ui.debugflag:
937 945 msg = ['bundle2-output-part: "%s"' % self.type]
938 946 if not self.mandatory:
939 947 msg.append(' (advisory)')
940 948 nbmp = len(self.mandatoryparams)
941 949 nbap = len(self.advisoryparams)
942 950 if nbmp or nbap:
943 951 msg.append(' (params:')
944 952 if nbmp:
945 953 msg.append(' %i mandatory' % nbmp)
946 954 if nbap:
947 955 msg.append(' %i advisory' % nbmp)
948 956 msg.append(')')
949 957 if not self.data:
950 958 msg.append(' empty payload')
951 959 elif util.safehasattr(self.data, 'next'):
952 960 msg.append(' streamed payload')
953 961 else:
954 962 msg.append(' %i bytes payload' % len(self.data))
955 963 msg.append('\n')
956 964 ui.debug(''.join(msg))
957 965
958 966 #### header
959 967 if self.mandatory:
960 968 parttype = self.type.upper()
961 969 else:
962 970 parttype = self.type.lower()
963 971 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
964 972 ## parttype
965 973 header = [_pack(_fparttypesize, len(parttype)),
966 974 parttype, _pack(_fpartid, self.id),
967 975 ]
968 976 ## parameters
969 977 # count
970 978 manpar = self.mandatoryparams
971 979 advpar = self.advisoryparams
972 980 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
973 981 # size
974 982 parsizes = []
975 983 for key, value in manpar:
976 984 parsizes.append(len(key))
977 985 parsizes.append(len(value))
978 986 for key, value in advpar:
979 987 parsizes.append(len(key))
980 988 parsizes.append(len(value))
981 989 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
982 990 header.append(paramsizes)
983 991 # key, value
984 992 for key, value in manpar:
985 993 header.append(key)
986 994 header.append(value)
987 995 for key, value in advpar:
988 996 header.append(key)
989 997 header.append(value)
990 998 ## finalize header
991 999 headerchunk = ''.join(header)
992 1000 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
993 1001 yield _pack(_fpartheadersize, len(headerchunk))
994 1002 yield headerchunk
995 1003 ## payload
996 1004 try:
997 1005 for chunk in self._payloadchunks():
998 1006 outdebug(ui, 'payload chunk size: %i' % len(chunk))
999 1007 yield _pack(_fpayloadsize, len(chunk))
1000 1008 yield chunk
1001 1009 except GeneratorExit:
1002 1010 # GeneratorExit means that nobody is listening for our
1003 1011 # results anyway, so just bail quickly rather than trying
1004 1012 # to produce an error part.
1005 1013 ui.debug('bundle2-generatorexit\n')
1006 1014 raise
1007 1015 except BaseException as exc:
1008 1016 # backup exception data for later
1009 1017 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1010 1018 % exc)
1011 1019 tb = sys.exc_info()[2]
1012 1020 msg = 'unexpected error: %s' % exc
1013 1021 interpart = bundlepart('error:abort', [('message', msg)],
1014 1022 mandatory=False)
1015 1023 interpart.id = 0
1016 1024 yield _pack(_fpayloadsize, -1)
1017 1025 for chunk in interpart.getchunks(ui=ui):
1018 1026 yield chunk
1019 1027 outdebug(ui, 'closing payload chunk')
1020 1028 # abort current part payload
1021 1029 yield _pack(_fpayloadsize, 0)
1022 1030 pycompat.raisewithtb(exc, tb)
1023 1031 # end of payload
1024 1032 outdebug(ui, 'closing payload chunk')
1025 1033 yield _pack(_fpayloadsize, 0)
1026 1034 self._generated = True
1027 1035
1028 1036 def _payloadchunks(self):
1029 1037 """yield chunks of a the part payload
1030 1038
1031 1039 Exists to handle the different methods to provide data to a part."""
1032 1040 # we only support fixed size data now.
1033 1041 # This will be improved in the future.
1034 1042 if util.safehasattr(self.data, 'next'):
1035 1043 buff = util.chunkbuffer(self.data)
1036 1044 chunk = buff.read(preferedchunksize)
1037 1045 while chunk:
1038 1046 yield chunk
1039 1047 chunk = buff.read(preferedchunksize)
1040 1048 elif len(self.data):
1041 1049 yield self.data
1042 1050
1043 1051
1044 1052 flaginterrupt = -1
1045 1053
1046 1054 class interrupthandler(unpackermixin):
1047 1055 """read one part and process it with restricted capability
1048 1056
1049 1057 This allows to transmit exception raised on the producer size during part
1050 1058 iteration while the consumer is reading a part.
1051 1059
1052 1060 Part processed in this manner only have access to a ui object,"""
1053 1061
1054 1062 def __init__(self, ui, fp):
1055 1063 super(interrupthandler, self).__init__(fp)
1056 1064 self.ui = ui
1057 1065
1058 1066 def _readpartheader(self):
1059 1067 """reads a part header size and return the bytes blob
1060 1068
1061 1069 returns None if empty"""
1062 1070 headersize = self._unpack(_fpartheadersize)[0]
1063 1071 if headersize < 0:
1064 1072 raise error.BundleValueError('negative part header size: %i'
1065 1073 % headersize)
1066 1074 indebug(self.ui, 'part header size: %i\n' % headersize)
1067 1075 if headersize:
1068 1076 return self._readexact(headersize)
1069 1077 return None
1070 1078
1071 1079 def __call__(self):
1072 1080
1073 1081 self.ui.debug('bundle2-input-stream-interrupt:'
1074 1082 ' opening out of band context\n')
1075 1083 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1076 1084 headerblock = self._readpartheader()
1077 1085 if headerblock is None:
1078 1086 indebug(self.ui, 'no part found during interruption.')
1079 1087 return
1080 1088 part = unbundlepart(self.ui, headerblock, self._fp)
1081 1089 op = interruptoperation(self.ui)
1082 1090 _processpart(op, part)
1083 1091 self.ui.debug('bundle2-input-stream-interrupt:'
1084 1092 ' closing out of band context\n')
1085 1093
1086 1094 class interruptoperation(object):
1087 1095 """A limited operation to be use by part handler during interruption
1088 1096
1089 1097 It only have access to an ui object.
1090 1098 """
1091 1099
1092 1100 def __init__(self, ui):
1093 1101 self.ui = ui
1094 1102 self.reply = None
1095 1103 self.captureoutput = False
1096 1104
1097 1105 @property
1098 1106 def repo(self):
1099 1107 raise error.ProgrammingError('no repo access from stream interruption')
1100 1108
1101 1109 def gettransaction(self):
1102 1110 raise TransactionUnavailable('no repo access from stream interruption')
1103 1111
1104 1112 class unbundlepart(unpackermixin):
1105 1113 """a bundle part read from a bundle"""
1106 1114
1107 1115 def __init__(self, ui, header, fp):
1108 1116 super(unbundlepart, self).__init__(fp)
1109 1117 self._seekable = (util.safehasattr(fp, 'seek') and
1110 1118 util.safehasattr(fp, 'tell'))
1111 1119 self.ui = ui
1112 1120 # unbundle state attr
1113 1121 self._headerdata = header
1114 1122 self._headeroffset = 0
1115 1123 self._initialized = False
1116 1124 self.consumed = False
1117 1125 # part data
1118 1126 self.id = None
1119 1127 self.type = None
1120 1128 self.mandatoryparams = None
1121 1129 self.advisoryparams = None
1122 1130 self.params = None
1123 1131 self.mandatorykeys = ()
1124 1132 self._payloadstream = None
1125 1133 self._readheader()
1126 1134 self._mandatory = None
1127 1135 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1128 1136 self._pos = 0
1129 1137
1130 1138 def _fromheader(self, size):
1131 1139 """return the next <size> byte from the header"""
1132 1140 offset = self._headeroffset
1133 1141 data = self._headerdata[offset:(offset + size)]
1134 1142 self._headeroffset = offset + size
1135 1143 return data
1136 1144
1137 1145 def _unpackheader(self, format):
1138 1146 """read given format from header
1139 1147
1140 1148 This automatically compute the size of the format to read."""
1141 1149 data = self._fromheader(struct.calcsize(format))
1142 1150 return _unpack(format, data)
1143 1151
1144 1152 def _initparams(self, mandatoryparams, advisoryparams):
1145 1153 """internal function to setup all logic related parameters"""
1146 1154 # make it read only to prevent people touching it by mistake.
1147 1155 self.mandatoryparams = tuple(mandatoryparams)
1148 1156 self.advisoryparams = tuple(advisoryparams)
1149 1157 # user friendly UI
1150 1158 self.params = util.sortdict(self.mandatoryparams)
1151 1159 self.params.update(self.advisoryparams)
1152 1160 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1153 1161
1154 1162 def _payloadchunks(self, chunknum=0):
1155 1163 '''seek to specified chunk and start yielding data'''
1156 1164 if len(self._chunkindex) == 0:
1157 1165 assert chunknum == 0, 'Must start with chunk 0'
1158 1166 self._chunkindex.append((0, self._tellfp()))
1159 1167 else:
1160 1168 assert chunknum < len(self._chunkindex), \
1161 1169 'Unknown chunk %d' % chunknum
1162 1170 self._seekfp(self._chunkindex[chunknum][1])
1163 1171
1164 1172 pos = self._chunkindex[chunknum][0]
1165 1173 payloadsize = self._unpack(_fpayloadsize)[0]
1166 1174 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1167 1175 while payloadsize:
1168 1176 if payloadsize == flaginterrupt:
1169 1177 # interruption detection, the handler will now read a
1170 1178 # single part and process it.
1171 1179 interrupthandler(self.ui, self._fp)()
1172 1180 elif payloadsize < 0:
1173 1181 msg = 'negative payload chunk size: %i' % payloadsize
1174 1182 raise error.BundleValueError(msg)
1175 1183 else:
1176 1184 result = self._readexact(payloadsize)
1177 1185 chunknum += 1
1178 1186 pos += payloadsize
1179 1187 if chunknum == len(self._chunkindex):
1180 1188 self._chunkindex.append((pos, self._tellfp()))
1181 1189 yield result
1182 1190 payloadsize = self._unpack(_fpayloadsize)[0]
1183 1191 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1184 1192
1185 1193 def _findchunk(self, pos):
1186 1194 '''for a given payload position, return a chunk number and offset'''
1187 1195 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1188 1196 if ppos == pos:
1189 1197 return chunk, 0
1190 1198 elif ppos > pos:
1191 1199 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1192 1200 raise ValueError('Unknown chunk')
1193 1201
1194 1202 def _readheader(self):
1195 1203 """read the header and setup the object"""
1196 1204 typesize = self._unpackheader(_fparttypesize)[0]
1197 1205 self.type = self._fromheader(typesize)
1198 1206 indebug(self.ui, 'part type: "%s"' % self.type)
1199 1207 self.id = self._unpackheader(_fpartid)[0]
1200 1208 indebug(self.ui, 'part id: "%s"' % self.id)
1201 1209 # extract mandatory bit from type
1202 1210 self.mandatory = (self.type != self.type.lower())
1203 1211 self.type = self.type.lower()
1204 1212 ## reading parameters
1205 1213 # param count
1206 1214 mancount, advcount = self._unpackheader(_fpartparamcount)
1207 1215 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1208 1216 # param size
1209 1217 fparamsizes = _makefpartparamsizes(mancount + advcount)
1210 1218 paramsizes = self._unpackheader(fparamsizes)
1211 1219 # make it a list of couple again
1212 1220 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1213 1221 # split mandatory from advisory
1214 1222 mansizes = paramsizes[:mancount]
1215 1223 advsizes = paramsizes[mancount:]
1216 1224 # retrieve param value
1217 1225 manparams = []
1218 1226 for key, value in mansizes:
1219 1227 manparams.append((self._fromheader(key), self._fromheader(value)))
1220 1228 advparams = []
1221 1229 for key, value in advsizes:
1222 1230 advparams.append((self._fromheader(key), self._fromheader(value)))
1223 1231 self._initparams(manparams, advparams)
1224 1232 ## part payload
1225 1233 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1226 1234 # we read the data, tell it
1227 1235 self._initialized = True
1228 1236
1229 1237 def read(self, size=None):
1230 1238 """read payload data"""
1231 1239 if not self._initialized:
1232 1240 self._readheader()
1233 1241 if size is None:
1234 1242 data = self._payloadstream.read()
1235 1243 else:
1236 1244 data = self._payloadstream.read(size)
1237 1245 self._pos += len(data)
1238 1246 if size is None or len(data) < size:
1239 1247 if not self.consumed and self._pos:
1240 1248 self.ui.debug('bundle2-input-part: total payload size %i\n'
1241 1249 % self._pos)
1242 1250 self.consumed = True
1243 1251 return data
1244 1252
1245 1253 def tell(self):
1246 1254 return self._pos
1247 1255
1248 1256 def seek(self, offset, whence=0):
1249 1257 if whence == 0:
1250 1258 newpos = offset
1251 1259 elif whence == 1:
1252 1260 newpos = self._pos + offset
1253 1261 elif whence == 2:
1254 1262 if not self.consumed:
1255 1263 self.read()
1256 1264 newpos = self._chunkindex[-1][0] - offset
1257 1265 else:
1258 1266 raise ValueError('Unknown whence value: %r' % (whence,))
1259 1267
1260 1268 if newpos > self._chunkindex[-1][0] and not self.consumed:
1261 1269 self.read()
1262 1270 if not 0 <= newpos <= self._chunkindex[-1][0]:
1263 1271 raise ValueError('Offset out of range')
1264 1272
1265 1273 if self._pos != newpos:
1266 1274 chunk, internaloffset = self._findchunk(newpos)
1267 1275 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1268 1276 adjust = self.read(internaloffset)
1269 1277 if len(adjust) != internaloffset:
1270 1278 raise error.Abort(_('Seek failed\n'))
1271 1279 self._pos = newpos
1272 1280
1273 1281 def _seekfp(self, offset, whence=0):
1274 1282 """move the underlying file pointer
1275 1283
1276 1284 This method is meant for internal usage by the bundle2 protocol only.
1277 1285 They directly manipulate the low level stream including bundle2 level
1278 1286 instruction.
1279 1287
1280 1288 Do not use it to implement higher-level logic or methods."""
1281 1289 if self._seekable:
1282 1290 return self._fp.seek(offset, whence)
1283 1291 else:
1284 1292 raise NotImplementedError(_('File pointer is not seekable'))
1285 1293
1286 1294 def _tellfp(self):
1287 1295 """return the file offset, or None if file is not seekable
1288 1296
1289 1297 This method is meant for internal usage by the bundle2 protocol only.
1290 1298 They directly manipulate the low level stream including bundle2 level
1291 1299 instruction.
1292 1300
1293 1301 Do not use it to implement higher-level logic or methods."""
1294 1302 if self._seekable:
1295 1303 try:
1296 1304 return self._fp.tell()
1297 1305 except IOError as e:
1298 1306 if e.errno == errno.ESPIPE:
1299 1307 self._seekable = False
1300 1308 else:
1301 1309 raise
1302 1310 return None
1303 1311
1304 1312 # These are only the static capabilities.
1305 1313 # Check the 'getrepocaps' function for the rest.
1306 1314 capabilities = {'HG20': (),
1307 1315 'error': ('abort', 'unsupportedcontent', 'pushraced',
1308 1316 'pushkey'),
1309 1317 'listkeys': (),
1310 1318 'pushkey': (),
1311 1319 'digests': tuple(sorted(util.DIGESTS.keys())),
1312 1320 'remote-changegroup': ('http', 'https'),
1313 1321 'hgtagsfnodes': (),
1314 1322 }
1315 1323
1316 1324 def getrepocaps(repo, allowpushback=False):
1317 1325 """return the bundle2 capabilities for a given repo
1318 1326
1319 1327 Exists to allow extensions (like evolution) to mutate the capabilities.
1320 1328 """
1321 1329 caps = capabilities.copy()
1322 1330 caps['changegroup'] = tuple(sorted(
1323 1331 changegroup.supportedincomingversions(repo)))
1324 1332 if obsolete.isenabled(repo, obsolete.exchangeopt):
1325 1333 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1326 1334 caps['obsmarkers'] = supportedformat
1327 1335 if allowpushback:
1328 1336 caps['pushback'] = ()
1329 1337 cpmode = repo.ui.config('server', 'concurrent-push-mode', 'strict')
1330 1338 if cpmode == 'check-related':
1331 1339 caps['checkheads'] = ('related',)
1332 1340 return caps
1333 1341
1334 1342 def bundle2caps(remote):
1335 1343 """return the bundle capabilities of a peer as dict"""
1336 1344 raw = remote.capable('bundle2')
1337 1345 if not raw and raw != '':
1338 1346 return {}
1339 1347 capsblob = urlreq.unquote(remote.capable('bundle2'))
1340 1348 return decodecaps(capsblob)
1341 1349
1342 1350 def obsmarkersversion(caps):
1343 1351 """extract the list of supported obsmarkers versions from a bundle2caps dict
1344 1352 """
1345 1353 obscaps = caps.get('obsmarkers', ())
1346 1354 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1347 1355
1348 1356 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1349 1357 vfs=None, compression=None, compopts=None):
1350 1358 if bundletype.startswith('HG10'):
1351 1359 cg = changegroup.getchangegroup(repo, source, outgoing, version='01')
1352 1360 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1353 1361 compression=compression, compopts=compopts)
1354 1362 elif not bundletype.startswith('HG20'):
1355 1363 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1356 1364
1357 1365 caps = {}
1358 1366 if 'obsolescence' in opts:
1359 1367 caps['obsmarkers'] = ('V1',)
1360 1368 bundle = bundle20(ui, caps)
1361 1369 bundle.setcompression(compression, compopts)
1362 1370 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1363 1371 chunkiter = bundle.getchunks()
1364 1372
1365 1373 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1366 1374
1367 1375 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1368 1376 # We should eventually reconcile this logic with the one behind
1369 1377 # 'exchange.getbundle2partsgenerator'.
1370 1378 #
1371 1379 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1372 1380 # different right now. So we keep them separated for now for the sake of
1373 1381 # simplicity.
1374 1382
1375 1383 # we always want a changegroup in such bundle
1376 1384 cgversion = opts.get('cg.version')
1377 1385 if cgversion is None:
1378 1386 cgversion = changegroup.safeversion(repo)
1379 1387 cg = changegroup.getchangegroup(repo, source, outgoing,
1380 1388 version=cgversion)
1381 1389 part = bundler.newpart('changegroup', data=cg.getchunks())
1382 1390 part.addparam('version', cg.version)
1383 1391 if 'clcount' in cg.extras:
1384 1392 part.addparam('nbchanges', str(cg.extras['clcount']),
1385 1393 mandatory=False)
1386 1394
1387 1395 addparttagsfnodescache(repo, bundler, outgoing)
1388 1396
1389 1397 if opts.get('obsolescence', False):
1390 1398 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1391 1399 buildobsmarkerspart(bundler, obsmarkers)
1392 1400
1393 1401 if opts.get('phases', False):
1394 1402 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1395 1403 phasedata = []
1396 1404 for phase in phases.allphases:
1397 1405 for head in headsbyphase[phase]:
1398 1406 phasedata.append(_pack(_fphasesentry, phase, head))
1399 1407 bundler.newpart('phase-heads', data=''.join(phasedata))
1400 1408
1401 1409 def addparttagsfnodescache(repo, bundler, outgoing):
1402 1410 # we include the tags fnode cache for the bundle changeset
1403 1411 # (as an optional parts)
1404 1412 cache = tags.hgtagsfnodescache(repo.unfiltered())
1405 1413 chunks = []
1406 1414
1407 1415 # .hgtags fnodes are only relevant for head changesets. While we could
1408 1416 # transfer values for all known nodes, there will likely be little to
1409 1417 # no benefit.
1410 1418 #
1411 1419 # We don't bother using a generator to produce output data because
1412 1420 # a) we only have 40 bytes per head and even esoteric numbers of heads
1413 1421 # consume little memory (1M heads is 40MB) b) we don't want to send the
1414 1422 # part if we don't have entries and knowing if we have entries requires
1415 1423 # cache lookups.
1416 1424 for node in outgoing.missingheads:
1417 1425 # Don't compute missing, as this may slow down serving.
1418 1426 fnode = cache.getfnode(node, computemissing=False)
1419 1427 if fnode is not None:
1420 1428 chunks.extend([node, fnode])
1421 1429
1422 1430 if chunks:
1423 1431 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1424 1432
1425 1433 def buildobsmarkerspart(bundler, markers):
1426 1434 """add an obsmarker part to the bundler with <markers>
1427 1435
1428 1436 No part is created if markers is empty.
1429 1437 Raises ValueError if the bundler doesn't support any known obsmarker format.
1430 1438 """
1431 1439 if not markers:
1432 1440 return None
1433 1441
1434 1442 remoteversions = obsmarkersversion(bundler.capabilities)
1435 1443 version = obsolete.commonversion(remoteversions)
1436 1444 if version is None:
1437 1445 raise ValueError('bundler does not support common obsmarker format')
1438 1446 stream = obsolete.encodemarkers(markers, True, version=version)
1439 1447 return bundler.newpart('obsmarkers', data=stream)
1440 1448
1441 1449 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1442 1450 compopts=None):
1443 1451 """Write a bundle file and return its filename.
1444 1452
1445 1453 Existing files will not be overwritten.
1446 1454 If no filename is specified, a temporary file is created.
1447 1455 bz2 compression can be turned off.
1448 1456 The bundle file will be deleted in case of errors.
1449 1457 """
1450 1458
1451 1459 if bundletype == "HG20":
1452 1460 bundle = bundle20(ui)
1453 1461 bundle.setcompression(compression, compopts)
1454 1462 part = bundle.newpart('changegroup', data=cg.getchunks())
1455 1463 part.addparam('version', cg.version)
1456 1464 if 'clcount' in cg.extras:
1457 1465 part.addparam('nbchanges', str(cg.extras['clcount']),
1458 1466 mandatory=False)
1459 1467 chunkiter = bundle.getchunks()
1460 1468 else:
1461 1469 # compression argument is only for the bundle2 case
1462 1470 assert compression is None
1463 1471 if cg.version != '01':
1464 1472 raise error.Abort(_('old bundle types only supports v1 '
1465 1473 'changegroups'))
1466 1474 header, comp = bundletypes[bundletype]
1467 1475 if comp not in util.compengines.supportedbundletypes:
1468 1476 raise error.Abort(_('unknown stream compression type: %s')
1469 1477 % comp)
1470 1478 compengine = util.compengines.forbundletype(comp)
1471 1479 def chunkiter():
1472 1480 yield header
1473 1481 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1474 1482 yield chunk
1475 1483 chunkiter = chunkiter()
1476 1484
1477 1485 # parse the changegroup data, otherwise we will block
1478 1486 # in case of sshrepo because we don't know the end of the stream
1479 1487 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1480 1488
1481 1489 def combinechangegroupresults(op):
1482 1490 """logic to combine 0 or more addchangegroup results into one"""
1483 1491 results = [r.get('return', 0)
1484 1492 for r in op.records['changegroup']]
1485 1493 changedheads = 0
1486 1494 result = 1
1487 1495 for ret in results:
1488 1496 # If any changegroup result is 0, return 0
1489 1497 if ret == 0:
1490 1498 result = 0
1491 1499 break
1492 1500 if ret < -1:
1493 1501 changedheads += ret + 1
1494 1502 elif ret > 1:
1495 1503 changedheads += ret - 1
1496 1504 if changedheads > 0:
1497 1505 result = 1 + changedheads
1498 1506 elif changedheads < 0:
1499 1507 result = -1 + changedheads
1500 1508 return result
1501 1509
1502 1510 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1503 1511 def handlechangegroup(op, inpart):
1504 1512 """apply a changegroup part on the repo
1505 1513
1506 1514 This is a very early implementation that will massive rework before being
1507 1515 inflicted to any end-user.
1508 1516 """
1509 1517 tr = op.gettransaction()
1510 1518 unpackerversion = inpart.params.get('version', '01')
1511 1519 # We should raise an appropriate exception here
1512 1520 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1513 1521 # the source and url passed here are overwritten by the one contained in
1514 1522 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1515 1523 nbchangesets = None
1516 1524 if 'nbchanges' in inpart.params:
1517 1525 nbchangesets = int(inpart.params.get('nbchanges'))
1518 1526 if ('treemanifest' in inpart.params and
1519 1527 'treemanifest' not in op.repo.requirements):
1520 1528 if len(op.repo.changelog) != 0:
1521 1529 raise error.Abort(_(
1522 1530 "bundle contains tree manifests, but local repo is "
1523 1531 "non-empty and does not use tree manifests"))
1524 1532 op.repo.requirements.add('treemanifest')
1525 1533 op.repo._applyopenerreqs()
1526 1534 op.repo._writerequirements()
1527 ret, addednodes = cg.apply(op.repo, tr, 'bundle2', 'bundle2',
1528 expectedtotal=nbchangesets)
1529 op.records.add('changegroup', {
1530 'return': ret,
1531 'addednodes': addednodes,
1532 })
1535 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1536 expectedtotal=nbchangesets)
1533 1537 if op.reply is not None:
1534 1538 # This is definitely not the final form of this
1535 1539 # return. But one need to start somewhere.
1536 1540 part = op.reply.newpart('reply:changegroup', mandatory=False)
1537 1541 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1538 1542 part.addparam('return', '%i' % ret, mandatory=False)
1539 1543 assert not inpart.read()
1540 1544
1541 1545 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1542 1546 ['digest:%s' % k for k in util.DIGESTS.keys()])
1543 1547 @parthandler('remote-changegroup', _remotechangegroupparams)
1544 1548 def handleremotechangegroup(op, inpart):
1545 1549 """apply a bundle10 on the repo, given an url and validation information
1546 1550
1547 1551 All the information about the remote bundle to import are given as
1548 1552 parameters. The parameters include:
1549 1553 - url: the url to the bundle10.
1550 1554 - size: the bundle10 file size. It is used to validate what was
1551 1555 retrieved by the client matches the server knowledge about the bundle.
1552 1556 - digests: a space separated list of the digest types provided as
1553 1557 parameters.
1554 1558 - digest:<digest-type>: the hexadecimal representation of the digest with
1555 1559 that name. Like the size, it is used to validate what was retrieved by
1556 1560 the client matches what the server knows about the bundle.
1557 1561
1558 1562 When multiple digest types are given, all of them are checked.
1559 1563 """
1560 1564 try:
1561 1565 raw_url = inpart.params['url']
1562 1566 except KeyError:
1563 1567 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1564 1568 parsed_url = util.url(raw_url)
1565 1569 if parsed_url.scheme not in capabilities['remote-changegroup']:
1566 1570 raise error.Abort(_('remote-changegroup does not support %s urls') %
1567 1571 parsed_url.scheme)
1568 1572
1569 1573 try:
1570 1574 size = int(inpart.params['size'])
1571 1575 except ValueError:
1572 1576 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1573 1577 % 'size')
1574 1578 except KeyError:
1575 1579 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1576 1580
1577 1581 digests = {}
1578 1582 for typ in inpart.params.get('digests', '').split():
1579 1583 param = 'digest:%s' % typ
1580 1584 try:
1581 1585 value = inpart.params[param]
1582 1586 except KeyError:
1583 1587 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1584 1588 param)
1585 1589 digests[typ] = value
1586 1590
1587 1591 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1588 1592
1589 1593 tr = op.gettransaction()
1590 1594 from . import exchange
1591 1595 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1592 1596 if not isinstance(cg, changegroup.cg1unpacker):
1593 1597 raise error.Abort(_('%s: not a bundle version 1.0') %
1594 1598 util.hidepassword(raw_url))
1595 ret, addednodes = cg.apply(op.repo, tr, 'bundle2', 'bundle2')
1596 op.records.add('changegroup', {
1597 'return': ret,
1598 'addednodes': addednodes,
1599 })
1599 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1600 1600 if op.reply is not None:
1601 1601 # This is definitely not the final form of this
1602 1602 # return. But one need to start somewhere.
1603 1603 part = op.reply.newpart('reply:changegroup')
1604 1604 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1605 1605 part.addparam('return', '%i' % ret, mandatory=False)
1606 1606 try:
1607 1607 real_part.validate()
1608 1608 except error.Abort as e:
1609 1609 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1610 1610 (util.hidepassword(raw_url), str(e)))
1611 1611 assert not inpart.read()
1612 1612
1613 1613 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1614 1614 def handlereplychangegroup(op, inpart):
1615 1615 ret = int(inpart.params['return'])
1616 1616 replyto = int(inpart.params['in-reply-to'])
1617 1617 op.records.add('changegroup', {'return': ret}, replyto)
1618 1618
1619 1619 @parthandler('check:heads')
1620 1620 def handlecheckheads(op, inpart):
1621 1621 """check that head of the repo did not change
1622 1622
1623 1623 This is used to detect a push race when using unbundle.
1624 1624 This replaces the "heads" argument of unbundle."""
1625 1625 h = inpart.read(20)
1626 1626 heads = []
1627 1627 while len(h) == 20:
1628 1628 heads.append(h)
1629 1629 h = inpart.read(20)
1630 1630 assert not h
1631 1631 # Trigger a transaction so that we are guaranteed to have the lock now.
1632 1632 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1633 1633 op.gettransaction()
1634 1634 if sorted(heads) != sorted(op.repo.heads()):
1635 1635 raise error.PushRaced('repository changed while pushing - '
1636 1636 'please try again')
1637 1637
1638 1638 @parthandler('check:updated-heads')
1639 1639 def handlecheckupdatedheads(op, inpart):
1640 1640 """check for race on the heads touched by a push
1641 1641
1642 1642 This is similar to 'check:heads' but focus on the heads actually updated
1643 1643 during the push. If other activities happen on unrelated heads, it is
1644 1644 ignored.
1645 1645
1646 1646 This allow server with high traffic to avoid push contention as long as
1647 1647 unrelated parts of the graph are involved."""
1648 1648 h = inpart.read(20)
1649 1649 heads = []
1650 1650 while len(h) == 20:
1651 1651 heads.append(h)
1652 1652 h = inpart.read(20)
1653 1653 assert not h
1654 1654 # trigger a transaction so that we are guaranteed to have the lock now.
1655 1655 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1656 1656 op.gettransaction()
1657 1657
1658 1658 currentheads = set()
1659 1659 for ls in op.repo.branchmap().itervalues():
1660 1660 currentheads.update(ls)
1661 1661
1662 1662 for h in heads:
1663 1663 if h not in currentheads:
1664 1664 raise error.PushRaced('repository changed while pushing - '
1665 1665 'please try again')
1666 1666
1667 1667 @parthandler('output')
1668 1668 def handleoutput(op, inpart):
1669 1669 """forward output captured on the server to the client"""
1670 1670 for line in inpart.read().splitlines():
1671 1671 op.ui.status(_('remote: %s\n') % line)
1672 1672
1673 1673 @parthandler('replycaps')
1674 1674 def handlereplycaps(op, inpart):
1675 1675 """Notify that a reply bundle should be created
1676 1676
1677 1677 The payload contains the capabilities information for the reply"""
1678 1678 caps = decodecaps(inpart.read())
1679 1679 if op.reply is None:
1680 1680 op.reply = bundle20(op.ui, caps)
1681 1681
1682 1682 class AbortFromPart(error.Abort):
1683 1683 """Sub-class of Abort that denotes an error from a bundle2 part."""
1684 1684
1685 1685 @parthandler('error:abort', ('message', 'hint'))
1686 1686 def handleerrorabort(op, inpart):
1687 1687 """Used to transmit abort error over the wire"""
1688 1688 raise AbortFromPart(inpart.params['message'],
1689 1689 hint=inpart.params.get('hint'))
1690 1690
1691 1691 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1692 1692 'in-reply-to'))
1693 1693 def handleerrorpushkey(op, inpart):
1694 1694 """Used to transmit failure of a mandatory pushkey over the wire"""
1695 1695 kwargs = {}
1696 1696 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1697 1697 value = inpart.params.get(name)
1698 1698 if value is not None:
1699 1699 kwargs[name] = value
1700 1700 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1701 1701
1702 1702 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1703 1703 def handleerrorunsupportedcontent(op, inpart):
1704 1704 """Used to transmit unknown content error over the wire"""
1705 1705 kwargs = {}
1706 1706 parttype = inpart.params.get('parttype')
1707 1707 if parttype is not None:
1708 1708 kwargs['parttype'] = parttype
1709 1709 params = inpart.params.get('params')
1710 1710 if params is not None:
1711 1711 kwargs['params'] = params.split('\0')
1712 1712
1713 1713 raise error.BundleUnknownFeatureError(**kwargs)
1714 1714
1715 1715 @parthandler('error:pushraced', ('message',))
1716 1716 def handleerrorpushraced(op, inpart):
1717 1717 """Used to transmit push race error over the wire"""
1718 1718 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1719 1719
1720 1720 @parthandler('listkeys', ('namespace',))
1721 1721 def handlelistkeys(op, inpart):
1722 1722 """retrieve pushkey namespace content stored in a bundle2"""
1723 1723 namespace = inpart.params['namespace']
1724 1724 r = pushkey.decodekeys(inpart.read())
1725 1725 op.records.add('listkeys', (namespace, r))
1726 1726
1727 1727 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1728 1728 def handlepushkey(op, inpart):
1729 1729 """process a pushkey request"""
1730 1730 dec = pushkey.decode
1731 1731 namespace = dec(inpart.params['namespace'])
1732 1732 key = dec(inpart.params['key'])
1733 1733 old = dec(inpart.params['old'])
1734 1734 new = dec(inpart.params['new'])
1735 1735 # Grab the transaction to ensure that we have the lock before performing the
1736 1736 # pushkey.
1737 1737 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1738 1738 op.gettransaction()
1739 1739 ret = op.repo.pushkey(namespace, key, old, new)
1740 1740 record = {'namespace': namespace,
1741 1741 'key': key,
1742 1742 'old': old,
1743 1743 'new': new}
1744 1744 op.records.add('pushkey', record)
1745 1745 if op.reply is not None:
1746 1746 rpart = op.reply.newpart('reply:pushkey')
1747 1747 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1748 1748 rpart.addparam('return', '%i' % ret, mandatory=False)
1749 1749 if inpart.mandatory and not ret:
1750 1750 kwargs = {}
1751 1751 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1752 1752 if key in inpart.params:
1753 1753 kwargs[key] = inpart.params[key]
1754 1754 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1755 1755
1756 1756 def _readphaseheads(inpart):
1757 1757 headsbyphase = [[] for i in phases.allphases]
1758 1758 entrysize = struct.calcsize(_fphasesentry)
1759 1759 while True:
1760 1760 entry = inpart.read(entrysize)
1761 1761 if len(entry) < entrysize:
1762 1762 if entry:
1763 1763 raise error.Abort(_('bad phase-heads bundle part'))
1764 1764 break
1765 1765 phase, node = struct.unpack(_fphasesentry, entry)
1766 1766 headsbyphase[phase].append(node)
1767 1767 return headsbyphase
1768 1768
1769 1769 @parthandler('phase-heads')
1770 1770 def handlephases(op, inpart):
1771 1771 """apply phases from bundle part to repo"""
1772 1772 headsbyphase = _readphaseheads(inpart)
1773 1773 addednodes = []
1774 1774 for entry in op.records['changegroup']:
1775 1775 addednodes.extend(entry['addednodes'])
1776 1776 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase,
1777 1777 addednodes)
1778 1778
1779 1779 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1780 1780 def handlepushkeyreply(op, inpart):
1781 1781 """retrieve the result of a pushkey request"""
1782 1782 ret = int(inpart.params['return'])
1783 1783 partid = int(inpart.params['in-reply-to'])
1784 1784 op.records.add('pushkey', {'return': ret}, partid)
1785 1785
1786 1786 @parthandler('obsmarkers')
1787 1787 def handleobsmarker(op, inpart):
1788 1788 """add a stream of obsmarkers to the repo"""
1789 1789 tr = op.gettransaction()
1790 1790 markerdata = inpart.read()
1791 1791 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1792 1792 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1793 1793 % len(markerdata))
1794 1794 # The mergemarkers call will crash if marker creation is not enabled.
1795 1795 # we want to avoid this if the part is advisory.
1796 1796 if not inpart.mandatory and op.repo.obsstore.readonly:
1797 1797 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1798 1798 return
1799 1799 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1800 1800 op.repo.invalidatevolatilesets()
1801 1801 if new:
1802 1802 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1803 1803 op.records.add('obsmarkers', {'new': new})
1804 1804 if op.reply is not None:
1805 1805 rpart = op.reply.newpart('reply:obsmarkers')
1806 1806 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1807 1807 rpart.addparam('new', '%i' % new, mandatory=False)
1808 1808
1809 1809
1810 1810 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1811 1811 def handleobsmarkerreply(op, inpart):
1812 1812 """retrieve the result of a pushkey request"""
1813 1813 ret = int(inpart.params['new'])
1814 1814 partid = int(inpart.params['in-reply-to'])
1815 1815 op.records.add('obsmarkers', {'new': ret}, partid)
1816 1816
1817 1817 @parthandler('hgtagsfnodes')
1818 1818 def handlehgtagsfnodes(op, inpart):
1819 1819 """Applies .hgtags fnodes cache entries to the local repo.
1820 1820
1821 1821 Payload is pairs of 20 byte changeset nodes and filenodes.
1822 1822 """
1823 1823 # Grab the transaction so we ensure that we have the lock at this point.
1824 1824 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1825 1825 op.gettransaction()
1826 1826 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1827 1827
1828 1828 count = 0
1829 1829 while True:
1830 1830 node = inpart.read(20)
1831 1831 fnode = inpart.read(20)
1832 1832 if len(node) < 20 or len(fnode) < 20:
1833 1833 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1834 1834 break
1835 1835 cache.setfnode(node, fnode)
1836 1836 count += 1
1837 1837
1838 1838 cache.write()
1839 1839 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now