##// END OF EJS Templates
bundle2: add the capability to store hookargs on bundle operation object...
Pulkit Goyal -
r33629:f3407d56 default
parent child Browse files
Show More
@@ -1,1851 +1,1855 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import
149 149
150 150 import errno
151 151 import re
152 152 import string
153 153 import struct
154 154 import sys
155 155
156 156 from .i18n import _
157 157 from . import (
158 158 changegroup,
159 159 error,
160 160 obsolete,
161 161 phases,
162 162 pushkey,
163 163 pycompat,
164 164 tags,
165 165 url,
166 166 util,
167 167 )
168 168
169 169 urlerr = util.urlerr
170 170 urlreq = util.urlreq
171 171
172 172 _pack = struct.pack
173 173 _unpack = struct.unpack
174 174
175 175 _fstreamparamsize = '>i'
176 176 _fpartheadersize = '>i'
177 177 _fparttypesize = '>B'
178 178 _fpartid = '>I'
179 179 _fpayloadsize = '>i'
180 180 _fpartparamcount = '>BB'
181 181
182 182 _fphasesentry = '>i20s'
183 183
184 184 preferedchunksize = 4096
185 185
186 186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
187 187
188 188 def outdebug(ui, message):
189 189 """debug regarding output stream (bundling)"""
190 190 if ui.configbool('devel', 'bundle2.debug'):
191 191 ui.debug('bundle2-output: %s\n' % message)
192 192
193 193 def indebug(ui, message):
194 194 """debug on input stream (unbundling)"""
195 195 if ui.configbool('devel', 'bundle2.debug'):
196 196 ui.debug('bundle2-input: %s\n' % message)
197 197
198 198 def validateparttype(parttype):
199 199 """raise ValueError if a parttype contains invalid character"""
200 200 if _parttypeforbidden.search(parttype):
201 201 raise ValueError(parttype)
202 202
203 203 def _makefpartparamsizes(nbparams):
204 204 """return a struct format to read part parameter sizes
205 205
206 206 The number parameters is variable so we need to build that format
207 207 dynamically.
208 208 """
209 209 return '>'+('BB'*nbparams)
210 210
211 211 parthandlermapping = {}
212 212
213 213 def parthandler(parttype, params=()):
214 214 """decorator that register a function as a bundle2 part handler
215 215
216 216 eg::
217 217
218 218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
219 219 def myparttypehandler(...):
220 220 '''process a part of type "my part".'''
221 221 ...
222 222 """
223 223 validateparttype(parttype)
224 224 def _decorator(func):
225 225 lparttype = parttype.lower() # enforce lower case matching.
226 226 assert lparttype not in parthandlermapping
227 227 parthandlermapping[lparttype] = func
228 228 func.params = frozenset(params)
229 229 return func
230 230 return _decorator
231 231
232 232 class unbundlerecords(object):
233 233 """keep record of what happens during and unbundle
234 234
235 235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
236 236 category of record and obj is an arbitrary object.
237 237
238 238 `records['cat']` will return all entries of this category 'cat'.
239 239
240 240 Iterating on the object itself will yield `('category', obj)` tuples
241 241 for all entries.
242 242
243 243 All iterations happens in chronological order.
244 244 """
245 245
246 246 def __init__(self):
247 247 self._categories = {}
248 248 self._sequences = []
249 249 self._replies = {}
250 250
251 251 def add(self, category, entry, inreplyto=None):
252 252 """add a new record of a given category.
253 253
254 254 The entry can then be retrieved in the list returned by
255 255 self['category']."""
256 256 self._categories.setdefault(category, []).append(entry)
257 257 self._sequences.append((category, entry))
258 258 if inreplyto is not None:
259 259 self.getreplies(inreplyto).add(category, entry)
260 260
261 261 def getreplies(self, partid):
262 262 """get the records that are replies to a specific part"""
263 263 return self._replies.setdefault(partid, unbundlerecords())
264 264
265 265 def __getitem__(self, cat):
266 266 return tuple(self._categories.get(cat, ()))
267 267
268 268 def __iter__(self):
269 269 return iter(self._sequences)
270 270
271 271 def __len__(self):
272 272 return len(self._sequences)
273 273
274 274 def __nonzero__(self):
275 275 return bool(self._sequences)
276 276
277 277 __bool__ = __nonzero__
278 278
279 279 class bundleoperation(object):
280 280 """an object that represents a single bundling process
281 281
282 282 Its purpose is to carry unbundle-related objects and states.
283 283
284 284 A new object should be created at the beginning of each bundle processing.
285 285 The object is to be returned by the processing function.
286 286
287 287 The object has very little content now it will ultimately contain:
288 288 * an access to the repo the bundle is applied to,
289 289 * a ui object,
290 290 * a way to retrieve a transaction to add changes to the repo,
291 291 * a way to record the result of processing each part,
292 292 * a way to construct a bundle response when applicable.
293 293 """
294 294
295 295 def __init__(self, repo, transactiongetter, captureoutput=True):
296 296 self.repo = repo
297 297 self.ui = repo.ui
298 298 self.records = unbundlerecords()
299 299 self.gettransaction = transactiongetter
300 300 self.reply = None
301 301 self.captureoutput = captureoutput
302 self.hookargs = {}
303
304 def addhookargs(self, hookargs):
305 self.hookargs.update(hookargs)
302 306
303 307 class TransactionUnavailable(RuntimeError):
304 308 pass
305 309
306 310 def _notransaction():
307 311 """default method to get a transaction while processing a bundle
308 312
309 313 Raise an exception to highlight the fact that no transaction was expected
310 314 to be created"""
311 315 raise TransactionUnavailable()
312 316
313 317 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
314 318 # transform me into unbundler.apply() as soon as the freeze is lifted
315 319 if isinstance(unbundler, unbundle20):
316 320 tr.hookargs['bundle2'] = '1'
317 321 if source is not None and 'source' not in tr.hookargs:
318 322 tr.hookargs['source'] = source
319 323 if url is not None and 'url' not in tr.hookargs:
320 324 tr.hookargs['url'] = url
321 325 return processbundle(repo, unbundler, lambda: tr)
322 326 else:
323 327 # the transactiongetter won't be used, but we might as well set it
324 328 op = bundleoperation(repo, lambda: tr)
325 329 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
326 330 return op
327 331
328 332 def processbundle(repo, unbundler, transactiongetter=None, op=None):
329 333 """This function process a bundle, apply effect to/from a repo
330 334
331 335 It iterates over each part then searches for and uses the proper handling
332 336 code to process the part. Parts are processed in order.
333 337
334 338 Unknown Mandatory part will abort the process.
335 339
336 340 It is temporarily possible to provide a prebuilt bundleoperation to the
337 341 function. This is used to ensure output is properly propagated in case of
338 342 an error during the unbundling. This output capturing part will likely be
339 343 reworked and this ability will probably go away in the process.
340 344 """
341 345 if op is None:
342 346 if transactiongetter is None:
343 347 transactiongetter = _notransaction
344 348 op = bundleoperation(repo, transactiongetter)
345 349 # todo:
346 350 # - replace this is a init function soon.
347 351 # - exception catching
348 352 unbundler.params
349 353 if repo.ui.debugflag:
350 354 msg = ['bundle2-input-bundle:']
351 355 if unbundler.params:
352 356 msg.append(' %i params' % len(unbundler.params))
353 357 if op.gettransaction is None or op.gettransaction is _notransaction:
354 358 msg.append(' no-transaction')
355 359 else:
356 360 msg.append(' with-transaction')
357 361 msg.append('\n')
358 362 repo.ui.debug(''.join(msg))
359 363 iterparts = enumerate(unbundler.iterparts())
360 364 part = None
361 365 nbpart = 0
362 366 try:
363 367 for nbpart, part in iterparts:
364 368 _processpart(op, part)
365 369 except Exception as exc:
366 370 # Any exceptions seeking to the end of the bundle at this point are
367 371 # almost certainly related to the underlying stream being bad.
368 372 # And, chances are that the exception we're handling is related to
369 373 # getting in that bad state. So, we swallow the seeking error and
370 374 # re-raise the original error.
371 375 seekerror = False
372 376 try:
373 377 for nbpart, part in iterparts:
374 378 # consume the bundle content
375 379 part.seek(0, 2)
376 380 except Exception:
377 381 seekerror = True
378 382
379 383 # Small hack to let caller code distinguish exceptions from bundle2
380 384 # processing from processing the old format. This is mostly
381 385 # needed to handle different return codes to unbundle according to the
382 386 # type of bundle. We should probably clean up or drop this return code
383 387 # craziness in a future version.
384 388 exc.duringunbundle2 = True
385 389 salvaged = []
386 390 replycaps = None
387 391 if op.reply is not None:
388 392 salvaged = op.reply.salvageoutput()
389 393 replycaps = op.reply.capabilities
390 394 exc._replycaps = replycaps
391 395 exc._bundle2salvagedoutput = salvaged
392 396
393 397 # Re-raising from a variable loses the original stack. So only use
394 398 # that form if we need to.
395 399 if seekerror:
396 400 raise exc
397 401 else:
398 402 raise
399 403 finally:
400 404 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
401 405
402 406 return op
403 407
404 408 def _processchangegroup(op, cg, tr, source, url, **kwargs):
405 409 ret = cg.apply(op.repo, tr, source, url, **kwargs)
406 410 op.records.add('changegroup', {
407 411 'return': ret,
408 412 })
409 413 return ret
410 414
411 415 def _processpart(op, part):
412 416 """process a single part from a bundle
413 417
414 418 The part is guaranteed to have been fully consumed when the function exits
415 419 (even if an exception is raised)."""
416 420 status = 'unknown' # used by debug output
417 421 hardabort = False
418 422 try:
419 423 try:
420 424 handler = parthandlermapping.get(part.type)
421 425 if handler is None:
422 426 status = 'unsupported-type'
423 427 raise error.BundleUnknownFeatureError(parttype=part.type)
424 428 indebug(op.ui, 'found a handler for part %r' % part.type)
425 429 unknownparams = part.mandatorykeys - handler.params
426 430 if unknownparams:
427 431 unknownparams = list(unknownparams)
428 432 unknownparams.sort()
429 433 status = 'unsupported-params (%s)' % unknownparams
430 434 raise error.BundleUnknownFeatureError(parttype=part.type,
431 435 params=unknownparams)
432 436 status = 'supported'
433 437 except error.BundleUnknownFeatureError as exc:
434 438 if part.mandatory: # mandatory parts
435 439 raise
436 440 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
437 441 return # skip to part processing
438 442 finally:
439 443 if op.ui.debugflag:
440 444 msg = ['bundle2-input-part: "%s"' % part.type]
441 445 if not part.mandatory:
442 446 msg.append(' (advisory)')
443 447 nbmp = len(part.mandatorykeys)
444 448 nbap = len(part.params) - nbmp
445 449 if nbmp or nbap:
446 450 msg.append(' (params:')
447 451 if nbmp:
448 452 msg.append(' %i mandatory' % nbmp)
449 453 if nbap:
450 454 msg.append(' %i advisory' % nbmp)
451 455 msg.append(')')
452 456 msg.append(' %s\n' % status)
453 457 op.ui.debug(''.join(msg))
454 458
455 459 # handler is called outside the above try block so that we don't
456 460 # risk catching KeyErrors from anything other than the
457 461 # parthandlermapping lookup (any KeyError raised by handler()
458 462 # itself represents a defect of a different variety).
459 463 output = None
460 464 if op.captureoutput and op.reply is not None:
461 465 op.ui.pushbuffer(error=True, subproc=True)
462 466 output = ''
463 467 try:
464 468 handler(op, part)
465 469 finally:
466 470 if output is not None:
467 471 output = op.ui.popbuffer()
468 472 if output:
469 473 outpart = op.reply.newpart('output', data=output,
470 474 mandatory=False)
471 475 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
472 476 # If exiting or interrupted, do not attempt to seek the stream in the
473 477 # finally block below. This makes abort faster.
474 478 except (SystemExit, KeyboardInterrupt):
475 479 hardabort = True
476 480 raise
477 481 finally:
478 482 # consume the part content to not corrupt the stream.
479 483 if not hardabort:
480 484 part.seek(0, 2)
481 485
482 486
483 487 def decodecaps(blob):
484 488 """decode a bundle2 caps bytes blob into a dictionary
485 489
486 490 The blob is a list of capabilities (one per line)
487 491 Capabilities may have values using a line of the form::
488 492
489 493 capability=value1,value2,value3
490 494
491 495 The values are always a list."""
492 496 caps = {}
493 497 for line in blob.splitlines():
494 498 if not line:
495 499 continue
496 500 if '=' not in line:
497 501 key, vals = line, ()
498 502 else:
499 503 key, vals = line.split('=', 1)
500 504 vals = vals.split(',')
501 505 key = urlreq.unquote(key)
502 506 vals = [urlreq.unquote(v) for v in vals]
503 507 caps[key] = vals
504 508 return caps
505 509
506 510 def encodecaps(caps):
507 511 """encode a bundle2 caps dictionary into a bytes blob"""
508 512 chunks = []
509 513 for ca in sorted(caps):
510 514 vals = caps[ca]
511 515 ca = urlreq.quote(ca)
512 516 vals = [urlreq.quote(v) for v in vals]
513 517 if vals:
514 518 ca = "%s=%s" % (ca, ','.join(vals))
515 519 chunks.append(ca)
516 520 return '\n'.join(chunks)
517 521
518 522 bundletypes = {
519 523 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
520 524 # since the unification ssh accepts a header but there
521 525 # is no capability signaling it.
522 526 "HG20": (), # special-cased below
523 527 "HG10UN": ("HG10UN", 'UN'),
524 528 "HG10BZ": ("HG10", 'BZ'),
525 529 "HG10GZ": ("HG10GZ", 'GZ'),
526 530 }
527 531
528 532 # hgweb uses this list to communicate its preferred type
529 533 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
530 534
531 535 class bundle20(object):
532 536 """represent an outgoing bundle2 container
533 537
534 538 Use the `addparam` method to add stream level parameter. and `newpart` to
535 539 populate it. Then call `getchunks` to retrieve all the binary chunks of
536 540 data that compose the bundle2 container."""
537 541
538 542 _magicstring = 'HG20'
539 543
540 544 def __init__(self, ui, capabilities=()):
541 545 self.ui = ui
542 546 self._params = []
543 547 self._parts = []
544 548 self.capabilities = dict(capabilities)
545 549 self._compengine = util.compengines.forbundletype('UN')
546 550 self._compopts = None
547 551
548 552 def setcompression(self, alg, compopts=None):
549 553 """setup core part compression to <alg>"""
550 554 if alg in (None, 'UN'):
551 555 return
552 556 assert not any(n.lower() == 'compression' for n, v in self._params)
553 557 self.addparam('Compression', alg)
554 558 self._compengine = util.compengines.forbundletype(alg)
555 559 self._compopts = compopts
556 560
557 561 @property
558 562 def nbparts(self):
559 563 """total number of parts added to the bundler"""
560 564 return len(self._parts)
561 565
562 566 # methods used to defines the bundle2 content
563 567 def addparam(self, name, value=None):
564 568 """add a stream level parameter"""
565 569 if not name:
566 570 raise ValueError('empty parameter name')
567 571 if name[0] not in string.letters:
568 572 raise ValueError('non letter first character: %r' % name)
569 573 self._params.append((name, value))
570 574
571 575 def addpart(self, part):
572 576 """add a new part to the bundle2 container
573 577
574 578 Parts contains the actual applicative payload."""
575 579 assert part.id is None
576 580 part.id = len(self._parts) # very cheap counter
577 581 self._parts.append(part)
578 582
579 583 def newpart(self, typeid, *args, **kwargs):
580 584 """create a new part and add it to the containers
581 585
582 586 As the part is directly added to the containers. For now, this means
583 587 that any failure to properly initialize the part after calling
584 588 ``newpart`` should result in a failure of the whole bundling process.
585 589
586 590 You can still fall back to manually create and add if you need better
587 591 control."""
588 592 part = bundlepart(typeid, *args, **kwargs)
589 593 self.addpart(part)
590 594 return part
591 595
592 596 # methods used to generate the bundle2 stream
593 597 def getchunks(self):
594 598 if self.ui.debugflag:
595 599 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
596 600 if self._params:
597 601 msg.append(' (%i params)' % len(self._params))
598 602 msg.append(' %i parts total\n' % len(self._parts))
599 603 self.ui.debug(''.join(msg))
600 604 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
601 605 yield self._magicstring
602 606 param = self._paramchunk()
603 607 outdebug(self.ui, 'bundle parameter: %s' % param)
604 608 yield _pack(_fstreamparamsize, len(param))
605 609 if param:
606 610 yield param
607 611 for chunk in self._compengine.compressstream(self._getcorechunk(),
608 612 self._compopts):
609 613 yield chunk
610 614
611 615 def _paramchunk(self):
612 616 """return a encoded version of all stream parameters"""
613 617 blocks = []
614 618 for par, value in self._params:
615 619 par = urlreq.quote(par)
616 620 if value is not None:
617 621 value = urlreq.quote(value)
618 622 par = '%s=%s' % (par, value)
619 623 blocks.append(par)
620 624 return ' '.join(blocks)
621 625
622 626 def _getcorechunk(self):
623 627 """yield chunk for the core part of the bundle
624 628
625 629 (all but headers and parameters)"""
626 630 outdebug(self.ui, 'start of parts')
627 631 for part in self._parts:
628 632 outdebug(self.ui, 'bundle part: "%s"' % part.type)
629 633 for chunk in part.getchunks(ui=self.ui):
630 634 yield chunk
631 635 outdebug(self.ui, 'end of bundle')
632 636 yield _pack(_fpartheadersize, 0)
633 637
634 638
635 639 def salvageoutput(self):
636 640 """return a list with a copy of all output parts in the bundle
637 641
638 642 This is meant to be used during error handling to make sure we preserve
639 643 server output"""
640 644 salvaged = []
641 645 for part in self._parts:
642 646 if part.type.startswith('output'):
643 647 salvaged.append(part.copy())
644 648 return salvaged
645 649
646 650
647 651 class unpackermixin(object):
648 652 """A mixin to extract bytes and struct data from a stream"""
649 653
650 654 def __init__(self, fp):
651 655 self._fp = fp
652 656
653 657 def _unpack(self, format):
654 658 """unpack this struct format from the stream
655 659
656 660 This method is meant for internal usage by the bundle2 protocol only.
657 661 They directly manipulate the low level stream including bundle2 level
658 662 instruction.
659 663
660 664 Do not use it to implement higher-level logic or methods."""
661 665 data = self._readexact(struct.calcsize(format))
662 666 return _unpack(format, data)
663 667
664 668 def _readexact(self, size):
665 669 """read exactly <size> bytes from the stream
666 670
667 671 This method is meant for internal usage by the bundle2 protocol only.
668 672 They directly manipulate the low level stream including bundle2 level
669 673 instruction.
670 674
671 675 Do not use it to implement higher-level logic or methods."""
672 676 return changegroup.readexactly(self._fp, size)
673 677
674 678 def getunbundler(ui, fp, magicstring=None):
675 679 """return a valid unbundler object for a given magicstring"""
676 680 if magicstring is None:
677 681 magicstring = changegroup.readexactly(fp, 4)
678 682 magic, version = magicstring[0:2], magicstring[2:4]
679 683 if magic != 'HG':
680 684 ui.debug(
681 685 "error: invalid magic: %r (version %r), should be 'HG'\n"
682 686 % (magic, version))
683 687 raise error.Abort(_('not a Mercurial bundle'))
684 688 unbundlerclass = formatmap.get(version)
685 689 if unbundlerclass is None:
686 690 raise error.Abort(_('unknown bundle version %s') % version)
687 691 unbundler = unbundlerclass(ui, fp)
688 692 indebug(ui, 'start processing of %s stream' % magicstring)
689 693 return unbundler
690 694
691 695 class unbundle20(unpackermixin):
692 696 """interpret a bundle2 stream
693 697
694 698 This class is fed with a binary stream and yields parts through its
695 699 `iterparts` methods."""
696 700
697 701 _magicstring = 'HG20'
698 702
699 703 def __init__(self, ui, fp):
700 704 """If header is specified, we do not read it out of the stream."""
701 705 self.ui = ui
702 706 self._compengine = util.compengines.forbundletype('UN')
703 707 self._compressed = None
704 708 super(unbundle20, self).__init__(fp)
705 709
706 710 @util.propertycache
707 711 def params(self):
708 712 """dictionary of stream level parameters"""
709 713 indebug(self.ui, 'reading bundle2 stream parameters')
710 714 params = {}
711 715 paramssize = self._unpack(_fstreamparamsize)[0]
712 716 if paramssize < 0:
713 717 raise error.BundleValueError('negative bundle param size: %i'
714 718 % paramssize)
715 719 if paramssize:
716 720 params = self._readexact(paramssize)
717 721 params = self._processallparams(params)
718 722 return params
719 723
720 724 def _processallparams(self, paramsblock):
721 725 """"""
722 726 params = util.sortdict()
723 727 for p in paramsblock.split(' '):
724 728 p = p.split('=', 1)
725 729 p = [urlreq.unquote(i) for i in p]
726 730 if len(p) < 2:
727 731 p.append(None)
728 732 self._processparam(*p)
729 733 params[p[0]] = p[1]
730 734 return params
731 735
732 736
733 737 def _processparam(self, name, value):
734 738 """process a parameter, applying its effect if needed
735 739
736 740 Parameter starting with a lower case letter are advisory and will be
737 741 ignored when unknown. Those starting with an upper case letter are
738 742 mandatory and will this function will raise a KeyError when unknown.
739 743
740 744 Note: no option are currently supported. Any input will be either
741 745 ignored or failing.
742 746 """
743 747 if not name:
744 748 raise ValueError('empty parameter name')
745 749 if name[0] not in string.letters:
746 750 raise ValueError('non letter first character: %r' % name)
747 751 try:
748 752 handler = b2streamparamsmap[name.lower()]
749 753 except KeyError:
750 754 if name[0].islower():
751 755 indebug(self.ui, "ignoring unknown parameter %r" % name)
752 756 else:
753 757 raise error.BundleUnknownFeatureError(params=(name,))
754 758 else:
755 759 handler(self, name, value)
756 760
757 761 def _forwardchunks(self):
758 762 """utility to transfer a bundle2 as binary
759 763
760 764 This is made necessary by the fact the 'getbundle' command over 'ssh'
761 765 have no way to know then the reply end, relying on the bundle to be
762 766 interpreted to know its end. This is terrible and we are sorry, but we
763 767 needed to move forward to get general delta enabled.
764 768 """
765 769 yield self._magicstring
766 770 assert 'params' not in vars(self)
767 771 paramssize = self._unpack(_fstreamparamsize)[0]
768 772 if paramssize < 0:
769 773 raise error.BundleValueError('negative bundle param size: %i'
770 774 % paramssize)
771 775 yield _pack(_fstreamparamsize, paramssize)
772 776 if paramssize:
773 777 params = self._readexact(paramssize)
774 778 self._processallparams(params)
775 779 yield params
776 780 assert self._compengine.bundletype == 'UN'
777 781 # From there, payload might need to be decompressed
778 782 self._fp = self._compengine.decompressorreader(self._fp)
779 783 emptycount = 0
780 784 while emptycount < 2:
781 785 # so we can brainlessly loop
782 786 assert _fpartheadersize == _fpayloadsize
783 787 size = self._unpack(_fpartheadersize)[0]
784 788 yield _pack(_fpartheadersize, size)
785 789 if size:
786 790 emptycount = 0
787 791 else:
788 792 emptycount += 1
789 793 continue
790 794 if size == flaginterrupt:
791 795 continue
792 796 elif size < 0:
793 797 raise error.BundleValueError('negative chunk size: %i')
794 798 yield self._readexact(size)
795 799
796 800
797 801 def iterparts(self):
798 802 """yield all parts contained in the stream"""
799 803 # make sure param have been loaded
800 804 self.params
801 805 # From there, payload need to be decompressed
802 806 self._fp = self._compengine.decompressorreader(self._fp)
803 807 indebug(self.ui, 'start extraction of bundle2 parts')
804 808 headerblock = self._readpartheader()
805 809 while headerblock is not None:
806 810 part = unbundlepart(self.ui, headerblock, self._fp)
807 811 yield part
808 812 part.seek(0, 2)
809 813 headerblock = self._readpartheader()
810 814 indebug(self.ui, 'end of bundle2 stream')
811 815
812 816 def _readpartheader(self):
813 817 """reads a part header size and return the bytes blob
814 818
815 819 returns None if empty"""
816 820 headersize = self._unpack(_fpartheadersize)[0]
817 821 if headersize < 0:
818 822 raise error.BundleValueError('negative part header size: %i'
819 823 % headersize)
820 824 indebug(self.ui, 'part header size: %i' % headersize)
821 825 if headersize:
822 826 return self._readexact(headersize)
823 827 return None
824 828
825 829 def compressed(self):
826 830 self.params # load params
827 831 return self._compressed
828 832
829 833 def close(self):
830 834 """close underlying file"""
831 835 if util.safehasattr(self._fp, 'close'):
832 836 return self._fp.close()
833 837
834 838 formatmap = {'20': unbundle20}
835 839
836 840 b2streamparamsmap = {}
837 841
838 842 def b2streamparamhandler(name):
839 843 """register a handler for a stream level parameter"""
840 844 def decorator(func):
841 845 assert name not in formatmap
842 846 b2streamparamsmap[name] = func
843 847 return func
844 848 return decorator
845 849
846 850 @b2streamparamhandler('compression')
847 851 def processcompression(unbundler, param, value):
848 852 """read compression parameter and install payload decompression"""
849 853 if value not in util.compengines.supportedbundletypes:
850 854 raise error.BundleUnknownFeatureError(params=(param,),
851 855 values=(value,))
852 856 unbundler._compengine = util.compengines.forbundletype(value)
853 857 if value is not None:
854 858 unbundler._compressed = True
855 859
856 860 class bundlepart(object):
857 861 """A bundle2 part contains application level payload
858 862
859 863 The part `type` is used to route the part to the application level
860 864 handler.
861 865
862 866 The part payload is contained in ``part.data``. It could be raw bytes or a
863 867 generator of byte chunks.
864 868
865 869 You can add parameters to the part using the ``addparam`` method.
866 870 Parameters can be either mandatory (default) or advisory. Remote side
867 871 should be able to safely ignore the advisory ones.
868 872
869 873 Both data and parameters cannot be modified after the generation has begun.
870 874 """
871 875
872 876 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
873 877 data='', mandatory=True):
874 878 validateparttype(parttype)
875 879 self.id = None
876 880 self.type = parttype
877 881 self._data = data
878 882 self._mandatoryparams = list(mandatoryparams)
879 883 self._advisoryparams = list(advisoryparams)
880 884 # checking for duplicated entries
881 885 self._seenparams = set()
882 886 for pname, __ in self._mandatoryparams + self._advisoryparams:
883 887 if pname in self._seenparams:
884 888 raise error.ProgrammingError('duplicated params: %s' % pname)
885 889 self._seenparams.add(pname)
886 890 # status of the part's generation:
887 891 # - None: not started,
888 892 # - False: currently generated,
889 893 # - True: generation done.
890 894 self._generated = None
891 895 self.mandatory = mandatory
892 896
893 897 def __repr__(self):
894 898 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
895 899 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
896 900 % (cls, id(self), self.id, self.type, self.mandatory))
897 901
898 902 def copy(self):
899 903 """return a copy of the part
900 904
901 905 The new part have the very same content but no partid assigned yet.
902 906 Parts with generated data cannot be copied."""
903 907 assert not util.safehasattr(self.data, 'next')
904 908 return self.__class__(self.type, self._mandatoryparams,
905 909 self._advisoryparams, self._data, self.mandatory)
906 910
907 911 # methods used to defines the part content
908 912 @property
909 913 def data(self):
910 914 return self._data
911 915
912 916 @data.setter
913 917 def data(self, data):
914 918 if self._generated is not None:
915 919 raise error.ReadOnlyPartError('part is being generated')
916 920 self._data = data
917 921
918 922 @property
919 923 def mandatoryparams(self):
920 924 # make it an immutable tuple to force people through ``addparam``
921 925 return tuple(self._mandatoryparams)
922 926
923 927 @property
924 928 def advisoryparams(self):
925 929 # make it an immutable tuple to force people through ``addparam``
926 930 return tuple(self._advisoryparams)
927 931
928 932 def addparam(self, name, value='', mandatory=True):
929 933 """add a parameter to the part
930 934
931 935 If 'mandatory' is set to True, the remote handler must claim support
932 936 for this parameter or the unbundling will be aborted.
933 937
934 938 The 'name' and 'value' cannot exceed 255 bytes each.
935 939 """
936 940 if self._generated is not None:
937 941 raise error.ReadOnlyPartError('part is being generated')
938 942 if name in self._seenparams:
939 943 raise ValueError('duplicated params: %s' % name)
940 944 self._seenparams.add(name)
941 945 params = self._advisoryparams
942 946 if mandatory:
943 947 params = self._mandatoryparams
944 948 params.append((name, value))
945 949
946 950 # methods used to generates the bundle2 stream
947 951 def getchunks(self, ui):
948 952 if self._generated is not None:
949 953 raise error.ProgrammingError('part can only be consumed once')
950 954 self._generated = False
951 955
952 956 if ui.debugflag:
953 957 msg = ['bundle2-output-part: "%s"' % self.type]
954 958 if not self.mandatory:
955 959 msg.append(' (advisory)')
956 960 nbmp = len(self.mandatoryparams)
957 961 nbap = len(self.advisoryparams)
958 962 if nbmp or nbap:
959 963 msg.append(' (params:')
960 964 if nbmp:
961 965 msg.append(' %i mandatory' % nbmp)
962 966 if nbap:
963 967 msg.append(' %i advisory' % nbmp)
964 968 msg.append(')')
965 969 if not self.data:
966 970 msg.append(' empty payload')
967 971 elif util.safehasattr(self.data, 'next'):
968 972 msg.append(' streamed payload')
969 973 else:
970 974 msg.append(' %i bytes payload' % len(self.data))
971 975 msg.append('\n')
972 976 ui.debug(''.join(msg))
973 977
974 978 #### header
975 979 if self.mandatory:
976 980 parttype = self.type.upper()
977 981 else:
978 982 parttype = self.type.lower()
979 983 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
980 984 ## parttype
981 985 header = [_pack(_fparttypesize, len(parttype)),
982 986 parttype, _pack(_fpartid, self.id),
983 987 ]
984 988 ## parameters
985 989 # count
986 990 manpar = self.mandatoryparams
987 991 advpar = self.advisoryparams
988 992 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
989 993 # size
990 994 parsizes = []
991 995 for key, value in manpar:
992 996 parsizes.append(len(key))
993 997 parsizes.append(len(value))
994 998 for key, value in advpar:
995 999 parsizes.append(len(key))
996 1000 parsizes.append(len(value))
997 1001 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
998 1002 header.append(paramsizes)
999 1003 # key, value
1000 1004 for key, value in manpar:
1001 1005 header.append(key)
1002 1006 header.append(value)
1003 1007 for key, value in advpar:
1004 1008 header.append(key)
1005 1009 header.append(value)
1006 1010 ## finalize header
1007 1011 headerchunk = ''.join(header)
1008 1012 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1009 1013 yield _pack(_fpartheadersize, len(headerchunk))
1010 1014 yield headerchunk
1011 1015 ## payload
1012 1016 try:
1013 1017 for chunk in self._payloadchunks():
1014 1018 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1015 1019 yield _pack(_fpayloadsize, len(chunk))
1016 1020 yield chunk
1017 1021 except GeneratorExit:
1018 1022 # GeneratorExit means that nobody is listening for our
1019 1023 # results anyway, so just bail quickly rather than trying
1020 1024 # to produce an error part.
1021 1025 ui.debug('bundle2-generatorexit\n')
1022 1026 raise
1023 1027 except BaseException as exc:
1024 1028 # backup exception data for later
1025 1029 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1026 1030 % exc)
1027 1031 tb = sys.exc_info()[2]
1028 1032 msg = 'unexpected error: %s' % exc
1029 1033 interpart = bundlepart('error:abort', [('message', msg)],
1030 1034 mandatory=False)
1031 1035 interpart.id = 0
1032 1036 yield _pack(_fpayloadsize, -1)
1033 1037 for chunk in interpart.getchunks(ui=ui):
1034 1038 yield chunk
1035 1039 outdebug(ui, 'closing payload chunk')
1036 1040 # abort current part payload
1037 1041 yield _pack(_fpayloadsize, 0)
1038 1042 pycompat.raisewithtb(exc, tb)
1039 1043 # end of payload
1040 1044 outdebug(ui, 'closing payload chunk')
1041 1045 yield _pack(_fpayloadsize, 0)
1042 1046 self._generated = True
1043 1047
1044 1048 def _payloadchunks(self):
1045 1049 """yield chunks of a the part payload
1046 1050
1047 1051 Exists to handle the different methods to provide data to a part."""
1048 1052 # we only support fixed size data now.
1049 1053 # This will be improved in the future.
1050 1054 if util.safehasattr(self.data, 'next'):
1051 1055 buff = util.chunkbuffer(self.data)
1052 1056 chunk = buff.read(preferedchunksize)
1053 1057 while chunk:
1054 1058 yield chunk
1055 1059 chunk = buff.read(preferedchunksize)
1056 1060 elif len(self.data):
1057 1061 yield self.data
1058 1062
1059 1063
1060 1064 flaginterrupt = -1
1061 1065
1062 1066 class interrupthandler(unpackermixin):
1063 1067 """read one part and process it with restricted capability
1064 1068
1065 1069 This allows to transmit exception raised on the producer size during part
1066 1070 iteration while the consumer is reading a part.
1067 1071
1068 1072 Part processed in this manner only have access to a ui object,"""
1069 1073
1070 1074 def __init__(self, ui, fp):
1071 1075 super(interrupthandler, self).__init__(fp)
1072 1076 self.ui = ui
1073 1077
1074 1078 def _readpartheader(self):
1075 1079 """reads a part header size and return the bytes blob
1076 1080
1077 1081 returns None if empty"""
1078 1082 headersize = self._unpack(_fpartheadersize)[0]
1079 1083 if headersize < 0:
1080 1084 raise error.BundleValueError('negative part header size: %i'
1081 1085 % headersize)
1082 1086 indebug(self.ui, 'part header size: %i\n' % headersize)
1083 1087 if headersize:
1084 1088 return self._readexact(headersize)
1085 1089 return None
1086 1090
1087 1091 def __call__(self):
1088 1092
1089 1093 self.ui.debug('bundle2-input-stream-interrupt:'
1090 1094 ' opening out of band context\n')
1091 1095 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1092 1096 headerblock = self._readpartheader()
1093 1097 if headerblock is None:
1094 1098 indebug(self.ui, 'no part found during interruption.')
1095 1099 return
1096 1100 part = unbundlepart(self.ui, headerblock, self._fp)
1097 1101 op = interruptoperation(self.ui)
1098 1102 _processpart(op, part)
1099 1103 self.ui.debug('bundle2-input-stream-interrupt:'
1100 1104 ' closing out of band context\n')
1101 1105
1102 1106 class interruptoperation(object):
1103 1107 """A limited operation to be use by part handler during interruption
1104 1108
1105 1109 It only have access to an ui object.
1106 1110 """
1107 1111
1108 1112 def __init__(self, ui):
1109 1113 self.ui = ui
1110 1114 self.reply = None
1111 1115 self.captureoutput = False
1112 1116
1113 1117 @property
1114 1118 def repo(self):
1115 1119 raise error.ProgrammingError('no repo access from stream interruption')
1116 1120
1117 1121 def gettransaction(self):
1118 1122 raise TransactionUnavailable('no repo access from stream interruption')
1119 1123
1120 1124 class unbundlepart(unpackermixin):
1121 1125 """a bundle part read from a bundle"""
1122 1126
1123 1127 def __init__(self, ui, header, fp):
1124 1128 super(unbundlepart, self).__init__(fp)
1125 1129 self._seekable = (util.safehasattr(fp, 'seek') and
1126 1130 util.safehasattr(fp, 'tell'))
1127 1131 self.ui = ui
1128 1132 # unbundle state attr
1129 1133 self._headerdata = header
1130 1134 self._headeroffset = 0
1131 1135 self._initialized = False
1132 1136 self.consumed = False
1133 1137 # part data
1134 1138 self.id = None
1135 1139 self.type = None
1136 1140 self.mandatoryparams = None
1137 1141 self.advisoryparams = None
1138 1142 self.params = None
1139 1143 self.mandatorykeys = ()
1140 1144 self._payloadstream = None
1141 1145 self._readheader()
1142 1146 self._mandatory = None
1143 1147 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1144 1148 self._pos = 0
1145 1149
1146 1150 def _fromheader(self, size):
1147 1151 """return the next <size> byte from the header"""
1148 1152 offset = self._headeroffset
1149 1153 data = self._headerdata[offset:(offset + size)]
1150 1154 self._headeroffset = offset + size
1151 1155 return data
1152 1156
1153 1157 def _unpackheader(self, format):
1154 1158 """read given format from header
1155 1159
1156 1160 This automatically compute the size of the format to read."""
1157 1161 data = self._fromheader(struct.calcsize(format))
1158 1162 return _unpack(format, data)
1159 1163
1160 1164 def _initparams(self, mandatoryparams, advisoryparams):
1161 1165 """internal function to setup all logic related parameters"""
1162 1166 # make it read only to prevent people touching it by mistake.
1163 1167 self.mandatoryparams = tuple(mandatoryparams)
1164 1168 self.advisoryparams = tuple(advisoryparams)
1165 1169 # user friendly UI
1166 1170 self.params = util.sortdict(self.mandatoryparams)
1167 1171 self.params.update(self.advisoryparams)
1168 1172 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1169 1173
1170 1174 def _payloadchunks(self, chunknum=0):
1171 1175 '''seek to specified chunk and start yielding data'''
1172 1176 if len(self._chunkindex) == 0:
1173 1177 assert chunknum == 0, 'Must start with chunk 0'
1174 1178 self._chunkindex.append((0, self._tellfp()))
1175 1179 else:
1176 1180 assert chunknum < len(self._chunkindex), \
1177 1181 'Unknown chunk %d' % chunknum
1178 1182 self._seekfp(self._chunkindex[chunknum][1])
1179 1183
1180 1184 pos = self._chunkindex[chunknum][0]
1181 1185 payloadsize = self._unpack(_fpayloadsize)[0]
1182 1186 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1183 1187 while payloadsize:
1184 1188 if payloadsize == flaginterrupt:
1185 1189 # interruption detection, the handler will now read a
1186 1190 # single part and process it.
1187 1191 interrupthandler(self.ui, self._fp)()
1188 1192 elif payloadsize < 0:
1189 1193 msg = 'negative payload chunk size: %i' % payloadsize
1190 1194 raise error.BundleValueError(msg)
1191 1195 else:
1192 1196 result = self._readexact(payloadsize)
1193 1197 chunknum += 1
1194 1198 pos += payloadsize
1195 1199 if chunknum == len(self._chunkindex):
1196 1200 self._chunkindex.append((pos, self._tellfp()))
1197 1201 yield result
1198 1202 payloadsize = self._unpack(_fpayloadsize)[0]
1199 1203 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1200 1204
1201 1205 def _findchunk(self, pos):
1202 1206 '''for a given payload position, return a chunk number and offset'''
1203 1207 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1204 1208 if ppos == pos:
1205 1209 return chunk, 0
1206 1210 elif ppos > pos:
1207 1211 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1208 1212 raise ValueError('Unknown chunk')
1209 1213
1210 1214 def _readheader(self):
1211 1215 """read the header and setup the object"""
1212 1216 typesize = self._unpackheader(_fparttypesize)[0]
1213 1217 self.type = self._fromheader(typesize)
1214 1218 indebug(self.ui, 'part type: "%s"' % self.type)
1215 1219 self.id = self._unpackheader(_fpartid)[0]
1216 1220 indebug(self.ui, 'part id: "%s"' % self.id)
1217 1221 # extract mandatory bit from type
1218 1222 self.mandatory = (self.type != self.type.lower())
1219 1223 self.type = self.type.lower()
1220 1224 ## reading parameters
1221 1225 # param count
1222 1226 mancount, advcount = self._unpackheader(_fpartparamcount)
1223 1227 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1224 1228 # param size
1225 1229 fparamsizes = _makefpartparamsizes(mancount + advcount)
1226 1230 paramsizes = self._unpackheader(fparamsizes)
1227 1231 # make it a list of couple again
1228 1232 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1229 1233 # split mandatory from advisory
1230 1234 mansizes = paramsizes[:mancount]
1231 1235 advsizes = paramsizes[mancount:]
1232 1236 # retrieve param value
1233 1237 manparams = []
1234 1238 for key, value in mansizes:
1235 1239 manparams.append((self._fromheader(key), self._fromheader(value)))
1236 1240 advparams = []
1237 1241 for key, value in advsizes:
1238 1242 advparams.append((self._fromheader(key), self._fromheader(value)))
1239 1243 self._initparams(manparams, advparams)
1240 1244 ## part payload
1241 1245 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1242 1246 # we read the data, tell it
1243 1247 self._initialized = True
1244 1248
1245 1249 def read(self, size=None):
1246 1250 """read payload data"""
1247 1251 if not self._initialized:
1248 1252 self._readheader()
1249 1253 if size is None:
1250 1254 data = self._payloadstream.read()
1251 1255 else:
1252 1256 data = self._payloadstream.read(size)
1253 1257 self._pos += len(data)
1254 1258 if size is None or len(data) < size:
1255 1259 if not self.consumed and self._pos:
1256 1260 self.ui.debug('bundle2-input-part: total payload size %i\n'
1257 1261 % self._pos)
1258 1262 self.consumed = True
1259 1263 return data
1260 1264
1261 1265 def tell(self):
1262 1266 return self._pos
1263 1267
1264 1268 def seek(self, offset, whence=0):
1265 1269 if whence == 0:
1266 1270 newpos = offset
1267 1271 elif whence == 1:
1268 1272 newpos = self._pos + offset
1269 1273 elif whence == 2:
1270 1274 if not self.consumed:
1271 1275 self.read()
1272 1276 newpos = self._chunkindex[-1][0] - offset
1273 1277 else:
1274 1278 raise ValueError('Unknown whence value: %r' % (whence,))
1275 1279
1276 1280 if newpos > self._chunkindex[-1][0] and not self.consumed:
1277 1281 self.read()
1278 1282 if not 0 <= newpos <= self._chunkindex[-1][0]:
1279 1283 raise ValueError('Offset out of range')
1280 1284
1281 1285 if self._pos != newpos:
1282 1286 chunk, internaloffset = self._findchunk(newpos)
1283 1287 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1284 1288 adjust = self.read(internaloffset)
1285 1289 if len(adjust) != internaloffset:
1286 1290 raise error.Abort(_('Seek failed\n'))
1287 1291 self._pos = newpos
1288 1292
1289 1293 def _seekfp(self, offset, whence=0):
1290 1294 """move the underlying file pointer
1291 1295
1292 1296 This method is meant for internal usage by the bundle2 protocol only.
1293 1297 They directly manipulate the low level stream including bundle2 level
1294 1298 instruction.
1295 1299
1296 1300 Do not use it to implement higher-level logic or methods."""
1297 1301 if self._seekable:
1298 1302 return self._fp.seek(offset, whence)
1299 1303 else:
1300 1304 raise NotImplementedError(_('File pointer is not seekable'))
1301 1305
1302 1306 def _tellfp(self):
1303 1307 """return the file offset, or None if file is not seekable
1304 1308
1305 1309 This method is meant for internal usage by the bundle2 protocol only.
1306 1310 They directly manipulate the low level stream including bundle2 level
1307 1311 instruction.
1308 1312
1309 1313 Do not use it to implement higher-level logic or methods."""
1310 1314 if self._seekable:
1311 1315 try:
1312 1316 return self._fp.tell()
1313 1317 except IOError as e:
1314 1318 if e.errno == errno.ESPIPE:
1315 1319 self._seekable = False
1316 1320 else:
1317 1321 raise
1318 1322 return None
1319 1323
1320 1324 # These are only the static capabilities.
1321 1325 # Check the 'getrepocaps' function for the rest.
1322 1326 capabilities = {'HG20': (),
1323 1327 'error': ('abort', 'unsupportedcontent', 'pushraced',
1324 1328 'pushkey'),
1325 1329 'listkeys': (),
1326 1330 'pushkey': (),
1327 1331 'digests': tuple(sorted(util.DIGESTS.keys())),
1328 1332 'remote-changegroup': ('http', 'https'),
1329 1333 'hgtagsfnodes': (),
1330 1334 }
1331 1335
1332 1336 def getrepocaps(repo, allowpushback=False):
1333 1337 """return the bundle2 capabilities for a given repo
1334 1338
1335 1339 Exists to allow extensions (like evolution) to mutate the capabilities.
1336 1340 """
1337 1341 caps = capabilities.copy()
1338 1342 caps['changegroup'] = tuple(sorted(
1339 1343 changegroup.supportedincomingversions(repo)))
1340 1344 if obsolete.isenabled(repo, obsolete.exchangeopt):
1341 1345 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1342 1346 caps['obsmarkers'] = supportedformat
1343 1347 if allowpushback:
1344 1348 caps['pushback'] = ()
1345 1349 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1346 1350 if cpmode == 'check-related':
1347 1351 caps['checkheads'] = ('related',)
1348 1352 return caps
1349 1353
1350 1354 def bundle2caps(remote):
1351 1355 """return the bundle capabilities of a peer as dict"""
1352 1356 raw = remote.capable('bundle2')
1353 1357 if not raw and raw != '':
1354 1358 return {}
1355 1359 capsblob = urlreq.unquote(remote.capable('bundle2'))
1356 1360 return decodecaps(capsblob)
1357 1361
1358 1362 def obsmarkersversion(caps):
1359 1363 """extract the list of supported obsmarkers versions from a bundle2caps dict
1360 1364 """
1361 1365 obscaps = caps.get('obsmarkers', ())
1362 1366 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1363 1367
1364 1368 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1365 1369 vfs=None, compression=None, compopts=None):
1366 1370 if bundletype.startswith('HG10'):
1367 1371 cg = changegroup.getchangegroup(repo, source, outgoing, version='01')
1368 1372 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1369 1373 compression=compression, compopts=compopts)
1370 1374 elif not bundletype.startswith('HG20'):
1371 1375 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1372 1376
1373 1377 caps = {}
1374 1378 if 'obsolescence' in opts:
1375 1379 caps['obsmarkers'] = ('V1',)
1376 1380 bundle = bundle20(ui, caps)
1377 1381 bundle.setcompression(compression, compopts)
1378 1382 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1379 1383 chunkiter = bundle.getchunks()
1380 1384
1381 1385 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1382 1386
1383 1387 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1384 1388 # We should eventually reconcile this logic with the one behind
1385 1389 # 'exchange.getbundle2partsgenerator'.
1386 1390 #
1387 1391 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1388 1392 # different right now. So we keep them separated for now for the sake of
1389 1393 # simplicity.
1390 1394
1391 1395 # we always want a changegroup in such bundle
1392 1396 cgversion = opts.get('cg.version')
1393 1397 if cgversion is None:
1394 1398 cgversion = changegroup.safeversion(repo)
1395 1399 cg = changegroup.getchangegroup(repo, source, outgoing,
1396 1400 version=cgversion)
1397 1401 part = bundler.newpart('changegroup', data=cg.getchunks())
1398 1402 part.addparam('version', cg.version)
1399 1403 if 'clcount' in cg.extras:
1400 1404 part.addparam('nbchanges', str(cg.extras['clcount']),
1401 1405 mandatory=False)
1402 1406 if opts.get('phases') and repo.revs('%ln and secret()',
1403 1407 outgoing.missingheads):
1404 1408 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1405 1409
1406 1410 addparttagsfnodescache(repo, bundler, outgoing)
1407 1411
1408 1412 if opts.get('obsolescence', False):
1409 1413 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1410 1414 buildobsmarkerspart(bundler, obsmarkers)
1411 1415
1412 1416 if opts.get('phases', False):
1413 1417 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1414 1418 phasedata = []
1415 1419 for phase in phases.allphases:
1416 1420 for head in headsbyphase[phase]:
1417 1421 phasedata.append(_pack(_fphasesentry, phase, head))
1418 1422 bundler.newpart('phase-heads', data=''.join(phasedata))
1419 1423
1420 1424 def addparttagsfnodescache(repo, bundler, outgoing):
1421 1425 # we include the tags fnode cache for the bundle changeset
1422 1426 # (as an optional parts)
1423 1427 cache = tags.hgtagsfnodescache(repo.unfiltered())
1424 1428 chunks = []
1425 1429
1426 1430 # .hgtags fnodes are only relevant for head changesets. While we could
1427 1431 # transfer values for all known nodes, there will likely be little to
1428 1432 # no benefit.
1429 1433 #
1430 1434 # We don't bother using a generator to produce output data because
1431 1435 # a) we only have 40 bytes per head and even esoteric numbers of heads
1432 1436 # consume little memory (1M heads is 40MB) b) we don't want to send the
1433 1437 # part if we don't have entries and knowing if we have entries requires
1434 1438 # cache lookups.
1435 1439 for node in outgoing.missingheads:
1436 1440 # Don't compute missing, as this may slow down serving.
1437 1441 fnode = cache.getfnode(node, computemissing=False)
1438 1442 if fnode is not None:
1439 1443 chunks.extend([node, fnode])
1440 1444
1441 1445 if chunks:
1442 1446 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1443 1447
1444 1448 def buildobsmarkerspart(bundler, markers):
1445 1449 """add an obsmarker part to the bundler with <markers>
1446 1450
1447 1451 No part is created if markers is empty.
1448 1452 Raises ValueError if the bundler doesn't support any known obsmarker format.
1449 1453 """
1450 1454 if not markers:
1451 1455 return None
1452 1456
1453 1457 remoteversions = obsmarkersversion(bundler.capabilities)
1454 1458 version = obsolete.commonversion(remoteversions)
1455 1459 if version is None:
1456 1460 raise ValueError('bundler does not support common obsmarker format')
1457 1461 stream = obsolete.encodemarkers(markers, True, version=version)
1458 1462 return bundler.newpart('obsmarkers', data=stream)
1459 1463
1460 1464 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1461 1465 compopts=None):
1462 1466 """Write a bundle file and return its filename.
1463 1467
1464 1468 Existing files will not be overwritten.
1465 1469 If no filename is specified, a temporary file is created.
1466 1470 bz2 compression can be turned off.
1467 1471 The bundle file will be deleted in case of errors.
1468 1472 """
1469 1473
1470 1474 if bundletype == "HG20":
1471 1475 bundle = bundle20(ui)
1472 1476 bundle.setcompression(compression, compopts)
1473 1477 part = bundle.newpart('changegroup', data=cg.getchunks())
1474 1478 part.addparam('version', cg.version)
1475 1479 if 'clcount' in cg.extras:
1476 1480 part.addparam('nbchanges', str(cg.extras['clcount']),
1477 1481 mandatory=False)
1478 1482 chunkiter = bundle.getchunks()
1479 1483 else:
1480 1484 # compression argument is only for the bundle2 case
1481 1485 assert compression is None
1482 1486 if cg.version != '01':
1483 1487 raise error.Abort(_('old bundle types only supports v1 '
1484 1488 'changegroups'))
1485 1489 header, comp = bundletypes[bundletype]
1486 1490 if comp not in util.compengines.supportedbundletypes:
1487 1491 raise error.Abort(_('unknown stream compression type: %s')
1488 1492 % comp)
1489 1493 compengine = util.compengines.forbundletype(comp)
1490 1494 def chunkiter():
1491 1495 yield header
1492 1496 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1493 1497 yield chunk
1494 1498 chunkiter = chunkiter()
1495 1499
1496 1500 # parse the changegroup data, otherwise we will block
1497 1501 # in case of sshrepo because we don't know the end of the stream
1498 1502 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1499 1503
1500 1504 def combinechangegroupresults(op):
1501 1505 """logic to combine 0 or more addchangegroup results into one"""
1502 1506 results = [r.get('return', 0)
1503 1507 for r in op.records['changegroup']]
1504 1508 changedheads = 0
1505 1509 result = 1
1506 1510 for ret in results:
1507 1511 # If any changegroup result is 0, return 0
1508 1512 if ret == 0:
1509 1513 result = 0
1510 1514 break
1511 1515 if ret < -1:
1512 1516 changedheads += ret + 1
1513 1517 elif ret > 1:
1514 1518 changedheads += ret - 1
1515 1519 if changedheads > 0:
1516 1520 result = 1 + changedheads
1517 1521 elif changedheads < 0:
1518 1522 result = -1 + changedheads
1519 1523 return result
1520 1524
1521 1525 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1522 1526 'targetphase'))
1523 1527 def handlechangegroup(op, inpart):
1524 1528 """apply a changegroup part on the repo
1525 1529
1526 1530 This is a very early implementation that will massive rework before being
1527 1531 inflicted to any end-user.
1528 1532 """
1529 1533 tr = op.gettransaction()
1530 1534 unpackerversion = inpart.params.get('version', '01')
1531 1535 # We should raise an appropriate exception here
1532 1536 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1533 1537 # the source and url passed here are overwritten by the one contained in
1534 1538 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1535 1539 nbchangesets = None
1536 1540 if 'nbchanges' in inpart.params:
1537 1541 nbchangesets = int(inpart.params.get('nbchanges'))
1538 1542 if ('treemanifest' in inpart.params and
1539 1543 'treemanifest' not in op.repo.requirements):
1540 1544 if len(op.repo.changelog) != 0:
1541 1545 raise error.Abort(_(
1542 1546 "bundle contains tree manifests, but local repo is "
1543 1547 "non-empty and does not use tree manifests"))
1544 1548 op.repo.requirements.add('treemanifest')
1545 1549 op.repo._applyopenerreqs()
1546 1550 op.repo._writerequirements()
1547 1551 extrakwargs = {}
1548 1552 targetphase = inpart.params.get('targetphase')
1549 1553 if targetphase is not None:
1550 1554 extrakwargs['targetphase'] = int(targetphase)
1551 1555 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1552 1556 expectedtotal=nbchangesets, **extrakwargs)
1553 1557 if op.reply is not None:
1554 1558 # This is definitely not the final form of this
1555 1559 # return. But one need to start somewhere.
1556 1560 part = op.reply.newpart('reply:changegroup', mandatory=False)
1557 1561 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1558 1562 part.addparam('return', '%i' % ret, mandatory=False)
1559 1563 assert not inpart.read()
1560 1564
1561 1565 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1562 1566 ['digest:%s' % k for k in util.DIGESTS.keys()])
1563 1567 @parthandler('remote-changegroup', _remotechangegroupparams)
1564 1568 def handleremotechangegroup(op, inpart):
1565 1569 """apply a bundle10 on the repo, given an url and validation information
1566 1570
1567 1571 All the information about the remote bundle to import are given as
1568 1572 parameters. The parameters include:
1569 1573 - url: the url to the bundle10.
1570 1574 - size: the bundle10 file size. It is used to validate what was
1571 1575 retrieved by the client matches the server knowledge about the bundle.
1572 1576 - digests: a space separated list of the digest types provided as
1573 1577 parameters.
1574 1578 - digest:<digest-type>: the hexadecimal representation of the digest with
1575 1579 that name. Like the size, it is used to validate what was retrieved by
1576 1580 the client matches what the server knows about the bundle.
1577 1581
1578 1582 When multiple digest types are given, all of them are checked.
1579 1583 """
1580 1584 try:
1581 1585 raw_url = inpart.params['url']
1582 1586 except KeyError:
1583 1587 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1584 1588 parsed_url = util.url(raw_url)
1585 1589 if parsed_url.scheme not in capabilities['remote-changegroup']:
1586 1590 raise error.Abort(_('remote-changegroup does not support %s urls') %
1587 1591 parsed_url.scheme)
1588 1592
1589 1593 try:
1590 1594 size = int(inpart.params['size'])
1591 1595 except ValueError:
1592 1596 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1593 1597 % 'size')
1594 1598 except KeyError:
1595 1599 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1596 1600
1597 1601 digests = {}
1598 1602 for typ in inpart.params.get('digests', '').split():
1599 1603 param = 'digest:%s' % typ
1600 1604 try:
1601 1605 value = inpart.params[param]
1602 1606 except KeyError:
1603 1607 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1604 1608 param)
1605 1609 digests[typ] = value
1606 1610
1607 1611 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1608 1612
1609 1613 tr = op.gettransaction()
1610 1614 from . import exchange
1611 1615 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1612 1616 if not isinstance(cg, changegroup.cg1unpacker):
1613 1617 raise error.Abort(_('%s: not a bundle version 1.0') %
1614 1618 util.hidepassword(raw_url))
1615 1619 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1616 1620 if op.reply is not None:
1617 1621 # This is definitely not the final form of this
1618 1622 # return. But one need to start somewhere.
1619 1623 part = op.reply.newpart('reply:changegroup')
1620 1624 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1621 1625 part.addparam('return', '%i' % ret, mandatory=False)
1622 1626 try:
1623 1627 real_part.validate()
1624 1628 except error.Abort as e:
1625 1629 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1626 1630 (util.hidepassword(raw_url), str(e)))
1627 1631 assert not inpart.read()
1628 1632
1629 1633 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1630 1634 def handlereplychangegroup(op, inpart):
1631 1635 ret = int(inpart.params['return'])
1632 1636 replyto = int(inpart.params['in-reply-to'])
1633 1637 op.records.add('changegroup', {'return': ret}, replyto)
1634 1638
1635 1639 @parthandler('check:heads')
1636 1640 def handlecheckheads(op, inpart):
1637 1641 """check that head of the repo did not change
1638 1642
1639 1643 This is used to detect a push race when using unbundle.
1640 1644 This replaces the "heads" argument of unbundle."""
1641 1645 h = inpart.read(20)
1642 1646 heads = []
1643 1647 while len(h) == 20:
1644 1648 heads.append(h)
1645 1649 h = inpart.read(20)
1646 1650 assert not h
1647 1651 # Trigger a transaction so that we are guaranteed to have the lock now.
1648 1652 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1649 1653 op.gettransaction()
1650 1654 if sorted(heads) != sorted(op.repo.heads()):
1651 1655 raise error.PushRaced('repository changed while pushing - '
1652 1656 'please try again')
1653 1657
1654 1658 @parthandler('check:updated-heads')
1655 1659 def handlecheckupdatedheads(op, inpart):
1656 1660 """check for race on the heads touched by a push
1657 1661
1658 1662 This is similar to 'check:heads' but focus on the heads actually updated
1659 1663 during the push. If other activities happen on unrelated heads, it is
1660 1664 ignored.
1661 1665
1662 1666 This allow server with high traffic to avoid push contention as long as
1663 1667 unrelated parts of the graph are involved."""
1664 1668 h = inpart.read(20)
1665 1669 heads = []
1666 1670 while len(h) == 20:
1667 1671 heads.append(h)
1668 1672 h = inpart.read(20)
1669 1673 assert not h
1670 1674 # trigger a transaction so that we are guaranteed to have the lock now.
1671 1675 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1672 1676 op.gettransaction()
1673 1677
1674 1678 currentheads = set()
1675 1679 for ls in op.repo.branchmap().itervalues():
1676 1680 currentheads.update(ls)
1677 1681
1678 1682 for h in heads:
1679 1683 if h not in currentheads:
1680 1684 raise error.PushRaced('repository changed while pushing - '
1681 1685 'please try again')
1682 1686
1683 1687 @parthandler('output')
1684 1688 def handleoutput(op, inpart):
1685 1689 """forward output captured on the server to the client"""
1686 1690 for line in inpart.read().splitlines():
1687 1691 op.ui.status(_('remote: %s\n') % line)
1688 1692
1689 1693 @parthandler('replycaps')
1690 1694 def handlereplycaps(op, inpart):
1691 1695 """Notify that a reply bundle should be created
1692 1696
1693 1697 The payload contains the capabilities information for the reply"""
1694 1698 caps = decodecaps(inpart.read())
1695 1699 if op.reply is None:
1696 1700 op.reply = bundle20(op.ui, caps)
1697 1701
1698 1702 class AbortFromPart(error.Abort):
1699 1703 """Sub-class of Abort that denotes an error from a bundle2 part."""
1700 1704
1701 1705 @parthandler('error:abort', ('message', 'hint'))
1702 1706 def handleerrorabort(op, inpart):
1703 1707 """Used to transmit abort error over the wire"""
1704 1708 raise AbortFromPart(inpart.params['message'],
1705 1709 hint=inpart.params.get('hint'))
1706 1710
1707 1711 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1708 1712 'in-reply-to'))
1709 1713 def handleerrorpushkey(op, inpart):
1710 1714 """Used to transmit failure of a mandatory pushkey over the wire"""
1711 1715 kwargs = {}
1712 1716 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1713 1717 value = inpart.params.get(name)
1714 1718 if value is not None:
1715 1719 kwargs[name] = value
1716 1720 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1717 1721
1718 1722 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1719 1723 def handleerrorunsupportedcontent(op, inpart):
1720 1724 """Used to transmit unknown content error over the wire"""
1721 1725 kwargs = {}
1722 1726 parttype = inpart.params.get('parttype')
1723 1727 if parttype is not None:
1724 1728 kwargs['parttype'] = parttype
1725 1729 params = inpart.params.get('params')
1726 1730 if params is not None:
1727 1731 kwargs['params'] = params.split('\0')
1728 1732
1729 1733 raise error.BundleUnknownFeatureError(**kwargs)
1730 1734
1731 1735 @parthandler('error:pushraced', ('message',))
1732 1736 def handleerrorpushraced(op, inpart):
1733 1737 """Used to transmit push race error over the wire"""
1734 1738 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1735 1739
1736 1740 @parthandler('listkeys', ('namespace',))
1737 1741 def handlelistkeys(op, inpart):
1738 1742 """retrieve pushkey namespace content stored in a bundle2"""
1739 1743 namespace = inpart.params['namespace']
1740 1744 r = pushkey.decodekeys(inpart.read())
1741 1745 op.records.add('listkeys', (namespace, r))
1742 1746
1743 1747 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1744 1748 def handlepushkey(op, inpart):
1745 1749 """process a pushkey request"""
1746 1750 dec = pushkey.decode
1747 1751 namespace = dec(inpart.params['namespace'])
1748 1752 key = dec(inpart.params['key'])
1749 1753 old = dec(inpart.params['old'])
1750 1754 new = dec(inpart.params['new'])
1751 1755 # Grab the transaction to ensure that we have the lock before performing the
1752 1756 # pushkey.
1753 1757 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1754 1758 op.gettransaction()
1755 1759 ret = op.repo.pushkey(namespace, key, old, new)
1756 1760 record = {'namespace': namespace,
1757 1761 'key': key,
1758 1762 'old': old,
1759 1763 'new': new}
1760 1764 op.records.add('pushkey', record)
1761 1765 if op.reply is not None:
1762 1766 rpart = op.reply.newpart('reply:pushkey')
1763 1767 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1764 1768 rpart.addparam('return', '%i' % ret, mandatory=False)
1765 1769 if inpart.mandatory and not ret:
1766 1770 kwargs = {}
1767 1771 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1768 1772 if key in inpart.params:
1769 1773 kwargs[key] = inpart.params[key]
1770 1774 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1771 1775
1772 1776 def _readphaseheads(inpart):
1773 1777 headsbyphase = [[] for i in phases.allphases]
1774 1778 entrysize = struct.calcsize(_fphasesentry)
1775 1779 while True:
1776 1780 entry = inpart.read(entrysize)
1777 1781 if len(entry) < entrysize:
1778 1782 if entry:
1779 1783 raise error.Abort(_('bad phase-heads bundle part'))
1780 1784 break
1781 1785 phase, node = struct.unpack(_fphasesentry, entry)
1782 1786 headsbyphase[phase].append(node)
1783 1787 return headsbyphase
1784 1788
1785 1789 @parthandler('phase-heads')
1786 1790 def handlephases(op, inpart):
1787 1791 """apply phases from bundle part to repo"""
1788 1792 headsbyphase = _readphaseheads(inpart)
1789 1793 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1790 1794
1791 1795 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1792 1796 def handlepushkeyreply(op, inpart):
1793 1797 """retrieve the result of a pushkey request"""
1794 1798 ret = int(inpart.params['return'])
1795 1799 partid = int(inpart.params['in-reply-to'])
1796 1800 op.records.add('pushkey', {'return': ret}, partid)
1797 1801
1798 1802 @parthandler('obsmarkers')
1799 1803 def handleobsmarker(op, inpart):
1800 1804 """add a stream of obsmarkers to the repo"""
1801 1805 tr = op.gettransaction()
1802 1806 markerdata = inpart.read()
1803 1807 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1804 1808 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1805 1809 % len(markerdata))
1806 1810 # The mergemarkers call will crash if marker creation is not enabled.
1807 1811 # we want to avoid this if the part is advisory.
1808 1812 if not inpart.mandatory and op.repo.obsstore.readonly:
1809 1813 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1810 1814 return
1811 1815 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1812 1816 op.repo.invalidatevolatilesets()
1813 1817 if new:
1814 1818 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1815 1819 op.records.add('obsmarkers', {'new': new})
1816 1820 if op.reply is not None:
1817 1821 rpart = op.reply.newpart('reply:obsmarkers')
1818 1822 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1819 1823 rpart.addparam('new', '%i' % new, mandatory=False)
1820 1824
1821 1825
1822 1826 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1823 1827 def handleobsmarkerreply(op, inpart):
1824 1828 """retrieve the result of a pushkey request"""
1825 1829 ret = int(inpart.params['new'])
1826 1830 partid = int(inpart.params['in-reply-to'])
1827 1831 op.records.add('obsmarkers', {'new': ret}, partid)
1828 1832
1829 1833 @parthandler('hgtagsfnodes')
1830 1834 def handlehgtagsfnodes(op, inpart):
1831 1835 """Applies .hgtags fnodes cache entries to the local repo.
1832 1836
1833 1837 Payload is pairs of 20 byte changeset nodes and filenodes.
1834 1838 """
1835 1839 # Grab the transaction so we ensure that we have the lock at this point.
1836 1840 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1837 1841 op.gettransaction()
1838 1842 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1839 1843
1840 1844 count = 0
1841 1845 while True:
1842 1846 node = inpart.read(20)
1843 1847 fnode = inpart.read(20)
1844 1848 if len(node) < 20 or len(fnode) < 20:
1845 1849 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1846 1850 break
1847 1851 cache.setfnode(node, fnode)
1848 1852 count += 1
1849 1853
1850 1854 cache.write()
1851 1855 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now