##// END OF EJS Templates
bundle2: remove unnecessary try finally...
Durham Goode -
r34261:cc7b37c9 default
parent child Browse files
Show More
@@ -1,1939 +1,1935
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import, division
149 149
150 150 import errno
151 151 import re
152 152 import string
153 153 import struct
154 154 import sys
155 155
156 156 from .i18n import _
157 157 from . import (
158 158 changegroup,
159 159 error,
160 160 obsolete,
161 161 phases,
162 162 pushkey,
163 163 pycompat,
164 164 tags,
165 165 url,
166 166 util,
167 167 )
168 168
169 169 urlerr = util.urlerr
170 170 urlreq = util.urlreq
171 171
172 172 _pack = struct.pack
173 173 _unpack = struct.unpack
174 174
175 175 _fstreamparamsize = '>i'
176 176 _fpartheadersize = '>i'
177 177 _fparttypesize = '>B'
178 178 _fpartid = '>I'
179 179 _fpayloadsize = '>i'
180 180 _fpartparamcount = '>BB'
181 181
182 182 _fphasesentry = '>i20s'
183 183
184 184 preferedchunksize = 4096
185 185
186 186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
187 187
188 188 def outdebug(ui, message):
189 189 """debug regarding output stream (bundling)"""
190 190 if ui.configbool('devel', 'bundle2.debug'):
191 191 ui.debug('bundle2-output: %s\n' % message)
192 192
193 193 def indebug(ui, message):
194 194 """debug on input stream (unbundling)"""
195 195 if ui.configbool('devel', 'bundle2.debug'):
196 196 ui.debug('bundle2-input: %s\n' % message)
197 197
198 198 def validateparttype(parttype):
199 199 """raise ValueError if a parttype contains invalid character"""
200 200 if _parttypeforbidden.search(parttype):
201 201 raise ValueError(parttype)
202 202
203 203 def _makefpartparamsizes(nbparams):
204 204 """return a struct format to read part parameter sizes
205 205
206 206 The number parameters is variable so we need to build that format
207 207 dynamically.
208 208 """
209 209 return '>'+('BB'*nbparams)
210 210
211 211 parthandlermapping = {}
212 212
213 213 def parthandler(parttype, params=()):
214 214 """decorator that register a function as a bundle2 part handler
215 215
216 216 eg::
217 217
218 218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
219 219 def myparttypehandler(...):
220 220 '''process a part of type "my part".'''
221 221 ...
222 222 """
223 223 validateparttype(parttype)
224 224 def _decorator(func):
225 225 lparttype = parttype.lower() # enforce lower case matching.
226 226 assert lparttype not in parthandlermapping
227 227 parthandlermapping[lparttype] = func
228 228 func.params = frozenset(params)
229 229 return func
230 230 return _decorator
231 231
232 232 class unbundlerecords(object):
233 233 """keep record of what happens during and unbundle
234 234
235 235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
236 236 category of record and obj is an arbitrary object.
237 237
238 238 `records['cat']` will return all entries of this category 'cat'.
239 239
240 240 Iterating on the object itself will yield `('category', obj)` tuples
241 241 for all entries.
242 242
243 243 All iterations happens in chronological order.
244 244 """
245 245
246 246 def __init__(self):
247 247 self._categories = {}
248 248 self._sequences = []
249 249 self._replies = {}
250 250
251 251 def add(self, category, entry, inreplyto=None):
252 252 """add a new record of a given category.
253 253
254 254 The entry can then be retrieved in the list returned by
255 255 self['category']."""
256 256 self._categories.setdefault(category, []).append(entry)
257 257 self._sequences.append((category, entry))
258 258 if inreplyto is not None:
259 259 self.getreplies(inreplyto).add(category, entry)
260 260
261 261 def getreplies(self, partid):
262 262 """get the records that are replies to a specific part"""
263 263 return self._replies.setdefault(partid, unbundlerecords())
264 264
265 265 def __getitem__(self, cat):
266 266 return tuple(self._categories.get(cat, ()))
267 267
268 268 def __iter__(self):
269 269 return iter(self._sequences)
270 270
271 271 def __len__(self):
272 272 return len(self._sequences)
273 273
274 274 def __nonzero__(self):
275 275 return bool(self._sequences)
276 276
277 277 __bool__ = __nonzero__
278 278
279 279 class bundleoperation(object):
280 280 """an object that represents a single bundling process
281 281
282 282 Its purpose is to carry unbundle-related objects and states.
283 283
284 284 A new object should be created at the beginning of each bundle processing.
285 285 The object is to be returned by the processing function.
286 286
287 287 The object has very little content now it will ultimately contain:
288 288 * an access to the repo the bundle is applied to,
289 289 * a ui object,
290 290 * a way to retrieve a transaction to add changes to the repo,
291 291 * a way to record the result of processing each part,
292 292 * a way to construct a bundle response when applicable.
293 293 """
294 294
295 295 def __init__(self, repo, transactiongetter, captureoutput=True):
296 296 self.repo = repo
297 297 self.ui = repo.ui
298 298 self.records = unbundlerecords()
299 299 self.reply = None
300 300 self.captureoutput = captureoutput
301 301 self.hookargs = {}
302 302 self._gettransaction = transactiongetter
303 303
304 304 def gettransaction(self):
305 305 transaction = self._gettransaction()
306 306
307 307 if self.hookargs:
308 308 # the ones added to the transaction supercede those added
309 309 # to the operation.
310 310 self.hookargs.update(transaction.hookargs)
311 311 transaction.hookargs = self.hookargs
312 312
313 313 # mark the hookargs as flushed. further attempts to add to
314 314 # hookargs will result in an abort.
315 315 self.hookargs = None
316 316
317 317 return transaction
318 318
319 319 def addhookargs(self, hookargs):
320 320 if self.hookargs is None:
321 321 raise error.ProgrammingError('attempted to add hookargs to '
322 322 'operation after transaction started')
323 323 self.hookargs.update(hookargs)
324 324
325 325 class TransactionUnavailable(RuntimeError):
326 326 pass
327 327
328 328 def _notransaction():
329 329 """default method to get a transaction while processing a bundle
330 330
331 331 Raise an exception to highlight the fact that no transaction was expected
332 332 to be created"""
333 333 raise TransactionUnavailable()
334 334
335 335 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
336 336 # transform me into unbundler.apply() as soon as the freeze is lifted
337 337 if isinstance(unbundler, unbundle20):
338 338 tr.hookargs['bundle2'] = '1'
339 339 if source is not None and 'source' not in tr.hookargs:
340 340 tr.hookargs['source'] = source
341 341 if url is not None and 'url' not in tr.hookargs:
342 342 tr.hookargs['url'] = url
343 343 return processbundle(repo, unbundler, lambda: tr)
344 344 else:
345 345 # the transactiongetter won't be used, but we might as well set it
346 346 op = bundleoperation(repo, lambda: tr)
347 347 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
348 348 return op
349 349
350 350 class partiterator(object):
351 351 def __init__(self, repo, op, unbundler):
352 352 self.repo = repo
353 353 self.op = op
354 354 self.unbundler = unbundler
355 355 self.iterator = None
356 356 self.count = 0
357 357 self.current = None
358 358
359 359 def __enter__(self):
360 360 def func():
361 361 itr = enumerate(self.unbundler.iterparts())
362 362 for count, p in itr:
363 363 self.count = count
364 364 self.current = p
365 365 yield p
366 366 p.seek(0, 2)
367 367 self.current = None
368 368 self.iterator = func()
369 369 return self.iterator
370 370
371 371 def __exit__(self, type, exc, tb):
372 372 if not self.iterator:
373 373 return
374 374
375 375 if exc:
376 376 # If exiting or interrupted, do not attempt to seek the stream in
377 377 # the finally block below. This makes abort faster.
378 378 if (self.current and
379 379 not isinstance(exc, (SystemExit, KeyboardInterrupt))):
380 380 # consume the part content to not corrupt the stream.
381 381 self.current.seek(0, 2)
382 382
383 383 # Any exceptions seeking to the end of the bundle at this point are
384 384 # almost certainly related to the underlying stream being bad.
385 385 # And, chances are that the exception we're handling is related to
386 386 # getting in that bad state. So, we swallow the seeking error and
387 387 # re-raise the original error.
388 388 seekerror = False
389 389 try:
390 390 for part in self.iterator:
391 391 # consume the bundle content
392 392 part.seek(0, 2)
393 393 except Exception:
394 394 seekerror = True
395 395
396 396 # Small hack to let caller code distinguish exceptions from bundle2
397 397 # processing from processing the old format. This is mostly needed
398 398 # to handle different return codes to unbundle according to the type
399 399 # of bundle. We should probably clean up or drop this return code
400 400 # craziness in a future version.
401 401 exc.duringunbundle2 = True
402 402 salvaged = []
403 403 replycaps = None
404 404 if self.op.reply is not None:
405 405 salvaged = self.op.reply.salvageoutput()
406 406 replycaps = self.op.reply.capabilities
407 407 exc._replycaps = replycaps
408 408 exc._bundle2salvagedoutput = salvaged
409 409
410 410 # Re-raising from a variable loses the original stack. So only use
411 411 # that form if we need to.
412 412 if seekerror:
413 413 raise exc
414 414
415 415 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
416 416 self.count)
417 417
418 418 def processbundle(repo, unbundler, transactiongetter=None, op=None):
419 419 """This function process a bundle, apply effect to/from a repo
420 420
421 421 It iterates over each part then searches for and uses the proper handling
422 422 code to process the part. Parts are processed in order.
423 423
424 424 Unknown Mandatory part will abort the process.
425 425
426 426 It is temporarily possible to provide a prebuilt bundleoperation to the
427 427 function. This is used to ensure output is properly propagated in case of
428 428 an error during the unbundling. This output capturing part will likely be
429 429 reworked and this ability will probably go away in the process.
430 430 """
431 431 if op is None:
432 432 if transactiongetter is None:
433 433 transactiongetter = _notransaction
434 434 op = bundleoperation(repo, transactiongetter)
435 435 # todo:
436 436 # - replace this is a init function soon.
437 437 # - exception catching
438 438 unbundler.params
439 439 if repo.ui.debugflag:
440 440 msg = ['bundle2-input-bundle:']
441 441 if unbundler.params:
442 442 msg.append(' %i params' % len(unbundler.params))
443 443 if op._gettransaction is None or op._gettransaction is _notransaction:
444 444 msg.append(' no-transaction')
445 445 else:
446 446 msg.append(' with-transaction')
447 447 msg.append('\n')
448 448 repo.ui.debug(''.join(msg))
449 449
450 450 with partiterator(repo, op, unbundler) as parts:
451 451 for part in parts:
452 452 _processpart(op, part)
453 453
454 454 return op
455 455
456 456 def _processchangegroup(op, cg, tr, source, url, **kwargs):
457 457 ret = cg.apply(op.repo, tr, source, url, **kwargs)
458 458 op.records.add('changegroup', {
459 459 'return': ret,
460 460 })
461 461 return ret
462 462
463 463 def _gethandler(op, part):
464 464 status = 'unknown' # used by debug output
465 465 try:
466 466 handler = parthandlermapping.get(part.type)
467 467 if handler is None:
468 468 status = 'unsupported-type'
469 469 raise error.BundleUnknownFeatureError(parttype=part.type)
470 470 indebug(op.ui, 'found a handler for part %r' % part.type)
471 471 unknownparams = part.mandatorykeys - handler.params
472 472 if unknownparams:
473 473 unknownparams = list(unknownparams)
474 474 unknownparams.sort()
475 475 status = 'unsupported-params (%s)' % unknownparams
476 476 raise error.BundleUnknownFeatureError(parttype=part.type,
477 477 params=unknownparams)
478 478 status = 'supported'
479 479 except error.BundleUnknownFeatureError as exc:
480 480 if part.mandatory: # mandatory parts
481 481 raise
482 482 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
483 483 return # skip to part processing
484 484 finally:
485 485 if op.ui.debugflag:
486 486 msg = ['bundle2-input-part: "%s"' % part.type]
487 487 if not part.mandatory:
488 488 msg.append(' (advisory)')
489 489 nbmp = len(part.mandatorykeys)
490 490 nbap = len(part.params) - nbmp
491 491 if nbmp or nbap:
492 492 msg.append(' (params:')
493 493 if nbmp:
494 494 msg.append(' %i mandatory' % nbmp)
495 495 if nbap:
496 496 msg.append(' %i advisory' % nbmp)
497 497 msg.append(')')
498 498 msg.append(' %s\n' % status)
499 499 op.ui.debug(''.join(msg))
500 500
501 501 return handler
502 502
503 503 def _processpart(op, part):
504 504 """process a single part from a bundle
505 505
506 506 The part is guaranteed to have been fully consumed when the function exits
507 507 (even if an exception is raised)."""
508 try:
509 508 handler = _gethandler(op, part)
510 509 if handler is None:
511 510 return
512 511
513 512 # handler is called outside the above try block so that we don't
514 513 # risk catching KeyErrors from anything other than the
515 514 # parthandlermapping lookup (any KeyError raised by handler()
516 515 # itself represents a defect of a different variety).
517 516 output = None
518 517 if op.captureoutput and op.reply is not None:
519 518 op.ui.pushbuffer(error=True, subproc=True)
520 519 output = ''
521 520 try:
522 521 handler(op, part)
523 522 finally:
524 523 if output is not None:
525 524 output = op.ui.popbuffer()
526 525 if output:
527 526 outpart = op.reply.newpart('output', data=output,
528 527 mandatory=False)
529 528 outpart.addparam(
530 529 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
531 finally:
532 pass
533
534 530
535 531 def decodecaps(blob):
536 532 """decode a bundle2 caps bytes blob into a dictionary
537 533
538 534 The blob is a list of capabilities (one per line)
539 535 Capabilities may have values using a line of the form::
540 536
541 537 capability=value1,value2,value3
542 538
543 539 The values are always a list."""
544 540 caps = {}
545 541 for line in blob.splitlines():
546 542 if not line:
547 543 continue
548 544 if '=' not in line:
549 545 key, vals = line, ()
550 546 else:
551 547 key, vals = line.split('=', 1)
552 548 vals = vals.split(',')
553 549 key = urlreq.unquote(key)
554 550 vals = [urlreq.unquote(v) for v in vals]
555 551 caps[key] = vals
556 552 return caps
557 553
558 554 def encodecaps(caps):
559 555 """encode a bundle2 caps dictionary into a bytes blob"""
560 556 chunks = []
561 557 for ca in sorted(caps):
562 558 vals = caps[ca]
563 559 ca = urlreq.quote(ca)
564 560 vals = [urlreq.quote(v) for v in vals]
565 561 if vals:
566 562 ca = "%s=%s" % (ca, ','.join(vals))
567 563 chunks.append(ca)
568 564 return '\n'.join(chunks)
569 565
570 566 bundletypes = {
571 567 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
572 568 # since the unification ssh accepts a header but there
573 569 # is no capability signaling it.
574 570 "HG20": (), # special-cased below
575 571 "HG10UN": ("HG10UN", 'UN'),
576 572 "HG10BZ": ("HG10", 'BZ'),
577 573 "HG10GZ": ("HG10GZ", 'GZ'),
578 574 }
579 575
580 576 # hgweb uses this list to communicate its preferred type
581 577 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
582 578
583 579 class bundle20(object):
584 580 """represent an outgoing bundle2 container
585 581
586 582 Use the `addparam` method to add stream level parameter. and `newpart` to
587 583 populate it. Then call `getchunks` to retrieve all the binary chunks of
588 584 data that compose the bundle2 container."""
589 585
590 586 _magicstring = 'HG20'
591 587
592 588 def __init__(self, ui, capabilities=()):
593 589 self.ui = ui
594 590 self._params = []
595 591 self._parts = []
596 592 self.capabilities = dict(capabilities)
597 593 self._compengine = util.compengines.forbundletype('UN')
598 594 self._compopts = None
599 595
600 596 def setcompression(self, alg, compopts=None):
601 597 """setup core part compression to <alg>"""
602 598 if alg in (None, 'UN'):
603 599 return
604 600 assert not any(n.lower() == 'compression' for n, v in self._params)
605 601 self.addparam('Compression', alg)
606 602 self._compengine = util.compengines.forbundletype(alg)
607 603 self._compopts = compopts
608 604
609 605 @property
610 606 def nbparts(self):
611 607 """total number of parts added to the bundler"""
612 608 return len(self._parts)
613 609
614 610 # methods used to defines the bundle2 content
615 611 def addparam(self, name, value=None):
616 612 """add a stream level parameter"""
617 613 if not name:
618 614 raise ValueError('empty parameter name')
619 615 if name[0] not in pycompat.bytestr(string.ascii_letters):
620 616 raise ValueError('non letter first character: %r' % name)
621 617 self._params.append((name, value))
622 618
623 619 def addpart(self, part):
624 620 """add a new part to the bundle2 container
625 621
626 622 Parts contains the actual applicative payload."""
627 623 assert part.id is None
628 624 part.id = len(self._parts) # very cheap counter
629 625 self._parts.append(part)
630 626
631 627 def newpart(self, typeid, *args, **kwargs):
632 628 """create a new part and add it to the containers
633 629
634 630 As the part is directly added to the containers. For now, this means
635 631 that any failure to properly initialize the part after calling
636 632 ``newpart`` should result in a failure of the whole bundling process.
637 633
638 634 You can still fall back to manually create and add if you need better
639 635 control."""
640 636 part = bundlepart(typeid, *args, **kwargs)
641 637 self.addpart(part)
642 638 return part
643 639
644 640 # methods used to generate the bundle2 stream
645 641 def getchunks(self):
646 642 if self.ui.debugflag:
647 643 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
648 644 if self._params:
649 645 msg.append(' (%i params)' % len(self._params))
650 646 msg.append(' %i parts total\n' % len(self._parts))
651 647 self.ui.debug(''.join(msg))
652 648 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
653 649 yield self._magicstring
654 650 param = self._paramchunk()
655 651 outdebug(self.ui, 'bundle parameter: %s' % param)
656 652 yield _pack(_fstreamparamsize, len(param))
657 653 if param:
658 654 yield param
659 655 for chunk in self._compengine.compressstream(self._getcorechunk(),
660 656 self._compopts):
661 657 yield chunk
662 658
663 659 def _paramchunk(self):
664 660 """return a encoded version of all stream parameters"""
665 661 blocks = []
666 662 for par, value in self._params:
667 663 par = urlreq.quote(par)
668 664 if value is not None:
669 665 value = urlreq.quote(value)
670 666 par = '%s=%s' % (par, value)
671 667 blocks.append(par)
672 668 return ' '.join(blocks)
673 669
674 670 def _getcorechunk(self):
675 671 """yield chunk for the core part of the bundle
676 672
677 673 (all but headers and parameters)"""
678 674 outdebug(self.ui, 'start of parts')
679 675 for part in self._parts:
680 676 outdebug(self.ui, 'bundle part: "%s"' % part.type)
681 677 for chunk in part.getchunks(ui=self.ui):
682 678 yield chunk
683 679 outdebug(self.ui, 'end of bundle')
684 680 yield _pack(_fpartheadersize, 0)
685 681
686 682
687 683 def salvageoutput(self):
688 684 """return a list with a copy of all output parts in the bundle
689 685
690 686 This is meant to be used during error handling to make sure we preserve
691 687 server output"""
692 688 salvaged = []
693 689 for part in self._parts:
694 690 if part.type.startswith('output'):
695 691 salvaged.append(part.copy())
696 692 return salvaged
697 693
698 694
699 695 class unpackermixin(object):
700 696 """A mixin to extract bytes and struct data from a stream"""
701 697
702 698 def __init__(self, fp):
703 699 self._fp = fp
704 700
705 701 def _unpack(self, format):
706 702 """unpack this struct format from the stream
707 703
708 704 This method is meant for internal usage by the bundle2 protocol only.
709 705 They directly manipulate the low level stream including bundle2 level
710 706 instruction.
711 707
712 708 Do not use it to implement higher-level logic or methods."""
713 709 data = self._readexact(struct.calcsize(format))
714 710 return _unpack(format, data)
715 711
716 712 def _readexact(self, size):
717 713 """read exactly <size> bytes from the stream
718 714
719 715 This method is meant for internal usage by the bundle2 protocol only.
720 716 They directly manipulate the low level stream including bundle2 level
721 717 instruction.
722 718
723 719 Do not use it to implement higher-level logic or methods."""
724 720 return changegroup.readexactly(self._fp, size)
725 721
726 722 def getunbundler(ui, fp, magicstring=None):
727 723 """return a valid unbundler object for a given magicstring"""
728 724 if magicstring is None:
729 725 magicstring = changegroup.readexactly(fp, 4)
730 726 magic, version = magicstring[0:2], magicstring[2:4]
731 727 if magic != 'HG':
732 728 ui.debug(
733 729 "error: invalid magic: %r (version %r), should be 'HG'\n"
734 730 % (magic, version))
735 731 raise error.Abort(_('not a Mercurial bundle'))
736 732 unbundlerclass = formatmap.get(version)
737 733 if unbundlerclass is None:
738 734 raise error.Abort(_('unknown bundle version %s') % version)
739 735 unbundler = unbundlerclass(ui, fp)
740 736 indebug(ui, 'start processing of %s stream' % magicstring)
741 737 return unbundler
742 738
743 739 class unbundle20(unpackermixin):
744 740 """interpret a bundle2 stream
745 741
746 742 This class is fed with a binary stream and yields parts through its
747 743 `iterparts` methods."""
748 744
749 745 _magicstring = 'HG20'
750 746
751 747 def __init__(self, ui, fp):
752 748 """If header is specified, we do not read it out of the stream."""
753 749 self.ui = ui
754 750 self._compengine = util.compengines.forbundletype('UN')
755 751 self._compressed = None
756 752 super(unbundle20, self).__init__(fp)
757 753
758 754 @util.propertycache
759 755 def params(self):
760 756 """dictionary of stream level parameters"""
761 757 indebug(self.ui, 'reading bundle2 stream parameters')
762 758 params = {}
763 759 paramssize = self._unpack(_fstreamparamsize)[0]
764 760 if paramssize < 0:
765 761 raise error.BundleValueError('negative bundle param size: %i'
766 762 % paramssize)
767 763 if paramssize:
768 764 params = self._readexact(paramssize)
769 765 params = self._processallparams(params)
770 766 return params
771 767
772 768 def _processallparams(self, paramsblock):
773 769 """"""
774 770 params = util.sortdict()
775 771 for p in paramsblock.split(' '):
776 772 p = p.split('=', 1)
777 773 p = [urlreq.unquote(i) for i in p]
778 774 if len(p) < 2:
779 775 p.append(None)
780 776 self._processparam(*p)
781 777 params[p[0]] = p[1]
782 778 return params
783 779
784 780
785 781 def _processparam(self, name, value):
786 782 """process a parameter, applying its effect if needed
787 783
788 784 Parameter starting with a lower case letter are advisory and will be
789 785 ignored when unknown. Those starting with an upper case letter are
790 786 mandatory and will this function will raise a KeyError when unknown.
791 787
792 788 Note: no option are currently supported. Any input will be either
793 789 ignored or failing.
794 790 """
795 791 if not name:
796 792 raise ValueError('empty parameter name')
797 793 if name[0] not in pycompat.bytestr(string.ascii_letters):
798 794 raise ValueError('non letter first character: %r' % name)
799 795 try:
800 796 handler = b2streamparamsmap[name.lower()]
801 797 except KeyError:
802 798 if name[0].islower():
803 799 indebug(self.ui, "ignoring unknown parameter %r" % name)
804 800 else:
805 801 raise error.BundleUnknownFeatureError(params=(name,))
806 802 else:
807 803 handler(self, name, value)
808 804
809 805 def _forwardchunks(self):
810 806 """utility to transfer a bundle2 as binary
811 807
812 808 This is made necessary by the fact the 'getbundle' command over 'ssh'
813 809 have no way to know then the reply end, relying on the bundle to be
814 810 interpreted to know its end. This is terrible and we are sorry, but we
815 811 needed to move forward to get general delta enabled.
816 812 """
817 813 yield self._magicstring
818 814 assert 'params' not in vars(self)
819 815 paramssize = self._unpack(_fstreamparamsize)[0]
820 816 if paramssize < 0:
821 817 raise error.BundleValueError('negative bundle param size: %i'
822 818 % paramssize)
823 819 yield _pack(_fstreamparamsize, paramssize)
824 820 if paramssize:
825 821 params = self._readexact(paramssize)
826 822 self._processallparams(params)
827 823 yield params
828 824 assert self._compengine.bundletype == 'UN'
829 825 # From there, payload might need to be decompressed
830 826 self._fp = self._compengine.decompressorreader(self._fp)
831 827 emptycount = 0
832 828 while emptycount < 2:
833 829 # so we can brainlessly loop
834 830 assert _fpartheadersize == _fpayloadsize
835 831 size = self._unpack(_fpartheadersize)[0]
836 832 yield _pack(_fpartheadersize, size)
837 833 if size:
838 834 emptycount = 0
839 835 else:
840 836 emptycount += 1
841 837 continue
842 838 if size == flaginterrupt:
843 839 continue
844 840 elif size < 0:
845 841 raise error.BundleValueError('negative chunk size: %i')
846 842 yield self._readexact(size)
847 843
848 844
849 845 def iterparts(self):
850 846 """yield all parts contained in the stream"""
851 847 # make sure param have been loaded
852 848 self.params
853 849 # From there, payload need to be decompressed
854 850 self._fp = self._compengine.decompressorreader(self._fp)
855 851 indebug(self.ui, 'start extraction of bundle2 parts')
856 852 headerblock = self._readpartheader()
857 853 while headerblock is not None:
858 854 part = unbundlepart(self.ui, headerblock, self._fp)
859 855 yield part
860 856 # Seek to the end of the part to force it's consumption so the next
861 857 # part can be read. But then seek back to the beginning so the
862 858 # code consuming this generator has a part that starts at 0.
863 859 part.seek(0, 2)
864 860 part.seek(0)
865 861 headerblock = self._readpartheader()
866 862 indebug(self.ui, 'end of bundle2 stream')
867 863
868 864 def _readpartheader(self):
869 865 """reads a part header size and return the bytes blob
870 866
871 867 returns None if empty"""
872 868 headersize = self._unpack(_fpartheadersize)[0]
873 869 if headersize < 0:
874 870 raise error.BundleValueError('negative part header size: %i'
875 871 % headersize)
876 872 indebug(self.ui, 'part header size: %i' % headersize)
877 873 if headersize:
878 874 return self._readexact(headersize)
879 875 return None
880 876
881 877 def compressed(self):
882 878 self.params # load params
883 879 return self._compressed
884 880
885 881 def close(self):
886 882 """close underlying file"""
887 883 if util.safehasattr(self._fp, 'close'):
888 884 return self._fp.close()
889 885
890 886 formatmap = {'20': unbundle20}
891 887
892 888 b2streamparamsmap = {}
893 889
894 890 def b2streamparamhandler(name):
895 891 """register a handler for a stream level parameter"""
896 892 def decorator(func):
897 893 assert name not in formatmap
898 894 b2streamparamsmap[name] = func
899 895 return func
900 896 return decorator
901 897
902 898 @b2streamparamhandler('compression')
903 899 def processcompression(unbundler, param, value):
904 900 """read compression parameter and install payload decompression"""
905 901 if value not in util.compengines.supportedbundletypes:
906 902 raise error.BundleUnknownFeatureError(params=(param,),
907 903 values=(value,))
908 904 unbundler._compengine = util.compengines.forbundletype(value)
909 905 if value is not None:
910 906 unbundler._compressed = True
911 907
912 908 class bundlepart(object):
913 909 """A bundle2 part contains application level payload
914 910
915 911 The part `type` is used to route the part to the application level
916 912 handler.
917 913
918 914 The part payload is contained in ``part.data``. It could be raw bytes or a
919 915 generator of byte chunks.
920 916
921 917 You can add parameters to the part using the ``addparam`` method.
922 918 Parameters can be either mandatory (default) or advisory. Remote side
923 919 should be able to safely ignore the advisory ones.
924 920
925 921 Both data and parameters cannot be modified after the generation has begun.
926 922 """
927 923
928 924 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
929 925 data='', mandatory=True):
930 926 validateparttype(parttype)
931 927 self.id = None
932 928 self.type = parttype
933 929 self._data = data
934 930 self._mandatoryparams = list(mandatoryparams)
935 931 self._advisoryparams = list(advisoryparams)
936 932 # checking for duplicated entries
937 933 self._seenparams = set()
938 934 for pname, __ in self._mandatoryparams + self._advisoryparams:
939 935 if pname in self._seenparams:
940 936 raise error.ProgrammingError('duplicated params: %s' % pname)
941 937 self._seenparams.add(pname)
942 938 # status of the part's generation:
943 939 # - None: not started,
944 940 # - False: currently generated,
945 941 # - True: generation done.
946 942 self._generated = None
947 943 self.mandatory = mandatory
948 944
949 945 def __repr__(self):
950 946 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
951 947 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
952 948 % (cls, id(self), self.id, self.type, self.mandatory))
953 949
954 950 def copy(self):
955 951 """return a copy of the part
956 952
957 953 The new part have the very same content but no partid assigned yet.
958 954 Parts with generated data cannot be copied."""
959 955 assert not util.safehasattr(self.data, 'next')
960 956 return self.__class__(self.type, self._mandatoryparams,
961 957 self._advisoryparams, self._data, self.mandatory)
962 958
963 959 # methods used to defines the part content
964 960 @property
965 961 def data(self):
966 962 return self._data
967 963
968 964 @data.setter
969 965 def data(self, data):
970 966 if self._generated is not None:
971 967 raise error.ReadOnlyPartError('part is being generated')
972 968 self._data = data
973 969
974 970 @property
975 971 def mandatoryparams(self):
976 972 # make it an immutable tuple to force people through ``addparam``
977 973 return tuple(self._mandatoryparams)
978 974
979 975 @property
980 976 def advisoryparams(self):
981 977 # make it an immutable tuple to force people through ``addparam``
982 978 return tuple(self._advisoryparams)
983 979
984 980 def addparam(self, name, value='', mandatory=True):
985 981 """add a parameter to the part
986 982
987 983 If 'mandatory' is set to True, the remote handler must claim support
988 984 for this parameter or the unbundling will be aborted.
989 985
990 986 The 'name' and 'value' cannot exceed 255 bytes each.
991 987 """
992 988 if self._generated is not None:
993 989 raise error.ReadOnlyPartError('part is being generated')
994 990 if name in self._seenparams:
995 991 raise ValueError('duplicated params: %s' % name)
996 992 self._seenparams.add(name)
997 993 params = self._advisoryparams
998 994 if mandatory:
999 995 params = self._mandatoryparams
1000 996 params.append((name, value))
1001 997
1002 998 # methods used to generates the bundle2 stream
1003 999 def getchunks(self, ui):
1004 1000 if self._generated is not None:
1005 1001 raise error.ProgrammingError('part can only be consumed once')
1006 1002 self._generated = False
1007 1003
1008 1004 if ui.debugflag:
1009 1005 msg = ['bundle2-output-part: "%s"' % self.type]
1010 1006 if not self.mandatory:
1011 1007 msg.append(' (advisory)')
1012 1008 nbmp = len(self.mandatoryparams)
1013 1009 nbap = len(self.advisoryparams)
1014 1010 if nbmp or nbap:
1015 1011 msg.append(' (params:')
1016 1012 if nbmp:
1017 1013 msg.append(' %i mandatory' % nbmp)
1018 1014 if nbap:
1019 1015 msg.append(' %i advisory' % nbmp)
1020 1016 msg.append(')')
1021 1017 if not self.data:
1022 1018 msg.append(' empty payload')
1023 1019 elif util.safehasattr(self.data, 'next'):
1024 1020 msg.append(' streamed payload')
1025 1021 else:
1026 1022 msg.append(' %i bytes payload' % len(self.data))
1027 1023 msg.append('\n')
1028 1024 ui.debug(''.join(msg))
1029 1025
1030 1026 #### header
1031 1027 if self.mandatory:
1032 1028 parttype = self.type.upper()
1033 1029 else:
1034 1030 parttype = self.type.lower()
1035 1031 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1036 1032 ## parttype
1037 1033 header = [_pack(_fparttypesize, len(parttype)),
1038 1034 parttype, _pack(_fpartid, self.id),
1039 1035 ]
1040 1036 ## parameters
1041 1037 # count
1042 1038 manpar = self.mandatoryparams
1043 1039 advpar = self.advisoryparams
1044 1040 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1045 1041 # size
1046 1042 parsizes = []
1047 1043 for key, value in manpar:
1048 1044 parsizes.append(len(key))
1049 1045 parsizes.append(len(value))
1050 1046 for key, value in advpar:
1051 1047 parsizes.append(len(key))
1052 1048 parsizes.append(len(value))
1053 1049 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1054 1050 header.append(paramsizes)
1055 1051 # key, value
1056 1052 for key, value in manpar:
1057 1053 header.append(key)
1058 1054 header.append(value)
1059 1055 for key, value in advpar:
1060 1056 header.append(key)
1061 1057 header.append(value)
1062 1058 ## finalize header
1063 1059 try:
1064 1060 headerchunk = ''.join(header)
1065 1061 except TypeError:
1066 1062 raise TypeError(r'Found a non-bytes trying to '
1067 1063 r'build bundle part header: %r' % header)
1068 1064 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1069 1065 yield _pack(_fpartheadersize, len(headerchunk))
1070 1066 yield headerchunk
1071 1067 ## payload
1072 1068 try:
1073 1069 for chunk in self._payloadchunks():
1074 1070 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1075 1071 yield _pack(_fpayloadsize, len(chunk))
1076 1072 yield chunk
1077 1073 except GeneratorExit:
1078 1074 # GeneratorExit means that nobody is listening for our
1079 1075 # results anyway, so just bail quickly rather than trying
1080 1076 # to produce an error part.
1081 1077 ui.debug('bundle2-generatorexit\n')
1082 1078 raise
1083 1079 except BaseException as exc:
1084 1080 bexc = util.forcebytestr(exc)
1085 1081 # backup exception data for later
1086 1082 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1087 1083 % bexc)
1088 1084 tb = sys.exc_info()[2]
1089 1085 msg = 'unexpected error: %s' % bexc
1090 1086 interpart = bundlepart('error:abort', [('message', msg)],
1091 1087 mandatory=False)
1092 1088 interpart.id = 0
1093 1089 yield _pack(_fpayloadsize, -1)
1094 1090 for chunk in interpart.getchunks(ui=ui):
1095 1091 yield chunk
1096 1092 outdebug(ui, 'closing payload chunk')
1097 1093 # abort current part payload
1098 1094 yield _pack(_fpayloadsize, 0)
1099 1095 pycompat.raisewithtb(exc, tb)
1100 1096 # end of payload
1101 1097 outdebug(ui, 'closing payload chunk')
1102 1098 yield _pack(_fpayloadsize, 0)
1103 1099 self._generated = True
1104 1100
1105 1101 def _payloadchunks(self):
1106 1102 """yield chunks of a the part payload
1107 1103
1108 1104 Exists to handle the different methods to provide data to a part."""
1109 1105 # we only support fixed size data now.
1110 1106 # This will be improved in the future.
1111 1107 if (util.safehasattr(self.data, 'next')
1112 1108 or util.safehasattr(self.data, '__next__')):
1113 1109 buff = util.chunkbuffer(self.data)
1114 1110 chunk = buff.read(preferedchunksize)
1115 1111 while chunk:
1116 1112 yield chunk
1117 1113 chunk = buff.read(preferedchunksize)
1118 1114 elif len(self.data):
1119 1115 yield self.data
1120 1116
1121 1117
1122 1118 flaginterrupt = -1
1123 1119
1124 1120 class interrupthandler(unpackermixin):
1125 1121 """read one part and process it with restricted capability
1126 1122
1127 1123 This allows to transmit exception raised on the producer size during part
1128 1124 iteration while the consumer is reading a part.
1129 1125
1130 1126 Part processed in this manner only have access to a ui object,"""
1131 1127
1132 1128 def __init__(self, ui, fp):
1133 1129 super(interrupthandler, self).__init__(fp)
1134 1130 self.ui = ui
1135 1131
1136 1132 def _readpartheader(self):
1137 1133 """reads a part header size and return the bytes blob
1138 1134
1139 1135 returns None if empty"""
1140 1136 headersize = self._unpack(_fpartheadersize)[0]
1141 1137 if headersize < 0:
1142 1138 raise error.BundleValueError('negative part header size: %i'
1143 1139 % headersize)
1144 1140 indebug(self.ui, 'part header size: %i\n' % headersize)
1145 1141 if headersize:
1146 1142 return self._readexact(headersize)
1147 1143 return None
1148 1144
1149 1145 def __call__(self):
1150 1146
1151 1147 self.ui.debug('bundle2-input-stream-interrupt:'
1152 1148 ' opening out of band context\n')
1153 1149 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1154 1150 headerblock = self._readpartheader()
1155 1151 if headerblock is None:
1156 1152 indebug(self.ui, 'no part found during interruption.')
1157 1153 return
1158 1154 part = unbundlepart(self.ui, headerblock, self._fp)
1159 1155 op = interruptoperation(self.ui)
1160 1156 hardabort = False
1161 1157 try:
1162 1158 _processpart(op, part)
1163 1159 except (SystemExit, KeyboardInterrupt):
1164 1160 hardabort = True
1165 1161 raise
1166 1162 finally:
1167 1163 if not hardabort:
1168 1164 part.seek(0, 2)
1169 1165 self.ui.debug('bundle2-input-stream-interrupt:'
1170 1166 ' closing out of band context\n')
1171 1167
1172 1168 class interruptoperation(object):
1173 1169 """A limited operation to be use by part handler during interruption
1174 1170
1175 1171 It only have access to an ui object.
1176 1172 """
1177 1173
1178 1174 def __init__(self, ui):
1179 1175 self.ui = ui
1180 1176 self.reply = None
1181 1177 self.captureoutput = False
1182 1178
1183 1179 @property
1184 1180 def repo(self):
1185 1181 raise error.ProgrammingError('no repo access from stream interruption')
1186 1182
1187 1183 def gettransaction(self):
1188 1184 raise TransactionUnavailable('no repo access from stream interruption')
1189 1185
1190 1186 class unbundlepart(unpackermixin):
1191 1187 """a bundle part read from a bundle"""
1192 1188
1193 1189 def __init__(self, ui, header, fp):
1194 1190 super(unbundlepart, self).__init__(fp)
1195 1191 self._seekable = (util.safehasattr(fp, 'seek') and
1196 1192 util.safehasattr(fp, 'tell'))
1197 1193 self.ui = ui
1198 1194 # unbundle state attr
1199 1195 self._headerdata = header
1200 1196 self._headeroffset = 0
1201 1197 self._initialized = False
1202 1198 self.consumed = False
1203 1199 # part data
1204 1200 self.id = None
1205 1201 self.type = None
1206 1202 self.mandatoryparams = None
1207 1203 self.advisoryparams = None
1208 1204 self.params = None
1209 1205 self.mandatorykeys = ()
1210 1206 self._payloadstream = None
1211 1207 self._readheader()
1212 1208 self._mandatory = None
1213 1209 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1214 1210 self._pos = 0
1215 1211
1216 1212 def _fromheader(self, size):
1217 1213 """return the next <size> byte from the header"""
1218 1214 offset = self._headeroffset
1219 1215 data = self._headerdata[offset:(offset + size)]
1220 1216 self._headeroffset = offset + size
1221 1217 return data
1222 1218
1223 1219 def _unpackheader(self, format):
1224 1220 """read given format from header
1225 1221
1226 1222 This automatically compute the size of the format to read."""
1227 1223 data = self._fromheader(struct.calcsize(format))
1228 1224 return _unpack(format, data)
1229 1225
1230 1226 def _initparams(self, mandatoryparams, advisoryparams):
1231 1227 """internal function to setup all logic related parameters"""
1232 1228 # make it read only to prevent people touching it by mistake.
1233 1229 self.mandatoryparams = tuple(mandatoryparams)
1234 1230 self.advisoryparams = tuple(advisoryparams)
1235 1231 # user friendly UI
1236 1232 self.params = util.sortdict(self.mandatoryparams)
1237 1233 self.params.update(self.advisoryparams)
1238 1234 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1239 1235
1240 1236 def _payloadchunks(self, chunknum=0):
1241 1237 '''seek to specified chunk and start yielding data'''
1242 1238 if len(self._chunkindex) == 0:
1243 1239 assert chunknum == 0, 'Must start with chunk 0'
1244 1240 self._chunkindex.append((0, self._tellfp()))
1245 1241 else:
1246 1242 assert chunknum < len(self._chunkindex), \
1247 1243 'Unknown chunk %d' % chunknum
1248 1244 self._seekfp(self._chunkindex[chunknum][1])
1249 1245
1250 1246 pos = self._chunkindex[chunknum][0]
1251 1247 payloadsize = self._unpack(_fpayloadsize)[0]
1252 1248 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1253 1249 while payloadsize:
1254 1250 if payloadsize == flaginterrupt:
1255 1251 # interruption detection, the handler will now read a
1256 1252 # single part and process it.
1257 1253 interrupthandler(self.ui, self._fp)()
1258 1254 elif payloadsize < 0:
1259 1255 msg = 'negative payload chunk size: %i' % payloadsize
1260 1256 raise error.BundleValueError(msg)
1261 1257 else:
1262 1258 result = self._readexact(payloadsize)
1263 1259 chunknum += 1
1264 1260 pos += payloadsize
1265 1261 if chunknum == len(self._chunkindex):
1266 1262 self._chunkindex.append((pos, self._tellfp()))
1267 1263 yield result
1268 1264 payloadsize = self._unpack(_fpayloadsize)[0]
1269 1265 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1270 1266
1271 1267 def _findchunk(self, pos):
1272 1268 '''for a given payload position, return a chunk number and offset'''
1273 1269 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1274 1270 if ppos == pos:
1275 1271 return chunk, 0
1276 1272 elif ppos > pos:
1277 1273 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1278 1274 raise ValueError('Unknown chunk')
1279 1275
1280 1276 def _readheader(self):
1281 1277 """read the header and setup the object"""
1282 1278 typesize = self._unpackheader(_fparttypesize)[0]
1283 1279 self.type = self._fromheader(typesize)
1284 1280 indebug(self.ui, 'part type: "%s"' % self.type)
1285 1281 self.id = self._unpackheader(_fpartid)[0]
1286 1282 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1287 1283 # extract mandatory bit from type
1288 1284 self.mandatory = (self.type != self.type.lower())
1289 1285 self.type = self.type.lower()
1290 1286 ## reading parameters
1291 1287 # param count
1292 1288 mancount, advcount = self._unpackheader(_fpartparamcount)
1293 1289 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1294 1290 # param size
1295 1291 fparamsizes = _makefpartparamsizes(mancount + advcount)
1296 1292 paramsizes = self._unpackheader(fparamsizes)
1297 1293 # make it a list of couple again
1298 1294 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1299 1295 # split mandatory from advisory
1300 1296 mansizes = paramsizes[:mancount]
1301 1297 advsizes = paramsizes[mancount:]
1302 1298 # retrieve param value
1303 1299 manparams = []
1304 1300 for key, value in mansizes:
1305 1301 manparams.append((self._fromheader(key), self._fromheader(value)))
1306 1302 advparams = []
1307 1303 for key, value in advsizes:
1308 1304 advparams.append((self._fromheader(key), self._fromheader(value)))
1309 1305 self._initparams(manparams, advparams)
1310 1306 ## part payload
1311 1307 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1312 1308 # we read the data, tell it
1313 1309 self._initialized = True
1314 1310
1315 1311 def read(self, size=None):
1316 1312 """read payload data"""
1317 1313 if not self._initialized:
1318 1314 self._readheader()
1319 1315 if size is None:
1320 1316 data = self._payloadstream.read()
1321 1317 else:
1322 1318 data = self._payloadstream.read(size)
1323 1319 self._pos += len(data)
1324 1320 if size is None or len(data) < size:
1325 1321 if not self.consumed and self._pos:
1326 1322 self.ui.debug('bundle2-input-part: total payload size %i\n'
1327 1323 % self._pos)
1328 1324 self.consumed = True
1329 1325 return data
1330 1326
1331 1327 def tell(self):
1332 1328 return self._pos
1333 1329
1334 1330 def seek(self, offset, whence=0):
1335 1331 if whence == 0:
1336 1332 newpos = offset
1337 1333 elif whence == 1:
1338 1334 newpos = self._pos + offset
1339 1335 elif whence == 2:
1340 1336 if not self.consumed:
1341 1337 self.read()
1342 1338 newpos = self._chunkindex[-1][0] - offset
1343 1339 else:
1344 1340 raise ValueError('Unknown whence value: %r' % (whence,))
1345 1341
1346 1342 if newpos > self._chunkindex[-1][0] and not self.consumed:
1347 1343 self.read()
1348 1344 if not 0 <= newpos <= self._chunkindex[-1][0]:
1349 1345 raise ValueError('Offset out of range')
1350 1346
1351 1347 if self._pos != newpos:
1352 1348 chunk, internaloffset = self._findchunk(newpos)
1353 1349 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1354 1350 adjust = self.read(internaloffset)
1355 1351 if len(adjust) != internaloffset:
1356 1352 raise error.Abort(_('Seek failed\n'))
1357 1353 self._pos = newpos
1358 1354
1359 1355 def _seekfp(self, offset, whence=0):
1360 1356 """move the underlying file pointer
1361 1357
1362 1358 This method is meant for internal usage by the bundle2 protocol only.
1363 1359 They directly manipulate the low level stream including bundle2 level
1364 1360 instruction.
1365 1361
1366 1362 Do not use it to implement higher-level logic or methods."""
1367 1363 if self._seekable:
1368 1364 return self._fp.seek(offset, whence)
1369 1365 else:
1370 1366 raise NotImplementedError(_('File pointer is not seekable'))
1371 1367
1372 1368 def _tellfp(self):
1373 1369 """return the file offset, or None if file is not seekable
1374 1370
1375 1371 This method is meant for internal usage by the bundle2 protocol only.
1376 1372 They directly manipulate the low level stream including bundle2 level
1377 1373 instruction.
1378 1374
1379 1375 Do not use it to implement higher-level logic or methods."""
1380 1376 if self._seekable:
1381 1377 try:
1382 1378 return self._fp.tell()
1383 1379 except IOError as e:
1384 1380 if e.errno == errno.ESPIPE:
1385 1381 self._seekable = False
1386 1382 else:
1387 1383 raise
1388 1384 return None
1389 1385
1390 1386 # These are only the static capabilities.
1391 1387 # Check the 'getrepocaps' function for the rest.
1392 1388 capabilities = {'HG20': (),
1393 1389 'error': ('abort', 'unsupportedcontent', 'pushraced',
1394 1390 'pushkey'),
1395 1391 'listkeys': (),
1396 1392 'pushkey': (),
1397 1393 'digests': tuple(sorted(util.DIGESTS.keys())),
1398 1394 'remote-changegroup': ('http', 'https'),
1399 1395 'hgtagsfnodes': (),
1400 1396 }
1401 1397
1402 1398 def getrepocaps(repo, allowpushback=False):
1403 1399 """return the bundle2 capabilities for a given repo
1404 1400
1405 1401 Exists to allow extensions (like evolution) to mutate the capabilities.
1406 1402 """
1407 1403 caps = capabilities.copy()
1408 1404 caps['changegroup'] = tuple(sorted(
1409 1405 changegroup.supportedincomingversions(repo)))
1410 1406 if obsolete.isenabled(repo, obsolete.exchangeopt):
1411 1407 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1412 1408 caps['obsmarkers'] = supportedformat
1413 1409 if allowpushback:
1414 1410 caps['pushback'] = ()
1415 1411 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1416 1412 if cpmode == 'check-related':
1417 1413 caps['checkheads'] = ('related',)
1418 1414 return caps
1419 1415
1420 1416 def bundle2caps(remote):
1421 1417 """return the bundle capabilities of a peer as dict"""
1422 1418 raw = remote.capable('bundle2')
1423 1419 if not raw and raw != '':
1424 1420 return {}
1425 1421 capsblob = urlreq.unquote(remote.capable('bundle2'))
1426 1422 return decodecaps(capsblob)
1427 1423
1428 1424 def obsmarkersversion(caps):
1429 1425 """extract the list of supported obsmarkers versions from a bundle2caps dict
1430 1426 """
1431 1427 obscaps = caps.get('obsmarkers', ())
1432 1428 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1433 1429
1434 1430 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1435 1431 vfs=None, compression=None, compopts=None):
1436 1432 if bundletype.startswith('HG10'):
1437 1433 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1438 1434 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1439 1435 compression=compression, compopts=compopts)
1440 1436 elif not bundletype.startswith('HG20'):
1441 1437 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1442 1438
1443 1439 caps = {}
1444 1440 if 'obsolescence' in opts:
1445 1441 caps['obsmarkers'] = ('V1',)
1446 1442 bundle = bundle20(ui, caps)
1447 1443 bundle.setcompression(compression, compopts)
1448 1444 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1449 1445 chunkiter = bundle.getchunks()
1450 1446
1451 1447 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1452 1448
1453 1449 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1454 1450 # We should eventually reconcile this logic with the one behind
1455 1451 # 'exchange.getbundle2partsgenerator'.
1456 1452 #
1457 1453 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1458 1454 # different right now. So we keep them separated for now for the sake of
1459 1455 # simplicity.
1460 1456
1461 1457 # we always want a changegroup in such bundle
1462 1458 cgversion = opts.get('cg.version')
1463 1459 if cgversion is None:
1464 1460 cgversion = changegroup.safeversion(repo)
1465 1461 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1466 1462 part = bundler.newpart('changegroup', data=cg.getchunks())
1467 1463 part.addparam('version', cg.version)
1468 1464 if 'clcount' in cg.extras:
1469 1465 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1470 1466 mandatory=False)
1471 1467 if opts.get('phases') and repo.revs('%ln and secret()',
1472 1468 outgoing.missingheads):
1473 1469 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1474 1470
1475 1471 addparttagsfnodescache(repo, bundler, outgoing)
1476 1472
1477 1473 if opts.get('obsolescence', False):
1478 1474 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1479 1475 buildobsmarkerspart(bundler, obsmarkers)
1480 1476
1481 1477 if opts.get('phases', False):
1482 1478 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1483 1479 phasedata = []
1484 1480 for phase in phases.allphases:
1485 1481 for head in headsbyphase[phase]:
1486 1482 phasedata.append(_pack(_fphasesentry, phase, head))
1487 1483 bundler.newpart('phase-heads', data=''.join(phasedata))
1488 1484
1489 1485 def addparttagsfnodescache(repo, bundler, outgoing):
1490 1486 # we include the tags fnode cache for the bundle changeset
1491 1487 # (as an optional parts)
1492 1488 cache = tags.hgtagsfnodescache(repo.unfiltered())
1493 1489 chunks = []
1494 1490
1495 1491 # .hgtags fnodes are only relevant for head changesets. While we could
1496 1492 # transfer values for all known nodes, there will likely be little to
1497 1493 # no benefit.
1498 1494 #
1499 1495 # We don't bother using a generator to produce output data because
1500 1496 # a) we only have 40 bytes per head and even esoteric numbers of heads
1501 1497 # consume little memory (1M heads is 40MB) b) we don't want to send the
1502 1498 # part if we don't have entries and knowing if we have entries requires
1503 1499 # cache lookups.
1504 1500 for node in outgoing.missingheads:
1505 1501 # Don't compute missing, as this may slow down serving.
1506 1502 fnode = cache.getfnode(node, computemissing=False)
1507 1503 if fnode is not None:
1508 1504 chunks.extend([node, fnode])
1509 1505
1510 1506 if chunks:
1511 1507 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1512 1508
1513 1509 def buildobsmarkerspart(bundler, markers):
1514 1510 """add an obsmarker part to the bundler with <markers>
1515 1511
1516 1512 No part is created if markers is empty.
1517 1513 Raises ValueError if the bundler doesn't support any known obsmarker format.
1518 1514 """
1519 1515 if not markers:
1520 1516 return None
1521 1517
1522 1518 remoteversions = obsmarkersversion(bundler.capabilities)
1523 1519 version = obsolete.commonversion(remoteversions)
1524 1520 if version is None:
1525 1521 raise ValueError('bundler does not support common obsmarker format')
1526 1522 stream = obsolete.encodemarkers(markers, True, version=version)
1527 1523 return bundler.newpart('obsmarkers', data=stream)
1528 1524
1529 1525 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1530 1526 compopts=None):
1531 1527 """Write a bundle file and return its filename.
1532 1528
1533 1529 Existing files will not be overwritten.
1534 1530 If no filename is specified, a temporary file is created.
1535 1531 bz2 compression can be turned off.
1536 1532 The bundle file will be deleted in case of errors.
1537 1533 """
1538 1534
1539 1535 if bundletype == "HG20":
1540 1536 bundle = bundle20(ui)
1541 1537 bundle.setcompression(compression, compopts)
1542 1538 part = bundle.newpart('changegroup', data=cg.getchunks())
1543 1539 part.addparam('version', cg.version)
1544 1540 if 'clcount' in cg.extras:
1545 1541 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1546 1542 mandatory=False)
1547 1543 chunkiter = bundle.getchunks()
1548 1544 else:
1549 1545 # compression argument is only for the bundle2 case
1550 1546 assert compression is None
1551 1547 if cg.version != '01':
1552 1548 raise error.Abort(_('old bundle types only supports v1 '
1553 1549 'changegroups'))
1554 1550 header, comp = bundletypes[bundletype]
1555 1551 if comp not in util.compengines.supportedbundletypes:
1556 1552 raise error.Abort(_('unknown stream compression type: %s')
1557 1553 % comp)
1558 1554 compengine = util.compengines.forbundletype(comp)
1559 1555 def chunkiter():
1560 1556 yield header
1561 1557 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1562 1558 yield chunk
1563 1559 chunkiter = chunkiter()
1564 1560
1565 1561 # parse the changegroup data, otherwise we will block
1566 1562 # in case of sshrepo because we don't know the end of the stream
1567 1563 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1568 1564
1569 1565 def combinechangegroupresults(op):
1570 1566 """logic to combine 0 or more addchangegroup results into one"""
1571 1567 results = [r.get('return', 0)
1572 1568 for r in op.records['changegroup']]
1573 1569 changedheads = 0
1574 1570 result = 1
1575 1571 for ret in results:
1576 1572 # If any changegroup result is 0, return 0
1577 1573 if ret == 0:
1578 1574 result = 0
1579 1575 break
1580 1576 if ret < -1:
1581 1577 changedheads += ret + 1
1582 1578 elif ret > 1:
1583 1579 changedheads += ret - 1
1584 1580 if changedheads > 0:
1585 1581 result = 1 + changedheads
1586 1582 elif changedheads < 0:
1587 1583 result = -1 + changedheads
1588 1584 return result
1589 1585
1590 1586 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1591 1587 'targetphase'))
1592 1588 def handlechangegroup(op, inpart):
1593 1589 """apply a changegroup part on the repo
1594 1590
1595 1591 This is a very early implementation that will massive rework before being
1596 1592 inflicted to any end-user.
1597 1593 """
1598 1594 tr = op.gettransaction()
1599 1595 unpackerversion = inpart.params.get('version', '01')
1600 1596 # We should raise an appropriate exception here
1601 1597 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1602 1598 # the source and url passed here are overwritten by the one contained in
1603 1599 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1604 1600 nbchangesets = None
1605 1601 if 'nbchanges' in inpart.params:
1606 1602 nbchangesets = int(inpart.params.get('nbchanges'))
1607 1603 if ('treemanifest' in inpart.params and
1608 1604 'treemanifest' not in op.repo.requirements):
1609 1605 if len(op.repo.changelog) != 0:
1610 1606 raise error.Abort(_(
1611 1607 "bundle contains tree manifests, but local repo is "
1612 1608 "non-empty and does not use tree manifests"))
1613 1609 op.repo.requirements.add('treemanifest')
1614 1610 op.repo._applyopenerreqs()
1615 1611 op.repo._writerequirements()
1616 1612 extrakwargs = {}
1617 1613 targetphase = inpart.params.get('targetphase')
1618 1614 if targetphase is not None:
1619 1615 extrakwargs['targetphase'] = int(targetphase)
1620 1616 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1621 1617 expectedtotal=nbchangesets, **extrakwargs)
1622 1618 if op.reply is not None:
1623 1619 # This is definitely not the final form of this
1624 1620 # return. But one need to start somewhere.
1625 1621 part = op.reply.newpart('reply:changegroup', mandatory=False)
1626 1622 part.addparam(
1627 1623 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1628 1624 part.addparam('return', '%i' % ret, mandatory=False)
1629 1625 assert not inpart.read()
1630 1626
1631 1627 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1632 1628 ['digest:%s' % k for k in util.DIGESTS.keys()])
1633 1629 @parthandler('remote-changegroup', _remotechangegroupparams)
1634 1630 def handleremotechangegroup(op, inpart):
1635 1631 """apply a bundle10 on the repo, given an url and validation information
1636 1632
1637 1633 All the information about the remote bundle to import are given as
1638 1634 parameters. The parameters include:
1639 1635 - url: the url to the bundle10.
1640 1636 - size: the bundle10 file size. It is used to validate what was
1641 1637 retrieved by the client matches the server knowledge about the bundle.
1642 1638 - digests: a space separated list of the digest types provided as
1643 1639 parameters.
1644 1640 - digest:<digest-type>: the hexadecimal representation of the digest with
1645 1641 that name. Like the size, it is used to validate what was retrieved by
1646 1642 the client matches what the server knows about the bundle.
1647 1643
1648 1644 When multiple digest types are given, all of them are checked.
1649 1645 """
1650 1646 try:
1651 1647 raw_url = inpart.params['url']
1652 1648 except KeyError:
1653 1649 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1654 1650 parsed_url = util.url(raw_url)
1655 1651 if parsed_url.scheme not in capabilities['remote-changegroup']:
1656 1652 raise error.Abort(_('remote-changegroup does not support %s urls') %
1657 1653 parsed_url.scheme)
1658 1654
1659 1655 try:
1660 1656 size = int(inpart.params['size'])
1661 1657 except ValueError:
1662 1658 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1663 1659 % 'size')
1664 1660 except KeyError:
1665 1661 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1666 1662
1667 1663 digests = {}
1668 1664 for typ in inpart.params.get('digests', '').split():
1669 1665 param = 'digest:%s' % typ
1670 1666 try:
1671 1667 value = inpart.params[param]
1672 1668 except KeyError:
1673 1669 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1674 1670 param)
1675 1671 digests[typ] = value
1676 1672
1677 1673 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1678 1674
1679 1675 tr = op.gettransaction()
1680 1676 from . import exchange
1681 1677 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1682 1678 if not isinstance(cg, changegroup.cg1unpacker):
1683 1679 raise error.Abort(_('%s: not a bundle version 1.0') %
1684 1680 util.hidepassword(raw_url))
1685 1681 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1686 1682 if op.reply is not None:
1687 1683 # This is definitely not the final form of this
1688 1684 # return. But one need to start somewhere.
1689 1685 part = op.reply.newpart('reply:changegroup')
1690 1686 part.addparam(
1691 1687 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1692 1688 part.addparam('return', '%i' % ret, mandatory=False)
1693 1689 try:
1694 1690 real_part.validate()
1695 1691 except error.Abort as e:
1696 1692 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1697 1693 (util.hidepassword(raw_url), str(e)))
1698 1694 assert not inpart.read()
1699 1695
1700 1696 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1701 1697 def handlereplychangegroup(op, inpart):
1702 1698 ret = int(inpart.params['return'])
1703 1699 replyto = int(inpart.params['in-reply-to'])
1704 1700 op.records.add('changegroup', {'return': ret}, replyto)
1705 1701
1706 1702 @parthandler('check:heads')
1707 1703 def handlecheckheads(op, inpart):
1708 1704 """check that head of the repo did not change
1709 1705
1710 1706 This is used to detect a push race when using unbundle.
1711 1707 This replaces the "heads" argument of unbundle."""
1712 1708 h = inpart.read(20)
1713 1709 heads = []
1714 1710 while len(h) == 20:
1715 1711 heads.append(h)
1716 1712 h = inpart.read(20)
1717 1713 assert not h
1718 1714 # Trigger a transaction so that we are guaranteed to have the lock now.
1719 1715 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1720 1716 op.gettransaction()
1721 1717 if sorted(heads) != sorted(op.repo.heads()):
1722 1718 raise error.PushRaced('repository changed while pushing - '
1723 1719 'please try again')
1724 1720
1725 1721 @parthandler('check:updated-heads')
1726 1722 def handlecheckupdatedheads(op, inpart):
1727 1723 """check for race on the heads touched by a push
1728 1724
1729 1725 This is similar to 'check:heads' but focus on the heads actually updated
1730 1726 during the push. If other activities happen on unrelated heads, it is
1731 1727 ignored.
1732 1728
1733 1729 This allow server with high traffic to avoid push contention as long as
1734 1730 unrelated parts of the graph are involved."""
1735 1731 h = inpart.read(20)
1736 1732 heads = []
1737 1733 while len(h) == 20:
1738 1734 heads.append(h)
1739 1735 h = inpart.read(20)
1740 1736 assert not h
1741 1737 # trigger a transaction so that we are guaranteed to have the lock now.
1742 1738 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1743 1739 op.gettransaction()
1744 1740
1745 1741 currentheads = set()
1746 1742 for ls in op.repo.branchmap().itervalues():
1747 1743 currentheads.update(ls)
1748 1744
1749 1745 for h in heads:
1750 1746 if h not in currentheads:
1751 1747 raise error.PushRaced('repository changed while pushing - '
1752 1748 'please try again')
1753 1749
1754 1750 @parthandler('output')
1755 1751 def handleoutput(op, inpart):
1756 1752 """forward output captured on the server to the client"""
1757 1753 for line in inpart.read().splitlines():
1758 1754 op.ui.status(_('remote: %s\n') % line)
1759 1755
1760 1756 @parthandler('replycaps')
1761 1757 def handlereplycaps(op, inpart):
1762 1758 """Notify that a reply bundle should be created
1763 1759
1764 1760 The payload contains the capabilities information for the reply"""
1765 1761 caps = decodecaps(inpart.read())
1766 1762 if op.reply is None:
1767 1763 op.reply = bundle20(op.ui, caps)
1768 1764
1769 1765 class AbortFromPart(error.Abort):
1770 1766 """Sub-class of Abort that denotes an error from a bundle2 part."""
1771 1767
1772 1768 @parthandler('error:abort', ('message', 'hint'))
1773 1769 def handleerrorabort(op, inpart):
1774 1770 """Used to transmit abort error over the wire"""
1775 1771 raise AbortFromPart(inpart.params['message'],
1776 1772 hint=inpart.params.get('hint'))
1777 1773
1778 1774 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1779 1775 'in-reply-to'))
1780 1776 def handleerrorpushkey(op, inpart):
1781 1777 """Used to transmit failure of a mandatory pushkey over the wire"""
1782 1778 kwargs = {}
1783 1779 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1784 1780 value = inpart.params.get(name)
1785 1781 if value is not None:
1786 1782 kwargs[name] = value
1787 1783 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1788 1784
1789 1785 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1790 1786 def handleerrorunsupportedcontent(op, inpart):
1791 1787 """Used to transmit unknown content error over the wire"""
1792 1788 kwargs = {}
1793 1789 parttype = inpart.params.get('parttype')
1794 1790 if parttype is not None:
1795 1791 kwargs['parttype'] = parttype
1796 1792 params = inpart.params.get('params')
1797 1793 if params is not None:
1798 1794 kwargs['params'] = params.split('\0')
1799 1795
1800 1796 raise error.BundleUnknownFeatureError(**kwargs)
1801 1797
1802 1798 @parthandler('error:pushraced', ('message',))
1803 1799 def handleerrorpushraced(op, inpart):
1804 1800 """Used to transmit push race error over the wire"""
1805 1801 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1806 1802
1807 1803 @parthandler('listkeys', ('namespace',))
1808 1804 def handlelistkeys(op, inpart):
1809 1805 """retrieve pushkey namespace content stored in a bundle2"""
1810 1806 namespace = inpart.params['namespace']
1811 1807 r = pushkey.decodekeys(inpart.read())
1812 1808 op.records.add('listkeys', (namespace, r))
1813 1809
1814 1810 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1815 1811 def handlepushkey(op, inpart):
1816 1812 """process a pushkey request"""
1817 1813 dec = pushkey.decode
1818 1814 namespace = dec(inpart.params['namespace'])
1819 1815 key = dec(inpart.params['key'])
1820 1816 old = dec(inpart.params['old'])
1821 1817 new = dec(inpart.params['new'])
1822 1818 # Grab the transaction to ensure that we have the lock before performing the
1823 1819 # pushkey.
1824 1820 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1825 1821 op.gettransaction()
1826 1822 ret = op.repo.pushkey(namespace, key, old, new)
1827 1823 record = {'namespace': namespace,
1828 1824 'key': key,
1829 1825 'old': old,
1830 1826 'new': new}
1831 1827 op.records.add('pushkey', record)
1832 1828 if op.reply is not None:
1833 1829 rpart = op.reply.newpart('reply:pushkey')
1834 1830 rpart.addparam(
1835 1831 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1836 1832 rpart.addparam('return', '%i' % ret, mandatory=False)
1837 1833 if inpart.mandatory and not ret:
1838 1834 kwargs = {}
1839 1835 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1840 1836 if key in inpart.params:
1841 1837 kwargs[key] = inpart.params[key]
1842 1838 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1843 1839
1844 1840 def _readphaseheads(inpart):
1845 1841 headsbyphase = [[] for i in phases.allphases]
1846 1842 entrysize = struct.calcsize(_fphasesentry)
1847 1843 while True:
1848 1844 entry = inpart.read(entrysize)
1849 1845 if len(entry) < entrysize:
1850 1846 if entry:
1851 1847 raise error.Abort(_('bad phase-heads bundle part'))
1852 1848 break
1853 1849 phase, node = struct.unpack(_fphasesentry, entry)
1854 1850 headsbyphase[phase].append(node)
1855 1851 return headsbyphase
1856 1852
1857 1853 @parthandler('phase-heads')
1858 1854 def handlephases(op, inpart):
1859 1855 """apply phases from bundle part to repo"""
1860 1856 headsbyphase = _readphaseheads(inpart)
1861 1857 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1862 1858 op.records.add('phase-heads', {})
1863 1859
1864 1860 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1865 1861 def handlepushkeyreply(op, inpart):
1866 1862 """retrieve the result of a pushkey request"""
1867 1863 ret = int(inpart.params['return'])
1868 1864 partid = int(inpart.params['in-reply-to'])
1869 1865 op.records.add('pushkey', {'return': ret}, partid)
1870 1866
1871 1867 @parthandler('obsmarkers')
1872 1868 def handleobsmarker(op, inpart):
1873 1869 """add a stream of obsmarkers to the repo"""
1874 1870 tr = op.gettransaction()
1875 1871 markerdata = inpart.read()
1876 1872 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1877 1873 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1878 1874 % len(markerdata))
1879 1875 # The mergemarkers call will crash if marker creation is not enabled.
1880 1876 # we want to avoid this if the part is advisory.
1881 1877 if not inpart.mandatory and op.repo.obsstore.readonly:
1882 1878 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1883 1879 return
1884 1880 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1885 1881 op.repo.invalidatevolatilesets()
1886 1882 if new:
1887 1883 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1888 1884 op.records.add('obsmarkers', {'new': new})
1889 1885 if op.reply is not None:
1890 1886 rpart = op.reply.newpart('reply:obsmarkers')
1891 1887 rpart.addparam(
1892 1888 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1893 1889 rpart.addparam('new', '%i' % new, mandatory=False)
1894 1890
1895 1891
1896 1892 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1897 1893 def handleobsmarkerreply(op, inpart):
1898 1894 """retrieve the result of a pushkey request"""
1899 1895 ret = int(inpart.params['new'])
1900 1896 partid = int(inpart.params['in-reply-to'])
1901 1897 op.records.add('obsmarkers', {'new': ret}, partid)
1902 1898
1903 1899 @parthandler('hgtagsfnodes')
1904 1900 def handlehgtagsfnodes(op, inpart):
1905 1901 """Applies .hgtags fnodes cache entries to the local repo.
1906 1902
1907 1903 Payload is pairs of 20 byte changeset nodes and filenodes.
1908 1904 """
1909 1905 # Grab the transaction so we ensure that we have the lock at this point.
1910 1906 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1911 1907 op.gettransaction()
1912 1908 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1913 1909
1914 1910 count = 0
1915 1911 while True:
1916 1912 node = inpart.read(20)
1917 1913 fnode = inpart.read(20)
1918 1914 if len(node) < 20 or len(fnode) < 20:
1919 1915 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1920 1916 break
1921 1917 cache.setfnode(node, fnode)
1922 1918 count += 1
1923 1919
1924 1920 cache.write()
1925 1921 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1926 1922
1927 1923 @parthandler('pushvars')
1928 1924 def bundle2getvars(op, part):
1929 1925 '''unbundle a bundle2 containing shellvars on the server'''
1930 1926 # An option to disable unbundling on server-side for security reasons
1931 1927 if op.ui.configbool('push', 'pushvars.server'):
1932 1928 hookargs = {}
1933 1929 for key, value in part.advisoryparams:
1934 1930 key = key.upper()
1935 1931 # We want pushed variables to have USERVAR_ prepended so we know
1936 1932 # they came from the --pushvar flag.
1937 1933 key = "USERVAR_" + key
1938 1934 hookargs[key] = value
1939 1935 op.addhookargs(hookargs)
General Comments 0
You need to be logged in to leave comments. Login now