##// END OF EJS Templates
bundle2: use os.SEEK_* constants...
Gregory Szorc -
r35037:241d9cac default
parent child Browse files
Show More
@@ -1,1945 +1,1946
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import, division
149 149
150 150 import errno
151 import os
151 152 import re
152 153 import string
153 154 import struct
154 155 import sys
155 156
156 157 from .i18n import _
157 158 from . import (
158 159 changegroup,
159 160 error,
160 161 node as nodemod,
161 162 obsolete,
162 163 phases,
163 164 pushkey,
164 165 pycompat,
165 166 tags,
166 167 url,
167 168 util,
168 169 )
169 170
170 171 urlerr = util.urlerr
171 172 urlreq = util.urlreq
172 173
173 174 _pack = struct.pack
174 175 _unpack = struct.unpack
175 176
176 177 _fstreamparamsize = '>i'
177 178 _fpartheadersize = '>i'
178 179 _fparttypesize = '>B'
179 180 _fpartid = '>I'
180 181 _fpayloadsize = '>i'
181 182 _fpartparamcount = '>BB'
182 183
183 184 preferedchunksize = 4096
184 185
185 186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
186 187
187 188 def outdebug(ui, message):
188 189 """debug regarding output stream (bundling)"""
189 190 if ui.configbool('devel', 'bundle2.debug'):
190 191 ui.debug('bundle2-output: %s\n' % message)
191 192
192 193 def indebug(ui, message):
193 194 """debug on input stream (unbundling)"""
194 195 if ui.configbool('devel', 'bundle2.debug'):
195 196 ui.debug('bundle2-input: %s\n' % message)
196 197
197 198 def validateparttype(parttype):
198 199 """raise ValueError if a parttype contains invalid character"""
199 200 if _parttypeforbidden.search(parttype):
200 201 raise ValueError(parttype)
201 202
202 203 def _makefpartparamsizes(nbparams):
203 204 """return a struct format to read part parameter sizes
204 205
205 206 The number parameters is variable so we need to build that format
206 207 dynamically.
207 208 """
208 209 return '>'+('BB'*nbparams)
209 210
210 211 parthandlermapping = {}
211 212
212 213 def parthandler(parttype, params=()):
213 214 """decorator that register a function as a bundle2 part handler
214 215
215 216 eg::
216 217
217 218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
218 219 def myparttypehandler(...):
219 220 '''process a part of type "my part".'''
220 221 ...
221 222 """
222 223 validateparttype(parttype)
223 224 def _decorator(func):
224 225 lparttype = parttype.lower() # enforce lower case matching.
225 226 assert lparttype not in parthandlermapping
226 227 parthandlermapping[lparttype] = func
227 228 func.params = frozenset(params)
228 229 return func
229 230 return _decorator
230 231
231 232 class unbundlerecords(object):
232 233 """keep record of what happens during and unbundle
233 234
234 235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
235 236 category of record and obj is an arbitrary object.
236 237
237 238 `records['cat']` will return all entries of this category 'cat'.
238 239
239 240 Iterating on the object itself will yield `('category', obj)` tuples
240 241 for all entries.
241 242
242 243 All iterations happens in chronological order.
243 244 """
244 245
245 246 def __init__(self):
246 247 self._categories = {}
247 248 self._sequences = []
248 249 self._replies = {}
249 250
250 251 def add(self, category, entry, inreplyto=None):
251 252 """add a new record of a given category.
252 253
253 254 The entry can then be retrieved in the list returned by
254 255 self['category']."""
255 256 self._categories.setdefault(category, []).append(entry)
256 257 self._sequences.append((category, entry))
257 258 if inreplyto is not None:
258 259 self.getreplies(inreplyto).add(category, entry)
259 260
260 261 def getreplies(self, partid):
261 262 """get the records that are replies to a specific part"""
262 263 return self._replies.setdefault(partid, unbundlerecords())
263 264
264 265 def __getitem__(self, cat):
265 266 return tuple(self._categories.get(cat, ()))
266 267
267 268 def __iter__(self):
268 269 return iter(self._sequences)
269 270
270 271 def __len__(self):
271 272 return len(self._sequences)
272 273
273 274 def __nonzero__(self):
274 275 return bool(self._sequences)
275 276
276 277 __bool__ = __nonzero__
277 278
278 279 class bundleoperation(object):
279 280 """an object that represents a single bundling process
280 281
281 282 Its purpose is to carry unbundle-related objects and states.
282 283
283 284 A new object should be created at the beginning of each bundle processing.
284 285 The object is to be returned by the processing function.
285 286
286 287 The object has very little content now it will ultimately contain:
287 288 * an access to the repo the bundle is applied to,
288 289 * a ui object,
289 290 * a way to retrieve a transaction to add changes to the repo,
290 291 * a way to record the result of processing each part,
291 292 * a way to construct a bundle response when applicable.
292 293 """
293 294
294 295 def __init__(self, repo, transactiongetter, captureoutput=True):
295 296 self.repo = repo
296 297 self.ui = repo.ui
297 298 self.records = unbundlerecords()
298 299 self.reply = None
299 300 self.captureoutput = captureoutput
300 301 self.hookargs = {}
301 302 self._gettransaction = transactiongetter
302 303
303 304 def gettransaction(self):
304 305 transaction = self._gettransaction()
305 306
306 307 if self.hookargs:
307 308 # the ones added to the transaction supercede those added
308 309 # to the operation.
309 310 self.hookargs.update(transaction.hookargs)
310 311 transaction.hookargs = self.hookargs
311 312
312 313 # mark the hookargs as flushed. further attempts to add to
313 314 # hookargs will result in an abort.
314 315 self.hookargs = None
315 316
316 317 return transaction
317 318
318 319 def addhookargs(self, hookargs):
319 320 if self.hookargs is None:
320 321 raise error.ProgrammingError('attempted to add hookargs to '
321 322 'operation after transaction started')
322 323 self.hookargs.update(hookargs)
323 324
324 325 class TransactionUnavailable(RuntimeError):
325 326 pass
326 327
327 328 def _notransaction():
328 329 """default method to get a transaction while processing a bundle
329 330
330 331 Raise an exception to highlight the fact that no transaction was expected
331 332 to be created"""
332 333 raise TransactionUnavailable()
333 334
334 335 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
335 336 # transform me into unbundler.apply() as soon as the freeze is lifted
336 337 if isinstance(unbundler, unbundle20):
337 338 tr.hookargs['bundle2'] = '1'
338 339 if source is not None and 'source' not in tr.hookargs:
339 340 tr.hookargs['source'] = source
340 341 if url is not None and 'url' not in tr.hookargs:
341 342 tr.hookargs['url'] = url
342 343 return processbundle(repo, unbundler, lambda: tr)
343 344 else:
344 345 # the transactiongetter won't be used, but we might as well set it
345 346 op = bundleoperation(repo, lambda: tr)
346 347 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
347 348 return op
348 349
349 350 class partiterator(object):
350 351 def __init__(self, repo, op, unbundler):
351 352 self.repo = repo
352 353 self.op = op
353 354 self.unbundler = unbundler
354 355 self.iterator = None
355 356 self.count = 0
356 357 self.current = None
357 358
358 359 def __enter__(self):
359 360 def func():
360 361 itr = enumerate(self.unbundler.iterparts())
361 362 for count, p in itr:
362 363 self.count = count
363 364 self.current = p
364 365 yield p
365 p.seek(0, 2)
366 p.seek(0, os.SEEK_END)
366 367 self.current = None
367 368 self.iterator = func()
368 369 return self.iterator
369 370
370 371 def __exit__(self, type, exc, tb):
371 372 if not self.iterator:
372 373 return
373 374
374 375 # Only gracefully abort in a normal exception situation. User aborts
375 376 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
376 377 # and should not gracefully cleanup.
377 378 if isinstance(exc, Exception):
378 379 # Any exceptions seeking to the end of the bundle at this point are
379 380 # almost certainly related to the underlying stream being bad.
380 381 # And, chances are that the exception we're handling is related to
381 382 # getting in that bad state. So, we swallow the seeking error and
382 383 # re-raise the original error.
383 384 seekerror = False
384 385 try:
385 386 if self.current:
386 387 # consume the part content to not corrupt the stream.
387 self.current.seek(0, 2)
388 self.current.seek(0, os.SEEK_END)
388 389
389 390 for part in self.iterator:
390 391 # consume the bundle content
391 part.seek(0, 2)
392 part.seek(0, os.SEEK_END)
392 393 except Exception:
393 394 seekerror = True
394 395
395 396 # Small hack to let caller code distinguish exceptions from bundle2
396 397 # processing from processing the old format. This is mostly needed
397 398 # to handle different return codes to unbundle according to the type
398 399 # of bundle. We should probably clean up or drop this return code
399 400 # craziness in a future version.
400 401 exc.duringunbundle2 = True
401 402 salvaged = []
402 403 replycaps = None
403 404 if self.op.reply is not None:
404 405 salvaged = self.op.reply.salvageoutput()
405 406 replycaps = self.op.reply.capabilities
406 407 exc._replycaps = replycaps
407 408 exc._bundle2salvagedoutput = salvaged
408 409
409 410 # Re-raising from a variable loses the original stack. So only use
410 411 # that form if we need to.
411 412 if seekerror:
412 413 raise exc
413 414
414 415 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
415 416 self.count)
416 417
417 418 def processbundle(repo, unbundler, transactiongetter=None, op=None):
418 419 """This function process a bundle, apply effect to/from a repo
419 420
420 421 It iterates over each part then searches for and uses the proper handling
421 422 code to process the part. Parts are processed in order.
422 423
423 424 Unknown Mandatory part will abort the process.
424 425
425 426 It is temporarily possible to provide a prebuilt bundleoperation to the
426 427 function. This is used to ensure output is properly propagated in case of
427 428 an error during the unbundling. This output capturing part will likely be
428 429 reworked and this ability will probably go away in the process.
429 430 """
430 431 if op is None:
431 432 if transactiongetter is None:
432 433 transactiongetter = _notransaction
433 434 op = bundleoperation(repo, transactiongetter)
434 435 # todo:
435 436 # - replace this is a init function soon.
436 437 # - exception catching
437 438 unbundler.params
438 439 if repo.ui.debugflag:
439 440 msg = ['bundle2-input-bundle:']
440 441 if unbundler.params:
441 442 msg.append(' %i params' % len(unbundler.params))
442 443 if op._gettransaction is None or op._gettransaction is _notransaction:
443 444 msg.append(' no-transaction')
444 445 else:
445 446 msg.append(' with-transaction')
446 447 msg.append('\n')
447 448 repo.ui.debug(''.join(msg))
448 449
449 450 processparts(repo, op, unbundler)
450 451
451 452 return op
452 453
453 454 def processparts(repo, op, unbundler):
454 455 with partiterator(repo, op, unbundler) as parts:
455 456 for part in parts:
456 457 _processpart(op, part)
457 458
458 459 def _processchangegroup(op, cg, tr, source, url, **kwargs):
459 460 ret = cg.apply(op.repo, tr, source, url, **kwargs)
460 461 op.records.add('changegroup', {
461 462 'return': ret,
462 463 })
463 464 return ret
464 465
465 466 def _gethandler(op, part):
466 467 status = 'unknown' # used by debug output
467 468 try:
468 469 handler = parthandlermapping.get(part.type)
469 470 if handler is None:
470 471 status = 'unsupported-type'
471 472 raise error.BundleUnknownFeatureError(parttype=part.type)
472 473 indebug(op.ui, 'found a handler for part %s' % part.type)
473 474 unknownparams = part.mandatorykeys - handler.params
474 475 if unknownparams:
475 476 unknownparams = list(unknownparams)
476 477 unknownparams.sort()
477 478 status = 'unsupported-params (%s)' % ', '.join(unknownparams)
478 479 raise error.BundleUnknownFeatureError(parttype=part.type,
479 480 params=unknownparams)
480 481 status = 'supported'
481 482 except error.BundleUnknownFeatureError as exc:
482 483 if part.mandatory: # mandatory parts
483 484 raise
484 485 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
485 486 return # skip to part processing
486 487 finally:
487 488 if op.ui.debugflag:
488 489 msg = ['bundle2-input-part: "%s"' % part.type]
489 490 if not part.mandatory:
490 491 msg.append(' (advisory)')
491 492 nbmp = len(part.mandatorykeys)
492 493 nbap = len(part.params) - nbmp
493 494 if nbmp or nbap:
494 495 msg.append(' (params:')
495 496 if nbmp:
496 497 msg.append(' %i mandatory' % nbmp)
497 498 if nbap:
498 499 msg.append(' %i advisory' % nbmp)
499 500 msg.append(')')
500 501 msg.append(' %s\n' % status)
501 502 op.ui.debug(''.join(msg))
502 503
503 504 return handler
504 505
505 506 def _processpart(op, part):
506 507 """process a single part from a bundle
507 508
508 509 The part is guaranteed to have been fully consumed when the function exits
509 510 (even if an exception is raised)."""
510 511 handler = _gethandler(op, part)
511 512 if handler is None:
512 513 return
513 514
514 515 # handler is called outside the above try block so that we don't
515 516 # risk catching KeyErrors from anything other than the
516 517 # parthandlermapping lookup (any KeyError raised by handler()
517 518 # itself represents a defect of a different variety).
518 519 output = None
519 520 if op.captureoutput and op.reply is not None:
520 521 op.ui.pushbuffer(error=True, subproc=True)
521 522 output = ''
522 523 try:
523 524 handler(op, part)
524 525 finally:
525 526 if output is not None:
526 527 output = op.ui.popbuffer()
527 528 if output:
528 529 outpart = op.reply.newpart('output', data=output,
529 530 mandatory=False)
530 531 outpart.addparam(
531 532 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
532 533
533 534 def decodecaps(blob):
534 535 """decode a bundle2 caps bytes blob into a dictionary
535 536
536 537 The blob is a list of capabilities (one per line)
537 538 Capabilities may have values using a line of the form::
538 539
539 540 capability=value1,value2,value3
540 541
541 542 The values are always a list."""
542 543 caps = {}
543 544 for line in blob.splitlines():
544 545 if not line:
545 546 continue
546 547 if '=' not in line:
547 548 key, vals = line, ()
548 549 else:
549 550 key, vals = line.split('=', 1)
550 551 vals = vals.split(',')
551 552 key = urlreq.unquote(key)
552 553 vals = [urlreq.unquote(v) for v in vals]
553 554 caps[key] = vals
554 555 return caps
555 556
556 557 def encodecaps(caps):
557 558 """encode a bundle2 caps dictionary into a bytes blob"""
558 559 chunks = []
559 560 for ca in sorted(caps):
560 561 vals = caps[ca]
561 562 ca = urlreq.quote(ca)
562 563 vals = [urlreq.quote(v) for v in vals]
563 564 if vals:
564 565 ca = "%s=%s" % (ca, ','.join(vals))
565 566 chunks.append(ca)
566 567 return '\n'.join(chunks)
567 568
568 569 bundletypes = {
569 570 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
570 571 # since the unification ssh accepts a header but there
571 572 # is no capability signaling it.
572 573 "HG20": (), # special-cased below
573 574 "HG10UN": ("HG10UN", 'UN'),
574 575 "HG10BZ": ("HG10", 'BZ'),
575 576 "HG10GZ": ("HG10GZ", 'GZ'),
576 577 }
577 578
578 579 # hgweb uses this list to communicate its preferred type
579 580 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
580 581
581 582 class bundle20(object):
582 583 """represent an outgoing bundle2 container
583 584
584 585 Use the `addparam` method to add stream level parameter. and `newpart` to
585 586 populate it. Then call `getchunks` to retrieve all the binary chunks of
586 587 data that compose the bundle2 container."""
587 588
588 589 _magicstring = 'HG20'
589 590
590 591 def __init__(self, ui, capabilities=()):
591 592 self.ui = ui
592 593 self._params = []
593 594 self._parts = []
594 595 self.capabilities = dict(capabilities)
595 596 self._compengine = util.compengines.forbundletype('UN')
596 597 self._compopts = None
597 598
598 599 def setcompression(self, alg, compopts=None):
599 600 """setup core part compression to <alg>"""
600 601 if alg in (None, 'UN'):
601 602 return
602 603 assert not any(n.lower() == 'compression' for n, v in self._params)
603 604 self.addparam('Compression', alg)
604 605 self._compengine = util.compengines.forbundletype(alg)
605 606 self._compopts = compopts
606 607
607 608 @property
608 609 def nbparts(self):
609 610 """total number of parts added to the bundler"""
610 611 return len(self._parts)
611 612
612 613 # methods used to defines the bundle2 content
613 614 def addparam(self, name, value=None):
614 615 """add a stream level parameter"""
615 616 if not name:
616 617 raise ValueError(r'empty parameter name')
617 618 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
618 619 raise ValueError(r'non letter first character: %s' % name)
619 620 self._params.append((name, value))
620 621
621 622 def addpart(self, part):
622 623 """add a new part to the bundle2 container
623 624
624 625 Parts contains the actual applicative payload."""
625 626 assert part.id is None
626 627 part.id = len(self._parts) # very cheap counter
627 628 self._parts.append(part)
628 629
629 630 def newpart(self, typeid, *args, **kwargs):
630 631 """create a new part and add it to the containers
631 632
632 633 As the part is directly added to the containers. For now, this means
633 634 that any failure to properly initialize the part after calling
634 635 ``newpart`` should result in a failure of the whole bundling process.
635 636
636 637 You can still fall back to manually create and add if you need better
637 638 control."""
638 639 part = bundlepart(typeid, *args, **kwargs)
639 640 self.addpart(part)
640 641 return part
641 642
642 643 # methods used to generate the bundle2 stream
643 644 def getchunks(self):
644 645 if self.ui.debugflag:
645 646 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
646 647 if self._params:
647 648 msg.append(' (%i params)' % len(self._params))
648 649 msg.append(' %i parts total\n' % len(self._parts))
649 650 self.ui.debug(''.join(msg))
650 651 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
651 652 yield self._magicstring
652 653 param = self._paramchunk()
653 654 outdebug(self.ui, 'bundle parameter: %s' % param)
654 655 yield _pack(_fstreamparamsize, len(param))
655 656 if param:
656 657 yield param
657 658 for chunk in self._compengine.compressstream(self._getcorechunk(),
658 659 self._compopts):
659 660 yield chunk
660 661
661 662 def _paramchunk(self):
662 663 """return a encoded version of all stream parameters"""
663 664 blocks = []
664 665 for par, value in self._params:
665 666 par = urlreq.quote(par)
666 667 if value is not None:
667 668 value = urlreq.quote(value)
668 669 par = '%s=%s' % (par, value)
669 670 blocks.append(par)
670 671 return ' '.join(blocks)
671 672
672 673 def _getcorechunk(self):
673 674 """yield chunk for the core part of the bundle
674 675
675 676 (all but headers and parameters)"""
676 677 outdebug(self.ui, 'start of parts')
677 678 for part in self._parts:
678 679 outdebug(self.ui, 'bundle part: "%s"' % part.type)
679 680 for chunk in part.getchunks(ui=self.ui):
680 681 yield chunk
681 682 outdebug(self.ui, 'end of bundle')
682 683 yield _pack(_fpartheadersize, 0)
683 684
684 685
685 686 def salvageoutput(self):
686 687 """return a list with a copy of all output parts in the bundle
687 688
688 689 This is meant to be used during error handling to make sure we preserve
689 690 server output"""
690 691 salvaged = []
691 692 for part in self._parts:
692 693 if part.type.startswith('output'):
693 694 salvaged.append(part.copy())
694 695 return salvaged
695 696
696 697
697 698 class unpackermixin(object):
698 699 """A mixin to extract bytes and struct data from a stream"""
699 700
700 701 def __init__(self, fp):
701 702 self._fp = fp
702 703
703 704 def _unpack(self, format):
704 705 """unpack this struct format from the stream
705 706
706 707 This method is meant for internal usage by the bundle2 protocol only.
707 708 They directly manipulate the low level stream including bundle2 level
708 709 instruction.
709 710
710 711 Do not use it to implement higher-level logic or methods."""
711 712 data = self._readexact(struct.calcsize(format))
712 713 return _unpack(format, data)
713 714
714 715 def _readexact(self, size):
715 716 """read exactly <size> bytes from the stream
716 717
717 718 This method is meant for internal usage by the bundle2 protocol only.
718 719 They directly manipulate the low level stream including bundle2 level
719 720 instruction.
720 721
721 722 Do not use it to implement higher-level logic or methods."""
722 723 return changegroup.readexactly(self._fp, size)
723 724
724 725 def getunbundler(ui, fp, magicstring=None):
725 726 """return a valid unbundler object for a given magicstring"""
726 727 if magicstring is None:
727 728 magicstring = changegroup.readexactly(fp, 4)
728 729 magic, version = magicstring[0:2], magicstring[2:4]
729 730 if magic != 'HG':
730 731 ui.debug(
731 732 "error: invalid magic: %r (version %r), should be 'HG'\n"
732 733 % (magic, version))
733 734 raise error.Abort(_('not a Mercurial bundle'))
734 735 unbundlerclass = formatmap.get(version)
735 736 if unbundlerclass is None:
736 737 raise error.Abort(_('unknown bundle version %s') % version)
737 738 unbundler = unbundlerclass(ui, fp)
738 739 indebug(ui, 'start processing of %s stream' % magicstring)
739 740 return unbundler
740 741
741 742 class unbundle20(unpackermixin):
742 743 """interpret a bundle2 stream
743 744
744 745 This class is fed with a binary stream and yields parts through its
745 746 `iterparts` methods."""
746 747
747 748 _magicstring = 'HG20'
748 749
749 750 def __init__(self, ui, fp):
750 751 """If header is specified, we do not read it out of the stream."""
751 752 self.ui = ui
752 753 self._compengine = util.compengines.forbundletype('UN')
753 754 self._compressed = None
754 755 super(unbundle20, self).__init__(fp)
755 756
756 757 @util.propertycache
757 758 def params(self):
758 759 """dictionary of stream level parameters"""
759 760 indebug(self.ui, 'reading bundle2 stream parameters')
760 761 params = {}
761 762 paramssize = self._unpack(_fstreamparamsize)[0]
762 763 if paramssize < 0:
763 764 raise error.BundleValueError('negative bundle param size: %i'
764 765 % paramssize)
765 766 if paramssize:
766 767 params = self._readexact(paramssize)
767 768 params = self._processallparams(params)
768 769 return params
769 770
770 771 def _processallparams(self, paramsblock):
771 772 """"""
772 773 params = util.sortdict()
773 774 for p in paramsblock.split(' '):
774 775 p = p.split('=', 1)
775 776 p = [urlreq.unquote(i) for i in p]
776 777 if len(p) < 2:
777 778 p.append(None)
778 779 self._processparam(*p)
779 780 params[p[0]] = p[1]
780 781 return params
781 782
782 783
783 784 def _processparam(self, name, value):
784 785 """process a parameter, applying its effect if needed
785 786
786 787 Parameter starting with a lower case letter are advisory and will be
787 788 ignored when unknown. Those starting with an upper case letter are
788 789 mandatory and will this function will raise a KeyError when unknown.
789 790
790 791 Note: no option are currently supported. Any input will be either
791 792 ignored or failing.
792 793 """
793 794 if not name:
794 795 raise ValueError(r'empty parameter name')
795 796 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
796 797 raise ValueError(r'non letter first character: %s' % name)
797 798 try:
798 799 handler = b2streamparamsmap[name.lower()]
799 800 except KeyError:
800 801 if name[0:1].islower():
801 802 indebug(self.ui, "ignoring unknown parameter %s" % name)
802 803 else:
803 804 raise error.BundleUnknownFeatureError(params=(name,))
804 805 else:
805 806 handler(self, name, value)
806 807
807 808 def _forwardchunks(self):
808 809 """utility to transfer a bundle2 as binary
809 810
810 811 This is made necessary by the fact the 'getbundle' command over 'ssh'
811 812 have no way to know then the reply end, relying on the bundle to be
812 813 interpreted to know its end. This is terrible and we are sorry, but we
813 814 needed to move forward to get general delta enabled.
814 815 """
815 816 yield self._magicstring
816 817 assert 'params' not in vars(self)
817 818 paramssize = self._unpack(_fstreamparamsize)[0]
818 819 if paramssize < 0:
819 820 raise error.BundleValueError('negative bundle param size: %i'
820 821 % paramssize)
821 822 yield _pack(_fstreamparamsize, paramssize)
822 823 if paramssize:
823 824 params = self._readexact(paramssize)
824 825 self._processallparams(params)
825 826 yield params
826 827 assert self._compengine.bundletype == 'UN'
827 828 # From there, payload might need to be decompressed
828 829 self._fp = self._compengine.decompressorreader(self._fp)
829 830 emptycount = 0
830 831 while emptycount < 2:
831 832 # so we can brainlessly loop
832 833 assert _fpartheadersize == _fpayloadsize
833 834 size = self._unpack(_fpartheadersize)[0]
834 835 yield _pack(_fpartheadersize, size)
835 836 if size:
836 837 emptycount = 0
837 838 else:
838 839 emptycount += 1
839 840 continue
840 841 if size == flaginterrupt:
841 842 continue
842 843 elif size < 0:
843 844 raise error.BundleValueError('negative chunk size: %i')
844 845 yield self._readexact(size)
845 846
846 847
847 848 def iterparts(self):
848 849 """yield all parts contained in the stream"""
849 850 # make sure param have been loaded
850 851 self.params
851 852 # From there, payload need to be decompressed
852 853 self._fp = self._compengine.decompressorreader(self._fp)
853 854 indebug(self.ui, 'start extraction of bundle2 parts')
854 855 headerblock = self._readpartheader()
855 856 while headerblock is not None:
856 857 part = unbundlepart(self.ui, headerblock, self._fp)
857 858 yield part
858 859 # Seek to the end of the part to force it's consumption so the next
859 860 # part can be read. But then seek back to the beginning so the
860 861 # code consuming this generator has a part that starts at 0.
861 part.seek(0, 2)
862 part.seek(0)
862 part.seek(0, os.SEEK_END)
863 part.seek(0, os.SEEK_SET)
863 864 headerblock = self._readpartheader()
864 865 indebug(self.ui, 'end of bundle2 stream')
865 866
866 867 def _readpartheader(self):
867 868 """reads a part header size and return the bytes blob
868 869
869 870 returns None if empty"""
870 871 headersize = self._unpack(_fpartheadersize)[0]
871 872 if headersize < 0:
872 873 raise error.BundleValueError('negative part header size: %i'
873 874 % headersize)
874 875 indebug(self.ui, 'part header size: %i' % headersize)
875 876 if headersize:
876 877 return self._readexact(headersize)
877 878 return None
878 879
879 880 def compressed(self):
880 881 self.params # load params
881 882 return self._compressed
882 883
883 884 def close(self):
884 885 """close underlying file"""
885 886 if util.safehasattr(self._fp, 'close'):
886 887 return self._fp.close()
887 888
888 889 formatmap = {'20': unbundle20}
889 890
890 891 b2streamparamsmap = {}
891 892
892 893 def b2streamparamhandler(name):
893 894 """register a handler for a stream level parameter"""
894 895 def decorator(func):
895 896 assert name not in formatmap
896 897 b2streamparamsmap[name] = func
897 898 return func
898 899 return decorator
899 900
900 901 @b2streamparamhandler('compression')
901 902 def processcompression(unbundler, param, value):
902 903 """read compression parameter and install payload decompression"""
903 904 if value not in util.compengines.supportedbundletypes:
904 905 raise error.BundleUnknownFeatureError(params=(param,),
905 906 values=(value,))
906 907 unbundler._compengine = util.compengines.forbundletype(value)
907 908 if value is not None:
908 909 unbundler._compressed = True
909 910
910 911 class bundlepart(object):
911 912 """A bundle2 part contains application level payload
912 913
913 914 The part `type` is used to route the part to the application level
914 915 handler.
915 916
916 917 The part payload is contained in ``part.data``. It could be raw bytes or a
917 918 generator of byte chunks.
918 919
919 920 You can add parameters to the part using the ``addparam`` method.
920 921 Parameters can be either mandatory (default) or advisory. Remote side
921 922 should be able to safely ignore the advisory ones.
922 923
923 924 Both data and parameters cannot be modified after the generation has begun.
924 925 """
925 926
926 927 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
927 928 data='', mandatory=True):
928 929 validateparttype(parttype)
929 930 self.id = None
930 931 self.type = parttype
931 932 self._data = data
932 933 self._mandatoryparams = list(mandatoryparams)
933 934 self._advisoryparams = list(advisoryparams)
934 935 # checking for duplicated entries
935 936 self._seenparams = set()
936 937 for pname, __ in self._mandatoryparams + self._advisoryparams:
937 938 if pname in self._seenparams:
938 939 raise error.ProgrammingError('duplicated params: %s' % pname)
939 940 self._seenparams.add(pname)
940 941 # status of the part's generation:
941 942 # - None: not started,
942 943 # - False: currently generated,
943 944 # - True: generation done.
944 945 self._generated = None
945 946 self.mandatory = mandatory
946 947
947 948 def __repr__(self):
948 949 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
949 950 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
950 951 % (cls, id(self), self.id, self.type, self.mandatory))
951 952
952 953 def copy(self):
953 954 """return a copy of the part
954 955
955 956 The new part have the very same content but no partid assigned yet.
956 957 Parts with generated data cannot be copied."""
957 958 assert not util.safehasattr(self.data, 'next')
958 959 return self.__class__(self.type, self._mandatoryparams,
959 960 self._advisoryparams, self._data, self.mandatory)
960 961
961 962 # methods used to defines the part content
962 963 @property
963 964 def data(self):
964 965 return self._data
965 966
966 967 @data.setter
967 968 def data(self, data):
968 969 if self._generated is not None:
969 970 raise error.ReadOnlyPartError('part is being generated')
970 971 self._data = data
971 972
972 973 @property
973 974 def mandatoryparams(self):
974 975 # make it an immutable tuple to force people through ``addparam``
975 976 return tuple(self._mandatoryparams)
976 977
977 978 @property
978 979 def advisoryparams(self):
979 980 # make it an immutable tuple to force people through ``addparam``
980 981 return tuple(self._advisoryparams)
981 982
982 983 def addparam(self, name, value='', mandatory=True):
983 984 """add a parameter to the part
984 985
985 986 If 'mandatory' is set to True, the remote handler must claim support
986 987 for this parameter or the unbundling will be aborted.
987 988
988 989 The 'name' and 'value' cannot exceed 255 bytes each.
989 990 """
990 991 if self._generated is not None:
991 992 raise error.ReadOnlyPartError('part is being generated')
992 993 if name in self._seenparams:
993 994 raise ValueError('duplicated params: %s' % name)
994 995 self._seenparams.add(name)
995 996 params = self._advisoryparams
996 997 if mandatory:
997 998 params = self._mandatoryparams
998 999 params.append((name, value))
999 1000
1000 1001 # methods used to generates the bundle2 stream
1001 1002 def getchunks(self, ui):
1002 1003 if self._generated is not None:
1003 1004 raise error.ProgrammingError('part can only be consumed once')
1004 1005 self._generated = False
1005 1006
1006 1007 if ui.debugflag:
1007 1008 msg = ['bundle2-output-part: "%s"' % self.type]
1008 1009 if not self.mandatory:
1009 1010 msg.append(' (advisory)')
1010 1011 nbmp = len(self.mandatoryparams)
1011 1012 nbap = len(self.advisoryparams)
1012 1013 if nbmp or nbap:
1013 1014 msg.append(' (params:')
1014 1015 if nbmp:
1015 1016 msg.append(' %i mandatory' % nbmp)
1016 1017 if nbap:
1017 1018 msg.append(' %i advisory' % nbmp)
1018 1019 msg.append(')')
1019 1020 if not self.data:
1020 1021 msg.append(' empty payload')
1021 1022 elif (util.safehasattr(self.data, 'next')
1022 1023 or util.safehasattr(self.data, '__next__')):
1023 1024 msg.append(' streamed payload')
1024 1025 else:
1025 1026 msg.append(' %i bytes payload' % len(self.data))
1026 1027 msg.append('\n')
1027 1028 ui.debug(''.join(msg))
1028 1029
1029 1030 #### header
1030 1031 if self.mandatory:
1031 1032 parttype = self.type.upper()
1032 1033 else:
1033 1034 parttype = self.type.lower()
1034 1035 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1035 1036 ## parttype
1036 1037 header = [_pack(_fparttypesize, len(parttype)),
1037 1038 parttype, _pack(_fpartid, self.id),
1038 1039 ]
1039 1040 ## parameters
1040 1041 # count
1041 1042 manpar = self.mandatoryparams
1042 1043 advpar = self.advisoryparams
1043 1044 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1044 1045 # size
1045 1046 parsizes = []
1046 1047 for key, value in manpar:
1047 1048 parsizes.append(len(key))
1048 1049 parsizes.append(len(value))
1049 1050 for key, value in advpar:
1050 1051 parsizes.append(len(key))
1051 1052 parsizes.append(len(value))
1052 1053 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1053 1054 header.append(paramsizes)
1054 1055 # key, value
1055 1056 for key, value in manpar:
1056 1057 header.append(key)
1057 1058 header.append(value)
1058 1059 for key, value in advpar:
1059 1060 header.append(key)
1060 1061 header.append(value)
1061 1062 ## finalize header
1062 1063 try:
1063 1064 headerchunk = ''.join(header)
1064 1065 except TypeError:
1065 1066 raise TypeError(r'Found a non-bytes trying to '
1066 1067 r'build bundle part header: %r' % header)
1067 1068 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1068 1069 yield _pack(_fpartheadersize, len(headerchunk))
1069 1070 yield headerchunk
1070 1071 ## payload
1071 1072 try:
1072 1073 for chunk in self._payloadchunks():
1073 1074 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1074 1075 yield _pack(_fpayloadsize, len(chunk))
1075 1076 yield chunk
1076 1077 except GeneratorExit:
1077 1078 # GeneratorExit means that nobody is listening for our
1078 1079 # results anyway, so just bail quickly rather than trying
1079 1080 # to produce an error part.
1080 1081 ui.debug('bundle2-generatorexit\n')
1081 1082 raise
1082 1083 except BaseException as exc:
1083 1084 bexc = util.forcebytestr(exc)
1084 1085 # backup exception data for later
1085 1086 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1086 1087 % bexc)
1087 1088 tb = sys.exc_info()[2]
1088 1089 msg = 'unexpected error: %s' % bexc
1089 1090 interpart = bundlepart('error:abort', [('message', msg)],
1090 1091 mandatory=False)
1091 1092 interpart.id = 0
1092 1093 yield _pack(_fpayloadsize, -1)
1093 1094 for chunk in interpart.getchunks(ui=ui):
1094 1095 yield chunk
1095 1096 outdebug(ui, 'closing payload chunk')
1096 1097 # abort current part payload
1097 1098 yield _pack(_fpayloadsize, 0)
1098 1099 pycompat.raisewithtb(exc, tb)
1099 1100 # end of payload
1100 1101 outdebug(ui, 'closing payload chunk')
1101 1102 yield _pack(_fpayloadsize, 0)
1102 1103 self._generated = True
1103 1104
1104 1105 def _payloadchunks(self):
1105 1106 """yield chunks of a the part payload
1106 1107
1107 1108 Exists to handle the different methods to provide data to a part."""
1108 1109 # we only support fixed size data now.
1109 1110 # This will be improved in the future.
1110 1111 if (util.safehasattr(self.data, 'next')
1111 1112 or util.safehasattr(self.data, '__next__')):
1112 1113 buff = util.chunkbuffer(self.data)
1113 1114 chunk = buff.read(preferedchunksize)
1114 1115 while chunk:
1115 1116 yield chunk
1116 1117 chunk = buff.read(preferedchunksize)
1117 1118 elif len(self.data):
1118 1119 yield self.data
1119 1120
1120 1121
1121 1122 flaginterrupt = -1
1122 1123
1123 1124 class interrupthandler(unpackermixin):
1124 1125 """read one part and process it with restricted capability
1125 1126
1126 1127 This allows to transmit exception raised on the producer size during part
1127 1128 iteration while the consumer is reading a part.
1128 1129
1129 1130 Part processed in this manner only have access to a ui object,"""
1130 1131
1131 1132 def __init__(self, ui, fp):
1132 1133 super(interrupthandler, self).__init__(fp)
1133 1134 self.ui = ui
1134 1135
1135 1136 def _readpartheader(self):
1136 1137 """reads a part header size and return the bytes blob
1137 1138
1138 1139 returns None if empty"""
1139 1140 headersize = self._unpack(_fpartheadersize)[0]
1140 1141 if headersize < 0:
1141 1142 raise error.BundleValueError('negative part header size: %i'
1142 1143 % headersize)
1143 1144 indebug(self.ui, 'part header size: %i\n' % headersize)
1144 1145 if headersize:
1145 1146 return self._readexact(headersize)
1146 1147 return None
1147 1148
1148 1149 def __call__(self):
1149 1150
1150 1151 self.ui.debug('bundle2-input-stream-interrupt:'
1151 1152 ' opening out of band context\n')
1152 1153 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1153 1154 headerblock = self._readpartheader()
1154 1155 if headerblock is None:
1155 1156 indebug(self.ui, 'no part found during interruption.')
1156 1157 return
1157 1158 part = unbundlepart(self.ui, headerblock, self._fp)
1158 1159 op = interruptoperation(self.ui)
1159 1160 hardabort = False
1160 1161 try:
1161 1162 _processpart(op, part)
1162 1163 except (SystemExit, KeyboardInterrupt):
1163 1164 hardabort = True
1164 1165 raise
1165 1166 finally:
1166 1167 if not hardabort:
1167 part.seek(0, 2)
1168 part.seek(0, os.SEEK_END)
1168 1169 self.ui.debug('bundle2-input-stream-interrupt:'
1169 1170 ' closing out of band context\n')
1170 1171
1171 1172 class interruptoperation(object):
1172 1173 """A limited operation to be use by part handler during interruption
1173 1174
1174 1175 It only have access to an ui object.
1175 1176 """
1176 1177
1177 1178 def __init__(self, ui):
1178 1179 self.ui = ui
1179 1180 self.reply = None
1180 1181 self.captureoutput = False
1181 1182
1182 1183 @property
1183 1184 def repo(self):
1184 1185 raise error.ProgrammingError('no repo access from stream interruption')
1185 1186
1186 1187 def gettransaction(self):
1187 1188 raise TransactionUnavailable('no repo access from stream interruption')
1188 1189
1189 1190 class unbundlepart(unpackermixin):
1190 1191 """a bundle part read from a bundle"""
1191 1192
1192 1193 def __init__(self, ui, header, fp):
1193 1194 super(unbundlepart, self).__init__(fp)
1194 1195 self._seekable = (util.safehasattr(fp, 'seek') and
1195 1196 util.safehasattr(fp, 'tell'))
1196 1197 self.ui = ui
1197 1198 # unbundle state attr
1198 1199 self._headerdata = header
1199 1200 self._headeroffset = 0
1200 1201 self._initialized = False
1201 1202 self.consumed = False
1202 1203 # part data
1203 1204 self.id = None
1204 1205 self.type = None
1205 1206 self.mandatoryparams = None
1206 1207 self.advisoryparams = None
1207 1208 self.params = None
1208 1209 self.mandatorykeys = ()
1209 1210 self._payloadstream = None
1210 1211 self._readheader()
1211 1212 self._mandatory = None
1212 1213 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1213 1214 self._pos = 0
1214 1215
1215 1216 def _fromheader(self, size):
1216 1217 """return the next <size> byte from the header"""
1217 1218 offset = self._headeroffset
1218 1219 data = self._headerdata[offset:(offset + size)]
1219 1220 self._headeroffset = offset + size
1220 1221 return data
1221 1222
1222 1223 def _unpackheader(self, format):
1223 1224 """read given format from header
1224 1225
1225 1226 This automatically compute the size of the format to read."""
1226 1227 data = self._fromheader(struct.calcsize(format))
1227 1228 return _unpack(format, data)
1228 1229
1229 1230 def _initparams(self, mandatoryparams, advisoryparams):
1230 1231 """internal function to setup all logic related parameters"""
1231 1232 # make it read only to prevent people touching it by mistake.
1232 1233 self.mandatoryparams = tuple(mandatoryparams)
1233 1234 self.advisoryparams = tuple(advisoryparams)
1234 1235 # user friendly UI
1235 1236 self.params = util.sortdict(self.mandatoryparams)
1236 1237 self.params.update(self.advisoryparams)
1237 1238 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1238 1239
1239 1240 def _payloadchunks(self, chunknum=0):
1240 1241 '''seek to specified chunk and start yielding data'''
1241 1242 if len(self._chunkindex) == 0:
1242 1243 assert chunknum == 0, 'Must start with chunk 0'
1243 1244 self._chunkindex.append((0, self._tellfp()))
1244 1245 else:
1245 1246 assert chunknum < len(self._chunkindex), \
1246 1247 'Unknown chunk %d' % chunknum
1247 1248 self._seekfp(self._chunkindex[chunknum][1])
1248 1249
1249 1250 pos = self._chunkindex[chunknum][0]
1250 1251 payloadsize = self._unpack(_fpayloadsize)[0]
1251 1252 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1252 1253 while payloadsize:
1253 1254 if payloadsize == flaginterrupt:
1254 1255 # interruption detection, the handler will now read a
1255 1256 # single part and process it.
1256 1257 interrupthandler(self.ui, self._fp)()
1257 1258 elif payloadsize < 0:
1258 1259 msg = 'negative payload chunk size: %i' % payloadsize
1259 1260 raise error.BundleValueError(msg)
1260 1261 else:
1261 1262 result = self._readexact(payloadsize)
1262 1263 chunknum += 1
1263 1264 pos += payloadsize
1264 1265 if chunknum == len(self._chunkindex):
1265 1266 self._chunkindex.append((pos, self._tellfp()))
1266 1267 yield result
1267 1268 payloadsize = self._unpack(_fpayloadsize)[0]
1268 1269 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1269 1270
1270 1271 def _findchunk(self, pos):
1271 1272 '''for a given payload position, return a chunk number and offset'''
1272 1273 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1273 1274 if ppos == pos:
1274 1275 return chunk, 0
1275 1276 elif ppos > pos:
1276 1277 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1277 1278 raise ValueError('Unknown chunk')
1278 1279
1279 1280 def _readheader(self):
1280 1281 """read the header and setup the object"""
1281 1282 typesize = self._unpackheader(_fparttypesize)[0]
1282 1283 self.type = self._fromheader(typesize)
1283 1284 indebug(self.ui, 'part type: "%s"' % self.type)
1284 1285 self.id = self._unpackheader(_fpartid)[0]
1285 1286 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1286 1287 # extract mandatory bit from type
1287 1288 self.mandatory = (self.type != self.type.lower())
1288 1289 self.type = self.type.lower()
1289 1290 ## reading parameters
1290 1291 # param count
1291 1292 mancount, advcount = self._unpackheader(_fpartparamcount)
1292 1293 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1293 1294 # param size
1294 1295 fparamsizes = _makefpartparamsizes(mancount + advcount)
1295 1296 paramsizes = self._unpackheader(fparamsizes)
1296 1297 # make it a list of couple again
1297 1298 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1298 1299 # split mandatory from advisory
1299 1300 mansizes = paramsizes[:mancount]
1300 1301 advsizes = paramsizes[mancount:]
1301 1302 # retrieve param value
1302 1303 manparams = []
1303 1304 for key, value in mansizes:
1304 1305 manparams.append((self._fromheader(key), self._fromheader(value)))
1305 1306 advparams = []
1306 1307 for key, value in advsizes:
1307 1308 advparams.append((self._fromheader(key), self._fromheader(value)))
1308 1309 self._initparams(manparams, advparams)
1309 1310 ## part payload
1310 1311 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1311 1312 # we read the data, tell it
1312 1313 self._initialized = True
1313 1314
1314 1315 def read(self, size=None):
1315 1316 """read payload data"""
1316 1317 if not self._initialized:
1317 1318 self._readheader()
1318 1319 if size is None:
1319 1320 data = self._payloadstream.read()
1320 1321 else:
1321 1322 data = self._payloadstream.read(size)
1322 1323 self._pos += len(data)
1323 1324 if size is None or len(data) < size:
1324 1325 if not self.consumed and self._pos:
1325 1326 self.ui.debug('bundle2-input-part: total payload size %i\n'
1326 1327 % self._pos)
1327 1328 self.consumed = True
1328 1329 return data
1329 1330
1330 1331 def tell(self):
1331 1332 return self._pos
1332 1333
1333 def seek(self, offset, whence=0):
1334 if whence == 0:
1334 def seek(self, offset, whence=os.SEEK_SET):
1335 if whence == os.SEEK_SET:
1335 1336 newpos = offset
1336 elif whence == 1:
1337 elif whence == os.SEEK_CUR:
1337 1338 newpos = self._pos + offset
1338 elif whence == 2:
1339 elif whence == os.SEEK_END:
1339 1340 if not self.consumed:
1340 1341 self.read()
1341 1342 newpos = self._chunkindex[-1][0] - offset
1342 1343 else:
1343 1344 raise ValueError('Unknown whence value: %r' % (whence,))
1344 1345
1345 1346 if newpos > self._chunkindex[-1][0] and not self.consumed:
1346 1347 self.read()
1347 1348 if not 0 <= newpos <= self._chunkindex[-1][0]:
1348 1349 raise ValueError('Offset out of range')
1349 1350
1350 1351 if self._pos != newpos:
1351 1352 chunk, internaloffset = self._findchunk(newpos)
1352 1353 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1353 1354 adjust = self.read(internaloffset)
1354 1355 if len(adjust) != internaloffset:
1355 1356 raise error.Abort(_('Seek failed\n'))
1356 1357 self._pos = newpos
1357 1358
1358 1359 def _seekfp(self, offset, whence=0):
1359 1360 """move the underlying file pointer
1360 1361
1361 1362 This method is meant for internal usage by the bundle2 protocol only.
1362 1363 They directly manipulate the low level stream including bundle2 level
1363 1364 instruction.
1364 1365
1365 1366 Do not use it to implement higher-level logic or methods."""
1366 1367 if self._seekable:
1367 1368 return self._fp.seek(offset, whence)
1368 1369 else:
1369 1370 raise NotImplementedError(_('File pointer is not seekable'))
1370 1371
1371 1372 def _tellfp(self):
1372 1373 """return the file offset, or None if file is not seekable
1373 1374
1374 1375 This method is meant for internal usage by the bundle2 protocol only.
1375 1376 They directly manipulate the low level stream including bundle2 level
1376 1377 instruction.
1377 1378
1378 1379 Do not use it to implement higher-level logic or methods."""
1379 1380 if self._seekable:
1380 1381 try:
1381 1382 return self._fp.tell()
1382 1383 except IOError as e:
1383 1384 if e.errno == errno.ESPIPE:
1384 1385 self._seekable = False
1385 1386 else:
1386 1387 raise
1387 1388 return None
1388 1389
1389 1390 # These are only the static capabilities.
1390 1391 # Check the 'getrepocaps' function for the rest.
1391 1392 capabilities = {'HG20': (),
1392 1393 'error': ('abort', 'unsupportedcontent', 'pushraced',
1393 1394 'pushkey'),
1394 1395 'listkeys': (),
1395 1396 'pushkey': (),
1396 1397 'digests': tuple(sorted(util.DIGESTS.keys())),
1397 1398 'remote-changegroup': ('http', 'https'),
1398 1399 'hgtagsfnodes': (),
1399 1400 'phases': ('heads',),
1400 1401 }
1401 1402
1402 1403 def getrepocaps(repo, allowpushback=False):
1403 1404 """return the bundle2 capabilities for a given repo
1404 1405
1405 1406 Exists to allow extensions (like evolution) to mutate the capabilities.
1406 1407 """
1407 1408 caps = capabilities.copy()
1408 1409 caps['changegroup'] = tuple(sorted(
1409 1410 changegroup.supportedincomingversions(repo)))
1410 1411 if obsolete.isenabled(repo, obsolete.exchangeopt):
1411 1412 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1412 1413 caps['obsmarkers'] = supportedformat
1413 1414 if allowpushback:
1414 1415 caps['pushback'] = ()
1415 1416 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1416 1417 if cpmode == 'check-related':
1417 1418 caps['checkheads'] = ('related',)
1418 1419 if 'phases' in repo.ui.configlist('devel', 'legacy.exchange'):
1419 1420 caps.pop('phases')
1420 1421 return caps
1421 1422
1422 1423 def bundle2caps(remote):
1423 1424 """return the bundle capabilities of a peer as dict"""
1424 1425 raw = remote.capable('bundle2')
1425 1426 if not raw and raw != '':
1426 1427 return {}
1427 1428 capsblob = urlreq.unquote(remote.capable('bundle2'))
1428 1429 return decodecaps(capsblob)
1429 1430
1430 1431 def obsmarkersversion(caps):
1431 1432 """extract the list of supported obsmarkers versions from a bundle2caps dict
1432 1433 """
1433 1434 obscaps = caps.get('obsmarkers', ())
1434 1435 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1435 1436
1436 1437 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1437 1438 vfs=None, compression=None, compopts=None):
1438 1439 if bundletype.startswith('HG10'):
1439 1440 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1440 1441 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1441 1442 compression=compression, compopts=compopts)
1442 1443 elif not bundletype.startswith('HG20'):
1443 1444 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1444 1445
1445 1446 caps = {}
1446 1447 if 'obsolescence' in opts:
1447 1448 caps['obsmarkers'] = ('V1',)
1448 1449 bundle = bundle20(ui, caps)
1449 1450 bundle.setcompression(compression, compopts)
1450 1451 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1451 1452 chunkiter = bundle.getchunks()
1452 1453
1453 1454 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1454 1455
1455 1456 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1456 1457 # We should eventually reconcile this logic with the one behind
1457 1458 # 'exchange.getbundle2partsgenerator'.
1458 1459 #
1459 1460 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1460 1461 # different right now. So we keep them separated for now for the sake of
1461 1462 # simplicity.
1462 1463
1463 1464 # we always want a changegroup in such bundle
1464 1465 cgversion = opts.get('cg.version')
1465 1466 if cgversion is None:
1466 1467 cgversion = changegroup.safeversion(repo)
1467 1468 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1468 1469 part = bundler.newpart('changegroup', data=cg.getchunks())
1469 1470 part.addparam('version', cg.version)
1470 1471 if 'clcount' in cg.extras:
1471 1472 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1472 1473 mandatory=False)
1473 1474 if opts.get('phases') and repo.revs('%ln and secret()',
1474 1475 outgoing.missingheads):
1475 1476 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1476 1477
1477 1478 addparttagsfnodescache(repo, bundler, outgoing)
1478 1479
1479 1480 if opts.get('obsolescence', False):
1480 1481 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1481 1482 buildobsmarkerspart(bundler, obsmarkers)
1482 1483
1483 1484 if opts.get('phases', False):
1484 1485 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1485 1486 phasedata = phases.binaryencode(headsbyphase)
1486 1487 bundler.newpart('phase-heads', data=phasedata)
1487 1488
1488 1489 def addparttagsfnodescache(repo, bundler, outgoing):
1489 1490 # we include the tags fnode cache for the bundle changeset
1490 1491 # (as an optional parts)
1491 1492 cache = tags.hgtagsfnodescache(repo.unfiltered())
1492 1493 chunks = []
1493 1494
1494 1495 # .hgtags fnodes are only relevant for head changesets. While we could
1495 1496 # transfer values for all known nodes, there will likely be little to
1496 1497 # no benefit.
1497 1498 #
1498 1499 # We don't bother using a generator to produce output data because
1499 1500 # a) we only have 40 bytes per head and even esoteric numbers of heads
1500 1501 # consume little memory (1M heads is 40MB) b) we don't want to send the
1501 1502 # part if we don't have entries and knowing if we have entries requires
1502 1503 # cache lookups.
1503 1504 for node in outgoing.missingheads:
1504 1505 # Don't compute missing, as this may slow down serving.
1505 1506 fnode = cache.getfnode(node, computemissing=False)
1506 1507 if fnode is not None:
1507 1508 chunks.extend([node, fnode])
1508 1509
1509 1510 if chunks:
1510 1511 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1511 1512
1512 1513 def buildobsmarkerspart(bundler, markers):
1513 1514 """add an obsmarker part to the bundler with <markers>
1514 1515
1515 1516 No part is created if markers is empty.
1516 1517 Raises ValueError if the bundler doesn't support any known obsmarker format.
1517 1518 """
1518 1519 if not markers:
1519 1520 return None
1520 1521
1521 1522 remoteversions = obsmarkersversion(bundler.capabilities)
1522 1523 version = obsolete.commonversion(remoteversions)
1523 1524 if version is None:
1524 1525 raise ValueError('bundler does not support common obsmarker format')
1525 1526 stream = obsolete.encodemarkers(markers, True, version=version)
1526 1527 return bundler.newpart('obsmarkers', data=stream)
1527 1528
1528 1529 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1529 1530 compopts=None):
1530 1531 """Write a bundle file and return its filename.
1531 1532
1532 1533 Existing files will not be overwritten.
1533 1534 If no filename is specified, a temporary file is created.
1534 1535 bz2 compression can be turned off.
1535 1536 The bundle file will be deleted in case of errors.
1536 1537 """
1537 1538
1538 1539 if bundletype == "HG20":
1539 1540 bundle = bundle20(ui)
1540 1541 bundle.setcompression(compression, compopts)
1541 1542 part = bundle.newpart('changegroup', data=cg.getchunks())
1542 1543 part.addparam('version', cg.version)
1543 1544 if 'clcount' in cg.extras:
1544 1545 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1545 1546 mandatory=False)
1546 1547 chunkiter = bundle.getchunks()
1547 1548 else:
1548 1549 # compression argument is only for the bundle2 case
1549 1550 assert compression is None
1550 1551 if cg.version != '01':
1551 1552 raise error.Abort(_('old bundle types only supports v1 '
1552 1553 'changegroups'))
1553 1554 header, comp = bundletypes[bundletype]
1554 1555 if comp not in util.compengines.supportedbundletypes:
1555 1556 raise error.Abort(_('unknown stream compression type: %s')
1556 1557 % comp)
1557 1558 compengine = util.compengines.forbundletype(comp)
1558 1559 def chunkiter():
1559 1560 yield header
1560 1561 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1561 1562 yield chunk
1562 1563 chunkiter = chunkiter()
1563 1564
1564 1565 # parse the changegroup data, otherwise we will block
1565 1566 # in case of sshrepo because we don't know the end of the stream
1566 1567 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1567 1568
1568 1569 def combinechangegroupresults(op):
1569 1570 """logic to combine 0 or more addchangegroup results into one"""
1570 1571 results = [r.get('return', 0)
1571 1572 for r in op.records['changegroup']]
1572 1573 changedheads = 0
1573 1574 result = 1
1574 1575 for ret in results:
1575 1576 # If any changegroup result is 0, return 0
1576 1577 if ret == 0:
1577 1578 result = 0
1578 1579 break
1579 1580 if ret < -1:
1580 1581 changedheads += ret + 1
1581 1582 elif ret > 1:
1582 1583 changedheads += ret - 1
1583 1584 if changedheads > 0:
1584 1585 result = 1 + changedheads
1585 1586 elif changedheads < 0:
1586 1587 result = -1 + changedheads
1587 1588 return result
1588 1589
1589 1590 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1590 1591 'targetphase'))
1591 1592 def handlechangegroup(op, inpart):
1592 1593 """apply a changegroup part on the repo
1593 1594
1594 1595 This is a very early implementation that will massive rework before being
1595 1596 inflicted to any end-user.
1596 1597 """
1597 1598 tr = op.gettransaction()
1598 1599 unpackerversion = inpart.params.get('version', '01')
1599 1600 # We should raise an appropriate exception here
1600 1601 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1601 1602 # the source and url passed here are overwritten by the one contained in
1602 1603 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1603 1604 nbchangesets = None
1604 1605 if 'nbchanges' in inpart.params:
1605 1606 nbchangesets = int(inpart.params.get('nbchanges'))
1606 1607 if ('treemanifest' in inpart.params and
1607 1608 'treemanifest' not in op.repo.requirements):
1608 1609 if len(op.repo.changelog) != 0:
1609 1610 raise error.Abort(_(
1610 1611 "bundle contains tree manifests, but local repo is "
1611 1612 "non-empty and does not use tree manifests"))
1612 1613 op.repo.requirements.add('treemanifest')
1613 1614 op.repo._applyopenerreqs()
1614 1615 op.repo._writerequirements()
1615 1616 extrakwargs = {}
1616 1617 targetphase = inpart.params.get('targetphase')
1617 1618 if targetphase is not None:
1618 1619 extrakwargs['targetphase'] = int(targetphase)
1619 1620 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1620 1621 expectedtotal=nbchangesets, **extrakwargs)
1621 1622 if op.reply is not None:
1622 1623 # This is definitely not the final form of this
1623 1624 # return. But one need to start somewhere.
1624 1625 part = op.reply.newpart('reply:changegroup', mandatory=False)
1625 1626 part.addparam(
1626 1627 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1627 1628 part.addparam('return', '%i' % ret, mandatory=False)
1628 1629 assert not inpart.read()
1629 1630
1630 1631 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1631 1632 ['digest:%s' % k for k in util.DIGESTS.keys()])
1632 1633 @parthandler('remote-changegroup', _remotechangegroupparams)
1633 1634 def handleremotechangegroup(op, inpart):
1634 1635 """apply a bundle10 on the repo, given an url and validation information
1635 1636
1636 1637 All the information about the remote bundle to import are given as
1637 1638 parameters. The parameters include:
1638 1639 - url: the url to the bundle10.
1639 1640 - size: the bundle10 file size. It is used to validate what was
1640 1641 retrieved by the client matches the server knowledge about the bundle.
1641 1642 - digests: a space separated list of the digest types provided as
1642 1643 parameters.
1643 1644 - digest:<digest-type>: the hexadecimal representation of the digest with
1644 1645 that name. Like the size, it is used to validate what was retrieved by
1645 1646 the client matches what the server knows about the bundle.
1646 1647
1647 1648 When multiple digest types are given, all of them are checked.
1648 1649 """
1649 1650 try:
1650 1651 raw_url = inpart.params['url']
1651 1652 except KeyError:
1652 1653 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1653 1654 parsed_url = util.url(raw_url)
1654 1655 if parsed_url.scheme not in capabilities['remote-changegroup']:
1655 1656 raise error.Abort(_('remote-changegroup does not support %s urls') %
1656 1657 parsed_url.scheme)
1657 1658
1658 1659 try:
1659 1660 size = int(inpart.params['size'])
1660 1661 except ValueError:
1661 1662 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1662 1663 % 'size')
1663 1664 except KeyError:
1664 1665 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1665 1666
1666 1667 digests = {}
1667 1668 for typ in inpart.params.get('digests', '').split():
1668 1669 param = 'digest:%s' % typ
1669 1670 try:
1670 1671 value = inpart.params[param]
1671 1672 except KeyError:
1672 1673 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1673 1674 param)
1674 1675 digests[typ] = value
1675 1676
1676 1677 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1677 1678
1678 1679 tr = op.gettransaction()
1679 1680 from . import exchange
1680 1681 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1681 1682 if not isinstance(cg, changegroup.cg1unpacker):
1682 1683 raise error.Abort(_('%s: not a bundle version 1.0') %
1683 1684 util.hidepassword(raw_url))
1684 1685 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1685 1686 if op.reply is not None:
1686 1687 # This is definitely not the final form of this
1687 1688 # return. But one need to start somewhere.
1688 1689 part = op.reply.newpart('reply:changegroup')
1689 1690 part.addparam(
1690 1691 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1691 1692 part.addparam('return', '%i' % ret, mandatory=False)
1692 1693 try:
1693 1694 real_part.validate()
1694 1695 except error.Abort as e:
1695 1696 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1696 1697 (util.hidepassword(raw_url), str(e)))
1697 1698 assert not inpart.read()
1698 1699
1699 1700 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1700 1701 def handlereplychangegroup(op, inpart):
1701 1702 ret = int(inpart.params['return'])
1702 1703 replyto = int(inpart.params['in-reply-to'])
1703 1704 op.records.add('changegroup', {'return': ret}, replyto)
1704 1705
1705 1706 @parthandler('check:heads')
1706 1707 def handlecheckheads(op, inpart):
1707 1708 """check that head of the repo did not change
1708 1709
1709 1710 This is used to detect a push race when using unbundle.
1710 1711 This replaces the "heads" argument of unbundle."""
1711 1712 h = inpart.read(20)
1712 1713 heads = []
1713 1714 while len(h) == 20:
1714 1715 heads.append(h)
1715 1716 h = inpart.read(20)
1716 1717 assert not h
1717 1718 # Trigger a transaction so that we are guaranteed to have the lock now.
1718 1719 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1719 1720 op.gettransaction()
1720 1721 if sorted(heads) != sorted(op.repo.heads()):
1721 1722 raise error.PushRaced('repository changed while pushing - '
1722 1723 'please try again')
1723 1724
1724 1725 @parthandler('check:updated-heads')
1725 1726 def handlecheckupdatedheads(op, inpart):
1726 1727 """check for race on the heads touched by a push
1727 1728
1728 1729 This is similar to 'check:heads' but focus on the heads actually updated
1729 1730 during the push. If other activities happen on unrelated heads, it is
1730 1731 ignored.
1731 1732
1732 1733 This allow server with high traffic to avoid push contention as long as
1733 1734 unrelated parts of the graph are involved."""
1734 1735 h = inpart.read(20)
1735 1736 heads = []
1736 1737 while len(h) == 20:
1737 1738 heads.append(h)
1738 1739 h = inpart.read(20)
1739 1740 assert not h
1740 1741 # trigger a transaction so that we are guaranteed to have the lock now.
1741 1742 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1742 1743 op.gettransaction()
1743 1744
1744 1745 currentheads = set()
1745 1746 for ls in op.repo.branchmap().itervalues():
1746 1747 currentheads.update(ls)
1747 1748
1748 1749 for h in heads:
1749 1750 if h not in currentheads:
1750 1751 raise error.PushRaced('repository changed while pushing - '
1751 1752 'please try again')
1752 1753
1753 1754 @parthandler('check:phases')
1754 1755 def handlecheckphases(op, inpart):
1755 1756 """check that phase boundaries of the repository did not change
1756 1757
1757 1758 This is used to detect a push race.
1758 1759 """
1759 1760 phasetonodes = phases.binarydecode(inpart)
1760 1761 unfi = op.repo.unfiltered()
1761 1762 cl = unfi.changelog
1762 1763 phasecache = unfi._phasecache
1763 1764 msg = ('repository changed while pushing - please try again '
1764 1765 '(%s is %s expected %s)')
1765 1766 for expectedphase, nodes in enumerate(phasetonodes):
1766 1767 for n in nodes:
1767 1768 actualphase = phasecache.phase(unfi, cl.rev(n))
1768 1769 if actualphase != expectedphase:
1769 1770 finalmsg = msg % (nodemod.short(n),
1770 1771 phases.phasenames[actualphase],
1771 1772 phases.phasenames[expectedphase])
1772 1773 raise error.PushRaced(finalmsg)
1773 1774
1774 1775 @parthandler('output')
1775 1776 def handleoutput(op, inpart):
1776 1777 """forward output captured on the server to the client"""
1777 1778 for line in inpart.read().splitlines():
1778 1779 op.ui.status(_('remote: %s\n') % line)
1779 1780
1780 1781 @parthandler('replycaps')
1781 1782 def handlereplycaps(op, inpart):
1782 1783 """Notify that a reply bundle should be created
1783 1784
1784 1785 The payload contains the capabilities information for the reply"""
1785 1786 caps = decodecaps(inpart.read())
1786 1787 if op.reply is None:
1787 1788 op.reply = bundle20(op.ui, caps)
1788 1789
1789 1790 class AbortFromPart(error.Abort):
1790 1791 """Sub-class of Abort that denotes an error from a bundle2 part."""
1791 1792
1792 1793 @parthandler('error:abort', ('message', 'hint'))
1793 1794 def handleerrorabort(op, inpart):
1794 1795 """Used to transmit abort error over the wire"""
1795 1796 raise AbortFromPart(inpart.params['message'],
1796 1797 hint=inpart.params.get('hint'))
1797 1798
1798 1799 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1799 1800 'in-reply-to'))
1800 1801 def handleerrorpushkey(op, inpart):
1801 1802 """Used to transmit failure of a mandatory pushkey over the wire"""
1802 1803 kwargs = {}
1803 1804 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1804 1805 value = inpart.params.get(name)
1805 1806 if value is not None:
1806 1807 kwargs[name] = value
1807 1808 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1808 1809
1809 1810 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1810 1811 def handleerrorunsupportedcontent(op, inpart):
1811 1812 """Used to transmit unknown content error over the wire"""
1812 1813 kwargs = {}
1813 1814 parttype = inpart.params.get('parttype')
1814 1815 if parttype is not None:
1815 1816 kwargs['parttype'] = parttype
1816 1817 params = inpart.params.get('params')
1817 1818 if params is not None:
1818 1819 kwargs['params'] = params.split('\0')
1819 1820
1820 1821 raise error.BundleUnknownFeatureError(**kwargs)
1821 1822
1822 1823 @parthandler('error:pushraced', ('message',))
1823 1824 def handleerrorpushraced(op, inpart):
1824 1825 """Used to transmit push race error over the wire"""
1825 1826 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1826 1827
1827 1828 @parthandler('listkeys', ('namespace',))
1828 1829 def handlelistkeys(op, inpart):
1829 1830 """retrieve pushkey namespace content stored in a bundle2"""
1830 1831 namespace = inpart.params['namespace']
1831 1832 r = pushkey.decodekeys(inpart.read())
1832 1833 op.records.add('listkeys', (namespace, r))
1833 1834
1834 1835 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1835 1836 def handlepushkey(op, inpart):
1836 1837 """process a pushkey request"""
1837 1838 dec = pushkey.decode
1838 1839 namespace = dec(inpart.params['namespace'])
1839 1840 key = dec(inpart.params['key'])
1840 1841 old = dec(inpart.params['old'])
1841 1842 new = dec(inpart.params['new'])
1842 1843 # Grab the transaction to ensure that we have the lock before performing the
1843 1844 # pushkey.
1844 1845 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1845 1846 op.gettransaction()
1846 1847 ret = op.repo.pushkey(namespace, key, old, new)
1847 1848 record = {'namespace': namespace,
1848 1849 'key': key,
1849 1850 'old': old,
1850 1851 'new': new}
1851 1852 op.records.add('pushkey', record)
1852 1853 if op.reply is not None:
1853 1854 rpart = op.reply.newpart('reply:pushkey')
1854 1855 rpart.addparam(
1855 1856 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1856 1857 rpart.addparam('return', '%i' % ret, mandatory=False)
1857 1858 if inpart.mandatory and not ret:
1858 1859 kwargs = {}
1859 1860 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1860 1861 if key in inpart.params:
1861 1862 kwargs[key] = inpart.params[key]
1862 1863 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1863 1864
1864 1865 @parthandler('phase-heads')
1865 1866 def handlephases(op, inpart):
1866 1867 """apply phases from bundle part to repo"""
1867 1868 headsbyphase = phases.binarydecode(inpart)
1868 1869 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
1869 1870
1870 1871 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1871 1872 def handlepushkeyreply(op, inpart):
1872 1873 """retrieve the result of a pushkey request"""
1873 1874 ret = int(inpart.params['return'])
1874 1875 partid = int(inpart.params['in-reply-to'])
1875 1876 op.records.add('pushkey', {'return': ret}, partid)
1876 1877
1877 1878 @parthandler('obsmarkers')
1878 1879 def handleobsmarker(op, inpart):
1879 1880 """add a stream of obsmarkers to the repo"""
1880 1881 tr = op.gettransaction()
1881 1882 markerdata = inpart.read()
1882 1883 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1883 1884 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1884 1885 % len(markerdata))
1885 1886 # The mergemarkers call will crash if marker creation is not enabled.
1886 1887 # we want to avoid this if the part is advisory.
1887 1888 if not inpart.mandatory and op.repo.obsstore.readonly:
1888 1889 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1889 1890 return
1890 1891 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1891 1892 op.repo.invalidatevolatilesets()
1892 1893 if new:
1893 1894 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1894 1895 op.records.add('obsmarkers', {'new': new})
1895 1896 if op.reply is not None:
1896 1897 rpart = op.reply.newpart('reply:obsmarkers')
1897 1898 rpart.addparam(
1898 1899 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1899 1900 rpart.addparam('new', '%i' % new, mandatory=False)
1900 1901
1901 1902
1902 1903 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1903 1904 def handleobsmarkerreply(op, inpart):
1904 1905 """retrieve the result of a pushkey request"""
1905 1906 ret = int(inpart.params['new'])
1906 1907 partid = int(inpart.params['in-reply-to'])
1907 1908 op.records.add('obsmarkers', {'new': ret}, partid)
1908 1909
1909 1910 @parthandler('hgtagsfnodes')
1910 1911 def handlehgtagsfnodes(op, inpart):
1911 1912 """Applies .hgtags fnodes cache entries to the local repo.
1912 1913
1913 1914 Payload is pairs of 20 byte changeset nodes and filenodes.
1914 1915 """
1915 1916 # Grab the transaction so we ensure that we have the lock at this point.
1916 1917 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1917 1918 op.gettransaction()
1918 1919 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1919 1920
1920 1921 count = 0
1921 1922 while True:
1922 1923 node = inpart.read(20)
1923 1924 fnode = inpart.read(20)
1924 1925 if len(node) < 20 or len(fnode) < 20:
1925 1926 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1926 1927 break
1927 1928 cache.setfnode(node, fnode)
1928 1929 count += 1
1929 1930
1930 1931 cache.write()
1931 1932 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1932 1933
1933 1934 @parthandler('pushvars')
1934 1935 def bundle2getvars(op, part):
1935 1936 '''unbundle a bundle2 containing shellvars on the server'''
1936 1937 # An option to disable unbundling on server-side for security reasons
1937 1938 if op.ui.configbool('push', 'pushvars.server'):
1938 1939 hookargs = {}
1939 1940 for key, value in part.advisoryparams:
1940 1941 key = key.upper()
1941 1942 # We want pushed variables to have USERVAR_ prepended so we know
1942 1943 # they came from the --pushvar flag.
1943 1944 key = "USERVAR_" + key
1944 1945 hookargs[key] = value
1945 1946 op.addhookargs(hookargs)
General Comments 0
You need to be logged in to leave comments. Login now