##// END OF EJS Templates
bundle: inline applybundle1()...
Martin von Zweigbergk -
r33044:1d2b6895 default
parent child Browse files
Show More
@@ -1,1848 +1,1845 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import
149 149
150 150 import errno
151 151 import re
152 152 import string
153 153 import struct
154 154 import sys
155 155
156 156 from .i18n import _
157 157 from . import (
158 158 changegroup,
159 159 error,
160 160 obsolete,
161 161 phases,
162 162 pushkey,
163 163 pycompat,
164 164 tags,
165 165 url,
166 166 util,
167 167 )
168 168
169 169 urlerr = util.urlerr
170 170 urlreq = util.urlreq
171 171
172 172 _pack = struct.pack
173 173 _unpack = struct.unpack
174 174
175 175 _fstreamparamsize = '>i'
176 176 _fpartheadersize = '>i'
177 177 _fparttypesize = '>B'
178 178 _fpartid = '>I'
179 179 _fpayloadsize = '>i'
180 180 _fpartparamcount = '>BB'
181 181
182 182 _fphasesentry = '>i20s'
183 183
184 184 preferedchunksize = 4096
185 185
186 186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
187 187
188 188 def outdebug(ui, message):
189 189 """debug regarding output stream (bundling)"""
190 190 if ui.configbool('devel', 'bundle2.debug', False):
191 191 ui.debug('bundle2-output: %s\n' % message)
192 192
193 193 def indebug(ui, message):
194 194 """debug on input stream (unbundling)"""
195 195 if ui.configbool('devel', 'bundle2.debug', False):
196 196 ui.debug('bundle2-input: %s\n' % message)
197 197
198 198 def validateparttype(parttype):
199 199 """raise ValueError if a parttype contains invalid character"""
200 200 if _parttypeforbidden.search(parttype):
201 201 raise ValueError(parttype)
202 202
203 203 def _makefpartparamsizes(nbparams):
204 204 """return a struct format to read part parameter sizes
205 205
206 206 The number parameters is variable so we need to build that format
207 207 dynamically.
208 208 """
209 209 return '>'+('BB'*nbparams)
210 210
211 211 parthandlermapping = {}
212 212
213 213 def parthandler(parttype, params=()):
214 214 """decorator that register a function as a bundle2 part handler
215 215
216 216 eg::
217 217
218 218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
219 219 def myparttypehandler(...):
220 220 '''process a part of type "my part".'''
221 221 ...
222 222 """
223 223 validateparttype(parttype)
224 224 def _decorator(func):
225 225 lparttype = parttype.lower() # enforce lower case matching.
226 226 assert lparttype not in parthandlermapping
227 227 parthandlermapping[lparttype] = func
228 228 func.params = frozenset(params)
229 229 return func
230 230 return _decorator
231 231
232 232 class unbundlerecords(object):
233 233 """keep record of what happens during and unbundle
234 234
235 235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
236 236 category of record and obj is an arbitrary object.
237 237
238 238 `records['cat']` will return all entries of this category 'cat'.
239 239
240 240 Iterating on the object itself will yield `('category', obj)` tuples
241 241 for all entries.
242 242
243 243 All iterations happens in chronological order.
244 244 """
245 245
246 246 def __init__(self):
247 247 self._categories = {}
248 248 self._sequences = []
249 249 self._replies = {}
250 250
251 251 def add(self, category, entry, inreplyto=None):
252 252 """add a new record of a given category.
253 253
254 254 The entry can then be retrieved in the list returned by
255 255 self['category']."""
256 256 self._categories.setdefault(category, []).append(entry)
257 257 self._sequences.append((category, entry))
258 258 if inreplyto is not None:
259 259 self.getreplies(inreplyto).add(category, entry)
260 260
261 261 def getreplies(self, partid):
262 262 """get the records that are replies to a specific part"""
263 263 return self._replies.setdefault(partid, unbundlerecords())
264 264
265 265 def __getitem__(self, cat):
266 266 return tuple(self._categories.get(cat, ()))
267 267
268 268 def __iter__(self):
269 269 return iter(self._sequences)
270 270
271 271 def __len__(self):
272 272 return len(self._sequences)
273 273
274 274 def __nonzero__(self):
275 275 return bool(self._sequences)
276 276
277 277 __bool__ = __nonzero__
278 278
279 279 class bundleoperation(object):
280 280 """an object that represents a single bundling process
281 281
282 282 Its purpose is to carry unbundle-related objects and states.
283 283
284 284 A new object should be created at the beginning of each bundle processing.
285 285 The object is to be returned by the processing function.
286 286
287 287 The object has very little content now it will ultimately contain:
288 288 * an access to the repo the bundle is applied to,
289 289 * a ui object,
290 290 * a way to retrieve a transaction to add changes to the repo,
291 291 * a way to record the result of processing each part,
292 292 * a way to construct a bundle response when applicable.
293 293 """
294 294
295 295 def __init__(self, repo, transactiongetter, captureoutput=True):
296 296 self.repo = repo
297 297 self.ui = repo.ui
298 298 self.records = unbundlerecords()
299 299 self.gettransaction = transactiongetter
300 300 self.reply = None
301 301 self.captureoutput = captureoutput
302 302
303 303 class TransactionUnavailable(RuntimeError):
304 304 pass
305 305
306 306 def _notransaction():
307 307 """default method to get a transaction while processing a bundle
308 308
309 309 Raise an exception to highlight the fact that no transaction was expected
310 310 to be created"""
311 311 raise TransactionUnavailable()
312 312
313 def applybundle1(repo, cg, tr, source, url, **kwargs):
314 # the transactiongetter won't be used, but we might as well set it
315 op = bundleoperation(repo, lambda: tr)
316 _processchangegroup(op, cg, tr, source, url, **kwargs)
317 return op
318
319 313 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
320 314 # transform me into unbundler.apply() as soon as the freeze is lifted
321 315 if isinstance(unbundler, unbundle20):
322 316 tr.hookargs['bundle2'] = '1'
323 317 if source is not None and 'source' not in tr.hookargs:
324 318 tr.hookargs['source'] = source
325 319 if url is not None and 'url' not in tr.hookargs:
326 320 tr.hookargs['url'] = url
327 321 return processbundle(repo, unbundler, lambda: tr)
328 322 else:
329 return applybundle1(repo, unbundler, tr, source, url, **kwargs)
323 # the transactiongetter won't be used, but we might as well set it
324 op = bundleoperation(repo, lambda: tr)
325 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
326 return op
330 327
331 328 def processbundle(repo, unbundler, transactiongetter=None, op=None):
332 329 """This function process a bundle, apply effect to/from a repo
333 330
334 331 It iterates over each part then searches for and uses the proper handling
335 332 code to process the part. Parts are processed in order.
336 333
337 334 Unknown Mandatory part will abort the process.
338 335
339 336 It is temporarily possible to provide a prebuilt bundleoperation to the
340 337 function. This is used to ensure output is properly propagated in case of
341 338 an error during the unbundling. This output capturing part will likely be
342 339 reworked and this ability will probably go away in the process.
343 340 """
344 341 if op is None:
345 342 if transactiongetter is None:
346 343 transactiongetter = _notransaction
347 344 op = bundleoperation(repo, transactiongetter)
348 345 # todo:
349 346 # - replace this is a init function soon.
350 347 # - exception catching
351 348 unbundler.params
352 349 if repo.ui.debugflag:
353 350 msg = ['bundle2-input-bundle:']
354 351 if unbundler.params:
355 352 msg.append(' %i params')
356 353 if op.gettransaction is None or op.gettransaction is _notransaction:
357 354 msg.append(' no-transaction')
358 355 else:
359 356 msg.append(' with-transaction')
360 357 msg.append('\n')
361 358 repo.ui.debug(''.join(msg))
362 359 iterparts = enumerate(unbundler.iterparts())
363 360 part = None
364 361 nbpart = 0
365 362 try:
366 363 for nbpart, part in iterparts:
367 364 _processpart(op, part)
368 365 except Exception as exc:
369 366 # Any exceptions seeking to the end of the bundle at this point are
370 367 # almost certainly related to the underlying stream being bad.
371 368 # And, chances are that the exception we're handling is related to
372 369 # getting in that bad state. So, we swallow the seeking error and
373 370 # re-raise the original error.
374 371 seekerror = False
375 372 try:
376 373 for nbpart, part in iterparts:
377 374 # consume the bundle content
378 375 part.seek(0, 2)
379 376 except Exception:
380 377 seekerror = True
381 378
382 379 # Small hack to let caller code distinguish exceptions from bundle2
383 380 # processing from processing the old format. This is mostly
384 381 # needed to handle different return codes to unbundle according to the
385 382 # type of bundle. We should probably clean up or drop this return code
386 383 # craziness in a future version.
387 384 exc.duringunbundle2 = True
388 385 salvaged = []
389 386 replycaps = None
390 387 if op.reply is not None:
391 388 salvaged = op.reply.salvageoutput()
392 389 replycaps = op.reply.capabilities
393 390 exc._replycaps = replycaps
394 391 exc._bundle2salvagedoutput = salvaged
395 392
396 393 # Re-raising from a variable loses the original stack. So only use
397 394 # that form if we need to.
398 395 if seekerror:
399 396 raise exc
400 397 else:
401 398 raise
402 399 finally:
403 400 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
404 401
405 402 return op
406 403
407 404 def _processchangegroup(op, cg, tr, source, url, **kwargs):
408 405 ret, addednodes = cg.apply(op.repo, tr, source, url, **kwargs)
409 406 op.records.add('changegroup', {
410 407 'return': ret,
411 408 'addednodes': addednodes,
412 409 })
413 410 return ret
414 411
415 412 def _processpart(op, part):
416 413 """process a single part from a bundle
417 414
418 415 The part is guaranteed to have been fully consumed when the function exits
419 416 (even if an exception is raised)."""
420 417 status = 'unknown' # used by debug output
421 418 hardabort = False
422 419 try:
423 420 try:
424 421 handler = parthandlermapping.get(part.type)
425 422 if handler is None:
426 423 status = 'unsupported-type'
427 424 raise error.BundleUnknownFeatureError(parttype=part.type)
428 425 indebug(op.ui, 'found a handler for part %r' % part.type)
429 426 unknownparams = part.mandatorykeys - handler.params
430 427 if unknownparams:
431 428 unknownparams = list(unknownparams)
432 429 unknownparams.sort()
433 430 status = 'unsupported-params (%s)' % unknownparams
434 431 raise error.BundleUnknownFeatureError(parttype=part.type,
435 432 params=unknownparams)
436 433 status = 'supported'
437 434 except error.BundleUnknownFeatureError as exc:
438 435 if part.mandatory: # mandatory parts
439 436 raise
440 437 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
441 438 return # skip to part processing
442 439 finally:
443 440 if op.ui.debugflag:
444 441 msg = ['bundle2-input-part: "%s"' % part.type]
445 442 if not part.mandatory:
446 443 msg.append(' (advisory)')
447 444 nbmp = len(part.mandatorykeys)
448 445 nbap = len(part.params) - nbmp
449 446 if nbmp or nbap:
450 447 msg.append(' (params:')
451 448 if nbmp:
452 449 msg.append(' %i mandatory' % nbmp)
453 450 if nbap:
454 451 msg.append(' %i advisory' % nbmp)
455 452 msg.append(')')
456 453 msg.append(' %s\n' % status)
457 454 op.ui.debug(''.join(msg))
458 455
459 456 # handler is called outside the above try block so that we don't
460 457 # risk catching KeyErrors from anything other than the
461 458 # parthandlermapping lookup (any KeyError raised by handler()
462 459 # itself represents a defect of a different variety).
463 460 output = None
464 461 if op.captureoutput and op.reply is not None:
465 462 op.ui.pushbuffer(error=True, subproc=True)
466 463 output = ''
467 464 try:
468 465 handler(op, part)
469 466 finally:
470 467 if output is not None:
471 468 output = op.ui.popbuffer()
472 469 if output:
473 470 outpart = op.reply.newpart('output', data=output,
474 471 mandatory=False)
475 472 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
476 473 # If exiting or interrupted, do not attempt to seek the stream in the
477 474 # finally block below. This makes abort faster.
478 475 except (SystemExit, KeyboardInterrupt):
479 476 hardabort = True
480 477 raise
481 478 finally:
482 479 # consume the part content to not corrupt the stream.
483 480 if not hardabort:
484 481 part.seek(0, 2)
485 482
486 483
487 484 def decodecaps(blob):
488 485 """decode a bundle2 caps bytes blob into a dictionary
489 486
490 487 The blob is a list of capabilities (one per line)
491 488 Capabilities may have values using a line of the form::
492 489
493 490 capability=value1,value2,value3
494 491
495 492 The values are always a list."""
496 493 caps = {}
497 494 for line in blob.splitlines():
498 495 if not line:
499 496 continue
500 497 if '=' not in line:
501 498 key, vals = line, ()
502 499 else:
503 500 key, vals = line.split('=', 1)
504 501 vals = vals.split(',')
505 502 key = urlreq.unquote(key)
506 503 vals = [urlreq.unquote(v) for v in vals]
507 504 caps[key] = vals
508 505 return caps
509 506
510 507 def encodecaps(caps):
511 508 """encode a bundle2 caps dictionary into a bytes blob"""
512 509 chunks = []
513 510 for ca in sorted(caps):
514 511 vals = caps[ca]
515 512 ca = urlreq.quote(ca)
516 513 vals = [urlreq.quote(v) for v in vals]
517 514 if vals:
518 515 ca = "%s=%s" % (ca, ','.join(vals))
519 516 chunks.append(ca)
520 517 return '\n'.join(chunks)
521 518
522 519 bundletypes = {
523 520 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
524 521 # since the unification ssh accepts a header but there
525 522 # is no capability signaling it.
526 523 "HG20": (), # special-cased below
527 524 "HG10UN": ("HG10UN", 'UN'),
528 525 "HG10BZ": ("HG10", 'BZ'),
529 526 "HG10GZ": ("HG10GZ", 'GZ'),
530 527 }
531 528
532 529 # hgweb uses this list to communicate its preferred type
533 530 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
534 531
535 532 class bundle20(object):
536 533 """represent an outgoing bundle2 container
537 534
538 535 Use the `addparam` method to add stream level parameter. and `newpart` to
539 536 populate it. Then call `getchunks` to retrieve all the binary chunks of
540 537 data that compose the bundle2 container."""
541 538
542 539 _magicstring = 'HG20'
543 540
544 541 def __init__(self, ui, capabilities=()):
545 542 self.ui = ui
546 543 self._params = []
547 544 self._parts = []
548 545 self.capabilities = dict(capabilities)
549 546 self._compengine = util.compengines.forbundletype('UN')
550 547 self._compopts = None
551 548
552 549 def setcompression(self, alg, compopts=None):
553 550 """setup core part compression to <alg>"""
554 551 if alg in (None, 'UN'):
555 552 return
556 553 assert not any(n.lower() == 'compression' for n, v in self._params)
557 554 self.addparam('Compression', alg)
558 555 self._compengine = util.compengines.forbundletype(alg)
559 556 self._compopts = compopts
560 557
561 558 @property
562 559 def nbparts(self):
563 560 """total number of parts added to the bundler"""
564 561 return len(self._parts)
565 562
566 563 # methods used to defines the bundle2 content
567 564 def addparam(self, name, value=None):
568 565 """add a stream level parameter"""
569 566 if not name:
570 567 raise ValueError('empty parameter name')
571 568 if name[0] not in string.letters:
572 569 raise ValueError('non letter first character: %r' % name)
573 570 self._params.append((name, value))
574 571
575 572 def addpart(self, part):
576 573 """add a new part to the bundle2 container
577 574
578 575 Parts contains the actual applicative payload."""
579 576 assert part.id is None
580 577 part.id = len(self._parts) # very cheap counter
581 578 self._parts.append(part)
582 579
583 580 def newpart(self, typeid, *args, **kwargs):
584 581 """create a new part and add it to the containers
585 582
586 583 As the part is directly added to the containers. For now, this means
587 584 that any failure to properly initialize the part after calling
588 585 ``newpart`` should result in a failure of the whole bundling process.
589 586
590 587 You can still fall back to manually create and add if you need better
591 588 control."""
592 589 part = bundlepart(typeid, *args, **kwargs)
593 590 self.addpart(part)
594 591 return part
595 592
596 593 # methods used to generate the bundle2 stream
597 594 def getchunks(self):
598 595 if self.ui.debugflag:
599 596 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
600 597 if self._params:
601 598 msg.append(' (%i params)' % len(self._params))
602 599 msg.append(' %i parts total\n' % len(self._parts))
603 600 self.ui.debug(''.join(msg))
604 601 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
605 602 yield self._magicstring
606 603 param = self._paramchunk()
607 604 outdebug(self.ui, 'bundle parameter: %s' % param)
608 605 yield _pack(_fstreamparamsize, len(param))
609 606 if param:
610 607 yield param
611 608 for chunk in self._compengine.compressstream(self._getcorechunk(),
612 609 self._compopts):
613 610 yield chunk
614 611
615 612 def _paramchunk(self):
616 613 """return a encoded version of all stream parameters"""
617 614 blocks = []
618 615 for par, value in self._params:
619 616 par = urlreq.quote(par)
620 617 if value is not None:
621 618 value = urlreq.quote(value)
622 619 par = '%s=%s' % (par, value)
623 620 blocks.append(par)
624 621 return ' '.join(blocks)
625 622
626 623 def _getcorechunk(self):
627 624 """yield chunk for the core part of the bundle
628 625
629 626 (all but headers and parameters)"""
630 627 outdebug(self.ui, 'start of parts')
631 628 for part in self._parts:
632 629 outdebug(self.ui, 'bundle part: "%s"' % part.type)
633 630 for chunk in part.getchunks(ui=self.ui):
634 631 yield chunk
635 632 outdebug(self.ui, 'end of bundle')
636 633 yield _pack(_fpartheadersize, 0)
637 634
638 635
639 636 def salvageoutput(self):
640 637 """return a list with a copy of all output parts in the bundle
641 638
642 639 This is meant to be used during error handling to make sure we preserve
643 640 server output"""
644 641 salvaged = []
645 642 for part in self._parts:
646 643 if part.type.startswith('output'):
647 644 salvaged.append(part.copy())
648 645 return salvaged
649 646
650 647
651 648 class unpackermixin(object):
652 649 """A mixin to extract bytes and struct data from a stream"""
653 650
654 651 def __init__(self, fp):
655 652 self._fp = fp
656 653
657 654 def _unpack(self, format):
658 655 """unpack this struct format from the stream
659 656
660 657 This method is meant for internal usage by the bundle2 protocol only.
661 658 They directly manipulate the low level stream including bundle2 level
662 659 instruction.
663 660
664 661 Do not use it to implement higher-level logic or methods."""
665 662 data = self._readexact(struct.calcsize(format))
666 663 return _unpack(format, data)
667 664
668 665 def _readexact(self, size):
669 666 """read exactly <size> bytes from the stream
670 667
671 668 This method is meant for internal usage by the bundle2 protocol only.
672 669 They directly manipulate the low level stream including bundle2 level
673 670 instruction.
674 671
675 672 Do not use it to implement higher-level logic or methods."""
676 673 return changegroup.readexactly(self._fp, size)
677 674
678 675 def getunbundler(ui, fp, magicstring=None):
679 676 """return a valid unbundler object for a given magicstring"""
680 677 if magicstring is None:
681 678 magicstring = changegroup.readexactly(fp, 4)
682 679 magic, version = magicstring[0:2], magicstring[2:4]
683 680 if magic != 'HG':
684 681 raise error.Abort(_('not a Mercurial bundle'))
685 682 unbundlerclass = formatmap.get(version)
686 683 if unbundlerclass is None:
687 684 raise error.Abort(_('unknown bundle version %s') % version)
688 685 unbundler = unbundlerclass(ui, fp)
689 686 indebug(ui, 'start processing of %s stream' % magicstring)
690 687 return unbundler
691 688
692 689 class unbundle20(unpackermixin):
693 690 """interpret a bundle2 stream
694 691
695 692 This class is fed with a binary stream and yields parts through its
696 693 `iterparts` methods."""
697 694
698 695 _magicstring = 'HG20'
699 696
700 697 def __init__(self, ui, fp):
701 698 """If header is specified, we do not read it out of the stream."""
702 699 self.ui = ui
703 700 self._compengine = util.compengines.forbundletype('UN')
704 701 self._compressed = None
705 702 super(unbundle20, self).__init__(fp)
706 703
707 704 @util.propertycache
708 705 def params(self):
709 706 """dictionary of stream level parameters"""
710 707 indebug(self.ui, 'reading bundle2 stream parameters')
711 708 params = {}
712 709 paramssize = self._unpack(_fstreamparamsize)[0]
713 710 if paramssize < 0:
714 711 raise error.BundleValueError('negative bundle param size: %i'
715 712 % paramssize)
716 713 if paramssize:
717 714 params = self._readexact(paramssize)
718 715 params = self._processallparams(params)
719 716 return params
720 717
721 718 def _processallparams(self, paramsblock):
722 719 """"""
723 720 params = util.sortdict()
724 721 for p in paramsblock.split(' '):
725 722 p = p.split('=', 1)
726 723 p = [urlreq.unquote(i) for i in p]
727 724 if len(p) < 2:
728 725 p.append(None)
729 726 self._processparam(*p)
730 727 params[p[0]] = p[1]
731 728 return params
732 729
733 730
734 731 def _processparam(self, name, value):
735 732 """process a parameter, applying its effect if needed
736 733
737 734 Parameter starting with a lower case letter are advisory and will be
738 735 ignored when unknown. Those starting with an upper case letter are
739 736 mandatory and will this function will raise a KeyError when unknown.
740 737
741 738 Note: no option are currently supported. Any input will be either
742 739 ignored or failing.
743 740 """
744 741 if not name:
745 742 raise ValueError('empty parameter name')
746 743 if name[0] not in string.letters:
747 744 raise ValueError('non letter first character: %r' % name)
748 745 try:
749 746 handler = b2streamparamsmap[name.lower()]
750 747 except KeyError:
751 748 if name[0].islower():
752 749 indebug(self.ui, "ignoring unknown parameter %r" % name)
753 750 else:
754 751 raise error.BundleUnknownFeatureError(params=(name,))
755 752 else:
756 753 handler(self, name, value)
757 754
758 755 def _forwardchunks(self):
759 756 """utility to transfer a bundle2 as binary
760 757
761 758 This is made necessary by the fact the 'getbundle' command over 'ssh'
762 759 have no way to know then the reply end, relying on the bundle to be
763 760 interpreted to know its end. This is terrible and we are sorry, but we
764 761 needed to move forward to get general delta enabled.
765 762 """
766 763 yield self._magicstring
767 764 assert 'params' not in vars(self)
768 765 paramssize = self._unpack(_fstreamparamsize)[0]
769 766 if paramssize < 0:
770 767 raise error.BundleValueError('negative bundle param size: %i'
771 768 % paramssize)
772 769 yield _pack(_fstreamparamsize, paramssize)
773 770 if paramssize:
774 771 params = self._readexact(paramssize)
775 772 self._processallparams(params)
776 773 yield params
777 774 assert self._compengine.bundletype == 'UN'
778 775 # From there, payload might need to be decompressed
779 776 self._fp = self._compengine.decompressorreader(self._fp)
780 777 emptycount = 0
781 778 while emptycount < 2:
782 779 # so we can brainlessly loop
783 780 assert _fpartheadersize == _fpayloadsize
784 781 size = self._unpack(_fpartheadersize)[0]
785 782 yield _pack(_fpartheadersize, size)
786 783 if size:
787 784 emptycount = 0
788 785 else:
789 786 emptycount += 1
790 787 continue
791 788 if size == flaginterrupt:
792 789 continue
793 790 elif size < 0:
794 791 raise error.BundleValueError('negative chunk size: %i')
795 792 yield self._readexact(size)
796 793
797 794
798 795 def iterparts(self):
799 796 """yield all parts contained in the stream"""
800 797 # make sure param have been loaded
801 798 self.params
802 799 # From there, payload need to be decompressed
803 800 self._fp = self._compengine.decompressorreader(self._fp)
804 801 indebug(self.ui, 'start extraction of bundle2 parts')
805 802 headerblock = self._readpartheader()
806 803 while headerblock is not None:
807 804 part = unbundlepart(self.ui, headerblock, self._fp)
808 805 yield part
809 806 part.seek(0, 2)
810 807 headerblock = self._readpartheader()
811 808 indebug(self.ui, 'end of bundle2 stream')
812 809
813 810 def _readpartheader(self):
814 811 """reads a part header size and return the bytes blob
815 812
816 813 returns None if empty"""
817 814 headersize = self._unpack(_fpartheadersize)[0]
818 815 if headersize < 0:
819 816 raise error.BundleValueError('negative part header size: %i'
820 817 % headersize)
821 818 indebug(self.ui, 'part header size: %i' % headersize)
822 819 if headersize:
823 820 return self._readexact(headersize)
824 821 return None
825 822
826 823 def compressed(self):
827 824 self.params # load params
828 825 return self._compressed
829 826
830 827 def close(self):
831 828 """close underlying file"""
832 829 if util.safehasattr(self._fp, 'close'):
833 830 return self._fp.close()
834 831
835 832 formatmap = {'20': unbundle20}
836 833
837 834 b2streamparamsmap = {}
838 835
839 836 def b2streamparamhandler(name):
840 837 """register a handler for a stream level parameter"""
841 838 def decorator(func):
842 839 assert name not in formatmap
843 840 b2streamparamsmap[name] = func
844 841 return func
845 842 return decorator
846 843
847 844 @b2streamparamhandler('compression')
848 845 def processcompression(unbundler, param, value):
849 846 """read compression parameter and install payload decompression"""
850 847 if value not in util.compengines.supportedbundletypes:
851 848 raise error.BundleUnknownFeatureError(params=(param,),
852 849 values=(value,))
853 850 unbundler._compengine = util.compengines.forbundletype(value)
854 851 if value is not None:
855 852 unbundler._compressed = True
856 853
857 854 class bundlepart(object):
858 855 """A bundle2 part contains application level payload
859 856
860 857 The part `type` is used to route the part to the application level
861 858 handler.
862 859
863 860 The part payload is contained in ``part.data``. It could be raw bytes or a
864 861 generator of byte chunks.
865 862
866 863 You can add parameters to the part using the ``addparam`` method.
867 864 Parameters can be either mandatory (default) or advisory. Remote side
868 865 should be able to safely ignore the advisory ones.
869 866
870 867 Both data and parameters cannot be modified after the generation has begun.
871 868 """
872 869
873 870 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
874 871 data='', mandatory=True):
875 872 validateparttype(parttype)
876 873 self.id = None
877 874 self.type = parttype
878 875 self._data = data
879 876 self._mandatoryparams = list(mandatoryparams)
880 877 self._advisoryparams = list(advisoryparams)
881 878 # checking for duplicated entries
882 879 self._seenparams = set()
883 880 for pname, __ in self._mandatoryparams + self._advisoryparams:
884 881 if pname in self._seenparams:
885 882 raise error.ProgrammingError('duplicated params: %s' % pname)
886 883 self._seenparams.add(pname)
887 884 # status of the part's generation:
888 885 # - None: not started,
889 886 # - False: currently generated,
890 887 # - True: generation done.
891 888 self._generated = None
892 889 self.mandatory = mandatory
893 890
894 891 def __repr__(self):
895 892 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
896 893 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
897 894 % (cls, id(self), self.id, self.type, self.mandatory))
898 895
899 896 def copy(self):
900 897 """return a copy of the part
901 898
902 899 The new part have the very same content but no partid assigned yet.
903 900 Parts with generated data cannot be copied."""
904 901 assert not util.safehasattr(self.data, 'next')
905 902 return self.__class__(self.type, self._mandatoryparams,
906 903 self._advisoryparams, self._data, self.mandatory)
907 904
908 905 # methods used to defines the part content
909 906 @property
910 907 def data(self):
911 908 return self._data
912 909
913 910 @data.setter
914 911 def data(self, data):
915 912 if self._generated is not None:
916 913 raise error.ReadOnlyPartError('part is being generated')
917 914 self._data = data
918 915
919 916 @property
920 917 def mandatoryparams(self):
921 918 # make it an immutable tuple to force people through ``addparam``
922 919 return tuple(self._mandatoryparams)
923 920
924 921 @property
925 922 def advisoryparams(self):
926 923 # make it an immutable tuple to force people through ``addparam``
927 924 return tuple(self._advisoryparams)
928 925
929 926 def addparam(self, name, value='', mandatory=True):
930 927 """add a parameter to the part
931 928
932 929 If 'mandatory' is set to True, the remote handler must claim support
933 930 for this parameter or the unbundling will be aborted.
934 931
935 932 The 'name' and 'value' cannot exceed 255 bytes each.
936 933 """
937 934 if self._generated is not None:
938 935 raise error.ReadOnlyPartError('part is being generated')
939 936 if name in self._seenparams:
940 937 raise ValueError('duplicated params: %s' % name)
941 938 self._seenparams.add(name)
942 939 params = self._advisoryparams
943 940 if mandatory:
944 941 params = self._mandatoryparams
945 942 params.append((name, value))
946 943
947 944 # methods used to generates the bundle2 stream
948 945 def getchunks(self, ui):
949 946 if self._generated is not None:
950 947 raise error.ProgrammingError('part can only be consumed once')
951 948 self._generated = False
952 949
953 950 if ui.debugflag:
954 951 msg = ['bundle2-output-part: "%s"' % self.type]
955 952 if not self.mandatory:
956 953 msg.append(' (advisory)')
957 954 nbmp = len(self.mandatoryparams)
958 955 nbap = len(self.advisoryparams)
959 956 if nbmp or nbap:
960 957 msg.append(' (params:')
961 958 if nbmp:
962 959 msg.append(' %i mandatory' % nbmp)
963 960 if nbap:
964 961 msg.append(' %i advisory' % nbmp)
965 962 msg.append(')')
966 963 if not self.data:
967 964 msg.append(' empty payload')
968 965 elif util.safehasattr(self.data, 'next'):
969 966 msg.append(' streamed payload')
970 967 else:
971 968 msg.append(' %i bytes payload' % len(self.data))
972 969 msg.append('\n')
973 970 ui.debug(''.join(msg))
974 971
975 972 #### header
976 973 if self.mandatory:
977 974 parttype = self.type.upper()
978 975 else:
979 976 parttype = self.type.lower()
980 977 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
981 978 ## parttype
982 979 header = [_pack(_fparttypesize, len(parttype)),
983 980 parttype, _pack(_fpartid, self.id),
984 981 ]
985 982 ## parameters
986 983 # count
987 984 manpar = self.mandatoryparams
988 985 advpar = self.advisoryparams
989 986 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
990 987 # size
991 988 parsizes = []
992 989 for key, value in manpar:
993 990 parsizes.append(len(key))
994 991 parsizes.append(len(value))
995 992 for key, value in advpar:
996 993 parsizes.append(len(key))
997 994 parsizes.append(len(value))
998 995 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
999 996 header.append(paramsizes)
1000 997 # key, value
1001 998 for key, value in manpar:
1002 999 header.append(key)
1003 1000 header.append(value)
1004 1001 for key, value in advpar:
1005 1002 header.append(key)
1006 1003 header.append(value)
1007 1004 ## finalize header
1008 1005 headerchunk = ''.join(header)
1009 1006 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1010 1007 yield _pack(_fpartheadersize, len(headerchunk))
1011 1008 yield headerchunk
1012 1009 ## payload
1013 1010 try:
1014 1011 for chunk in self._payloadchunks():
1015 1012 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1016 1013 yield _pack(_fpayloadsize, len(chunk))
1017 1014 yield chunk
1018 1015 except GeneratorExit:
1019 1016 # GeneratorExit means that nobody is listening for our
1020 1017 # results anyway, so just bail quickly rather than trying
1021 1018 # to produce an error part.
1022 1019 ui.debug('bundle2-generatorexit\n')
1023 1020 raise
1024 1021 except BaseException as exc:
1025 1022 # backup exception data for later
1026 1023 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1027 1024 % exc)
1028 1025 tb = sys.exc_info()[2]
1029 1026 msg = 'unexpected error: %s' % exc
1030 1027 interpart = bundlepart('error:abort', [('message', msg)],
1031 1028 mandatory=False)
1032 1029 interpart.id = 0
1033 1030 yield _pack(_fpayloadsize, -1)
1034 1031 for chunk in interpart.getchunks(ui=ui):
1035 1032 yield chunk
1036 1033 outdebug(ui, 'closing payload chunk')
1037 1034 # abort current part payload
1038 1035 yield _pack(_fpayloadsize, 0)
1039 1036 pycompat.raisewithtb(exc, tb)
1040 1037 # end of payload
1041 1038 outdebug(ui, 'closing payload chunk')
1042 1039 yield _pack(_fpayloadsize, 0)
1043 1040 self._generated = True
1044 1041
1045 1042 def _payloadchunks(self):
1046 1043 """yield chunks of a the part payload
1047 1044
1048 1045 Exists to handle the different methods to provide data to a part."""
1049 1046 # we only support fixed size data now.
1050 1047 # This will be improved in the future.
1051 1048 if util.safehasattr(self.data, 'next'):
1052 1049 buff = util.chunkbuffer(self.data)
1053 1050 chunk = buff.read(preferedchunksize)
1054 1051 while chunk:
1055 1052 yield chunk
1056 1053 chunk = buff.read(preferedchunksize)
1057 1054 elif len(self.data):
1058 1055 yield self.data
1059 1056
1060 1057
1061 1058 flaginterrupt = -1
1062 1059
1063 1060 class interrupthandler(unpackermixin):
1064 1061 """read one part and process it with restricted capability
1065 1062
1066 1063 This allows to transmit exception raised on the producer size during part
1067 1064 iteration while the consumer is reading a part.
1068 1065
1069 1066 Part processed in this manner only have access to a ui object,"""
1070 1067
1071 1068 def __init__(self, ui, fp):
1072 1069 super(interrupthandler, self).__init__(fp)
1073 1070 self.ui = ui
1074 1071
1075 1072 def _readpartheader(self):
1076 1073 """reads a part header size and return the bytes blob
1077 1074
1078 1075 returns None if empty"""
1079 1076 headersize = self._unpack(_fpartheadersize)[0]
1080 1077 if headersize < 0:
1081 1078 raise error.BundleValueError('negative part header size: %i'
1082 1079 % headersize)
1083 1080 indebug(self.ui, 'part header size: %i\n' % headersize)
1084 1081 if headersize:
1085 1082 return self._readexact(headersize)
1086 1083 return None
1087 1084
1088 1085 def __call__(self):
1089 1086
1090 1087 self.ui.debug('bundle2-input-stream-interrupt:'
1091 1088 ' opening out of band context\n')
1092 1089 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1093 1090 headerblock = self._readpartheader()
1094 1091 if headerblock is None:
1095 1092 indebug(self.ui, 'no part found during interruption.')
1096 1093 return
1097 1094 part = unbundlepart(self.ui, headerblock, self._fp)
1098 1095 op = interruptoperation(self.ui)
1099 1096 _processpart(op, part)
1100 1097 self.ui.debug('bundle2-input-stream-interrupt:'
1101 1098 ' closing out of band context\n')
1102 1099
1103 1100 class interruptoperation(object):
1104 1101 """A limited operation to be use by part handler during interruption
1105 1102
1106 1103 It only have access to an ui object.
1107 1104 """
1108 1105
1109 1106 def __init__(self, ui):
1110 1107 self.ui = ui
1111 1108 self.reply = None
1112 1109 self.captureoutput = False
1113 1110
1114 1111 @property
1115 1112 def repo(self):
1116 1113 raise error.ProgrammingError('no repo access from stream interruption')
1117 1114
1118 1115 def gettransaction(self):
1119 1116 raise TransactionUnavailable('no repo access from stream interruption')
1120 1117
1121 1118 class unbundlepart(unpackermixin):
1122 1119 """a bundle part read from a bundle"""
1123 1120
1124 1121 def __init__(self, ui, header, fp):
1125 1122 super(unbundlepart, self).__init__(fp)
1126 1123 self._seekable = (util.safehasattr(fp, 'seek') and
1127 1124 util.safehasattr(fp, 'tell'))
1128 1125 self.ui = ui
1129 1126 # unbundle state attr
1130 1127 self._headerdata = header
1131 1128 self._headeroffset = 0
1132 1129 self._initialized = False
1133 1130 self.consumed = False
1134 1131 # part data
1135 1132 self.id = None
1136 1133 self.type = None
1137 1134 self.mandatoryparams = None
1138 1135 self.advisoryparams = None
1139 1136 self.params = None
1140 1137 self.mandatorykeys = ()
1141 1138 self._payloadstream = None
1142 1139 self._readheader()
1143 1140 self._mandatory = None
1144 1141 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1145 1142 self._pos = 0
1146 1143
1147 1144 def _fromheader(self, size):
1148 1145 """return the next <size> byte from the header"""
1149 1146 offset = self._headeroffset
1150 1147 data = self._headerdata[offset:(offset + size)]
1151 1148 self._headeroffset = offset + size
1152 1149 return data
1153 1150
1154 1151 def _unpackheader(self, format):
1155 1152 """read given format from header
1156 1153
1157 1154 This automatically compute the size of the format to read."""
1158 1155 data = self._fromheader(struct.calcsize(format))
1159 1156 return _unpack(format, data)
1160 1157
1161 1158 def _initparams(self, mandatoryparams, advisoryparams):
1162 1159 """internal function to setup all logic related parameters"""
1163 1160 # make it read only to prevent people touching it by mistake.
1164 1161 self.mandatoryparams = tuple(mandatoryparams)
1165 1162 self.advisoryparams = tuple(advisoryparams)
1166 1163 # user friendly UI
1167 1164 self.params = util.sortdict(self.mandatoryparams)
1168 1165 self.params.update(self.advisoryparams)
1169 1166 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1170 1167
1171 1168 def _payloadchunks(self, chunknum=0):
1172 1169 '''seek to specified chunk and start yielding data'''
1173 1170 if len(self._chunkindex) == 0:
1174 1171 assert chunknum == 0, 'Must start with chunk 0'
1175 1172 self._chunkindex.append((0, self._tellfp()))
1176 1173 else:
1177 1174 assert chunknum < len(self._chunkindex), \
1178 1175 'Unknown chunk %d' % chunknum
1179 1176 self._seekfp(self._chunkindex[chunknum][1])
1180 1177
1181 1178 pos = self._chunkindex[chunknum][0]
1182 1179 payloadsize = self._unpack(_fpayloadsize)[0]
1183 1180 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1184 1181 while payloadsize:
1185 1182 if payloadsize == flaginterrupt:
1186 1183 # interruption detection, the handler will now read a
1187 1184 # single part and process it.
1188 1185 interrupthandler(self.ui, self._fp)()
1189 1186 elif payloadsize < 0:
1190 1187 msg = 'negative payload chunk size: %i' % payloadsize
1191 1188 raise error.BundleValueError(msg)
1192 1189 else:
1193 1190 result = self._readexact(payloadsize)
1194 1191 chunknum += 1
1195 1192 pos += payloadsize
1196 1193 if chunknum == len(self._chunkindex):
1197 1194 self._chunkindex.append((pos, self._tellfp()))
1198 1195 yield result
1199 1196 payloadsize = self._unpack(_fpayloadsize)[0]
1200 1197 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1201 1198
1202 1199 def _findchunk(self, pos):
1203 1200 '''for a given payload position, return a chunk number and offset'''
1204 1201 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1205 1202 if ppos == pos:
1206 1203 return chunk, 0
1207 1204 elif ppos > pos:
1208 1205 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1209 1206 raise ValueError('Unknown chunk')
1210 1207
1211 1208 def _readheader(self):
1212 1209 """read the header and setup the object"""
1213 1210 typesize = self._unpackheader(_fparttypesize)[0]
1214 1211 self.type = self._fromheader(typesize)
1215 1212 indebug(self.ui, 'part type: "%s"' % self.type)
1216 1213 self.id = self._unpackheader(_fpartid)[0]
1217 1214 indebug(self.ui, 'part id: "%s"' % self.id)
1218 1215 # extract mandatory bit from type
1219 1216 self.mandatory = (self.type != self.type.lower())
1220 1217 self.type = self.type.lower()
1221 1218 ## reading parameters
1222 1219 # param count
1223 1220 mancount, advcount = self._unpackheader(_fpartparamcount)
1224 1221 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1225 1222 # param size
1226 1223 fparamsizes = _makefpartparamsizes(mancount + advcount)
1227 1224 paramsizes = self._unpackheader(fparamsizes)
1228 1225 # make it a list of couple again
1229 1226 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1230 1227 # split mandatory from advisory
1231 1228 mansizes = paramsizes[:mancount]
1232 1229 advsizes = paramsizes[mancount:]
1233 1230 # retrieve param value
1234 1231 manparams = []
1235 1232 for key, value in mansizes:
1236 1233 manparams.append((self._fromheader(key), self._fromheader(value)))
1237 1234 advparams = []
1238 1235 for key, value in advsizes:
1239 1236 advparams.append((self._fromheader(key), self._fromheader(value)))
1240 1237 self._initparams(manparams, advparams)
1241 1238 ## part payload
1242 1239 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1243 1240 # we read the data, tell it
1244 1241 self._initialized = True
1245 1242
1246 1243 def read(self, size=None):
1247 1244 """read payload data"""
1248 1245 if not self._initialized:
1249 1246 self._readheader()
1250 1247 if size is None:
1251 1248 data = self._payloadstream.read()
1252 1249 else:
1253 1250 data = self._payloadstream.read(size)
1254 1251 self._pos += len(data)
1255 1252 if size is None or len(data) < size:
1256 1253 if not self.consumed and self._pos:
1257 1254 self.ui.debug('bundle2-input-part: total payload size %i\n'
1258 1255 % self._pos)
1259 1256 self.consumed = True
1260 1257 return data
1261 1258
1262 1259 def tell(self):
1263 1260 return self._pos
1264 1261
1265 1262 def seek(self, offset, whence=0):
1266 1263 if whence == 0:
1267 1264 newpos = offset
1268 1265 elif whence == 1:
1269 1266 newpos = self._pos + offset
1270 1267 elif whence == 2:
1271 1268 if not self.consumed:
1272 1269 self.read()
1273 1270 newpos = self._chunkindex[-1][0] - offset
1274 1271 else:
1275 1272 raise ValueError('Unknown whence value: %r' % (whence,))
1276 1273
1277 1274 if newpos > self._chunkindex[-1][0] and not self.consumed:
1278 1275 self.read()
1279 1276 if not 0 <= newpos <= self._chunkindex[-1][0]:
1280 1277 raise ValueError('Offset out of range')
1281 1278
1282 1279 if self._pos != newpos:
1283 1280 chunk, internaloffset = self._findchunk(newpos)
1284 1281 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1285 1282 adjust = self.read(internaloffset)
1286 1283 if len(adjust) != internaloffset:
1287 1284 raise error.Abort(_('Seek failed\n'))
1288 1285 self._pos = newpos
1289 1286
1290 1287 def _seekfp(self, offset, whence=0):
1291 1288 """move the underlying file pointer
1292 1289
1293 1290 This method is meant for internal usage by the bundle2 protocol only.
1294 1291 They directly manipulate the low level stream including bundle2 level
1295 1292 instruction.
1296 1293
1297 1294 Do not use it to implement higher-level logic or methods."""
1298 1295 if self._seekable:
1299 1296 return self._fp.seek(offset, whence)
1300 1297 else:
1301 1298 raise NotImplementedError(_('File pointer is not seekable'))
1302 1299
1303 1300 def _tellfp(self):
1304 1301 """return the file offset, or None if file is not seekable
1305 1302
1306 1303 This method is meant for internal usage by the bundle2 protocol only.
1307 1304 They directly manipulate the low level stream including bundle2 level
1308 1305 instruction.
1309 1306
1310 1307 Do not use it to implement higher-level logic or methods."""
1311 1308 if self._seekable:
1312 1309 try:
1313 1310 return self._fp.tell()
1314 1311 except IOError as e:
1315 1312 if e.errno == errno.ESPIPE:
1316 1313 self._seekable = False
1317 1314 else:
1318 1315 raise
1319 1316 return None
1320 1317
1321 1318 # These are only the static capabilities.
1322 1319 # Check the 'getrepocaps' function for the rest.
1323 1320 capabilities = {'HG20': (),
1324 1321 'error': ('abort', 'unsupportedcontent', 'pushraced',
1325 1322 'pushkey'),
1326 1323 'listkeys': (),
1327 1324 'pushkey': (),
1328 1325 'digests': tuple(sorted(util.DIGESTS.keys())),
1329 1326 'remote-changegroup': ('http', 'https'),
1330 1327 'hgtagsfnodes': (),
1331 1328 }
1332 1329
1333 1330 def getrepocaps(repo, allowpushback=False):
1334 1331 """return the bundle2 capabilities for a given repo
1335 1332
1336 1333 Exists to allow extensions (like evolution) to mutate the capabilities.
1337 1334 """
1338 1335 caps = capabilities.copy()
1339 1336 caps['changegroup'] = tuple(sorted(
1340 1337 changegroup.supportedincomingversions(repo)))
1341 1338 if obsolete.isenabled(repo, obsolete.exchangeopt):
1342 1339 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1343 1340 caps['obsmarkers'] = supportedformat
1344 1341 if allowpushback:
1345 1342 caps['pushback'] = ()
1346 1343 cpmode = repo.ui.config('server', 'concurrent-push-mode', 'strict')
1347 1344 if cpmode == 'check-related':
1348 1345 caps['checkheads'] = ('related',)
1349 1346 return caps
1350 1347
1351 1348 def bundle2caps(remote):
1352 1349 """return the bundle capabilities of a peer as dict"""
1353 1350 raw = remote.capable('bundle2')
1354 1351 if not raw and raw != '':
1355 1352 return {}
1356 1353 capsblob = urlreq.unquote(remote.capable('bundle2'))
1357 1354 return decodecaps(capsblob)
1358 1355
1359 1356 def obsmarkersversion(caps):
1360 1357 """extract the list of supported obsmarkers versions from a bundle2caps dict
1361 1358 """
1362 1359 obscaps = caps.get('obsmarkers', ())
1363 1360 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1364 1361
1365 1362 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1366 1363 vfs=None, compression=None, compopts=None):
1367 1364 if bundletype.startswith('HG10'):
1368 1365 cg = changegroup.getchangegroup(repo, source, outgoing, version='01')
1369 1366 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1370 1367 compression=compression, compopts=compopts)
1371 1368 elif not bundletype.startswith('HG20'):
1372 1369 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1373 1370
1374 1371 caps = {}
1375 1372 if 'obsolescence' in opts:
1376 1373 caps['obsmarkers'] = ('V1',)
1377 1374 bundle = bundle20(ui, caps)
1378 1375 bundle.setcompression(compression, compopts)
1379 1376 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1380 1377 chunkiter = bundle.getchunks()
1381 1378
1382 1379 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1383 1380
1384 1381 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1385 1382 # We should eventually reconcile this logic with the one behind
1386 1383 # 'exchange.getbundle2partsgenerator'.
1387 1384 #
1388 1385 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1389 1386 # different right now. So we keep them separated for now for the sake of
1390 1387 # simplicity.
1391 1388
1392 1389 # we always want a changegroup in such bundle
1393 1390 cgversion = opts.get('cg.version')
1394 1391 if cgversion is None:
1395 1392 cgversion = changegroup.safeversion(repo)
1396 1393 cg = changegroup.getchangegroup(repo, source, outgoing,
1397 1394 version=cgversion)
1398 1395 part = bundler.newpart('changegroup', data=cg.getchunks())
1399 1396 part.addparam('version', cg.version)
1400 1397 if 'clcount' in cg.extras:
1401 1398 part.addparam('nbchanges', str(cg.extras['clcount']),
1402 1399 mandatory=False)
1403 1400
1404 1401 addparttagsfnodescache(repo, bundler, outgoing)
1405 1402
1406 1403 if opts.get('obsolescence', False):
1407 1404 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1408 1405 buildobsmarkerspart(bundler, obsmarkers)
1409 1406
1410 1407 if opts.get('phases', False):
1411 1408 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1412 1409 phasedata = []
1413 1410 for phase in phases.allphases:
1414 1411 for head in headsbyphase[phase]:
1415 1412 phasedata.append(_pack(_fphasesentry, phase, head))
1416 1413 bundler.newpart('phase-heads', data=''.join(phasedata))
1417 1414
1418 1415 def addparttagsfnodescache(repo, bundler, outgoing):
1419 1416 # we include the tags fnode cache for the bundle changeset
1420 1417 # (as an optional parts)
1421 1418 cache = tags.hgtagsfnodescache(repo.unfiltered())
1422 1419 chunks = []
1423 1420
1424 1421 # .hgtags fnodes are only relevant for head changesets. While we could
1425 1422 # transfer values for all known nodes, there will likely be little to
1426 1423 # no benefit.
1427 1424 #
1428 1425 # We don't bother using a generator to produce output data because
1429 1426 # a) we only have 40 bytes per head and even esoteric numbers of heads
1430 1427 # consume little memory (1M heads is 40MB) b) we don't want to send the
1431 1428 # part if we don't have entries and knowing if we have entries requires
1432 1429 # cache lookups.
1433 1430 for node in outgoing.missingheads:
1434 1431 # Don't compute missing, as this may slow down serving.
1435 1432 fnode = cache.getfnode(node, computemissing=False)
1436 1433 if fnode is not None:
1437 1434 chunks.extend([node, fnode])
1438 1435
1439 1436 if chunks:
1440 1437 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1441 1438
1442 1439 def buildobsmarkerspart(bundler, markers):
1443 1440 """add an obsmarker part to the bundler with <markers>
1444 1441
1445 1442 No part is created if markers is empty.
1446 1443 Raises ValueError if the bundler doesn't support any known obsmarker format.
1447 1444 """
1448 1445 if not markers:
1449 1446 return None
1450 1447
1451 1448 remoteversions = obsmarkersversion(bundler.capabilities)
1452 1449 version = obsolete.commonversion(remoteversions)
1453 1450 if version is None:
1454 1451 raise ValueError('bundler does not support common obsmarker format')
1455 1452 stream = obsolete.encodemarkers(markers, True, version=version)
1456 1453 return bundler.newpart('obsmarkers', data=stream)
1457 1454
1458 1455 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1459 1456 compopts=None):
1460 1457 """Write a bundle file and return its filename.
1461 1458
1462 1459 Existing files will not be overwritten.
1463 1460 If no filename is specified, a temporary file is created.
1464 1461 bz2 compression can be turned off.
1465 1462 The bundle file will be deleted in case of errors.
1466 1463 """
1467 1464
1468 1465 if bundletype == "HG20":
1469 1466 bundle = bundle20(ui)
1470 1467 bundle.setcompression(compression, compopts)
1471 1468 part = bundle.newpart('changegroup', data=cg.getchunks())
1472 1469 part.addparam('version', cg.version)
1473 1470 if 'clcount' in cg.extras:
1474 1471 part.addparam('nbchanges', str(cg.extras['clcount']),
1475 1472 mandatory=False)
1476 1473 chunkiter = bundle.getchunks()
1477 1474 else:
1478 1475 # compression argument is only for the bundle2 case
1479 1476 assert compression is None
1480 1477 if cg.version != '01':
1481 1478 raise error.Abort(_('old bundle types only supports v1 '
1482 1479 'changegroups'))
1483 1480 header, comp = bundletypes[bundletype]
1484 1481 if comp not in util.compengines.supportedbundletypes:
1485 1482 raise error.Abort(_('unknown stream compression type: %s')
1486 1483 % comp)
1487 1484 compengine = util.compengines.forbundletype(comp)
1488 1485 def chunkiter():
1489 1486 yield header
1490 1487 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1491 1488 yield chunk
1492 1489 chunkiter = chunkiter()
1493 1490
1494 1491 # parse the changegroup data, otherwise we will block
1495 1492 # in case of sshrepo because we don't know the end of the stream
1496 1493 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1497 1494
1498 1495 def combinechangegroupresults(op):
1499 1496 """logic to combine 0 or more addchangegroup results into one"""
1500 1497 results = [r.get('return', 0)
1501 1498 for r in op.records['changegroup']]
1502 1499 changedheads = 0
1503 1500 result = 1
1504 1501 for ret in results:
1505 1502 # If any changegroup result is 0, return 0
1506 1503 if ret == 0:
1507 1504 result = 0
1508 1505 break
1509 1506 if ret < -1:
1510 1507 changedheads += ret + 1
1511 1508 elif ret > 1:
1512 1509 changedheads += ret - 1
1513 1510 if changedheads > 0:
1514 1511 result = 1 + changedheads
1515 1512 elif changedheads < 0:
1516 1513 result = -1 + changedheads
1517 1514 return result
1518 1515
1519 1516 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1520 1517 def handlechangegroup(op, inpart):
1521 1518 """apply a changegroup part on the repo
1522 1519
1523 1520 This is a very early implementation that will massive rework before being
1524 1521 inflicted to any end-user.
1525 1522 """
1526 1523 tr = op.gettransaction()
1527 1524 unpackerversion = inpart.params.get('version', '01')
1528 1525 # We should raise an appropriate exception here
1529 1526 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1530 1527 # the source and url passed here are overwritten by the one contained in
1531 1528 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1532 1529 nbchangesets = None
1533 1530 if 'nbchanges' in inpart.params:
1534 1531 nbchangesets = int(inpart.params.get('nbchanges'))
1535 1532 if ('treemanifest' in inpart.params and
1536 1533 'treemanifest' not in op.repo.requirements):
1537 1534 if len(op.repo.changelog) != 0:
1538 1535 raise error.Abort(_(
1539 1536 "bundle contains tree manifests, but local repo is "
1540 1537 "non-empty and does not use tree manifests"))
1541 1538 op.repo.requirements.add('treemanifest')
1542 1539 op.repo._applyopenerreqs()
1543 1540 op.repo._writerequirements()
1544 1541 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1545 1542 expectedtotal=nbchangesets)
1546 1543 if op.reply is not None:
1547 1544 # This is definitely not the final form of this
1548 1545 # return. But one need to start somewhere.
1549 1546 part = op.reply.newpart('reply:changegroup', mandatory=False)
1550 1547 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1551 1548 part.addparam('return', '%i' % ret, mandatory=False)
1552 1549 assert not inpart.read()
1553 1550
1554 1551 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1555 1552 ['digest:%s' % k for k in util.DIGESTS.keys()])
1556 1553 @parthandler('remote-changegroup', _remotechangegroupparams)
1557 1554 def handleremotechangegroup(op, inpart):
1558 1555 """apply a bundle10 on the repo, given an url and validation information
1559 1556
1560 1557 All the information about the remote bundle to import are given as
1561 1558 parameters. The parameters include:
1562 1559 - url: the url to the bundle10.
1563 1560 - size: the bundle10 file size. It is used to validate what was
1564 1561 retrieved by the client matches the server knowledge about the bundle.
1565 1562 - digests: a space separated list of the digest types provided as
1566 1563 parameters.
1567 1564 - digest:<digest-type>: the hexadecimal representation of the digest with
1568 1565 that name. Like the size, it is used to validate what was retrieved by
1569 1566 the client matches what the server knows about the bundle.
1570 1567
1571 1568 When multiple digest types are given, all of them are checked.
1572 1569 """
1573 1570 try:
1574 1571 raw_url = inpart.params['url']
1575 1572 except KeyError:
1576 1573 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1577 1574 parsed_url = util.url(raw_url)
1578 1575 if parsed_url.scheme not in capabilities['remote-changegroup']:
1579 1576 raise error.Abort(_('remote-changegroup does not support %s urls') %
1580 1577 parsed_url.scheme)
1581 1578
1582 1579 try:
1583 1580 size = int(inpart.params['size'])
1584 1581 except ValueError:
1585 1582 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1586 1583 % 'size')
1587 1584 except KeyError:
1588 1585 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1589 1586
1590 1587 digests = {}
1591 1588 for typ in inpart.params.get('digests', '').split():
1592 1589 param = 'digest:%s' % typ
1593 1590 try:
1594 1591 value = inpart.params[param]
1595 1592 except KeyError:
1596 1593 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1597 1594 param)
1598 1595 digests[typ] = value
1599 1596
1600 1597 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1601 1598
1602 1599 tr = op.gettransaction()
1603 1600 from . import exchange
1604 1601 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1605 1602 if not isinstance(cg, changegroup.cg1unpacker):
1606 1603 raise error.Abort(_('%s: not a bundle version 1.0') %
1607 1604 util.hidepassword(raw_url))
1608 1605 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1609 1606 if op.reply is not None:
1610 1607 # This is definitely not the final form of this
1611 1608 # return. But one need to start somewhere.
1612 1609 part = op.reply.newpart('reply:changegroup')
1613 1610 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1614 1611 part.addparam('return', '%i' % ret, mandatory=False)
1615 1612 try:
1616 1613 real_part.validate()
1617 1614 except error.Abort as e:
1618 1615 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1619 1616 (util.hidepassword(raw_url), str(e)))
1620 1617 assert not inpart.read()
1621 1618
1622 1619 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1623 1620 def handlereplychangegroup(op, inpart):
1624 1621 ret = int(inpart.params['return'])
1625 1622 replyto = int(inpart.params['in-reply-to'])
1626 1623 op.records.add('changegroup', {'return': ret}, replyto)
1627 1624
1628 1625 @parthandler('check:heads')
1629 1626 def handlecheckheads(op, inpart):
1630 1627 """check that head of the repo did not change
1631 1628
1632 1629 This is used to detect a push race when using unbundle.
1633 1630 This replaces the "heads" argument of unbundle."""
1634 1631 h = inpart.read(20)
1635 1632 heads = []
1636 1633 while len(h) == 20:
1637 1634 heads.append(h)
1638 1635 h = inpart.read(20)
1639 1636 assert not h
1640 1637 # Trigger a transaction so that we are guaranteed to have the lock now.
1641 1638 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1642 1639 op.gettransaction()
1643 1640 if sorted(heads) != sorted(op.repo.heads()):
1644 1641 raise error.PushRaced('repository changed while pushing - '
1645 1642 'please try again')
1646 1643
1647 1644 @parthandler('check:updated-heads')
1648 1645 def handlecheckupdatedheads(op, inpart):
1649 1646 """check for race on the heads touched by a push
1650 1647
1651 1648 This is similar to 'check:heads' but focus on the heads actually updated
1652 1649 during the push. If other activities happen on unrelated heads, it is
1653 1650 ignored.
1654 1651
1655 1652 This allow server with high traffic to avoid push contention as long as
1656 1653 unrelated parts of the graph are involved."""
1657 1654 h = inpart.read(20)
1658 1655 heads = []
1659 1656 while len(h) == 20:
1660 1657 heads.append(h)
1661 1658 h = inpart.read(20)
1662 1659 assert not h
1663 1660 # trigger a transaction so that we are guaranteed to have the lock now.
1664 1661 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1665 1662 op.gettransaction()
1666 1663
1667 1664 currentheads = set()
1668 1665 for ls in op.repo.branchmap().itervalues():
1669 1666 currentheads.update(ls)
1670 1667
1671 1668 for h in heads:
1672 1669 if h not in currentheads:
1673 1670 raise error.PushRaced('repository changed while pushing - '
1674 1671 'please try again')
1675 1672
1676 1673 @parthandler('output')
1677 1674 def handleoutput(op, inpart):
1678 1675 """forward output captured on the server to the client"""
1679 1676 for line in inpart.read().splitlines():
1680 1677 op.ui.status(_('remote: %s\n') % line)
1681 1678
1682 1679 @parthandler('replycaps')
1683 1680 def handlereplycaps(op, inpart):
1684 1681 """Notify that a reply bundle should be created
1685 1682
1686 1683 The payload contains the capabilities information for the reply"""
1687 1684 caps = decodecaps(inpart.read())
1688 1685 if op.reply is None:
1689 1686 op.reply = bundle20(op.ui, caps)
1690 1687
1691 1688 class AbortFromPart(error.Abort):
1692 1689 """Sub-class of Abort that denotes an error from a bundle2 part."""
1693 1690
1694 1691 @parthandler('error:abort', ('message', 'hint'))
1695 1692 def handleerrorabort(op, inpart):
1696 1693 """Used to transmit abort error over the wire"""
1697 1694 raise AbortFromPart(inpart.params['message'],
1698 1695 hint=inpart.params.get('hint'))
1699 1696
1700 1697 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1701 1698 'in-reply-to'))
1702 1699 def handleerrorpushkey(op, inpart):
1703 1700 """Used to transmit failure of a mandatory pushkey over the wire"""
1704 1701 kwargs = {}
1705 1702 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1706 1703 value = inpart.params.get(name)
1707 1704 if value is not None:
1708 1705 kwargs[name] = value
1709 1706 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1710 1707
1711 1708 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1712 1709 def handleerrorunsupportedcontent(op, inpart):
1713 1710 """Used to transmit unknown content error over the wire"""
1714 1711 kwargs = {}
1715 1712 parttype = inpart.params.get('parttype')
1716 1713 if parttype is not None:
1717 1714 kwargs['parttype'] = parttype
1718 1715 params = inpart.params.get('params')
1719 1716 if params is not None:
1720 1717 kwargs['params'] = params.split('\0')
1721 1718
1722 1719 raise error.BundleUnknownFeatureError(**kwargs)
1723 1720
1724 1721 @parthandler('error:pushraced', ('message',))
1725 1722 def handleerrorpushraced(op, inpart):
1726 1723 """Used to transmit push race error over the wire"""
1727 1724 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1728 1725
1729 1726 @parthandler('listkeys', ('namespace',))
1730 1727 def handlelistkeys(op, inpart):
1731 1728 """retrieve pushkey namespace content stored in a bundle2"""
1732 1729 namespace = inpart.params['namespace']
1733 1730 r = pushkey.decodekeys(inpart.read())
1734 1731 op.records.add('listkeys', (namespace, r))
1735 1732
1736 1733 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1737 1734 def handlepushkey(op, inpart):
1738 1735 """process a pushkey request"""
1739 1736 dec = pushkey.decode
1740 1737 namespace = dec(inpart.params['namespace'])
1741 1738 key = dec(inpart.params['key'])
1742 1739 old = dec(inpart.params['old'])
1743 1740 new = dec(inpart.params['new'])
1744 1741 # Grab the transaction to ensure that we have the lock before performing the
1745 1742 # pushkey.
1746 1743 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1747 1744 op.gettransaction()
1748 1745 ret = op.repo.pushkey(namespace, key, old, new)
1749 1746 record = {'namespace': namespace,
1750 1747 'key': key,
1751 1748 'old': old,
1752 1749 'new': new}
1753 1750 op.records.add('pushkey', record)
1754 1751 if op.reply is not None:
1755 1752 rpart = op.reply.newpart('reply:pushkey')
1756 1753 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1757 1754 rpart.addparam('return', '%i' % ret, mandatory=False)
1758 1755 if inpart.mandatory and not ret:
1759 1756 kwargs = {}
1760 1757 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1761 1758 if key in inpart.params:
1762 1759 kwargs[key] = inpart.params[key]
1763 1760 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1764 1761
1765 1762 def _readphaseheads(inpart):
1766 1763 headsbyphase = [[] for i in phases.allphases]
1767 1764 entrysize = struct.calcsize(_fphasesentry)
1768 1765 while True:
1769 1766 entry = inpart.read(entrysize)
1770 1767 if len(entry) < entrysize:
1771 1768 if entry:
1772 1769 raise error.Abort(_('bad phase-heads bundle part'))
1773 1770 break
1774 1771 phase, node = struct.unpack(_fphasesentry, entry)
1775 1772 headsbyphase[phase].append(node)
1776 1773 return headsbyphase
1777 1774
1778 1775 @parthandler('phase-heads')
1779 1776 def handlephases(op, inpart):
1780 1777 """apply phases from bundle part to repo"""
1781 1778 headsbyphase = _readphaseheads(inpart)
1782 1779 addednodes = []
1783 1780 for entry in op.records['changegroup']:
1784 1781 addednodes.extend(entry['addednodes'])
1785 1782 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase,
1786 1783 addednodes)
1787 1784
1788 1785 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1789 1786 def handlepushkeyreply(op, inpart):
1790 1787 """retrieve the result of a pushkey request"""
1791 1788 ret = int(inpart.params['return'])
1792 1789 partid = int(inpart.params['in-reply-to'])
1793 1790 op.records.add('pushkey', {'return': ret}, partid)
1794 1791
1795 1792 @parthandler('obsmarkers')
1796 1793 def handleobsmarker(op, inpart):
1797 1794 """add a stream of obsmarkers to the repo"""
1798 1795 tr = op.gettransaction()
1799 1796 markerdata = inpart.read()
1800 1797 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1801 1798 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1802 1799 % len(markerdata))
1803 1800 # The mergemarkers call will crash if marker creation is not enabled.
1804 1801 # we want to avoid this if the part is advisory.
1805 1802 if not inpart.mandatory and op.repo.obsstore.readonly:
1806 1803 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1807 1804 return
1808 1805 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1809 1806 op.repo.invalidatevolatilesets()
1810 1807 if new:
1811 1808 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1812 1809 op.records.add('obsmarkers', {'new': new})
1813 1810 if op.reply is not None:
1814 1811 rpart = op.reply.newpart('reply:obsmarkers')
1815 1812 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1816 1813 rpart.addparam('new', '%i' % new, mandatory=False)
1817 1814
1818 1815
1819 1816 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1820 1817 def handleobsmarkerreply(op, inpart):
1821 1818 """retrieve the result of a pushkey request"""
1822 1819 ret = int(inpart.params['new'])
1823 1820 partid = int(inpart.params['in-reply-to'])
1824 1821 op.records.add('obsmarkers', {'new': ret}, partid)
1825 1822
1826 1823 @parthandler('hgtagsfnodes')
1827 1824 def handlehgtagsfnodes(op, inpart):
1828 1825 """Applies .hgtags fnodes cache entries to the local repo.
1829 1826
1830 1827 Payload is pairs of 20 byte changeset nodes and filenodes.
1831 1828 """
1832 1829 # Grab the transaction so we ensure that we have the lock at this point.
1833 1830 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1834 1831 op.gettransaction()
1835 1832 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1836 1833
1837 1834 count = 0
1838 1835 while True:
1839 1836 node = inpart.read(20)
1840 1837 fnode = inpart.read(20)
1841 1838 if len(node) < 20 or len(fnode) < 20:
1842 1839 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1843 1840 break
1844 1841 cache.setfnode(node, fnode)
1845 1842 count += 1
1846 1843
1847 1844 cache.write()
1848 1845 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now