##// END OF EJS Templates
phase: introduce a new 'check:phases' part...
Boris Feld -
r34821:a95067b1 default
parent child Browse files
Show More
@@ -1,1923 +1,1945 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import, division
149 149
150 150 import errno
151 151 import re
152 152 import string
153 153 import struct
154 154 import sys
155 155
156 156 from .i18n import _
157 157 from . import (
158 158 changegroup,
159 159 error,
160 node as nodemod,
160 161 obsolete,
161 162 phases,
162 163 pushkey,
163 164 pycompat,
164 165 tags,
165 166 url,
166 167 util,
167 168 )
168 169
169 170 urlerr = util.urlerr
170 171 urlreq = util.urlreq
171 172
172 173 _pack = struct.pack
173 174 _unpack = struct.unpack
174 175
175 176 _fstreamparamsize = '>i'
176 177 _fpartheadersize = '>i'
177 178 _fparttypesize = '>B'
178 179 _fpartid = '>I'
179 180 _fpayloadsize = '>i'
180 181 _fpartparamcount = '>BB'
181 182
182 183 preferedchunksize = 4096
183 184
184 185 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
185 186
186 187 def outdebug(ui, message):
187 188 """debug regarding output stream (bundling)"""
188 189 if ui.configbool('devel', 'bundle2.debug'):
189 190 ui.debug('bundle2-output: %s\n' % message)
190 191
191 192 def indebug(ui, message):
192 193 """debug on input stream (unbundling)"""
193 194 if ui.configbool('devel', 'bundle2.debug'):
194 195 ui.debug('bundle2-input: %s\n' % message)
195 196
196 197 def validateparttype(parttype):
197 198 """raise ValueError if a parttype contains invalid character"""
198 199 if _parttypeforbidden.search(parttype):
199 200 raise ValueError(parttype)
200 201
201 202 def _makefpartparamsizes(nbparams):
202 203 """return a struct format to read part parameter sizes
203 204
204 205 The number parameters is variable so we need to build that format
205 206 dynamically.
206 207 """
207 208 return '>'+('BB'*nbparams)
208 209
209 210 parthandlermapping = {}
210 211
211 212 def parthandler(parttype, params=()):
212 213 """decorator that register a function as a bundle2 part handler
213 214
214 215 eg::
215 216
216 217 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
217 218 def myparttypehandler(...):
218 219 '''process a part of type "my part".'''
219 220 ...
220 221 """
221 222 validateparttype(parttype)
222 223 def _decorator(func):
223 224 lparttype = parttype.lower() # enforce lower case matching.
224 225 assert lparttype not in parthandlermapping
225 226 parthandlermapping[lparttype] = func
226 227 func.params = frozenset(params)
227 228 return func
228 229 return _decorator
229 230
230 231 class unbundlerecords(object):
231 232 """keep record of what happens during and unbundle
232 233
233 234 New records are added using `records.add('cat', obj)`. Where 'cat' is a
234 235 category of record and obj is an arbitrary object.
235 236
236 237 `records['cat']` will return all entries of this category 'cat'.
237 238
238 239 Iterating on the object itself will yield `('category', obj)` tuples
239 240 for all entries.
240 241
241 242 All iterations happens in chronological order.
242 243 """
243 244
244 245 def __init__(self):
245 246 self._categories = {}
246 247 self._sequences = []
247 248 self._replies = {}
248 249
249 250 def add(self, category, entry, inreplyto=None):
250 251 """add a new record of a given category.
251 252
252 253 The entry can then be retrieved in the list returned by
253 254 self['category']."""
254 255 self._categories.setdefault(category, []).append(entry)
255 256 self._sequences.append((category, entry))
256 257 if inreplyto is not None:
257 258 self.getreplies(inreplyto).add(category, entry)
258 259
259 260 def getreplies(self, partid):
260 261 """get the records that are replies to a specific part"""
261 262 return self._replies.setdefault(partid, unbundlerecords())
262 263
263 264 def __getitem__(self, cat):
264 265 return tuple(self._categories.get(cat, ()))
265 266
266 267 def __iter__(self):
267 268 return iter(self._sequences)
268 269
269 270 def __len__(self):
270 271 return len(self._sequences)
271 272
272 273 def __nonzero__(self):
273 274 return bool(self._sequences)
274 275
275 276 __bool__ = __nonzero__
276 277
277 278 class bundleoperation(object):
278 279 """an object that represents a single bundling process
279 280
280 281 Its purpose is to carry unbundle-related objects and states.
281 282
282 283 A new object should be created at the beginning of each bundle processing.
283 284 The object is to be returned by the processing function.
284 285
285 286 The object has very little content now it will ultimately contain:
286 287 * an access to the repo the bundle is applied to,
287 288 * a ui object,
288 289 * a way to retrieve a transaction to add changes to the repo,
289 290 * a way to record the result of processing each part,
290 291 * a way to construct a bundle response when applicable.
291 292 """
292 293
293 294 def __init__(self, repo, transactiongetter, captureoutput=True):
294 295 self.repo = repo
295 296 self.ui = repo.ui
296 297 self.records = unbundlerecords()
297 298 self.reply = None
298 299 self.captureoutput = captureoutput
299 300 self.hookargs = {}
300 301 self._gettransaction = transactiongetter
301 302
302 303 def gettransaction(self):
303 304 transaction = self._gettransaction()
304 305
305 306 if self.hookargs:
306 307 # the ones added to the transaction supercede those added
307 308 # to the operation.
308 309 self.hookargs.update(transaction.hookargs)
309 310 transaction.hookargs = self.hookargs
310 311
311 312 # mark the hookargs as flushed. further attempts to add to
312 313 # hookargs will result in an abort.
313 314 self.hookargs = None
314 315
315 316 return transaction
316 317
317 318 def addhookargs(self, hookargs):
318 319 if self.hookargs is None:
319 320 raise error.ProgrammingError('attempted to add hookargs to '
320 321 'operation after transaction started')
321 322 self.hookargs.update(hookargs)
322 323
323 324 class TransactionUnavailable(RuntimeError):
324 325 pass
325 326
326 327 def _notransaction():
327 328 """default method to get a transaction while processing a bundle
328 329
329 330 Raise an exception to highlight the fact that no transaction was expected
330 331 to be created"""
331 332 raise TransactionUnavailable()
332 333
333 334 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
334 335 # transform me into unbundler.apply() as soon as the freeze is lifted
335 336 if isinstance(unbundler, unbundle20):
336 337 tr.hookargs['bundle2'] = '1'
337 338 if source is not None and 'source' not in tr.hookargs:
338 339 tr.hookargs['source'] = source
339 340 if url is not None and 'url' not in tr.hookargs:
340 341 tr.hookargs['url'] = url
341 342 return processbundle(repo, unbundler, lambda: tr)
342 343 else:
343 344 # the transactiongetter won't be used, but we might as well set it
344 345 op = bundleoperation(repo, lambda: tr)
345 346 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
346 347 return op
347 348
348 349 class partiterator(object):
349 350 def __init__(self, repo, op, unbundler):
350 351 self.repo = repo
351 352 self.op = op
352 353 self.unbundler = unbundler
353 354 self.iterator = None
354 355 self.count = 0
355 356 self.current = None
356 357
357 358 def __enter__(self):
358 359 def func():
359 360 itr = enumerate(self.unbundler.iterparts())
360 361 for count, p in itr:
361 362 self.count = count
362 363 self.current = p
363 364 yield p
364 365 p.seek(0, 2)
365 366 self.current = None
366 367 self.iterator = func()
367 368 return self.iterator
368 369
369 370 def __exit__(self, type, exc, tb):
370 371 if not self.iterator:
371 372 return
372 373
373 374 # Only gracefully abort in a normal exception situation. User aborts
374 375 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
375 376 # and should not gracefully cleanup.
376 377 if isinstance(exc, Exception):
377 378 # Any exceptions seeking to the end of the bundle at this point are
378 379 # almost certainly related to the underlying stream being bad.
379 380 # And, chances are that the exception we're handling is related to
380 381 # getting in that bad state. So, we swallow the seeking error and
381 382 # re-raise the original error.
382 383 seekerror = False
383 384 try:
384 385 if self.current:
385 386 # consume the part content to not corrupt the stream.
386 387 self.current.seek(0, 2)
387 388
388 389 for part in self.iterator:
389 390 # consume the bundle content
390 391 part.seek(0, 2)
391 392 except Exception:
392 393 seekerror = True
393 394
394 395 # Small hack to let caller code distinguish exceptions from bundle2
395 396 # processing from processing the old format. This is mostly needed
396 397 # to handle different return codes to unbundle according to the type
397 398 # of bundle. We should probably clean up or drop this return code
398 399 # craziness in a future version.
399 400 exc.duringunbundle2 = True
400 401 salvaged = []
401 402 replycaps = None
402 403 if self.op.reply is not None:
403 404 salvaged = self.op.reply.salvageoutput()
404 405 replycaps = self.op.reply.capabilities
405 406 exc._replycaps = replycaps
406 407 exc._bundle2salvagedoutput = salvaged
407 408
408 409 # Re-raising from a variable loses the original stack. So only use
409 410 # that form if we need to.
410 411 if seekerror:
411 412 raise exc
412 413
413 414 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
414 415 self.count)
415 416
416 417 def processbundle(repo, unbundler, transactiongetter=None, op=None):
417 418 """This function process a bundle, apply effect to/from a repo
418 419
419 420 It iterates over each part then searches for and uses the proper handling
420 421 code to process the part. Parts are processed in order.
421 422
422 423 Unknown Mandatory part will abort the process.
423 424
424 425 It is temporarily possible to provide a prebuilt bundleoperation to the
425 426 function. This is used to ensure output is properly propagated in case of
426 427 an error during the unbundling. This output capturing part will likely be
427 428 reworked and this ability will probably go away in the process.
428 429 """
429 430 if op is None:
430 431 if transactiongetter is None:
431 432 transactiongetter = _notransaction
432 433 op = bundleoperation(repo, transactiongetter)
433 434 # todo:
434 435 # - replace this is a init function soon.
435 436 # - exception catching
436 437 unbundler.params
437 438 if repo.ui.debugflag:
438 439 msg = ['bundle2-input-bundle:']
439 440 if unbundler.params:
440 441 msg.append(' %i params' % len(unbundler.params))
441 442 if op._gettransaction is None or op._gettransaction is _notransaction:
442 443 msg.append(' no-transaction')
443 444 else:
444 445 msg.append(' with-transaction')
445 446 msg.append('\n')
446 447 repo.ui.debug(''.join(msg))
447 448
448 449 processparts(repo, op, unbundler)
449 450
450 451 return op
451 452
452 453 def processparts(repo, op, unbundler):
453 454 with partiterator(repo, op, unbundler) as parts:
454 455 for part in parts:
455 456 _processpart(op, part)
456 457
457 458 def _processchangegroup(op, cg, tr, source, url, **kwargs):
458 459 ret = cg.apply(op.repo, tr, source, url, **kwargs)
459 460 op.records.add('changegroup', {
460 461 'return': ret,
461 462 })
462 463 return ret
463 464
464 465 def _gethandler(op, part):
465 466 status = 'unknown' # used by debug output
466 467 try:
467 468 handler = parthandlermapping.get(part.type)
468 469 if handler is None:
469 470 status = 'unsupported-type'
470 471 raise error.BundleUnknownFeatureError(parttype=part.type)
471 472 indebug(op.ui, 'found a handler for part %s' % part.type)
472 473 unknownparams = part.mandatorykeys - handler.params
473 474 if unknownparams:
474 475 unknownparams = list(unknownparams)
475 476 unknownparams.sort()
476 477 status = 'unsupported-params (%s)' % ', '.join(unknownparams)
477 478 raise error.BundleUnknownFeatureError(parttype=part.type,
478 479 params=unknownparams)
479 480 status = 'supported'
480 481 except error.BundleUnknownFeatureError as exc:
481 482 if part.mandatory: # mandatory parts
482 483 raise
483 484 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
484 485 return # skip to part processing
485 486 finally:
486 487 if op.ui.debugflag:
487 488 msg = ['bundle2-input-part: "%s"' % part.type]
488 489 if not part.mandatory:
489 490 msg.append(' (advisory)')
490 491 nbmp = len(part.mandatorykeys)
491 492 nbap = len(part.params) - nbmp
492 493 if nbmp or nbap:
493 494 msg.append(' (params:')
494 495 if nbmp:
495 496 msg.append(' %i mandatory' % nbmp)
496 497 if nbap:
497 498 msg.append(' %i advisory' % nbmp)
498 499 msg.append(')')
499 500 msg.append(' %s\n' % status)
500 501 op.ui.debug(''.join(msg))
501 502
502 503 return handler
503 504
504 505 def _processpart(op, part):
505 506 """process a single part from a bundle
506 507
507 508 The part is guaranteed to have been fully consumed when the function exits
508 509 (even if an exception is raised)."""
509 510 handler = _gethandler(op, part)
510 511 if handler is None:
511 512 return
512 513
513 514 # handler is called outside the above try block so that we don't
514 515 # risk catching KeyErrors from anything other than the
515 516 # parthandlermapping lookup (any KeyError raised by handler()
516 517 # itself represents a defect of a different variety).
517 518 output = None
518 519 if op.captureoutput and op.reply is not None:
519 520 op.ui.pushbuffer(error=True, subproc=True)
520 521 output = ''
521 522 try:
522 523 handler(op, part)
523 524 finally:
524 525 if output is not None:
525 526 output = op.ui.popbuffer()
526 527 if output:
527 528 outpart = op.reply.newpart('output', data=output,
528 529 mandatory=False)
529 530 outpart.addparam(
530 531 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
531 532
532 533 def decodecaps(blob):
533 534 """decode a bundle2 caps bytes blob into a dictionary
534 535
535 536 The blob is a list of capabilities (one per line)
536 537 Capabilities may have values using a line of the form::
537 538
538 539 capability=value1,value2,value3
539 540
540 541 The values are always a list."""
541 542 caps = {}
542 543 for line in blob.splitlines():
543 544 if not line:
544 545 continue
545 546 if '=' not in line:
546 547 key, vals = line, ()
547 548 else:
548 549 key, vals = line.split('=', 1)
549 550 vals = vals.split(',')
550 551 key = urlreq.unquote(key)
551 552 vals = [urlreq.unquote(v) for v in vals]
552 553 caps[key] = vals
553 554 return caps
554 555
555 556 def encodecaps(caps):
556 557 """encode a bundle2 caps dictionary into a bytes blob"""
557 558 chunks = []
558 559 for ca in sorted(caps):
559 560 vals = caps[ca]
560 561 ca = urlreq.quote(ca)
561 562 vals = [urlreq.quote(v) for v in vals]
562 563 if vals:
563 564 ca = "%s=%s" % (ca, ','.join(vals))
564 565 chunks.append(ca)
565 566 return '\n'.join(chunks)
566 567
567 568 bundletypes = {
568 569 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
569 570 # since the unification ssh accepts a header but there
570 571 # is no capability signaling it.
571 572 "HG20": (), # special-cased below
572 573 "HG10UN": ("HG10UN", 'UN'),
573 574 "HG10BZ": ("HG10", 'BZ'),
574 575 "HG10GZ": ("HG10GZ", 'GZ'),
575 576 }
576 577
577 578 # hgweb uses this list to communicate its preferred type
578 579 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
579 580
580 581 class bundle20(object):
581 582 """represent an outgoing bundle2 container
582 583
583 584 Use the `addparam` method to add stream level parameter. and `newpart` to
584 585 populate it. Then call `getchunks` to retrieve all the binary chunks of
585 586 data that compose the bundle2 container."""
586 587
587 588 _magicstring = 'HG20'
588 589
589 590 def __init__(self, ui, capabilities=()):
590 591 self.ui = ui
591 592 self._params = []
592 593 self._parts = []
593 594 self.capabilities = dict(capabilities)
594 595 self._compengine = util.compengines.forbundletype('UN')
595 596 self._compopts = None
596 597
597 598 def setcompression(self, alg, compopts=None):
598 599 """setup core part compression to <alg>"""
599 600 if alg in (None, 'UN'):
600 601 return
601 602 assert not any(n.lower() == 'compression' for n, v in self._params)
602 603 self.addparam('Compression', alg)
603 604 self._compengine = util.compengines.forbundletype(alg)
604 605 self._compopts = compopts
605 606
606 607 @property
607 608 def nbparts(self):
608 609 """total number of parts added to the bundler"""
609 610 return len(self._parts)
610 611
611 612 # methods used to defines the bundle2 content
612 613 def addparam(self, name, value=None):
613 614 """add a stream level parameter"""
614 615 if not name:
615 616 raise ValueError(r'empty parameter name')
616 617 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
617 618 raise ValueError(r'non letter first character: %s' % name)
618 619 self._params.append((name, value))
619 620
620 621 def addpart(self, part):
621 622 """add a new part to the bundle2 container
622 623
623 624 Parts contains the actual applicative payload."""
624 625 assert part.id is None
625 626 part.id = len(self._parts) # very cheap counter
626 627 self._parts.append(part)
627 628
628 629 def newpart(self, typeid, *args, **kwargs):
629 630 """create a new part and add it to the containers
630 631
631 632 As the part is directly added to the containers. For now, this means
632 633 that any failure to properly initialize the part after calling
633 634 ``newpart`` should result in a failure of the whole bundling process.
634 635
635 636 You can still fall back to manually create and add if you need better
636 637 control."""
637 638 part = bundlepart(typeid, *args, **kwargs)
638 639 self.addpart(part)
639 640 return part
640 641
641 642 # methods used to generate the bundle2 stream
642 643 def getchunks(self):
643 644 if self.ui.debugflag:
644 645 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
645 646 if self._params:
646 647 msg.append(' (%i params)' % len(self._params))
647 648 msg.append(' %i parts total\n' % len(self._parts))
648 649 self.ui.debug(''.join(msg))
649 650 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
650 651 yield self._magicstring
651 652 param = self._paramchunk()
652 653 outdebug(self.ui, 'bundle parameter: %s' % param)
653 654 yield _pack(_fstreamparamsize, len(param))
654 655 if param:
655 656 yield param
656 657 for chunk in self._compengine.compressstream(self._getcorechunk(),
657 658 self._compopts):
658 659 yield chunk
659 660
660 661 def _paramchunk(self):
661 662 """return a encoded version of all stream parameters"""
662 663 blocks = []
663 664 for par, value in self._params:
664 665 par = urlreq.quote(par)
665 666 if value is not None:
666 667 value = urlreq.quote(value)
667 668 par = '%s=%s' % (par, value)
668 669 blocks.append(par)
669 670 return ' '.join(blocks)
670 671
671 672 def _getcorechunk(self):
672 673 """yield chunk for the core part of the bundle
673 674
674 675 (all but headers and parameters)"""
675 676 outdebug(self.ui, 'start of parts')
676 677 for part in self._parts:
677 678 outdebug(self.ui, 'bundle part: "%s"' % part.type)
678 679 for chunk in part.getchunks(ui=self.ui):
679 680 yield chunk
680 681 outdebug(self.ui, 'end of bundle')
681 682 yield _pack(_fpartheadersize, 0)
682 683
683 684
684 685 def salvageoutput(self):
685 686 """return a list with a copy of all output parts in the bundle
686 687
687 688 This is meant to be used during error handling to make sure we preserve
688 689 server output"""
689 690 salvaged = []
690 691 for part in self._parts:
691 692 if part.type.startswith('output'):
692 693 salvaged.append(part.copy())
693 694 return salvaged
694 695
695 696
696 697 class unpackermixin(object):
697 698 """A mixin to extract bytes and struct data from a stream"""
698 699
699 700 def __init__(self, fp):
700 701 self._fp = fp
701 702
702 703 def _unpack(self, format):
703 704 """unpack this struct format from the stream
704 705
705 706 This method is meant for internal usage by the bundle2 protocol only.
706 707 They directly manipulate the low level stream including bundle2 level
707 708 instruction.
708 709
709 710 Do not use it to implement higher-level logic or methods."""
710 711 data = self._readexact(struct.calcsize(format))
711 712 return _unpack(format, data)
712 713
713 714 def _readexact(self, size):
714 715 """read exactly <size> bytes from the stream
715 716
716 717 This method is meant for internal usage by the bundle2 protocol only.
717 718 They directly manipulate the low level stream including bundle2 level
718 719 instruction.
719 720
720 721 Do not use it to implement higher-level logic or methods."""
721 722 return changegroup.readexactly(self._fp, size)
722 723
723 724 def getunbundler(ui, fp, magicstring=None):
724 725 """return a valid unbundler object for a given magicstring"""
725 726 if magicstring is None:
726 727 magicstring = changegroup.readexactly(fp, 4)
727 728 magic, version = magicstring[0:2], magicstring[2:4]
728 729 if magic != 'HG':
729 730 ui.debug(
730 731 "error: invalid magic: %r (version %r), should be 'HG'\n"
731 732 % (magic, version))
732 733 raise error.Abort(_('not a Mercurial bundle'))
733 734 unbundlerclass = formatmap.get(version)
734 735 if unbundlerclass is None:
735 736 raise error.Abort(_('unknown bundle version %s') % version)
736 737 unbundler = unbundlerclass(ui, fp)
737 738 indebug(ui, 'start processing of %s stream' % magicstring)
738 739 return unbundler
739 740
740 741 class unbundle20(unpackermixin):
741 742 """interpret a bundle2 stream
742 743
743 744 This class is fed with a binary stream and yields parts through its
744 745 `iterparts` methods."""
745 746
746 747 _magicstring = 'HG20'
747 748
748 749 def __init__(self, ui, fp):
749 750 """If header is specified, we do not read it out of the stream."""
750 751 self.ui = ui
751 752 self._compengine = util.compengines.forbundletype('UN')
752 753 self._compressed = None
753 754 super(unbundle20, self).__init__(fp)
754 755
755 756 @util.propertycache
756 757 def params(self):
757 758 """dictionary of stream level parameters"""
758 759 indebug(self.ui, 'reading bundle2 stream parameters')
759 760 params = {}
760 761 paramssize = self._unpack(_fstreamparamsize)[0]
761 762 if paramssize < 0:
762 763 raise error.BundleValueError('negative bundle param size: %i'
763 764 % paramssize)
764 765 if paramssize:
765 766 params = self._readexact(paramssize)
766 767 params = self._processallparams(params)
767 768 return params
768 769
769 770 def _processallparams(self, paramsblock):
770 771 """"""
771 772 params = util.sortdict()
772 773 for p in paramsblock.split(' '):
773 774 p = p.split('=', 1)
774 775 p = [urlreq.unquote(i) for i in p]
775 776 if len(p) < 2:
776 777 p.append(None)
777 778 self._processparam(*p)
778 779 params[p[0]] = p[1]
779 780 return params
780 781
781 782
782 783 def _processparam(self, name, value):
783 784 """process a parameter, applying its effect if needed
784 785
785 786 Parameter starting with a lower case letter are advisory and will be
786 787 ignored when unknown. Those starting with an upper case letter are
787 788 mandatory and will this function will raise a KeyError when unknown.
788 789
789 790 Note: no option are currently supported. Any input will be either
790 791 ignored or failing.
791 792 """
792 793 if not name:
793 794 raise ValueError(r'empty parameter name')
794 795 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
795 796 raise ValueError(r'non letter first character: %s' % name)
796 797 try:
797 798 handler = b2streamparamsmap[name.lower()]
798 799 except KeyError:
799 800 if name[0:1].islower():
800 801 indebug(self.ui, "ignoring unknown parameter %s" % name)
801 802 else:
802 803 raise error.BundleUnknownFeatureError(params=(name,))
803 804 else:
804 805 handler(self, name, value)
805 806
806 807 def _forwardchunks(self):
807 808 """utility to transfer a bundle2 as binary
808 809
809 810 This is made necessary by the fact the 'getbundle' command over 'ssh'
810 811 have no way to know then the reply end, relying on the bundle to be
811 812 interpreted to know its end. This is terrible and we are sorry, but we
812 813 needed to move forward to get general delta enabled.
813 814 """
814 815 yield self._magicstring
815 816 assert 'params' not in vars(self)
816 817 paramssize = self._unpack(_fstreamparamsize)[0]
817 818 if paramssize < 0:
818 819 raise error.BundleValueError('negative bundle param size: %i'
819 820 % paramssize)
820 821 yield _pack(_fstreamparamsize, paramssize)
821 822 if paramssize:
822 823 params = self._readexact(paramssize)
823 824 self._processallparams(params)
824 825 yield params
825 826 assert self._compengine.bundletype == 'UN'
826 827 # From there, payload might need to be decompressed
827 828 self._fp = self._compengine.decompressorreader(self._fp)
828 829 emptycount = 0
829 830 while emptycount < 2:
830 831 # so we can brainlessly loop
831 832 assert _fpartheadersize == _fpayloadsize
832 833 size = self._unpack(_fpartheadersize)[0]
833 834 yield _pack(_fpartheadersize, size)
834 835 if size:
835 836 emptycount = 0
836 837 else:
837 838 emptycount += 1
838 839 continue
839 840 if size == flaginterrupt:
840 841 continue
841 842 elif size < 0:
842 843 raise error.BundleValueError('negative chunk size: %i')
843 844 yield self._readexact(size)
844 845
845 846
846 847 def iterparts(self):
847 848 """yield all parts contained in the stream"""
848 849 # make sure param have been loaded
849 850 self.params
850 851 # From there, payload need to be decompressed
851 852 self._fp = self._compengine.decompressorreader(self._fp)
852 853 indebug(self.ui, 'start extraction of bundle2 parts')
853 854 headerblock = self._readpartheader()
854 855 while headerblock is not None:
855 856 part = unbundlepart(self.ui, headerblock, self._fp)
856 857 yield part
857 858 # Seek to the end of the part to force it's consumption so the next
858 859 # part can be read. But then seek back to the beginning so the
859 860 # code consuming this generator has a part that starts at 0.
860 861 part.seek(0, 2)
861 862 part.seek(0)
862 863 headerblock = self._readpartheader()
863 864 indebug(self.ui, 'end of bundle2 stream')
864 865
865 866 def _readpartheader(self):
866 867 """reads a part header size and return the bytes blob
867 868
868 869 returns None if empty"""
869 870 headersize = self._unpack(_fpartheadersize)[0]
870 871 if headersize < 0:
871 872 raise error.BundleValueError('negative part header size: %i'
872 873 % headersize)
873 874 indebug(self.ui, 'part header size: %i' % headersize)
874 875 if headersize:
875 876 return self._readexact(headersize)
876 877 return None
877 878
878 879 def compressed(self):
879 880 self.params # load params
880 881 return self._compressed
881 882
882 883 def close(self):
883 884 """close underlying file"""
884 885 if util.safehasattr(self._fp, 'close'):
885 886 return self._fp.close()
886 887
887 888 formatmap = {'20': unbundle20}
888 889
889 890 b2streamparamsmap = {}
890 891
891 892 def b2streamparamhandler(name):
892 893 """register a handler for a stream level parameter"""
893 894 def decorator(func):
894 895 assert name not in formatmap
895 896 b2streamparamsmap[name] = func
896 897 return func
897 898 return decorator
898 899
899 900 @b2streamparamhandler('compression')
900 901 def processcompression(unbundler, param, value):
901 902 """read compression parameter and install payload decompression"""
902 903 if value not in util.compengines.supportedbundletypes:
903 904 raise error.BundleUnknownFeatureError(params=(param,),
904 905 values=(value,))
905 906 unbundler._compengine = util.compengines.forbundletype(value)
906 907 if value is not None:
907 908 unbundler._compressed = True
908 909
909 910 class bundlepart(object):
910 911 """A bundle2 part contains application level payload
911 912
912 913 The part `type` is used to route the part to the application level
913 914 handler.
914 915
915 916 The part payload is contained in ``part.data``. It could be raw bytes or a
916 917 generator of byte chunks.
917 918
918 919 You can add parameters to the part using the ``addparam`` method.
919 920 Parameters can be either mandatory (default) or advisory. Remote side
920 921 should be able to safely ignore the advisory ones.
921 922
922 923 Both data and parameters cannot be modified after the generation has begun.
923 924 """
924 925
925 926 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
926 927 data='', mandatory=True):
927 928 validateparttype(parttype)
928 929 self.id = None
929 930 self.type = parttype
930 931 self._data = data
931 932 self._mandatoryparams = list(mandatoryparams)
932 933 self._advisoryparams = list(advisoryparams)
933 934 # checking for duplicated entries
934 935 self._seenparams = set()
935 936 for pname, __ in self._mandatoryparams + self._advisoryparams:
936 937 if pname in self._seenparams:
937 938 raise error.ProgrammingError('duplicated params: %s' % pname)
938 939 self._seenparams.add(pname)
939 940 # status of the part's generation:
940 941 # - None: not started,
941 942 # - False: currently generated,
942 943 # - True: generation done.
943 944 self._generated = None
944 945 self.mandatory = mandatory
945 946
946 947 def __repr__(self):
947 948 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
948 949 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
949 950 % (cls, id(self), self.id, self.type, self.mandatory))
950 951
951 952 def copy(self):
952 953 """return a copy of the part
953 954
954 955 The new part have the very same content but no partid assigned yet.
955 956 Parts with generated data cannot be copied."""
956 957 assert not util.safehasattr(self.data, 'next')
957 958 return self.__class__(self.type, self._mandatoryparams,
958 959 self._advisoryparams, self._data, self.mandatory)
959 960
960 961 # methods used to defines the part content
961 962 @property
962 963 def data(self):
963 964 return self._data
964 965
965 966 @data.setter
966 967 def data(self, data):
967 968 if self._generated is not None:
968 969 raise error.ReadOnlyPartError('part is being generated')
969 970 self._data = data
970 971
971 972 @property
972 973 def mandatoryparams(self):
973 974 # make it an immutable tuple to force people through ``addparam``
974 975 return tuple(self._mandatoryparams)
975 976
976 977 @property
977 978 def advisoryparams(self):
978 979 # make it an immutable tuple to force people through ``addparam``
979 980 return tuple(self._advisoryparams)
980 981
981 982 def addparam(self, name, value='', mandatory=True):
982 983 """add a parameter to the part
983 984
984 985 If 'mandatory' is set to True, the remote handler must claim support
985 986 for this parameter or the unbundling will be aborted.
986 987
987 988 The 'name' and 'value' cannot exceed 255 bytes each.
988 989 """
989 990 if self._generated is not None:
990 991 raise error.ReadOnlyPartError('part is being generated')
991 992 if name in self._seenparams:
992 993 raise ValueError('duplicated params: %s' % name)
993 994 self._seenparams.add(name)
994 995 params = self._advisoryparams
995 996 if mandatory:
996 997 params = self._mandatoryparams
997 998 params.append((name, value))
998 999
999 1000 # methods used to generates the bundle2 stream
1000 1001 def getchunks(self, ui):
1001 1002 if self._generated is not None:
1002 1003 raise error.ProgrammingError('part can only be consumed once')
1003 1004 self._generated = False
1004 1005
1005 1006 if ui.debugflag:
1006 1007 msg = ['bundle2-output-part: "%s"' % self.type]
1007 1008 if not self.mandatory:
1008 1009 msg.append(' (advisory)')
1009 1010 nbmp = len(self.mandatoryparams)
1010 1011 nbap = len(self.advisoryparams)
1011 1012 if nbmp or nbap:
1012 1013 msg.append(' (params:')
1013 1014 if nbmp:
1014 1015 msg.append(' %i mandatory' % nbmp)
1015 1016 if nbap:
1016 1017 msg.append(' %i advisory' % nbmp)
1017 1018 msg.append(')')
1018 1019 if not self.data:
1019 1020 msg.append(' empty payload')
1020 1021 elif (util.safehasattr(self.data, 'next')
1021 1022 or util.safehasattr(self.data, '__next__')):
1022 1023 msg.append(' streamed payload')
1023 1024 else:
1024 1025 msg.append(' %i bytes payload' % len(self.data))
1025 1026 msg.append('\n')
1026 1027 ui.debug(''.join(msg))
1027 1028
1028 1029 #### header
1029 1030 if self.mandatory:
1030 1031 parttype = self.type.upper()
1031 1032 else:
1032 1033 parttype = self.type.lower()
1033 1034 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1034 1035 ## parttype
1035 1036 header = [_pack(_fparttypesize, len(parttype)),
1036 1037 parttype, _pack(_fpartid, self.id),
1037 1038 ]
1038 1039 ## parameters
1039 1040 # count
1040 1041 manpar = self.mandatoryparams
1041 1042 advpar = self.advisoryparams
1042 1043 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1043 1044 # size
1044 1045 parsizes = []
1045 1046 for key, value in manpar:
1046 1047 parsizes.append(len(key))
1047 1048 parsizes.append(len(value))
1048 1049 for key, value in advpar:
1049 1050 parsizes.append(len(key))
1050 1051 parsizes.append(len(value))
1051 1052 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1052 1053 header.append(paramsizes)
1053 1054 # key, value
1054 1055 for key, value in manpar:
1055 1056 header.append(key)
1056 1057 header.append(value)
1057 1058 for key, value in advpar:
1058 1059 header.append(key)
1059 1060 header.append(value)
1060 1061 ## finalize header
1061 1062 try:
1062 1063 headerchunk = ''.join(header)
1063 1064 except TypeError:
1064 1065 raise TypeError(r'Found a non-bytes trying to '
1065 1066 r'build bundle part header: %r' % header)
1066 1067 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1067 1068 yield _pack(_fpartheadersize, len(headerchunk))
1068 1069 yield headerchunk
1069 1070 ## payload
1070 1071 try:
1071 1072 for chunk in self._payloadchunks():
1072 1073 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1073 1074 yield _pack(_fpayloadsize, len(chunk))
1074 1075 yield chunk
1075 1076 except GeneratorExit:
1076 1077 # GeneratorExit means that nobody is listening for our
1077 1078 # results anyway, so just bail quickly rather than trying
1078 1079 # to produce an error part.
1079 1080 ui.debug('bundle2-generatorexit\n')
1080 1081 raise
1081 1082 except BaseException as exc:
1082 1083 bexc = util.forcebytestr(exc)
1083 1084 # backup exception data for later
1084 1085 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1085 1086 % bexc)
1086 1087 tb = sys.exc_info()[2]
1087 1088 msg = 'unexpected error: %s' % bexc
1088 1089 interpart = bundlepart('error:abort', [('message', msg)],
1089 1090 mandatory=False)
1090 1091 interpart.id = 0
1091 1092 yield _pack(_fpayloadsize, -1)
1092 1093 for chunk in interpart.getchunks(ui=ui):
1093 1094 yield chunk
1094 1095 outdebug(ui, 'closing payload chunk')
1095 1096 # abort current part payload
1096 1097 yield _pack(_fpayloadsize, 0)
1097 1098 pycompat.raisewithtb(exc, tb)
1098 1099 # end of payload
1099 1100 outdebug(ui, 'closing payload chunk')
1100 1101 yield _pack(_fpayloadsize, 0)
1101 1102 self._generated = True
1102 1103
1103 1104 def _payloadchunks(self):
1104 1105 """yield chunks of a the part payload
1105 1106
1106 1107 Exists to handle the different methods to provide data to a part."""
1107 1108 # we only support fixed size data now.
1108 1109 # This will be improved in the future.
1109 1110 if (util.safehasattr(self.data, 'next')
1110 1111 or util.safehasattr(self.data, '__next__')):
1111 1112 buff = util.chunkbuffer(self.data)
1112 1113 chunk = buff.read(preferedchunksize)
1113 1114 while chunk:
1114 1115 yield chunk
1115 1116 chunk = buff.read(preferedchunksize)
1116 1117 elif len(self.data):
1117 1118 yield self.data
1118 1119
1119 1120
1120 1121 flaginterrupt = -1
1121 1122
1122 1123 class interrupthandler(unpackermixin):
1123 1124 """read one part and process it with restricted capability
1124 1125
1125 1126 This allows to transmit exception raised on the producer size during part
1126 1127 iteration while the consumer is reading a part.
1127 1128
1128 1129 Part processed in this manner only have access to a ui object,"""
1129 1130
1130 1131 def __init__(self, ui, fp):
1131 1132 super(interrupthandler, self).__init__(fp)
1132 1133 self.ui = ui
1133 1134
1134 1135 def _readpartheader(self):
1135 1136 """reads a part header size and return the bytes blob
1136 1137
1137 1138 returns None if empty"""
1138 1139 headersize = self._unpack(_fpartheadersize)[0]
1139 1140 if headersize < 0:
1140 1141 raise error.BundleValueError('negative part header size: %i'
1141 1142 % headersize)
1142 1143 indebug(self.ui, 'part header size: %i\n' % headersize)
1143 1144 if headersize:
1144 1145 return self._readexact(headersize)
1145 1146 return None
1146 1147
1147 1148 def __call__(self):
1148 1149
1149 1150 self.ui.debug('bundle2-input-stream-interrupt:'
1150 1151 ' opening out of band context\n')
1151 1152 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1152 1153 headerblock = self._readpartheader()
1153 1154 if headerblock is None:
1154 1155 indebug(self.ui, 'no part found during interruption.')
1155 1156 return
1156 1157 part = unbundlepart(self.ui, headerblock, self._fp)
1157 1158 op = interruptoperation(self.ui)
1158 1159 hardabort = False
1159 1160 try:
1160 1161 _processpart(op, part)
1161 1162 except (SystemExit, KeyboardInterrupt):
1162 1163 hardabort = True
1163 1164 raise
1164 1165 finally:
1165 1166 if not hardabort:
1166 1167 part.seek(0, 2)
1167 1168 self.ui.debug('bundle2-input-stream-interrupt:'
1168 1169 ' closing out of band context\n')
1169 1170
1170 1171 class interruptoperation(object):
1171 1172 """A limited operation to be use by part handler during interruption
1172 1173
1173 1174 It only have access to an ui object.
1174 1175 """
1175 1176
1176 1177 def __init__(self, ui):
1177 1178 self.ui = ui
1178 1179 self.reply = None
1179 1180 self.captureoutput = False
1180 1181
1181 1182 @property
1182 1183 def repo(self):
1183 1184 raise error.ProgrammingError('no repo access from stream interruption')
1184 1185
1185 1186 def gettransaction(self):
1186 1187 raise TransactionUnavailable('no repo access from stream interruption')
1187 1188
1188 1189 class unbundlepart(unpackermixin):
1189 1190 """a bundle part read from a bundle"""
1190 1191
1191 1192 def __init__(self, ui, header, fp):
1192 1193 super(unbundlepart, self).__init__(fp)
1193 1194 self._seekable = (util.safehasattr(fp, 'seek') and
1194 1195 util.safehasattr(fp, 'tell'))
1195 1196 self.ui = ui
1196 1197 # unbundle state attr
1197 1198 self._headerdata = header
1198 1199 self._headeroffset = 0
1199 1200 self._initialized = False
1200 1201 self.consumed = False
1201 1202 # part data
1202 1203 self.id = None
1203 1204 self.type = None
1204 1205 self.mandatoryparams = None
1205 1206 self.advisoryparams = None
1206 1207 self.params = None
1207 1208 self.mandatorykeys = ()
1208 1209 self._payloadstream = None
1209 1210 self._readheader()
1210 1211 self._mandatory = None
1211 1212 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1212 1213 self._pos = 0
1213 1214
1214 1215 def _fromheader(self, size):
1215 1216 """return the next <size> byte from the header"""
1216 1217 offset = self._headeroffset
1217 1218 data = self._headerdata[offset:(offset + size)]
1218 1219 self._headeroffset = offset + size
1219 1220 return data
1220 1221
1221 1222 def _unpackheader(self, format):
1222 1223 """read given format from header
1223 1224
1224 1225 This automatically compute the size of the format to read."""
1225 1226 data = self._fromheader(struct.calcsize(format))
1226 1227 return _unpack(format, data)
1227 1228
1228 1229 def _initparams(self, mandatoryparams, advisoryparams):
1229 1230 """internal function to setup all logic related parameters"""
1230 1231 # make it read only to prevent people touching it by mistake.
1231 1232 self.mandatoryparams = tuple(mandatoryparams)
1232 1233 self.advisoryparams = tuple(advisoryparams)
1233 1234 # user friendly UI
1234 1235 self.params = util.sortdict(self.mandatoryparams)
1235 1236 self.params.update(self.advisoryparams)
1236 1237 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1237 1238
1238 1239 def _payloadchunks(self, chunknum=0):
1239 1240 '''seek to specified chunk and start yielding data'''
1240 1241 if len(self._chunkindex) == 0:
1241 1242 assert chunknum == 0, 'Must start with chunk 0'
1242 1243 self._chunkindex.append((0, self._tellfp()))
1243 1244 else:
1244 1245 assert chunknum < len(self._chunkindex), \
1245 1246 'Unknown chunk %d' % chunknum
1246 1247 self._seekfp(self._chunkindex[chunknum][1])
1247 1248
1248 1249 pos = self._chunkindex[chunknum][0]
1249 1250 payloadsize = self._unpack(_fpayloadsize)[0]
1250 1251 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1251 1252 while payloadsize:
1252 1253 if payloadsize == flaginterrupt:
1253 1254 # interruption detection, the handler will now read a
1254 1255 # single part and process it.
1255 1256 interrupthandler(self.ui, self._fp)()
1256 1257 elif payloadsize < 0:
1257 1258 msg = 'negative payload chunk size: %i' % payloadsize
1258 1259 raise error.BundleValueError(msg)
1259 1260 else:
1260 1261 result = self._readexact(payloadsize)
1261 1262 chunknum += 1
1262 1263 pos += payloadsize
1263 1264 if chunknum == len(self._chunkindex):
1264 1265 self._chunkindex.append((pos, self._tellfp()))
1265 1266 yield result
1266 1267 payloadsize = self._unpack(_fpayloadsize)[0]
1267 1268 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1268 1269
1269 1270 def _findchunk(self, pos):
1270 1271 '''for a given payload position, return a chunk number and offset'''
1271 1272 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1272 1273 if ppos == pos:
1273 1274 return chunk, 0
1274 1275 elif ppos > pos:
1275 1276 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1276 1277 raise ValueError('Unknown chunk')
1277 1278
1278 1279 def _readheader(self):
1279 1280 """read the header and setup the object"""
1280 1281 typesize = self._unpackheader(_fparttypesize)[0]
1281 1282 self.type = self._fromheader(typesize)
1282 1283 indebug(self.ui, 'part type: "%s"' % self.type)
1283 1284 self.id = self._unpackheader(_fpartid)[0]
1284 1285 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1285 1286 # extract mandatory bit from type
1286 1287 self.mandatory = (self.type != self.type.lower())
1287 1288 self.type = self.type.lower()
1288 1289 ## reading parameters
1289 1290 # param count
1290 1291 mancount, advcount = self._unpackheader(_fpartparamcount)
1291 1292 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1292 1293 # param size
1293 1294 fparamsizes = _makefpartparamsizes(mancount + advcount)
1294 1295 paramsizes = self._unpackheader(fparamsizes)
1295 1296 # make it a list of couple again
1296 1297 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1297 1298 # split mandatory from advisory
1298 1299 mansizes = paramsizes[:mancount]
1299 1300 advsizes = paramsizes[mancount:]
1300 1301 # retrieve param value
1301 1302 manparams = []
1302 1303 for key, value in mansizes:
1303 1304 manparams.append((self._fromheader(key), self._fromheader(value)))
1304 1305 advparams = []
1305 1306 for key, value in advsizes:
1306 1307 advparams.append((self._fromheader(key), self._fromheader(value)))
1307 1308 self._initparams(manparams, advparams)
1308 1309 ## part payload
1309 1310 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1310 1311 # we read the data, tell it
1311 1312 self._initialized = True
1312 1313
1313 1314 def read(self, size=None):
1314 1315 """read payload data"""
1315 1316 if not self._initialized:
1316 1317 self._readheader()
1317 1318 if size is None:
1318 1319 data = self._payloadstream.read()
1319 1320 else:
1320 1321 data = self._payloadstream.read(size)
1321 1322 self._pos += len(data)
1322 1323 if size is None or len(data) < size:
1323 1324 if not self.consumed and self._pos:
1324 1325 self.ui.debug('bundle2-input-part: total payload size %i\n'
1325 1326 % self._pos)
1326 1327 self.consumed = True
1327 1328 return data
1328 1329
1329 1330 def tell(self):
1330 1331 return self._pos
1331 1332
1332 1333 def seek(self, offset, whence=0):
1333 1334 if whence == 0:
1334 1335 newpos = offset
1335 1336 elif whence == 1:
1336 1337 newpos = self._pos + offset
1337 1338 elif whence == 2:
1338 1339 if not self.consumed:
1339 1340 self.read()
1340 1341 newpos = self._chunkindex[-1][0] - offset
1341 1342 else:
1342 1343 raise ValueError('Unknown whence value: %r' % (whence,))
1343 1344
1344 1345 if newpos > self._chunkindex[-1][0] and not self.consumed:
1345 1346 self.read()
1346 1347 if not 0 <= newpos <= self._chunkindex[-1][0]:
1347 1348 raise ValueError('Offset out of range')
1348 1349
1349 1350 if self._pos != newpos:
1350 1351 chunk, internaloffset = self._findchunk(newpos)
1351 1352 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1352 1353 adjust = self.read(internaloffset)
1353 1354 if len(adjust) != internaloffset:
1354 1355 raise error.Abort(_('Seek failed\n'))
1355 1356 self._pos = newpos
1356 1357
1357 1358 def _seekfp(self, offset, whence=0):
1358 1359 """move the underlying file pointer
1359 1360
1360 1361 This method is meant for internal usage by the bundle2 protocol only.
1361 1362 They directly manipulate the low level stream including bundle2 level
1362 1363 instruction.
1363 1364
1364 1365 Do not use it to implement higher-level logic or methods."""
1365 1366 if self._seekable:
1366 1367 return self._fp.seek(offset, whence)
1367 1368 else:
1368 1369 raise NotImplementedError(_('File pointer is not seekable'))
1369 1370
1370 1371 def _tellfp(self):
1371 1372 """return the file offset, or None if file is not seekable
1372 1373
1373 1374 This method is meant for internal usage by the bundle2 protocol only.
1374 1375 They directly manipulate the low level stream including bundle2 level
1375 1376 instruction.
1376 1377
1377 1378 Do not use it to implement higher-level logic or methods."""
1378 1379 if self._seekable:
1379 1380 try:
1380 1381 return self._fp.tell()
1381 1382 except IOError as e:
1382 1383 if e.errno == errno.ESPIPE:
1383 1384 self._seekable = False
1384 1385 else:
1385 1386 raise
1386 1387 return None
1387 1388
1388 1389 # These are only the static capabilities.
1389 1390 # Check the 'getrepocaps' function for the rest.
1390 1391 capabilities = {'HG20': (),
1391 1392 'error': ('abort', 'unsupportedcontent', 'pushraced',
1392 1393 'pushkey'),
1393 1394 'listkeys': (),
1394 1395 'pushkey': (),
1395 1396 'digests': tuple(sorted(util.DIGESTS.keys())),
1396 1397 'remote-changegroup': ('http', 'https'),
1397 1398 'hgtagsfnodes': (),
1398 1399 'phases': ('heads',),
1399 1400 }
1400 1401
1401 1402 def getrepocaps(repo, allowpushback=False):
1402 1403 """return the bundle2 capabilities for a given repo
1403 1404
1404 1405 Exists to allow extensions (like evolution) to mutate the capabilities.
1405 1406 """
1406 1407 caps = capabilities.copy()
1407 1408 caps['changegroup'] = tuple(sorted(
1408 1409 changegroup.supportedincomingversions(repo)))
1409 1410 if obsolete.isenabled(repo, obsolete.exchangeopt):
1410 1411 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1411 1412 caps['obsmarkers'] = supportedformat
1412 1413 if allowpushback:
1413 1414 caps['pushback'] = ()
1414 1415 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1415 1416 if cpmode == 'check-related':
1416 1417 caps['checkheads'] = ('related',)
1417 1418 if 'phases' in repo.ui.configlist('devel', 'legacy.exchange'):
1418 1419 caps.pop('phases')
1419 1420 return caps
1420 1421
1421 1422 def bundle2caps(remote):
1422 1423 """return the bundle capabilities of a peer as dict"""
1423 1424 raw = remote.capable('bundle2')
1424 1425 if not raw and raw != '':
1425 1426 return {}
1426 1427 capsblob = urlreq.unquote(remote.capable('bundle2'))
1427 1428 return decodecaps(capsblob)
1428 1429
1429 1430 def obsmarkersversion(caps):
1430 1431 """extract the list of supported obsmarkers versions from a bundle2caps dict
1431 1432 """
1432 1433 obscaps = caps.get('obsmarkers', ())
1433 1434 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1434 1435
1435 1436 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1436 1437 vfs=None, compression=None, compopts=None):
1437 1438 if bundletype.startswith('HG10'):
1438 1439 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1439 1440 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1440 1441 compression=compression, compopts=compopts)
1441 1442 elif not bundletype.startswith('HG20'):
1442 1443 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1443 1444
1444 1445 caps = {}
1445 1446 if 'obsolescence' in opts:
1446 1447 caps['obsmarkers'] = ('V1',)
1447 1448 bundle = bundle20(ui, caps)
1448 1449 bundle.setcompression(compression, compopts)
1449 1450 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1450 1451 chunkiter = bundle.getchunks()
1451 1452
1452 1453 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1453 1454
1454 1455 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1455 1456 # We should eventually reconcile this logic with the one behind
1456 1457 # 'exchange.getbundle2partsgenerator'.
1457 1458 #
1458 1459 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1459 1460 # different right now. So we keep them separated for now for the sake of
1460 1461 # simplicity.
1461 1462
1462 1463 # we always want a changegroup in such bundle
1463 1464 cgversion = opts.get('cg.version')
1464 1465 if cgversion is None:
1465 1466 cgversion = changegroup.safeversion(repo)
1466 1467 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1467 1468 part = bundler.newpart('changegroup', data=cg.getchunks())
1468 1469 part.addparam('version', cg.version)
1469 1470 if 'clcount' in cg.extras:
1470 1471 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1471 1472 mandatory=False)
1472 1473 if opts.get('phases') and repo.revs('%ln and secret()',
1473 1474 outgoing.missingheads):
1474 1475 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1475 1476
1476 1477 addparttagsfnodescache(repo, bundler, outgoing)
1477 1478
1478 1479 if opts.get('obsolescence', False):
1479 1480 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1480 1481 buildobsmarkerspart(bundler, obsmarkers)
1481 1482
1482 1483 if opts.get('phases', False):
1483 1484 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1484 1485 phasedata = phases.binaryencode(headsbyphase)
1485 1486 bundler.newpart('phase-heads', data=phasedata)
1486 1487
1487 1488 def addparttagsfnodescache(repo, bundler, outgoing):
1488 1489 # we include the tags fnode cache for the bundle changeset
1489 1490 # (as an optional parts)
1490 1491 cache = tags.hgtagsfnodescache(repo.unfiltered())
1491 1492 chunks = []
1492 1493
1493 1494 # .hgtags fnodes are only relevant for head changesets. While we could
1494 1495 # transfer values for all known nodes, there will likely be little to
1495 1496 # no benefit.
1496 1497 #
1497 1498 # We don't bother using a generator to produce output data because
1498 1499 # a) we only have 40 bytes per head and even esoteric numbers of heads
1499 1500 # consume little memory (1M heads is 40MB) b) we don't want to send the
1500 1501 # part if we don't have entries and knowing if we have entries requires
1501 1502 # cache lookups.
1502 1503 for node in outgoing.missingheads:
1503 1504 # Don't compute missing, as this may slow down serving.
1504 1505 fnode = cache.getfnode(node, computemissing=False)
1505 1506 if fnode is not None:
1506 1507 chunks.extend([node, fnode])
1507 1508
1508 1509 if chunks:
1509 1510 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1510 1511
1511 1512 def buildobsmarkerspart(bundler, markers):
1512 1513 """add an obsmarker part to the bundler with <markers>
1513 1514
1514 1515 No part is created if markers is empty.
1515 1516 Raises ValueError if the bundler doesn't support any known obsmarker format.
1516 1517 """
1517 1518 if not markers:
1518 1519 return None
1519 1520
1520 1521 remoteversions = obsmarkersversion(bundler.capabilities)
1521 1522 version = obsolete.commonversion(remoteversions)
1522 1523 if version is None:
1523 1524 raise ValueError('bundler does not support common obsmarker format')
1524 1525 stream = obsolete.encodemarkers(markers, True, version=version)
1525 1526 return bundler.newpart('obsmarkers', data=stream)
1526 1527
1527 1528 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1528 1529 compopts=None):
1529 1530 """Write a bundle file and return its filename.
1530 1531
1531 1532 Existing files will not be overwritten.
1532 1533 If no filename is specified, a temporary file is created.
1533 1534 bz2 compression can be turned off.
1534 1535 The bundle file will be deleted in case of errors.
1535 1536 """
1536 1537
1537 1538 if bundletype == "HG20":
1538 1539 bundle = bundle20(ui)
1539 1540 bundle.setcompression(compression, compopts)
1540 1541 part = bundle.newpart('changegroup', data=cg.getchunks())
1541 1542 part.addparam('version', cg.version)
1542 1543 if 'clcount' in cg.extras:
1543 1544 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1544 1545 mandatory=False)
1545 1546 chunkiter = bundle.getchunks()
1546 1547 else:
1547 1548 # compression argument is only for the bundle2 case
1548 1549 assert compression is None
1549 1550 if cg.version != '01':
1550 1551 raise error.Abort(_('old bundle types only supports v1 '
1551 1552 'changegroups'))
1552 1553 header, comp = bundletypes[bundletype]
1553 1554 if comp not in util.compengines.supportedbundletypes:
1554 1555 raise error.Abort(_('unknown stream compression type: %s')
1555 1556 % comp)
1556 1557 compengine = util.compengines.forbundletype(comp)
1557 1558 def chunkiter():
1558 1559 yield header
1559 1560 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1560 1561 yield chunk
1561 1562 chunkiter = chunkiter()
1562 1563
1563 1564 # parse the changegroup data, otherwise we will block
1564 1565 # in case of sshrepo because we don't know the end of the stream
1565 1566 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1566 1567
1567 1568 def combinechangegroupresults(op):
1568 1569 """logic to combine 0 or more addchangegroup results into one"""
1569 1570 results = [r.get('return', 0)
1570 1571 for r in op.records['changegroup']]
1571 1572 changedheads = 0
1572 1573 result = 1
1573 1574 for ret in results:
1574 1575 # If any changegroup result is 0, return 0
1575 1576 if ret == 0:
1576 1577 result = 0
1577 1578 break
1578 1579 if ret < -1:
1579 1580 changedheads += ret + 1
1580 1581 elif ret > 1:
1581 1582 changedheads += ret - 1
1582 1583 if changedheads > 0:
1583 1584 result = 1 + changedheads
1584 1585 elif changedheads < 0:
1585 1586 result = -1 + changedheads
1586 1587 return result
1587 1588
1588 1589 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1589 1590 'targetphase'))
1590 1591 def handlechangegroup(op, inpart):
1591 1592 """apply a changegroup part on the repo
1592 1593
1593 1594 This is a very early implementation that will massive rework before being
1594 1595 inflicted to any end-user.
1595 1596 """
1596 1597 tr = op.gettransaction()
1597 1598 unpackerversion = inpart.params.get('version', '01')
1598 1599 # We should raise an appropriate exception here
1599 1600 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1600 1601 # the source and url passed here are overwritten by the one contained in
1601 1602 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1602 1603 nbchangesets = None
1603 1604 if 'nbchanges' in inpart.params:
1604 1605 nbchangesets = int(inpart.params.get('nbchanges'))
1605 1606 if ('treemanifest' in inpart.params and
1606 1607 'treemanifest' not in op.repo.requirements):
1607 1608 if len(op.repo.changelog) != 0:
1608 1609 raise error.Abort(_(
1609 1610 "bundle contains tree manifests, but local repo is "
1610 1611 "non-empty and does not use tree manifests"))
1611 1612 op.repo.requirements.add('treemanifest')
1612 1613 op.repo._applyopenerreqs()
1613 1614 op.repo._writerequirements()
1614 1615 extrakwargs = {}
1615 1616 targetphase = inpart.params.get('targetphase')
1616 1617 if targetphase is not None:
1617 1618 extrakwargs['targetphase'] = int(targetphase)
1618 1619 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1619 1620 expectedtotal=nbchangesets, **extrakwargs)
1620 1621 if op.reply is not None:
1621 1622 # This is definitely not the final form of this
1622 1623 # return. But one need to start somewhere.
1623 1624 part = op.reply.newpart('reply:changegroup', mandatory=False)
1624 1625 part.addparam(
1625 1626 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1626 1627 part.addparam('return', '%i' % ret, mandatory=False)
1627 1628 assert not inpart.read()
1628 1629
1629 1630 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1630 1631 ['digest:%s' % k for k in util.DIGESTS.keys()])
1631 1632 @parthandler('remote-changegroup', _remotechangegroupparams)
1632 1633 def handleremotechangegroup(op, inpart):
1633 1634 """apply a bundle10 on the repo, given an url and validation information
1634 1635
1635 1636 All the information about the remote bundle to import are given as
1636 1637 parameters. The parameters include:
1637 1638 - url: the url to the bundle10.
1638 1639 - size: the bundle10 file size. It is used to validate what was
1639 1640 retrieved by the client matches the server knowledge about the bundle.
1640 1641 - digests: a space separated list of the digest types provided as
1641 1642 parameters.
1642 1643 - digest:<digest-type>: the hexadecimal representation of the digest with
1643 1644 that name. Like the size, it is used to validate what was retrieved by
1644 1645 the client matches what the server knows about the bundle.
1645 1646
1646 1647 When multiple digest types are given, all of them are checked.
1647 1648 """
1648 1649 try:
1649 1650 raw_url = inpart.params['url']
1650 1651 except KeyError:
1651 1652 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1652 1653 parsed_url = util.url(raw_url)
1653 1654 if parsed_url.scheme not in capabilities['remote-changegroup']:
1654 1655 raise error.Abort(_('remote-changegroup does not support %s urls') %
1655 1656 parsed_url.scheme)
1656 1657
1657 1658 try:
1658 1659 size = int(inpart.params['size'])
1659 1660 except ValueError:
1660 1661 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1661 1662 % 'size')
1662 1663 except KeyError:
1663 1664 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1664 1665
1665 1666 digests = {}
1666 1667 for typ in inpart.params.get('digests', '').split():
1667 1668 param = 'digest:%s' % typ
1668 1669 try:
1669 1670 value = inpart.params[param]
1670 1671 except KeyError:
1671 1672 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1672 1673 param)
1673 1674 digests[typ] = value
1674 1675
1675 1676 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1676 1677
1677 1678 tr = op.gettransaction()
1678 1679 from . import exchange
1679 1680 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1680 1681 if not isinstance(cg, changegroup.cg1unpacker):
1681 1682 raise error.Abort(_('%s: not a bundle version 1.0') %
1682 1683 util.hidepassword(raw_url))
1683 1684 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1684 1685 if op.reply is not None:
1685 1686 # This is definitely not the final form of this
1686 1687 # return. But one need to start somewhere.
1687 1688 part = op.reply.newpart('reply:changegroup')
1688 1689 part.addparam(
1689 1690 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1690 1691 part.addparam('return', '%i' % ret, mandatory=False)
1691 1692 try:
1692 1693 real_part.validate()
1693 1694 except error.Abort as e:
1694 1695 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1695 1696 (util.hidepassword(raw_url), str(e)))
1696 1697 assert not inpart.read()
1697 1698
1698 1699 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1699 1700 def handlereplychangegroup(op, inpart):
1700 1701 ret = int(inpart.params['return'])
1701 1702 replyto = int(inpart.params['in-reply-to'])
1702 1703 op.records.add('changegroup', {'return': ret}, replyto)
1703 1704
1704 1705 @parthandler('check:heads')
1705 1706 def handlecheckheads(op, inpart):
1706 1707 """check that head of the repo did not change
1707 1708
1708 1709 This is used to detect a push race when using unbundle.
1709 1710 This replaces the "heads" argument of unbundle."""
1710 1711 h = inpart.read(20)
1711 1712 heads = []
1712 1713 while len(h) == 20:
1713 1714 heads.append(h)
1714 1715 h = inpart.read(20)
1715 1716 assert not h
1716 1717 # Trigger a transaction so that we are guaranteed to have the lock now.
1717 1718 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1718 1719 op.gettransaction()
1719 1720 if sorted(heads) != sorted(op.repo.heads()):
1720 1721 raise error.PushRaced('repository changed while pushing - '
1721 1722 'please try again')
1722 1723
1723 1724 @parthandler('check:updated-heads')
1724 1725 def handlecheckupdatedheads(op, inpart):
1725 1726 """check for race on the heads touched by a push
1726 1727
1727 1728 This is similar to 'check:heads' but focus on the heads actually updated
1728 1729 during the push. If other activities happen on unrelated heads, it is
1729 1730 ignored.
1730 1731
1731 1732 This allow server with high traffic to avoid push contention as long as
1732 1733 unrelated parts of the graph are involved."""
1733 1734 h = inpart.read(20)
1734 1735 heads = []
1735 1736 while len(h) == 20:
1736 1737 heads.append(h)
1737 1738 h = inpart.read(20)
1738 1739 assert not h
1739 1740 # trigger a transaction so that we are guaranteed to have the lock now.
1740 1741 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1741 1742 op.gettransaction()
1742 1743
1743 1744 currentheads = set()
1744 1745 for ls in op.repo.branchmap().itervalues():
1745 1746 currentheads.update(ls)
1746 1747
1747 1748 for h in heads:
1748 1749 if h not in currentheads:
1749 1750 raise error.PushRaced('repository changed while pushing - '
1750 1751 'please try again')
1751 1752
1753 @parthandler('check:phases')
1754 def handlecheckphases(op, inpart):
1755 """check that phase boundaries of the repository did not change
1756
1757 This is used to detect a push race.
1758 """
1759 phasetonodes = phases.binarydecode(inpart)
1760 unfi = op.repo.unfiltered()
1761 cl = unfi.changelog
1762 phasecache = unfi._phasecache
1763 msg = ('repository changed while pushing - please try again '
1764 '(%s is %s expected %s)')
1765 for expectedphase, nodes in enumerate(phasetonodes):
1766 for n in nodes:
1767 actualphase = phasecache.phase(unfi, cl.rev(n))
1768 if actualphase != expectedphase:
1769 finalmsg = msg % (nodemod.short(n),
1770 phases.phasenames[actualphase],
1771 phases.phasenames[expectedphase])
1772 raise error.PushRaced(finalmsg)
1773
1752 1774 @parthandler('output')
1753 1775 def handleoutput(op, inpart):
1754 1776 """forward output captured on the server to the client"""
1755 1777 for line in inpart.read().splitlines():
1756 1778 op.ui.status(_('remote: %s\n') % line)
1757 1779
1758 1780 @parthandler('replycaps')
1759 1781 def handlereplycaps(op, inpart):
1760 1782 """Notify that a reply bundle should be created
1761 1783
1762 1784 The payload contains the capabilities information for the reply"""
1763 1785 caps = decodecaps(inpart.read())
1764 1786 if op.reply is None:
1765 1787 op.reply = bundle20(op.ui, caps)
1766 1788
1767 1789 class AbortFromPart(error.Abort):
1768 1790 """Sub-class of Abort that denotes an error from a bundle2 part."""
1769 1791
1770 1792 @parthandler('error:abort', ('message', 'hint'))
1771 1793 def handleerrorabort(op, inpart):
1772 1794 """Used to transmit abort error over the wire"""
1773 1795 raise AbortFromPart(inpart.params['message'],
1774 1796 hint=inpart.params.get('hint'))
1775 1797
1776 1798 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1777 1799 'in-reply-to'))
1778 1800 def handleerrorpushkey(op, inpart):
1779 1801 """Used to transmit failure of a mandatory pushkey over the wire"""
1780 1802 kwargs = {}
1781 1803 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1782 1804 value = inpart.params.get(name)
1783 1805 if value is not None:
1784 1806 kwargs[name] = value
1785 1807 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1786 1808
1787 1809 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1788 1810 def handleerrorunsupportedcontent(op, inpart):
1789 1811 """Used to transmit unknown content error over the wire"""
1790 1812 kwargs = {}
1791 1813 parttype = inpart.params.get('parttype')
1792 1814 if parttype is not None:
1793 1815 kwargs['parttype'] = parttype
1794 1816 params = inpart.params.get('params')
1795 1817 if params is not None:
1796 1818 kwargs['params'] = params.split('\0')
1797 1819
1798 1820 raise error.BundleUnknownFeatureError(**kwargs)
1799 1821
1800 1822 @parthandler('error:pushraced', ('message',))
1801 1823 def handleerrorpushraced(op, inpart):
1802 1824 """Used to transmit push race error over the wire"""
1803 1825 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1804 1826
1805 1827 @parthandler('listkeys', ('namespace',))
1806 1828 def handlelistkeys(op, inpart):
1807 1829 """retrieve pushkey namespace content stored in a bundle2"""
1808 1830 namespace = inpart.params['namespace']
1809 1831 r = pushkey.decodekeys(inpart.read())
1810 1832 op.records.add('listkeys', (namespace, r))
1811 1833
1812 1834 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1813 1835 def handlepushkey(op, inpart):
1814 1836 """process a pushkey request"""
1815 1837 dec = pushkey.decode
1816 1838 namespace = dec(inpart.params['namespace'])
1817 1839 key = dec(inpart.params['key'])
1818 1840 old = dec(inpart.params['old'])
1819 1841 new = dec(inpart.params['new'])
1820 1842 # Grab the transaction to ensure that we have the lock before performing the
1821 1843 # pushkey.
1822 1844 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1823 1845 op.gettransaction()
1824 1846 ret = op.repo.pushkey(namespace, key, old, new)
1825 1847 record = {'namespace': namespace,
1826 1848 'key': key,
1827 1849 'old': old,
1828 1850 'new': new}
1829 1851 op.records.add('pushkey', record)
1830 1852 if op.reply is not None:
1831 1853 rpart = op.reply.newpart('reply:pushkey')
1832 1854 rpart.addparam(
1833 1855 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1834 1856 rpart.addparam('return', '%i' % ret, mandatory=False)
1835 1857 if inpart.mandatory and not ret:
1836 1858 kwargs = {}
1837 1859 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1838 1860 if key in inpart.params:
1839 1861 kwargs[key] = inpart.params[key]
1840 1862 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1841 1863
1842 1864 @parthandler('phase-heads')
1843 1865 def handlephases(op, inpart):
1844 1866 """apply phases from bundle part to repo"""
1845 1867 headsbyphase = phases.binarydecode(inpart)
1846 1868 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
1847 1869
1848 1870 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1849 1871 def handlepushkeyreply(op, inpart):
1850 1872 """retrieve the result of a pushkey request"""
1851 1873 ret = int(inpart.params['return'])
1852 1874 partid = int(inpart.params['in-reply-to'])
1853 1875 op.records.add('pushkey', {'return': ret}, partid)
1854 1876
1855 1877 @parthandler('obsmarkers')
1856 1878 def handleobsmarker(op, inpart):
1857 1879 """add a stream of obsmarkers to the repo"""
1858 1880 tr = op.gettransaction()
1859 1881 markerdata = inpart.read()
1860 1882 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1861 1883 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1862 1884 % len(markerdata))
1863 1885 # The mergemarkers call will crash if marker creation is not enabled.
1864 1886 # we want to avoid this if the part is advisory.
1865 1887 if not inpart.mandatory and op.repo.obsstore.readonly:
1866 1888 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1867 1889 return
1868 1890 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1869 1891 op.repo.invalidatevolatilesets()
1870 1892 if new:
1871 1893 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1872 1894 op.records.add('obsmarkers', {'new': new})
1873 1895 if op.reply is not None:
1874 1896 rpart = op.reply.newpart('reply:obsmarkers')
1875 1897 rpart.addparam(
1876 1898 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1877 1899 rpart.addparam('new', '%i' % new, mandatory=False)
1878 1900
1879 1901
1880 1902 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1881 1903 def handleobsmarkerreply(op, inpart):
1882 1904 """retrieve the result of a pushkey request"""
1883 1905 ret = int(inpart.params['new'])
1884 1906 partid = int(inpart.params['in-reply-to'])
1885 1907 op.records.add('obsmarkers', {'new': ret}, partid)
1886 1908
1887 1909 @parthandler('hgtagsfnodes')
1888 1910 def handlehgtagsfnodes(op, inpart):
1889 1911 """Applies .hgtags fnodes cache entries to the local repo.
1890 1912
1891 1913 Payload is pairs of 20 byte changeset nodes and filenodes.
1892 1914 """
1893 1915 # Grab the transaction so we ensure that we have the lock at this point.
1894 1916 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1895 1917 op.gettransaction()
1896 1918 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1897 1919
1898 1920 count = 0
1899 1921 while True:
1900 1922 node = inpart.read(20)
1901 1923 fnode = inpart.read(20)
1902 1924 if len(node) < 20 or len(fnode) < 20:
1903 1925 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1904 1926 break
1905 1927 cache.setfnode(node, fnode)
1906 1928 count += 1
1907 1929
1908 1930 cache.write()
1909 1931 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1910 1932
1911 1933 @parthandler('pushvars')
1912 1934 def bundle2getvars(op, part):
1913 1935 '''unbundle a bundle2 containing shellvars on the server'''
1914 1936 # An option to disable unbundling on server-side for security reasons
1915 1937 if op.ui.configbool('push', 'pushvars.server'):
1916 1938 hookargs = {}
1917 1939 for key, value in part.advisoryparams:
1918 1940 key = key.upper()
1919 1941 # We want pushed variables to have USERVAR_ prepended so we know
1920 1942 # they came from the --pushvar flag.
1921 1943 key = "USERVAR_" + key
1922 1944 hookargs[key] = value
1923 1945 op.addhookargs(hookargs)
General Comments 0
You need to be logged in to leave comments. Login now