##// END OF EJS Templates
bundle2: raise a more helpful error if building a bundle part header fails...
Augie Fackler -
r34246:ab379eed default
parent child Browse files
Show More
@@ -1,1917 +1,1921 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import, division
149 149
150 150 import errno
151 151 import re
152 152 import string
153 153 import struct
154 154 import sys
155 155
156 156 from .i18n import _
157 157 from . import (
158 158 changegroup,
159 159 error,
160 160 obsolete,
161 161 phases,
162 162 pushkey,
163 163 pycompat,
164 164 tags,
165 165 url,
166 166 util,
167 167 )
168 168
169 169 urlerr = util.urlerr
170 170 urlreq = util.urlreq
171 171
172 172 _pack = struct.pack
173 173 _unpack = struct.unpack
174 174
175 175 _fstreamparamsize = '>i'
176 176 _fpartheadersize = '>i'
177 177 _fparttypesize = '>B'
178 178 _fpartid = '>I'
179 179 _fpayloadsize = '>i'
180 180 _fpartparamcount = '>BB'
181 181
182 182 _fphasesentry = '>i20s'
183 183
184 184 preferedchunksize = 4096
185 185
186 186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
187 187
188 188 def outdebug(ui, message):
189 189 """debug regarding output stream (bundling)"""
190 190 if ui.configbool('devel', 'bundle2.debug'):
191 191 ui.debug('bundle2-output: %s\n' % message)
192 192
193 193 def indebug(ui, message):
194 194 """debug on input stream (unbundling)"""
195 195 if ui.configbool('devel', 'bundle2.debug'):
196 196 ui.debug('bundle2-input: %s\n' % message)
197 197
198 198 def validateparttype(parttype):
199 199 """raise ValueError if a parttype contains invalid character"""
200 200 if _parttypeforbidden.search(parttype):
201 201 raise ValueError(parttype)
202 202
203 203 def _makefpartparamsizes(nbparams):
204 204 """return a struct format to read part parameter sizes
205 205
206 206 The number parameters is variable so we need to build that format
207 207 dynamically.
208 208 """
209 209 return '>'+('BB'*nbparams)
210 210
211 211 parthandlermapping = {}
212 212
213 213 def parthandler(parttype, params=()):
214 214 """decorator that register a function as a bundle2 part handler
215 215
216 216 eg::
217 217
218 218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
219 219 def myparttypehandler(...):
220 220 '''process a part of type "my part".'''
221 221 ...
222 222 """
223 223 validateparttype(parttype)
224 224 def _decorator(func):
225 225 lparttype = parttype.lower() # enforce lower case matching.
226 226 assert lparttype not in parthandlermapping
227 227 parthandlermapping[lparttype] = func
228 228 func.params = frozenset(params)
229 229 return func
230 230 return _decorator
231 231
232 232 class unbundlerecords(object):
233 233 """keep record of what happens during and unbundle
234 234
235 235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
236 236 category of record and obj is an arbitrary object.
237 237
238 238 `records['cat']` will return all entries of this category 'cat'.
239 239
240 240 Iterating on the object itself will yield `('category', obj)` tuples
241 241 for all entries.
242 242
243 243 All iterations happens in chronological order.
244 244 """
245 245
246 246 def __init__(self):
247 247 self._categories = {}
248 248 self._sequences = []
249 249 self._replies = {}
250 250
251 251 def add(self, category, entry, inreplyto=None):
252 252 """add a new record of a given category.
253 253
254 254 The entry can then be retrieved in the list returned by
255 255 self['category']."""
256 256 self._categories.setdefault(category, []).append(entry)
257 257 self._sequences.append((category, entry))
258 258 if inreplyto is not None:
259 259 self.getreplies(inreplyto).add(category, entry)
260 260
261 261 def getreplies(self, partid):
262 262 """get the records that are replies to a specific part"""
263 263 return self._replies.setdefault(partid, unbundlerecords())
264 264
265 265 def __getitem__(self, cat):
266 266 return tuple(self._categories.get(cat, ()))
267 267
268 268 def __iter__(self):
269 269 return iter(self._sequences)
270 270
271 271 def __len__(self):
272 272 return len(self._sequences)
273 273
274 274 def __nonzero__(self):
275 275 return bool(self._sequences)
276 276
277 277 __bool__ = __nonzero__
278 278
279 279 class bundleoperation(object):
280 280 """an object that represents a single bundling process
281 281
282 282 Its purpose is to carry unbundle-related objects and states.
283 283
284 284 A new object should be created at the beginning of each bundle processing.
285 285 The object is to be returned by the processing function.
286 286
287 287 The object has very little content now it will ultimately contain:
288 288 * an access to the repo the bundle is applied to,
289 289 * a ui object,
290 290 * a way to retrieve a transaction to add changes to the repo,
291 291 * a way to record the result of processing each part,
292 292 * a way to construct a bundle response when applicable.
293 293 """
294 294
295 295 def __init__(self, repo, transactiongetter, captureoutput=True):
296 296 self.repo = repo
297 297 self.ui = repo.ui
298 298 self.records = unbundlerecords()
299 299 self.reply = None
300 300 self.captureoutput = captureoutput
301 301 self.hookargs = {}
302 302 self._gettransaction = transactiongetter
303 303
304 304 def gettransaction(self):
305 305 transaction = self._gettransaction()
306 306
307 307 if self.hookargs:
308 308 # the ones added to the transaction supercede those added
309 309 # to the operation.
310 310 self.hookargs.update(transaction.hookargs)
311 311 transaction.hookargs = self.hookargs
312 312
313 313 # mark the hookargs as flushed. further attempts to add to
314 314 # hookargs will result in an abort.
315 315 self.hookargs = None
316 316
317 317 return transaction
318 318
319 319 def addhookargs(self, hookargs):
320 320 if self.hookargs is None:
321 321 raise error.ProgrammingError('attempted to add hookargs to '
322 322 'operation after transaction started')
323 323 self.hookargs.update(hookargs)
324 324
325 325 class TransactionUnavailable(RuntimeError):
326 326 pass
327 327
328 328 def _notransaction():
329 329 """default method to get a transaction while processing a bundle
330 330
331 331 Raise an exception to highlight the fact that no transaction was expected
332 332 to be created"""
333 333 raise TransactionUnavailable()
334 334
335 335 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
336 336 # transform me into unbundler.apply() as soon as the freeze is lifted
337 337 if isinstance(unbundler, unbundle20):
338 338 tr.hookargs['bundle2'] = '1'
339 339 if source is not None and 'source' not in tr.hookargs:
340 340 tr.hookargs['source'] = source
341 341 if url is not None and 'url' not in tr.hookargs:
342 342 tr.hookargs['url'] = url
343 343 return processbundle(repo, unbundler, lambda: tr)
344 344 else:
345 345 # the transactiongetter won't be used, but we might as well set it
346 346 op = bundleoperation(repo, lambda: tr)
347 347 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
348 348 return op
349 349
350 350 class partiterator(object):
351 351 def __init__(self, repo, op, unbundler):
352 352 self.repo = repo
353 353 self.op = op
354 354 self.unbundler = unbundler
355 355 self.iterator = None
356 356 self.count = 0
357 357
358 358 def __enter__(self):
359 359 def func():
360 360 itr = enumerate(self.unbundler.iterparts())
361 361 for count, p in itr:
362 362 self.count = count
363 363 yield p
364 364 self.iterator = func()
365 365 return self.iterator
366 366
367 367 def __exit__(self, type, exc, tb):
368 368 if not self.iterator:
369 369 return
370 370
371 371 if exc:
372 372 # Any exceptions seeking to the end of the bundle at this point are
373 373 # almost certainly related to the underlying stream being bad.
374 374 # And, chances are that the exception we're handling is related to
375 375 # getting in that bad state. So, we swallow the seeking error and
376 376 # re-raise the original error.
377 377 seekerror = False
378 378 try:
379 379 for part in self.iterator:
380 380 # consume the bundle content
381 381 part.seek(0, 2)
382 382 except Exception:
383 383 seekerror = True
384 384
385 385 # Small hack to let caller code distinguish exceptions from bundle2
386 386 # processing from processing the old format. This is mostly needed
387 387 # to handle different return codes to unbundle according to the type
388 388 # of bundle. We should probably clean up or drop this return code
389 389 # craziness in a future version.
390 390 exc.duringunbundle2 = True
391 391 salvaged = []
392 392 replycaps = None
393 393 if self.op.reply is not None:
394 394 salvaged = self.op.reply.salvageoutput()
395 395 replycaps = self.op.reply.capabilities
396 396 exc._replycaps = replycaps
397 397 exc._bundle2salvagedoutput = salvaged
398 398
399 399 # Re-raising from a variable loses the original stack. So only use
400 400 # that form if we need to.
401 401 if seekerror:
402 402 raise exc
403 403
404 404 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
405 405 self.count)
406 406
407 407 def processbundle(repo, unbundler, transactiongetter=None, op=None):
408 408 """This function process a bundle, apply effect to/from a repo
409 409
410 410 It iterates over each part then searches for and uses the proper handling
411 411 code to process the part. Parts are processed in order.
412 412
413 413 Unknown Mandatory part will abort the process.
414 414
415 415 It is temporarily possible to provide a prebuilt bundleoperation to the
416 416 function. This is used to ensure output is properly propagated in case of
417 417 an error during the unbundling. This output capturing part will likely be
418 418 reworked and this ability will probably go away in the process.
419 419 """
420 420 if op is None:
421 421 if transactiongetter is None:
422 422 transactiongetter = _notransaction
423 423 op = bundleoperation(repo, transactiongetter)
424 424 # todo:
425 425 # - replace this is a init function soon.
426 426 # - exception catching
427 427 unbundler.params
428 428 if repo.ui.debugflag:
429 429 msg = ['bundle2-input-bundle:']
430 430 if unbundler.params:
431 431 msg.append(' %i params' % len(unbundler.params))
432 432 if op._gettransaction is None or op._gettransaction is _notransaction:
433 433 msg.append(' no-transaction')
434 434 else:
435 435 msg.append(' with-transaction')
436 436 msg.append('\n')
437 437 repo.ui.debug(''.join(msg))
438 438
439 439 with partiterator(repo, op, unbundler) as parts:
440 440 for part in parts:
441 441 _processpart(op, part)
442 442
443 443 return op
444 444
445 445 def _processchangegroup(op, cg, tr, source, url, **kwargs):
446 446 ret = cg.apply(op.repo, tr, source, url, **kwargs)
447 447 op.records.add('changegroup', {
448 448 'return': ret,
449 449 })
450 450 return ret
451 451
452 452 def _processpart(op, part):
453 453 """process a single part from a bundle
454 454
455 455 The part is guaranteed to have been fully consumed when the function exits
456 456 (even if an exception is raised)."""
457 457 status = 'unknown' # used by debug output
458 458 hardabort = False
459 459 try:
460 460 try:
461 461 handler = parthandlermapping.get(part.type)
462 462 if handler is None:
463 463 status = 'unsupported-type'
464 464 raise error.BundleUnknownFeatureError(parttype=part.type)
465 465 indebug(op.ui, 'found a handler for part %r' % part.type)
466 466 unknownparams = part.mandatorykeys - handler.params
467 467 if unknownparams:
468 468 unknownparams = list(unknownparams)
469 469 unknownparams.sort()
470 470 status = 'unsupported-params (%s)' % unknownparams
471 471 raise error.BundleUnknownFeatureError(parttype=part.type,
472 472 params=unknownparams)
473 473 status = 'supported'
474 474 except error.BundleUnknownFeatureError as exc:
475 475 if part.mandatory: # mandatory parts
476 476 raise
477 477 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
478 478 return # skip to part processing
479 479 finally:
480 480 if op.ui.debugflag:
481 481 msg = ['bundle2-input-part: "%s"' % part.type]
482 482 if not part.mandatory:
483 483 msg.append(' (advisory)')
484 484 nbmp = len(part.mandatorykeys)
485 485 nbap = len(part.params) - nbmp
486 486 if nbmp or nbap:
487 487 msg.append(' (params:')
488 488 if nbmp:
489 489 msg.append(' %i mandatory' % nbmp)
490 490 if nbap:
491 491 msg.append(' %i advisory' % nbmp)
492 492 msg.append(')')
493 493 msg.append(' %s\n' % status)
494 494 op.ui.debug(''.join(msg))
495 495
496 496 # handler is called outside the above try block so that we don't
497 497 # risk catching KeyErrors from anything other than the
498 498 # parthandlermapping lookup (any KeyError raised by handler()
499 499 # itself represents a defect of a different variety).
500 500 output = None
501 501 if op.captureoutput and op.reply is not None:
502 502 op.ui.pushbuffer(error=True, subproc=True)
503 503 output = ''
504 504 try:
505 505 handler(op, part)
506 506 finally:
507 507 if output is not None:
508 508 output = op.ui.popbuffer()
509 509 if output:
510 510 outpart = op.reply.newpart('output', data=output,
511 511 mandatory=False)
512 512 outpart.addparam(
513 513 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
514 514 # If exiting or interrupted, do not attempt to seek the stream in the
515 515 # finally block below. This makes abort faster.
516 516 except (SystemExit, KeyboardInterrupt):
517 517 hardabort = True
518 518 raise
519 519 finally:
520 520 # consume the part content to not corrupt the stream.
521 521 if not hardabort:
522 522 part.seek(0, 2)
523 523
524 524
525 525 def decodecaps(blob):
526 526 """decode a bundle2 caps bytes blob into a dictionary
527 527
528 528 The blob is a list of capabilities (one per line)
529 529 Capabilities may have values using a line of the form::
530 530
531 531 capability=value1,value2,value3
532 532
533 533 The values are always a list."""
534 534 caps = {}
535 535 for line in blob.splitlines():
536 536 if not line:
537 537 continue
538 538 if '=' not in line:
539 539 key, vals = line, ()
540 540 else:
541 541 key, vals = line.split('=', 1)
542 542 vals = vals.split(',')
543 543 key = urlreq.unquote(key)
544 544 vals = [urlreq.unquote(v) for v in vals]
545 545 caps[key] = vals
546 546 return caps
547 547
548 548 def encodecaps(caps):
549 549 """encode a bundle2 caps dictionary into a bytes blob"""
550 550 chunks = []
551 551 for ca in sorted(caps):
552 552 vals = caps[ca]
553 553 ca = urlreq.quote(ca)
554 554 vals = [urlreq.quote(v) for v in vals]
555 555 if vals:
556 556 ca = "%s=%s" % (ca, ','.join(vals))
557 557 chunks.append(ca)
558 558 return '\n'.join(chunks)
559 559
560 560 bundletypes = {
561 561 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
562 562 # since the unification ssh accepts a header but there
563 563 # is no capability signaling it.
564 564 "HG20": (), # special-cased below
565 565 "HG10UN": ("HG10UN", 'UN'),
566 566 "HG10BZ": ("HG10", 'BZ'),
567 567 "HG10GZ": ("HG10GZ", 'GZ'),
568 568 }
569 569
570 570 # hgweb uses this list to communicate its preferred type
571 571 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
572 572
573 573 class bundle20(object):
574 574 """represent an outgoing bundle2 container
575 575
576 576 Use the `addparam` method to add stream level parameter. and `newpart` to
577 577 populate it. Then call `getchunks` to retrieve all the binary chunks of
578 578 data that compose the bundle2 container."""
579 579
580 580 _magicstring = 'HG20'
581 581
582 582 def __init__(self, ui, capabilities=()):
583 583 self.ui = ui
584 584 self._params = []
585 585 self._parts = []
586 586 self.capabilities = dict(capabilities)
587 587 self._compengine = util.compengines.forbundletype('UN')
588 588 self._compopts = None
589 589
590 590 def setcompression(self, alg, compopts=None):
591 591 """setup core part compression to <alg>"""
592 592 if alg in (None, 'UN'):
593 593 return
594 594 assert not any(n.lower() == 'compression' for n, v in self._params)
595 595 self.addparam('Compression', alg)
596 596 self._compengine = util.compengines.forbundletype(alg)
597 597 self._compopts = compopts
598 598
599 599 @property
600 600 def nbparts(self):
601 601 """total number of parts added to the bundler"""
602 602 return len(self._parts)
603 603
604 604 # methods used to defines the bundle2 content
605 605 def addparam(self, name, value=None):
606 606 """add a stream level parameter"""
607 607 if not name:
608 608 raise ValueError('empty parameter name')
609 609 if name[0] not in pycompat.bytestr(string.ascii_letters):
610 610 raise ValueError('non letter first character: %r' % name)
611 611 self._params.append((name, value))
612 612
613 613 def addpart(self, part):
614 614 """add a new part to the bundle2 container
615 615
616 616 Parts contains the actual applicative payload."""
617 617 assert part.id is None
618 618 part.id = len(self._parts) # very cheap counter
619 619 self._parts.append(part)
620 620
621 621 def newpart(self, typeid, *args, **kwargs):
622 622 """create a new part and add it to the containers
623 623
624 624 As the part is directly added to the containers. For now, this means
625 625 that any failure to properly initialize the part after calling
626 626 ``newpart`` should result in a failure of the whole bundling process.
627 627
628 628 You can still fall back to manually create and add if you need better
629 629 control."""
630 630 part = bundlepart(typeid, *args, **kwargs)
631 631 self.addpart(part)
632 632 return part
633 633
634 634 # methods used to generate the bundle2 stream
635 635 def getchunks(self):
636 636 if self.ui.debugflag:
637 637 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
638 638 if self._params:
639 639 msg.append(' (%i params)' % len(self._params))
640 640 msg.append(' %i parts total\n' % len(self._parts))
641 641 self.ui.debug(''.join(msg))
642 642 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
643 643 yield self._magicstring
644 644 param = self._paramchunk()
645 645 outdebug(self.ui, 'bundle parameter: %s' % param)
646 646 yield _pack(_fstreamparamsize, len(param))
647 647 if param:
648 648 yield param
649 649 for chunk in self._compengine.compressstream(self._getcorechunk(),
650 650 self._compopts):
651 651 yield chunk
652 652
653 653 def _paramchunk(self):
654 654 """return a encoded version of all stream parameters"""
655 655 blocks = []
656 656 for par, value in self._params:
657 657 par = urlreq.quote(par)
658 658 if value is not None:
659 659 value = urlreq.quote(value)
660 660 par = '%s=%s' % (par, value)
661 661 blocks.append(par)
662 662 return ' '.join(blocks)
663 663
664 664 def _getcorechunk(self):
665 665 """yield chunk for the core part of the bundle
666 666
667 667 (all but headers and parameters)"""
668 668 outdebug(self.ui, 'start of parts')
669 669 for part in self._parts:
670 670 outdebug(self.ui, 'bundle part: "%s"' % part.type)
671 671 for chunk in part.getchunks(ui=self.ui):
672 672 yield chunk
673 673 outdebug(self.ui, 'end of bundle')
674 674 yield _pack(_fpartheadersize, 0)
675 675
676 676
677 677 def salvageoutput(self):
678 678 """return a list with a copy of all output parts in the bundle
679 679
680 680 This is meant to be used during error handling to make sure we preserve
681 681 server output"""
682 682 salvaged = []
683 683 for part in self._parts:
684 684 if part.type.startswith('output'):
685 685 salvaged.append(part.copy())
686 686 return salvaged
687 687
688 688
689 689 class unpackermixin(object):
690 690 """A mixin to extract bytes and struct data from a stream"""
691 691
692 692 def __init__(self, fp):
693 693 self._fp = fp
694 694
695 695 def _unpack(self, format):
696 696 """unpack this struct format from the stream
697 697
698 698 This method is meant for internal usage by the bundle2 protocol only.
699 699 They directly manipulate the low level stream including bundle2 level
700 700 instruction.
701 701
702 702 Do not use it to implement higher-level logic or methods."""
703 703 data = self._readexact(struct.calcsize(format))
704 704 return _unpack(format, data)
705 705
706 706 def _readexact(self, size):
707 707 """read exactly <size> bytes from the stream
708 708
709 709 This method is meant for internal usage by the bundle2 protocol only.
710 710 They directly manipulate the low level stream including bundle2 level
711 711 instruction.
712 712
713 713 Do not use it to implement higher-level logic or methods."""
714 714 return changegroup.readexactly(self._fp, size)
715 715
716 716 def getunbundler(ui, fp, magicstring=None):
717 717 """return a valid unbundler object for a given magicstring"""
718 718 if magicstring is None:
719 719 magicstring = changegroup.readexactly(fp, 4)
720 720 magic, version = magicstring[0:2], magicstring[2:4]
721 721 if magic != 'HG':
722 722 ui.debug(
723 723 "error: invalid magic: %r (version %r), should be 'HG'\n"
724 724 % (magic, version))
725 725 raise error.Abort(_('not a Mercurial bundle'))
726 726 unbundlerclass = formatmap.get(version)
727 727 if unbundlerclass is None:
728 728 raise error.Abort(_('unknown bundle version %s') % version)
729 729 unbundler = unbundlerclass(ui, fp)
730 730 indebug(ui, 'start processing of %s stream' % magicstring)
731 731 return unbundler
732 732
733 733 class unbundle20(unpackermixin):
734 734 """interpret a bundle2 stream
735 735
736 736 This class is fed with a binary stream and yields parts through its
737 737 `iterparts` methods."""
738 738
739 739 _magicstring = 'HG20'
740 740
741 741 def __init__(self, ui, fp):
742 742 """If header is specified, we do not read it out of the stream."""
743 743 self.ui = ui
744 744 self._compengine = util.compengines.forbundletype('UN')
745 745 self._compressed = None
746 746 super(unbundle20, self).__init__(fp)
747 747
748 748 @util.propertycache
749 749 def params(self):
750 750 """dictionary of stream level parameters"""
751 751 indebug(self.ui, 'reading bundle2 stream parameters')
752 752 params = {}
753 753 paramssize = self._unpack(_fstreamparamsize)[0]
754 754 if paramssize < 0:
755 755 raise error.BundleValueError('negative bundle param size: %i'
756 756 % paramssize)
757 757 if paramssize:
758 758 params = self._readexact(paramssize)
759 759 params = self._processallparams(params)
760 760 return params
761 761
762 762 def _processallparams(self, paramsblock):
763 763 """"""
764 764 params = util.sortdict()
765 765 for p in paramsblock.split(' '):
766 766 p = p.split('=', 1)
767 767 p = [urlreq.unquote(i) for i in p]
768 768 if len(p) < 2:
769 769 p.append(None)
770 770 self._processparam(*p)
771 771 params[p[0]] = p[1]
772 772 return params
773 773
774 774
775 775 def _processparam(self, name, value):
776 776 """process a parameter, applying its effect if needed
777 777
778 778 Parameter starting with a lower case letter are advisory and will be
779 779 ignored when unknown. Those starting with an upper case letter are
780 780 mandatory and will this function will raise a KeyError when unknown.
781 781
782 782 Note: no option are currently supported. Any input will be either
783 783 ignored or failing.
784 784 """
785 785 if not name:
786 786 raise ValueError('empty parameter name')
787 787 if name[0] not in pycompat.bytestr(string.ascii_letters):
788 788 raise ValueError('non letter first character: %r' % name)
789 789 try:
790 790 handler = b2streamparamsmap[name.lower()]
791 791 except KeyError:
792 792 if name[0].islower():
793 793 indebug(self.ui, "ignoring unknown parameter %r" % name)
794 794 else:
795 795 raise error.BundleUnknownFeatureError(params=(name,))
796 796 else:
797 797 handler(self, name, value)
798 798
799 799 def _forwardchunks(self):
800 800 """utility to transfer a bundle2 as binary
801 801
802 802 This is made necessary by the fact the 'getbundle' command over 'ssh'
803 803 have no way to know then the reply end, relying on the bundle to be
804 804 interpreted to know its end. This is terrible and we are sorry, but we
805 805 needed to move forward to get general delta enabled.
806 806 """
807 807 yield self._magicstring
808 808 assert 'params' not in vars(self)
809 809 paramssize = self._unpack(_fstreamparamsize)[0]
810 810 if paramssize < 0:
811 811 raise error.BundleValueError('negative bundle param size: %i'
812 812 % paramssize)
813 813 yield _pack(_fstreamparamsize, paramssize)
814 814 if paramssize:
815 815 params = self._readexact(paramssize)
816 816 self._processallparams(params)
817 817 yield params
818 818 assert self._compengine.bundletype == 'UN'
819 819 # From there, payload might need to be decompressed
820 820 self._fp = self._compengine.decompressorreader(self._fp)
821 821 emptycount = 0
822 822 while emptycount < 2:
823 823 # so we can brainlessly loop
824 824 assert _fpartheadersize == _fpayloadsize
825 825 size = self._unpack(_fpartheadersize)[0]
826 826 yield _pack(_fpartheadersize, size)
827 827 if size:
828 828 emptycount = 0
829 829 else:
830 830 emptycount += 1
831 831 continue
832 832 if size == flaginterrupt:
833 833 continue
834 834 elif size < 0:
835 835 raise error.BundleValueError('negative chunk size: %i')
836 836 yield self._readexact(size)
837 837
838 838
839 839 def iterparts(self):
840 840 """yield all parts contained in the stream"""
841 841 # make sure param have been loaded
842 842 self.params
843 843 # From there, payload need to be decompressed
844 844 self._fp = self._compengine.decompressorreader(self._fp)
845 845 indebug(self.ui, 'start extraction of bundle2 parts')
846 846 headerblock = self._readpartheader()
847 847 while headerblock is not None:
848 848 part = unbundlepart(self.ui, headerblock, self._fp)
849 849 yield part
850 850 # Seek to the end of the part to force it's consumption so the next
851 851 # part can be read. But then seek back to the beginning so the
852 852 # code consuming this generator has a part that starts at 0.
853 853 part.seek(0, 2)
854 854 part.seek(0)
855 855 headerblock = self._readpartheader()
856 856 indebug(self.ui, 'end of bundle2 stream')
857 857
858 858 def _readpartheader(self):
859 859 """reads a part header size and return the bytes blob
860 860
861 861 returns None if empty"""
862 862 headersize = self._unpack(_fpartheadersize)[0]
863 863 if headersize < 0:
864 864 raise error.BundleValueError('negative part header size: %i'
865 865 % headersize)
866 866 indebug(self.ui, 'part header size: %i' % headersize)
867 867 if headersize:
868 868 return self._readexact(headersize)
869 869 return None
870 870
871 871 def compressed(self):
872 872 self.params # load params
873 873 return self._compressed
874 874
875 875 def close(self):
876 876 """close underlying file"""
877 877 if util.safehasattr(self._fp, 'close'):
878 878 return self._fp.close()
879 879
880 880 formatmap = {'20': unbundle20}
881 881
882 882 b2streamparamsmap = {}
883 883
884 884 def b2streamparamhandler(name):
885 885 """register a handler for a stream level parameter"""
886 886 def decorator(func):
887 887 assert name not in formatmap
888 888 b2streamparamsmap[name] = func
889 889 return func
890 890 return decorator
891 891
892 892 @b2streamparamhandler('compression')
893 893 def processcompression(unbundler, param, value):
894 894 """read compression parameter and install payload decompression"""
895 895 if value not in util.compengines.supportedbundletypes:
896 896 raise error.BundleUnknownFeatureError(params=(param,),
897 897 values=(value,))
898 898 unbundler._compengine = util.compengines.forbundletype(value)
899 899 if value is not None:
900 900 unbundler._compressed = True
901 901
902 902 class bundlepart(object):
903 903 """A bundle2 part contains application level payload
904 904
905 905 The part `type` is used to route the part to the application level
906 906 handler.
907 907
908 908 The part payload is contained in ``part.data``. It could be raw bytes or a
909 909 generator of byte chunks.
910 910
911 911 You can add parameters to the part using the ``addparam`` method.
912 912 Parameters can be either mandatory (default) or advisory. Remote side
913 913 should be able to safely ignore the advisory ones.
914 914
915 915 Both data and parameters cannot be modified after the generation has begun.
916 916 """
917 917
918 918 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
919 919 data='', mandatory=True):
920 920 validateparttype(parttype)
921 921 self.id = None
922 922 self.type = parttype
923 923 self._data = data
924 924 self._mandatoryparams = list(mandatoryparams)
925 925 self._advisoryparams = list(advisoryparams)
926 926 # checking for duplicated entries
927 927 self._seenparams = set()
928 928 for pname, __ in self._mandatoryparams + self._advisoryparams:
929 929 if pname in self._seenparams:
930 930 raise error.ProgrammingError('duplicated params: %s' % pname)
931 931 self._seenparams.add(pname)
932 932 # status of the part's generation:
933 933 # - None: not started,
934 934 # - False: currently generated,
935 935 # - True: generation done.
936 936 self._generated = None
937 937 self.mandatory = mandatory
938 938
939 939 def __repr__(self):
940 940 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
941 941 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
942 942 % (cls, id(self), self.id, self.type, self.mandatory))
943 943
944 944 def copy(self):
945 945 """return a copy of the part
946 946
947 947 The new part have the very same content but no partid assigned yet.
948 948 Parts with generated data cannot be copied."""
949 949 assert not util.safehasattr(self.data, 'next')
950 950 return self.__class__(self.type, self._mandatoryparams,
951 951 self._advisoryparams, self._data, self.mandatory)
952 952
953 953 # methods used to defines the part content
954 954 @property
955 955 def data(self):
956 956 return self._data
957 957
958 958 @data.setter
959 959 def data(self, data):
960 960 if self._generated is not None:
961 961 raise error.ReadOnlyPartError('part is being generated')
962 962 self._data = data
963 963
964 964 @property
965 965 def mandatoryparams(self):
966 966 # make it an immutable tuple to force people through ``addparam``
967 967 return tuple(self._mandatoryparams)
968 968
969 969 @property
970 970 def advisoryparams(self):
971 971 # make it an immutable tuple to force people through ``addparam``
972 972 return tuple(self._advisoryparams)
973 973
974 974 def addparam(self, name, value='', mandatory=True):
975 975 """add a parameter to the part
976 976
977 977 If 'mandatory' is set to True, the remote handler must claim support
978 978 for this parameter or the unbundling will be aborted.
979 979
980 980 The 'name' and 'value' cannot exceed 255 bytes each.
981 981 """
982 982 if self._generated is not None:
983 983 raise error.ReadOnlyPartError('part is being generated')
984 984 if name in self._seenparams:
985 985 raise ValueError('duplicated params: %s' % name)
986 986 self._seenparams.add(name)
987 987 params = self._advisoryparams
988 988 if mandatory:
989 989 params = self._mandatoryparams
990 990 params.append((name, value))
991 991
992 992 # methods used to generates the bundle2 stream
993 993 def getchunks(self, ui):
994 994 if self._generated is not None:
995 995 raise error.ProgrammingError('part can only be consumed once')
996 996 self._generated = False
997 997
998 998 if ui.debugflag:
999 999 msg = ['bundle2-output-part: "%s"' % self.type]
1000 1000 if not self.mandatory:
1001 1001 msg.append(' (advisory)')
1002 1002 nbmp = len(self.mandatoryparams)
1003 1003 nbap = len(self.advisoryparams)
1004 1004 if nbmp or nbap:
1005 1005 msg.append(' (params:')
1006 1006 if nbmp:
1007 1007 msg.append(' %i mandatory' % nbmp)
1008 1008 if nbap:
1009 1009 msg.append(' %i advisory' % nbmp)
1010 1010 msg.append(')')
1011 1011 if not self.data:
1012 1012 msg.append(' empty payload')
1013 1013 elif util.safehasattr(self.data, 'next'):
1014 1014 msg.append(' streamed payload')
1015 1015 else:
1016 1016 msg.append(' %i bytes payload' % len(self.data))
1017 1017 msg.append('\n')
1018 1018 ui.debug(''.join(msg))
1019 1019
1020 1020 #### header
1021 1021 if self.mandatory:
1022 1022 parttype = self.type.upper()
1023 1023 else:
1024 1024 parttype = self.type.lower()
1025 1025 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1026 1026 ## parttype
1027 1027 header = [_pack(_fparttypesize, len(parttype)),
1028 1028 parttype, _pack(_fpartid, self.id),
1029 1029 ]
1030 1030 ## parameters
1031 1031 # count
1032 1032 manpar = self.mandatoryparams
1033 1033 advpar = self.advisoryparams
1034 1034 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1035 1035 # size
1036 1036 parsizes = []
1037 1037 for key, value in manpar:
1038 1038 parsizes.append(len(key))
1039 1039 parsizes.append(len(value))
1040 1040 for key, value in advpar:
1041 1041 parsizes.append(len(key))
1042 1042 parsizes.append(len(value))
1043 1043 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1044 1044 header.append(paramsizes)
1045 1045 # key, value
1046 1046 for key, value in manpar:
1047 1047 header.append(key)
1048 1048 header.append(value)
1049 1049 for key, value in advpar:
1050 1050 header.append(key)
1051 1051 header.append(value)
1052 1052 ## finalize header
1053 headerchunk = ''.join(header)
1053 try:
1054 headerchunk = ''.join(header)
1055 except TypeError:
1056 raise TypeError(r'Found a non-bytes trying to '
1057 r'build bundle part header: %r' % header)
1054 1058 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1055 1059 yield _pack(_fpartheadersize, len(headerchunk))
1056 1060 yield headerchunk
1057 1061 ## payload
1058 1062 try:
1059 1063 for chunk in self._payloadchunks():
1060 1064 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1061 1065 yield _pack(_fpayloadsize, len(chunk))
1062 1066 yield chunk
1063 1067 except GeneratorExit:
1064 1068 # GeneratorExit means that nobody is listening for our
1065 1069 # results anyway, so just bail quickly rather than trying
1066 1070 # to produce an error part.
1067 1071 ui.debug('bundle2-generatorexit\n')
1068 1072 raise
1069 1073 except BaseException as exc:
1070 1074 bexc = util.forcebytestr(exc)
1071 1075 # backup exception data for later
1072 1076 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1073 1077 % bexc)
1074 1078 tb = sys.exc_info()[2]
1075 1079 msg = 'unexpected error: %s' % bexc
1076 1080 interpart = bundlepart('error:abort', [('message', msg)],
1077 1081 mandatory=False)
1078 1082 interpart.id = 0
1079 1083 yield _pack(_fpayloadsize, -1)
1080 1084 for chunk in interpart.getchunks(ui=ui):
1081 1085 yield chunk
1082 1086 outdebug(ui, 'closing payload chunk')
1083 1087 # abort current part payload
1084 1088 yield _pack(_fpayloadsize, 0)
1085 1089 pycompat.raisewithtb(exc, tb)
1086 1090 # end of payload
1087 1091 outdebug(ui, 'closing payload chunk')
1088 1092 yield _pack(_fpayloadsize, 0)
1089 1093 self._generated = True
1090 1094
1091 1095 def _payloadchunks(self):
1092 1096 """yield chunks of a the part payload
1093 1097
1094 1098 Exists to handle the different methods to provide data to a part."""
1095 1099 # we only support fixed size data now.
1096 1100 # This will be improved in the future.
1097 1101 if (util.safehasattr(self.data, 'next')
1098 1102 or util.safehasattr(self.data, '__next__')):
1099 1103 buff = util.chunkbuffer(self.data)
1100 1104 chunk = buff.read(preferedchunksize)
1101 1105 while chunk:
1102 1106 yield chunk
1103 1107 chunk = buff.read(preferedchunksize)
1104 1108 elif len(self.data):
1105 1109 yield self.data
1106 1110
1107 1111
1108 1112 flaginterrupt = -1
1109 1113
1110 1114 class interrupthandler(unpackermixin):
1111 1115 """read one part and process it with restricted capability
1112 1116
1113 1117 This allows to transmit exception raised on the producer size during part
1114 1118 iteration while the consumer is reading a part.
1115 1119
1116 1120 Part processed in this manner only have access to a ui object,"""
1117 1121
1118 1122 def __init__(self, ui, fp):
1119 1123 super(interrupthandler, self).__init__(fp)
1120 1124 self.ui = ui
1121 1125
1122 1126 def _readpartheader(self):
1123 1127 """reads a part header size and return the bytes blob
1124 1128
1125 1129 returns None if empty"""
1126 1130 headersize = self._unpack(_fpartheadersize)[0]
1127 1131 if headersize < 0:
1128 1132 raise error.BundleValueError('negative part header size: %i'
1129 1133 % headersize)
1130 1134 indebug(self.ui, 'part header size: %i\n' % headersize)
1131 1135 if headersize:
1132 1136 return self._readexact(headersize)
1133 1137 return None
1134 1138
1135 1139 def __call__(self):
1136 1140
1137 1141 self.ui.debug('bundle2-input-stream-interrupt:'
1138 1142 ' opening out of band context\n')
1139 1143 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1140 1144 headerblock = self._readpartheader()
1141 1145 if headerblock is None:
1142 1146 indebug(self.ui, 'no part found during interruption.')
1143 1147 return
1144 1148 part = unbundlepart(self.ui, headerblock, self._fp)
1145 1149 op = interruptoperation(self.ui)
1146 1150 _processpart(op, part)
1147 1151 self.ui.debug('bundle2-input-stream-interrupt:'
1148 1152 ' closing out of band context\n')
1149 1153
1150 1154 class interruptoperation(object):
1151 1155 """A limited operation to be use by part handler during interruption
1152 1156
1153 1157 It only have access to an ui object.
1154 1158 """
1155 1159
1156 1160 def __init__(self, ui):
1157 1161 self.ui = ui
1158 1162 self.reply = None
1159 1163 self.captureoutput = False
1160 1164
1161 1165 @property
1162 1166 def repo(self):
1163 1167 raise error.ProgrammingError('no repo access from stream interruption')
1164 1168
1165 1169 def gettransaction(self):
1166 1170 raise TransactionUnavailable('no repo access from stream interruption')
1167 1171
1168 1172 class unbundlepart(unpackermixin):
1169 1173 """a bundle part read from a bundle"""
1170 1174
1171 1175 def __init__(self, ui, header, fp):
1172 1176 super(unbundlepart, self).__init__(fp)
1173 1177 self._seekable = (util.safehasattr(fp, 'seek') and
1174 1178 util.safehasattr(fp, 'tell'))
1175 1179 self.ui = ui
1176 1180 # unbundle state attr
1177 1181 self._headerdata = header
1178 1182 self._headeroffset = 0
1179 1183 self._initialized = False
1180 1184 self.consumed = False
1181 1185 # part data
1182 1186 self.id = None
1183 1187 self.type = None
1184 1188 self.mandatoryparams = None
1185 1189 self.advisoryparams = None
1186 1190 self.params = None
1187 1191 self.mandatorykeys = ()
1188 1192 self._payloadstream = None
1189 1193 self._readheader()
1190 1194 self._mandatory = None
1191 1195 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1192 1196 self._pos = 0
1193 1197
1194 1198 def _fromheader(self, size):
1195 1199 """return the next <size> byte from the header"""
1196 1200 offset = self._headeroffset
1197 1201 data = self._headerdata[offset:(offset + size)]
1198 1202 self._headeroffset = offset + size
1199 1203 return data
1200 1204
1201 1205 def _unpackheader(self, format):
1202 1206 """read given format from header
1203 1207
1204 1208 This automatically compute the size of the format to read."""
1205 1209 data = self._fromheader(struct.calcsize(format))
1206 1210 return _unpack(format, data)
1207 1211
1208 1212 def _initparams(self, mandatoryparams, advisoryparams):
1209 1213 """internal function to setup all logic related parameters"""
1210 1214 # make it read only to prevent people touching it by mistake.
1211 1215 self.mandatoryparams = tuple(mandatoryparams)
1212 1216 self.advisoryparams = tuple(advisoryparams)
1213 1217 # user friendly UI
1214 1218 self.params = util.sortdict(self.mandatoryparams)
1215 1219 self.params.update(self.advisoryparams)
1216 1220 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1217 1221
1218 1222 def _payloadchunks(self, chunknum=0):
1219 1223 '''seek to specified chunk and start yielding data'''
1220 1224 if len(self._chunkindex) == 0:
1221 1225 assert chunknum == 0, 'Must start with chunk 0'
1222 1226 self._chunkindex.append((0, self._tellfp()))
1223 1227 else:
1224 1228 assert chunknum < len(self._chunkindex), \
1225 1229 'Unknown chunk %d' % chunknum
1226 1230 self._seekfp(self._chunkindex[chunknum][1])
1227 1231
1228 1232 pos = self._chunkindex[chunknum][0]
1229 1233 payloadsize = self._unpack(_fpayloadsize)[0]
1230 1234 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1231 1235 while payloadsize:
1232 1236 if payloadsize == flaginterrupt:
1233 1237 # interruption detection, the handler will now read a
1234 1238 # single part and process it.
1235 1239 interrupthandler(self.ui, self._fp)()
1236 1240 elif payloadsize < 0:
1237 1241 msg = 'negative payload chunk size: %i' % payloadsize
1238 1242 raise error.BundleValueError(msg)
1239 1243 else:
1240 1244 result = self._readexact(payloadsize)
1241 1245 chunknum += 1
1242 1246 pos += payloadsize
1243 1247 if chunknum == len(self._chunkindex):
1244 1248 self._chunkindex.append((pos, self._tellfp()))
1245 1249 yield result
1246 1250 payloadsize = self._unpack(_fpayloadsize)[0]
1247 1251 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1248 1252
1249 1253 def _findchunk(self, pos):
1250 1254 '''for a given payload position, return a chunk number and offset'''
1251 1255 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1252 1256 if ppos == pos:
1253 1257 return chunk, 0
1254 1258 elif ppos > pos:
1255 1259 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1256 1260 raise ValueError('Unknown chunk')
1257 1261
1258 1262 def _readheader(self):
1259 1263 """read the header and setup the object"""
1260 1264 typesize = self._unpackheader(_fparttypesize)[0]
1261 1265 self.type = self._fromheader(typesize)
1262 1266 indebug(self.ui, 'part type: "%s"' % self.type)
1263 1267 self.id = self._unpackheader(_fpartid)[0]
1264 1268 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1265 1269 # extract mandatory bit from type
1266 1270 self.mandatory = (self.type != self.type.lower())
1267 1271 self.type = self.type.lower()
1268 1272 ## reading parameters
1269 1273 # param count
1270 1274 mancount, advcount = self._unpackheader(_fpartparamcount)
1271 1275 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1272 1276 # param size
1273 1277 fparamsizes = _makefpartparamsizes(mancount + advcount)
1274 1278 paramsizes = self._unpackheader(fparamsizes)
1275 1279 # make it a list of couple again
1276 1280 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1277 1281 # split mandatory from advisory
1278 1282 mansizes = paramsizes[:mancount]
1279 1283 advsizes = paramsizes[mancount:]
1280 1284 # retrieve param value
1281 1285 manparams = []
1282 1286 for key, value in mansizes:
1283 1287 manparams.append((self._fromheader(key), self._fromheader(value)))
1284 1288 advparams = []
1285 1289 for key, value in advsizes:
1286 1290 advparams.append((self._fromheader(key), self._fromheader(value)))
1287 1291 self._initparams(manparams, advparams)
1288 1292 ## part payload
1289 1293 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1290 1294 # we read the data, tell it
1291 1295 self._initialized = True
1292 1296
1293 1297 def read(self, size=None):
1294 1298 """read payload data"""
1295 1299 if not self._initialized:
1296 1300 self._readheader()
1297 1301 if size is None:
1298 1302 data = self._payloadstream.read()
1299 1303 else:
1300 1304 data = self._payloadstream.read(size)
1301 1305 self._pos += len(data)
1302 1306 if size is None or len(data) < size:
1303 1307 if not self.consumed and self._pos:
1304 1308 self.ui.debug('bundle2-input-part: total payload size %i\n'
1305 1309 % self._pos)
1306 1310 self.consumed = True
1307 1311 return data
1308 1312
1309 1313 def tell(self):
1310 1314 return self._pos
1311 1315
1312 1316 def seek(self, offset, whence=0):
1313 1317 if whence == 0:
1314 1318 newpos = offset
1315 1319 elif whence == 1:
1316 1320 newpos = self._pos + offset
1317 1321 elif whence == 2:
1318 1322 if not self.consumed:
1319 1323 self.read()
1320 1324 newpos = self._chunkindex[-1][0] - offset
1321 1325 else:
1322 1326 raise ValueError('Unknown whence value: %r' % (whence,))
1323 1327
1324 1328 if newpos > self._chunkindex[-1][0] and not self.consumed:
1325 1329 self.read()
1326 1330 if not 0 <= newpos <= self._chunkindex[-1][0]:
1327 1331 raise ValueError('Offset out of range')
1328 1332
1329 1333 if self._pos != newpos:
1330 1334 chunk, internaloffset = self._findchunk(newpos)
1331 1335 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1332 1336 adjust = self.read(internaloffset)
1333 1337 if len(adjust) != internaloffset:
1334 1338 raise error.Abort(_('Seek failed\n'))
1335 1339 self._pos = newpos
1336 1340
1337 1341 def _seekfp(self, offset, whence=0):
1338 1342 """move the underlying file pointer
1339 1343
1340 1344 This method is meant for internal usage by the bundle2 protocol only.
1341 1345 They directly manipulate the low level stream including bundle2 level
1342 1346 instruction.
1343 1347
1344 1348 Do not use it to implement higher-level logic or methods."""
1345 1349 if self._seekable:
1346 1350 return self._fp.seek(offset, whence)
1347 1351 else:
1348 1352 raise NotImplementedError(_('File pointer is not seekable'))
1349 1353
1350 1354 def _tellfp(self):
1351 1355 """return the file offset, or None if file is not seekable
1352 1356
1353 1357 This method is meant for internal usage by the bundle2 protocol only.
1354 1358 They directly manipulate the low level stream including bundle2 level
1355 1359 instruction.
1356 1360
1357 1361 Do not use it to implement higher-level logic or methods."""
1358 1362 if self._seekable:
1359 1363 try:
1360 1364 return self._fp.tell()
1361 1365 except IOError as e:
1362 1366 if e.errno == errno.ESPIPE:
1363 1367 self._seekable = False
1364 1368 else:
1365 1369 raise
1366 1370 return None
1367 1371
1368 1372 # These are only the static capabilities.
1369 1373 # Check the 'getrepocaps' function for the rest.
1370 1374 capabilities = {'HG20': (),
1371 1375 'error': ('abort', 'unsupportedcontent', 'pushraced',
1372 1376 'pushkey'),
1373 1377 'listkeys': (),
1374 1378 'pushkey': (),
1375 1379 'digests': tuple(sorted(util.DIGESTS.keys())),
1376 1380 'remote-changegroup': ('http', 'https'),
1377 1381 'hgtagsfnodes': (),
1378 1382 }
1379 1383
1380 1384 def getrepocaps(repo, allowpushback=False):
1381 1385 """return the bundle2 capabilities for a given repo
1382 1386
1383 1387 Exists to allow extensions (like evolution) to mutate the capabilities.
1384 1388 """
1385 1389 caps = capabilities.copy()
1386 1390 caps['changegroup'] = tuple(sorted(
1387 1391 changegroup.supportedincomingversions(repo)))
1388 1392 if obsolete.isenabled(repo, obsolete.exchangeopt):
1389 1393 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1390 1394 caps['obsmarkers'] = supportedformat
1391 1395 if allowpushback:
1392 1396 caps['pushback'] = ()
1393 1397 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1394 1398 if cpmode == 'check-related':
1395 1399 caps['checkheads'] = ('related',)
1396 1400 return caps
1397 1401
1398 1402 def bundle2caps(remote):
1399 1403 """return the bundle capabilities of a peer as dict"""
1400 1404 raw = remote.capable('bundle2')
1401 1405 if not raw and raw != '':
1402 1406 return {}
1403 1407 capsblob = urlreq.unquote(remote.capable('bundle2'))
1404 1408 return decodecaps(capsblob)
1405 1409
1406 1410 def obsmarkersversion(caps):
1407 1411 """extract the list of supported obsmarkers versions from a bundle2caps dict
1408 1412 """
1409 1413 obscaps = caps.get('obsmarkers', ())
1410 1414 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1411 1415
1412 1416 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1413 1417 vfs=None, compression=None, compopts=None):
1414 1418 if bundletype.startswith('HG10'):
1415 1419 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1416 1420 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1417 1421 compression=compression, compopts=compopts)
1418 1422 elif not bundletype.startswith('HG20'):
1419 1423 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1420 1424
1421 1425 caps = {}
1422 1426 if 'obsolescence' in opts:
1423 1427 caps['obsmarkers'] = ('V1',)
1424 1428 bundle = bundle20(ui, caps)
1425 1429 bundle.setcompression(compression, compopts)
1426 1430 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1427 1431 chunkiter = bundle.getchunks()
1428 1432
1429 1433 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1430 1434
1431 1435 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1432 1436 # We should eventually reconcile this logic with the one behind
1433 1437 # 'exchange.getbundle2partsgenerator'.
1434 1438 #
1435 1439 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1436 1440 # different right now. So we keep them separated for now for the sake of
1437 1441 # simplicity.
1438 1442
1439 1443 # we always want a changegroup in such bundle
1440 1444 cgversion = opts.get('cg.version')
1441 1445 if cgversion is None:
1442 1446 cgversion = changegroup.safeversion(repo)
1443 1447 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1444 1448 part = bundler.newpart('changegroup', data=cg.getchunks())
1445 1449 part.addparam('version', cg.version)
1446 1450 if 'clcount' in cg.extras:
1447 1451 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1448 1452 mandatory=False)
1449 1453 if opts.get('phases') and repo.revs('%ln and secret()',
1450 1454 outgoing.missingheads):
1451 1455 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1452 1456
1453 1457 addparttagsfnodescache(repo, bundler, outgoing)
1454 1458
1455 1459 if opts.get('obsolescence', False):
1456 1460 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1457 1461 buildobsmarkerspart(bundler, obsmarkers)
1458 1462
1459 1463 if opts.get('phases', False):
1460 1464 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1461 1465 phasedata = []
1462 1466 for phase in phases.allphases:
1463 1467 for head in headsbyphase[phase]:
1464 1468 phasedata.append(_pack(_fphasesentry, phase, head))
1465 1469 bundler.newpart('phase-heads', data=''.join(phasedata))
1466 1470
1467 1471 def addparttagsfnodescache(repo, bundler, outgoing):
1468 1472 # we include the tags fnode cache for the bundle changeset
1469 1473 # (as an optional parts)
1470 1474 cache = tags.hgtagsfnodescache(repo.unfiltered())
1471 1475 chunks = []
1472 1476
1473 1477 # .hgtags fnodes are only relevant for head changesets. While we could
1474 1478 # transfer values for all known nodes, there will likely be little to
1475 1479 # no benefit.
1476 1480 #
1477 1481 # We don't bother using a generator to produce output data because
1478 1482 # a) we only have 40 bytes per head and even esoteric numbers of heads
1479 1483 # consume little memory (1M heads is 40MB) b) we don't want to send the
1480 1484 # part if we don't have entries and knowing if we have entries requires
1481 1485 # cache lookups.
1482 1486 for node in outgoing.missingheads:
1483 1487 # Don't compute missing, as this may slow down serving.
1484 1488 fnode = cache.getfnode(node, computemissing=False)
1485 1489 if fnode is not None:
1486 1490 chunks.extend([node, fnode])
1487 1491
1488 1492 if chunks:
1489 1493 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1490 1494
1491 1495 def buildobsmarkerspart(bundler, markers):
1492 1496 """add an obsmarker part to the bundler with <markers>
1493 1497
1494 1498 No part is created if markers is empty.
1495 1499 Raises ValueError if the bundler doesn't support any known obsmarker format.
1496 1500 """
1497 1501 if not markers:
1498 1502 return None
1499 1503
1500 1504 remoteversions = obsmarkersversion(bundler.capabilities)
1501 1505 version = obsolete.commonversion(remoteversions)
1502 1506 if version is None:
1503 1507 raise ValueError('bundler does not support common obsmarker format')
1504 1508 stream = obsolete.encodemarkers(markers, True, version=version)
1505 1509 return bundler.newpart('obsmarkers', data=stream)
1506 1510
1507 1511 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1508 1512 compopts=None):
1509 1513 """Write a bundle file and return its filename.
1510 1514
1511 1515 Existing files will not be overwritten.
1512 1516 If no filename is specified, a temporary file is created.
1513 1517 bz2 compression can be turned off.
1514 1518 The bundle file will be deleted in case of errors.
1515 1519 """
1516 1520
1517 1521 if bundletype == "HG20":
1518 1522 bundle = bundle20(ui)
1519 1523 bundle.setcompression(compression, compopts)
1520 1524 part = bundle.newpart('changegroup', data=cg.getchunks())
1521 1525 part.addparam('version', cg.version)
1522 1526 if 'clcount' in cg.extras:
1523 1527 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1524 1528 mandatory=False)
1525 1529 chunkiter = bundle.getchunks()
1526 1530 else:
1527 1531 # compression argument is only for the bundle2 case
1528 1532 assert compression is None
1529 1533 if cg.version != '01':
1530 1534 raise error.Abort(_('old bundle types only supports v1 '
1531 1535 'changegroups'))
1532 1536 header, comp = bundletypes[bundletype]
1533 1537 if comp not in util.compengines.supportedbundletypes:
1534 1538 raise error.Abort(_('unknown stream compression type: %s')
1535 1539 % comp)
1536 1540 compengine = util.compengines.forbundletype(comp)
1537 1541 def chunkiter():
1538 1542 yield header
1539 1543 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1540 1544 yield chunk
1541 1545 chunkiter = chunkiter()
1542 1546
1543 1547 # parse the changegroup data, otherwise we will block
1544 1548 # in case of sshrepo because we don't know the end of the stream
1545 1549 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1546 1550
1547 1551 def combinechangegroupresults(op):
1548 1552 """logic to combine 0 or more addchangegroup results into one"""
1549 1553 results = [r.get('return', 0)
1550 1554 for r in op.records['changegroup']]
1551 1555 changedheads = 0
1552 1556 result = 1
1553 1557 for ret in results:
1554 1558 # If any changegroup result is 0, return 0
1555 1559 if ret == 0:
1556 1560 result = 0
1557 1561 break
1558 1562 if ret < -1:
1559 1563 changedheads += ret + 1
1560 1564 elif ret > 1:
1561 1565 changedheads += ret - 1
1562 1566 if changedheads > 0:
1563 1567 result = 1 + changedheads
1564 1568 elif changedheads < 0:
1565 1569 result = -1 + changedheads
1566 1570 return result
1567 1571
1568 1572 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1569 1573 'targetphase'))
1570 1574 def handlechangegroup(op, inpart):
1571 1575 """apply a changegroup part on the repo
1572 1576
1573 1577 This is a very early implementation that will massive rework before being
1574 1578 inflicted to any end-user.
1575 1579 """
1576 1580 tr = op.gettransaction()
1577 1581 unpackerversion = inpart.params.get('version', '01')
1578 1582 # We should raise an appropriate exception here
1579 1583 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1580 1584 # the source and url passed here are overwritten by the one contained in
1581 1585 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1582 1586 nbchangesets = None
1583 1587 if 'nbchanges' in inpart.params:
1584 1588 nbchangesets = int(inpart.params.get('nbchanges'))
1585 1589 if ('treemanifest' in inpart.params and
1586 1590 'treemanifest' not in op.repo.requirements):
1587 1591 if len(op.repo.changelog) != 0:
1588 1592 raise error.Abort(_(
1589 1593 "bundle contains tree manifests, but local repo is "
1590 1594 "non-empty and does not use tree manifests"))
1591 1595 op.repo.requirements.add('treemanifest')
1592 1596 op.repo._applyopenerreqs()
1593 1597 op.repo._writerequirements()
1594 1598 extrakwargs = {}
1595 1599 targetphase = inpart.params.get('targetphase')
1596 1600 if targetphase is not None:
1597 1601 extrakwargs['targetphase'] = int(targetphase)
1598 1602 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1599 1603 expectedtotal=nbchangesets, **extrakwargs)
1600 1604 if op.reply is not None:
1601 1605 # This is definitely not the final form of this
1602 1606 # return. But one need to start somewhere.
1603 1607 part = op.reply.newpart('reply:changegroup', mandatory=False)
1604 1608 part.addparam(
1605 1609 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1606 1610 part.addparam('return', '%i' % ret, mandatory=False)
1607 1611 assert not inpart.read()
1608 1612
1609 1613 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1610 1614 ['digest:%s' % k for k in util.DIGESTS.keys()])
1611 1615 @parthandler('remote-changegroup', _remotechangegroupparams)
1612 1616 def handleremotechangegroup(op, inpart):
1613 1617 """apply a bundle10 on the repo, given an url and validation information
1614 1618
1615 1619 All the information about the remote bundle to import are given as
1616 1620 parameters. The parameters include:
1617 1621 - url: the url to the bundle10.
1618 1622 - size: the bundle10 file size. It is used to validate what was
1619 1623 retrieved by the client matches the server knowledge about the bundle.
1620 1624 - digests: a space separated list of the digest types provided as
1621 1625 parameters.
1622 1626 - digest:<digest-type>: the hexadecimal representation of the digest with
1623 1627 that name. Like the size, it is used to validate what was retrieved by
1624 1628 the client matches what the server knows about the bundle.
1625 1629
1626 1630 When multiple digest types are given, all of them are checked.
1627 1631 """
1628 1632 try:
1629 1633 raw_url = inpart.params['url']
1630 1634 except KeyError:
1631 1635 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1632 1636 parsed_url = util.url(raw_url)
1633 1637 if parsed_url.scheme not in capabilities['remote-changegroup']:
1634 1638 raise error.Abort(_('remote-changegroup does not support %s urls') %
1635 1639 parsed_url.scheme)
1636 1640
1637 1641 try:
1638 1642 size = int(inpart.params['size'])
1639 1643 except ValueError:
1640 1644 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1641 1645 % 'size')
1642 1646 except KeyError:
1643 1647 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1644 1648
1645 1649 digests = {}
1646 1650 for typ in inpart.params.get('digests', '').split():
1647 1651 param = 'digest:%s' % typ
1648 1652 try:
1649 1653 value = inpart.params[param]
1650 1654 except KeyError:
1651 1655 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1652 1656 param)
1653 1657 digests[typ] = value
1654 1658
1655 1659 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1656 1660
1657 1661 tr = op.gettransaction()
1658 1662 from . import exchange
1659 1663 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1660 1664 if not isinstance(cg, changegroup.cg1unpacker):
1661 1665 raise error.Abort(_('%s: not a bundle version 1.0') %
1662 1666 util.hidepassword(raw_url))
1663 1667 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1664 1668 if op.reply is not None:
1665 1669 # This is definitely not the final form of this
1666 1670 # return. But one need to start somewhere.
1667 1671 part = op.reply.newpart('reply:changegroup')
1668 1672 part.addparam(
1669 1673 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1670 1674 part.addparam('return', '%i' % ret, mandatory=False)
1671 1675 try:
1672 1676 real_part.validate()
1673 1677 except error.Abort as e:
1674 1678 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1675 1679 (util.hidepassword(raw_url), str(e)))
1676 1680 assert not inpart.read()
1677 1681
1678 1682 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1679 1683 def handlereplychangegroup(op, inpart):
1680 1684 ret = int(inpart.params['return'])
1681 1685 replyto = int(inpart.params['in-reply-to'])
1682 1686 op.records.add('changegroup', {'return': ret}, replyto)
1683 1687
1684 1688 @parthandler('check:heads')
1685 1689 def handlecheckheads(op, inpart):
1686 1690 """check that head of the repo did not change
1687 1691
1688 1692 This is used to detect a push race when using unbundle.
1689 1693 This replaces the "heads" argument of unbundle."""
1690 1694 h = inpart.read(20)
1691 1695 heads = []
1692 1696 while len(h) == 20:
1693 1697 heads.append(h)
1694 1698 h = inpart.read(20)
1695 1699 assert not h
1696 1700 # Trigger a transaction so that we are guaranteed to have the lock now.
1697 1701 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1698 1702 op.gettransaction()
1699 1703 if sorted(heads) != sorted(op.repo.heads()):
1700 1704 raise error.PushRaced('repository changed while pushing - '
1701 1705 'please try again')
1702 1706
1703 1707 @parthandler('check:updated-heads')
1704 1708 def handlecheckupdatedheads(op, inpart):
1705 1709 """check for race on the heads touched by a push
1706 1710
1707 1711 This is similar to 'check:heads' but focus on the heads actually updated
1708 1712 during the push. If other activities happen on unrelated heads, it is
1709 1713 ignored.
1710 1714
1711 1715 This allow server with high traffic to avoid push contention as long as
1712 1716 unrelated parts of the graph are involved."""
1713 1717 h = inpart.read(20)
1714 1718 heads = []
1715 1719 while len(h) == 20:
1716 1720 heads.append(h)
1717 1721 h = inpart.read(20)
1718 1722 assert not h
1719 1723 # trigger a transaction so that we are guaranteed to have the lock now.
1720 1724 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1721 1725 op.gettransaction()
1722 1726
1723 1727 currentheads = set()
1724 1728 for ls in op.repo.branchmap().itervalues():
1725 1729 currentheads.update(ls)
1726 1730
1727 1731 for h in heads:
1728 1732 if h not in currentheads:
1729 1733 raise error.PushRaced('repository changed while pushing - '
1730 1734 'please try again')
1731 1735
1732 1736 @parthandler('output')
1733 1737 def handleoutput(op, inpart):
1734 1738 """forward output captured on the server to the client"""
1735 1739 for line in inpart.read().splitlines():
1736 1740 op.ui.status(_('remote: %s\n') % line)
1737 1741
1738 1742 @parthandler('replycaps')
1739 1743 def handlereplycaps(op, inpart):
1740 1744 """Notify that a reply bundle should be created
1741 1745
1742 1746 The payload contains the capabilities information for the reply"""
1743 1747 caps = decodecaps(inpart.read())
1744 1748 if op.reply is None:
1745 1749 op.reply = bundle20(op.ui, caps)
1746 1750
1747 1751 class AbortFromPart(error.Abort):
1748 1752 """Sub-class of Abort that denotes an error from a bundle2 part."""
1749 1753
1750 1754 @parthandler('error:abort', ('message', 'hint'))
1751 1755 def handleerrorabort(op, inpart):
1752 1756 """Used to transmit abort error over the wire"""
1753 1757 raise AbortFromPart(inpart.params['message'],
1754 1758 hint=inpart.params.get('hint'))
1755 1759
1756 1760 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1757 1761 'in-reply-to'))
1758 1762 def handleerrorpushkey(op, inpart):
1759 1763 """Used to transmit failure of a mandatory pushkey over the wire"""
1760 1764 kwargs = {}
1761 1765 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1762 1766 value = inpart.params.get(name)
1763 1767 if value is not None:
1764 1768 kwargs[name] = value
1765 1769 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1766 1770
1767 1771 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1768 1772 def handleerrorunsupportedcontent(op, inpart):
1769 1773 """Used to transmit unknown content error over the wire"""
1770 1774 kwargs = {}
1771 1775 parttype = inpart.params.get('parttype')
1772 1776 if parttype is not None:
1773 1777 kwargs['parttype'] = parttype
1774 1778 params = inpart.params.get('params')
1775 1779 if params is not None:
1776 1780 kwargs['params'] = params.split('\0')
1777 1781
1778 1782 raise error.BundleUnknownFeatureError(**kwargs)
1779 1783
1780 1784 @parthandler('error:pushraced', ('message',))
1781 1785 def handleerrorpushraced(op, inpart):
1782 1786 """Used to transmit push race error over the wire"""
1783 1787 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1784 1788
1785 1789 @parthandler('listkeys', ('namespace',))
1786 1790 def handlelistkeys(op, inpart):
1787 1791 """retrieve pushkey namespace content stored in a bundle2"""
1788 1792 namespace = inpart.params['namespace']
1789 1793 r = pushkey.decodekeys(inpart.read())
1790 1794 op.records.add('listkeys', (namespace, r))
1791 1795
1792 1796 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1793 1797 def handlepushkey(op, inpart):
1794 1798 """process a pushkey request"""
1795 1799 dec = pushkey.decode
1796 1800 namespace = dec(inpart.params['namespace'])
1797 1801 key = dec(inpart.params['key'])
1798 1802 old = dec(inpart.params['old'])
1799 1803 new = dec(inpart.params['new'])
1800 1804 # Grab the transaction to ensure that we have the lock before performing the
1801 1805 # pushkey.
1802 1806 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1803 1807 op.gettransaction()
1804 1808 ret = op.repo.pushkey(namespace, key, old, new)
1805 1809 record = {'namespace': namespace,
1806 1810 'key': key,
1807 1811 'old': old,
1808 1812 'new': new}
1809 1813 op.records.add('pushkey', record)
1810 1814 if op.reply is not None:
1811 1815 rpart = op.reply.newpart('reply:pushkey')
1812 1816 rpart.addparam(
1813 1817 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1814 1818 rpart.addparam('return', '%i' % ret, mandatory=False)
1815 1819 if inpart.mandatory and not ret:
1816 1820 kwargs = {}
1817 1821 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1818 1822 if key in inpart.params:
1819 1823 kwargs[key] = inpart.params[key]
1820 1824 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1821 1825
1822 1826 def _readphaseheads(inpart):
1823 1827 headsbyphase = [[] for i in phases.allphases]
1824 1828 entrysize = struct.calcsize(_fphasesentry)
1825 1829 while True:
1826 1830 entry = inpart.read(entrysize)
1827 1831 if len(entry) < entrysize:
1828 1832 if entry:
1829 1833 raise error.Abort(_('bad phase-heads bundle part'))
1830 1834 break
1831 1835 phase, node = struct.unpack(_fphasesentry, entry)
1832 1836 headsbyphase[phase].append(node)
1833 1837 return headsbyphase
1834 1838
1835 1839 @parthandler('phase-heads')
1836 1840 def handlephases(op, inpart):
1837 1841 """apply phases from bundle part to repo"""
1838 1842 headsbyphase = _readphaseheads(inpart)
1839 1843 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1840 1844 op.records.add('phase-heads', {})
1841 1845
1842 1846 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1843 1847 def handlepushkeyreply(op, inpart):
1844 1848 """retrieve the result of a pushkey request"""
1845 1849 ret = int(inpart.params['return'])
1846 1850 partid = int(inpart.params['in-reply-to'])
1847 1851 op.records.add('pushkey', {'return': ret}, partid)
1848 1852
1849 1853 @parthandler('obsmarkers')
1850 1854 def handleobsmarker(op, inpart):
1851 1855 """add a stream of obsmarkers to the repo"""
1852 1856 tr = op.gettransaction()
1853 1857 markerdata = inpart.read()
1854 1858 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1855 1859 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1856 1860 % len(markerdata))
1857 1861 # The mergemarkers call will crash if marker creation is not enabled.
1858 1862 # we want to avoid this if the part is advisory.
1859 1863 if not inpart.mandatory and op.repo.obsstore.readonly:
1860 1864 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1861 1865 return
1862 1866 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1863 1867 op.repo.invalidatevolatilesets()
1864 1868 if new:
1865 1869 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1866 1870 op.records.add('obsmarkers', {'new': new})
1867 1871 if op.reply is not None:
1868 1872 rpart = op.reply.newpart('reply:obsmarkers')
1869 1873 rpart.addparam(
1870 1874 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1871 1875 rpart.addparam('new', '%i' % new, mandatory=False)
1872 1876
1873 1877
1874 1878 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1875 1879 def handleobsmarkerreply(op, inpart):
1876 1880 """retrieve the result of a pushkey request"""
1877 1881 ret = int(inpart.params['new'])
1878 1882 partid = int(inpart.params['in-reply-to'])
1879 1883 op.records.add('obsmarkers', {'new': ret}, partid)
1880 1884
1881 1885 @parthandler('hgtagsfnodes')
1882 1886 def handlehgtagsfnodes(op, inpart):
1883 1887 """Applies .hgtags fnodes cache entries to the local repo.
1884 1888
1885 1889 Payload is pairs of 20 byte changeset nodes and filenodes.
1886 1890 """
1887 1891 # Grab the transaction so we ensure that we have the lock at this point.
1888 1892 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1889 1893 op.gettransaction()
1890 1894 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1891 1895
1892 1896 count = 0
1893 1897 while True:
1894 1898 node = inpart.read(20)
1895 1899 fnode = inpart.read(20)
1896 1900 if len(node) < 20 or len(fnode) < 20:
1897 1901 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1898 1902 break
1899 1903 cache.setfnode(node, fnode)
1900 1904 count += 1
1901 1905
1902 1906 cache.write()
1903 1907 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1904 1908
1905 1909 @parthandler('pushvars')
1906 1910 def bundle2getvars(op, part):
1907 1911 '''unbundle a bundle2 containing shellvars on the server'''
1908 1912 # An option to disable unbundling on server-side for security reasons
1909 1913 if op.ui.configbool('push', 'pushvars.server'):
1910 1914 hookargs = {}
1911 1915 for key, value in part.advisoryparams:
1912 1916 key = key.upper()
1913 1917 # We want pushed variables to have USERVAR_ prepended so we know
1914 1918 # they came from the --pushvar flag.
1915 1919 key = "USERVAR_" + key
1916 1920 hookargs[key] = value
1917 1921 op.addhookargs(hookargs)
General Comments 0
You need to be logged in to leave comments. Login now