##// END OF EJS Templates
bundle2: raise ProgrammingError for invalid call of addhookargs()...
Yuya Nishihara -
r33808:1bf5c550 default
parent child Browse files
Show More
@@ -1,1895 +1,1894 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import, division
149 149
150 150 import errno
151 151 import re
152 152 import string
153 153 import struct
154 154 import sys
155 155
156 156 from .i18n import _
157 157 from . import (
158 158 changegroup,
159 159 error,
160 160 obsolete,
161 161 phases,
162 162 pushkey,
163 163 pycompat,
164 164 tags,
165 165 url,
166 166 util,
167 167 )
168 168
169 169 urlerr = util.urlerr
170 170 urlreq = util.urlreq
171 171
172 172 _pack = struct.pack
173 173 _unpack = struct.unpack
174 174
175 175 _fstreamparamsize = '>i'
176 176 _fpartheadersize = '>i'
177 177 _fparttypesize = '>B'
178 178 _fpartid = '>I'
179 179 _fpayloadsize = '>i'
180 180 _fpartparamcount = '>BB'
181 181
182 182 _fphasesentry = '>i20s'
183 183
184 184 preferedchunksize = 4096
185 185
186 186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
187 187
188 188 def outdebug(ui, message):
189 189 """debug regarding output stream (bundling)"""
190 190 if ui.configbool('devel', 'bundle2.debug'):
191 191 ui.debug('bundle2-output: %s\n' % message)
192 192
193 193 def indebug(ui, message):
194 194 """debug on input stream (unbundling)"""
195 195 if ui.configbool('devel', 'bundle2.debug'):
196 196 ui.debug('bundle2-input: %s\n' % message)
197 197
198 198 def validateparttype(parttype):
199 199 """raise ValueError if a parttype contains invalid character"""
200 200 if _parttypeforbidden.search(parttype):
201 201 raise ValueError(parttype)
202 202
203 203 def _makefpartparamsizes(nbparams):
204 204 """return a struct format to read part parameter sizes
205 205
206 206 The number parameters is variable so we need to build that format
207 207 dynamically.
208 208 """
209 209 return '>'+('BB'*nbparams)
210 210
211 211 parthandlermapping = {}
212 212
213 213 def parthandler(parttype, params=()):
214 214 """decorator that register a function as a bundle2 part handler
215 215
216 216 eg::
217 217
218 218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
219 219 def myparttypehandler(...):
220 220 '''process a part of type "my part".'''
221 221 ...
222 222 """
223 223 validateparttype(parttype)
224 224 def _decorator(func):
225 225 lparttype = parttype.lower() # enforce lower case matching.
226 226 assert lparttype not in parthandlermapping
227 227 parthandlermapping[lparttype] = func
228 228 func.params = frozenset(params)
229 229 return func
230 230 return _decorator
231 231
232 232 class unbundlerecords(object):
233 233 """keep record of what happens during and unbundle
234 234
235 235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
236 236 category of record and obj is an arbitrary object.
237 237
238 238 `records['cat']` will return all entries of this category 'cat'.
239 239
240 240 Iterating on the object itself will yield `('category', obj)` tuples
241 241 for all entries.
242 242
243 243 All iterations happens in chronological order.
244 244 """
245 245
246 246 def __init__(self):
247 247 self._categories = {}
248 248 self._sequences = []
249 249 self._replies = {}
250 250
251 251 def add(self, category, entry, inreplyto=None):
252 252 """add a new record of a given category.
253 253
254 254 The entry can then be retrieved in the list returned by
255 255 self['category']."""
256 256 self._categories.setdefault(category, []).append(entry)
257 257 self._sequences.append((category, entry))
258 258 if inreplyto is not None:
259 259 self.getreplies(inreplyto).add(category, entry)
260 260
261 261 def getreplies(self, partid):
262 262 """get the records that are replies to a specific part"""
263 263 return self._replies.setdefault(partid, unbundlerecords())
264 264
265 265 def __getitem__(self, cat):
266 266 return tuple(self._categories.get(cat, ()))
267 267
268 268 def __iter__(self):
269 269 return iter(self._sequences)
270 270
271 271 def __len__(self):
272 272 return len(self._sequences)
273 273
274 274 def __nonzero__(self):
275 275 return bool(self._sequences)
276 276
277 277 __bool__ = __nonzero__
278 278
279 279 class bundleoperation(object):
280 280 """an object that represents a single bundling process
281 281
282 282 Its purpose is to carry unbundle-related objects and states.
283 283
284 284 A new object should be created at the beginning of each bundle processing.
285 285 The object is to be returned by the processing function.
286 286
287 287 The object has very little content now it will ultimately contain:
288 288 * an access to the repo the bundle is applied to,
289 289 * a ui object,
290 290 * a way to retrieve a transaction to add changes to the repo,
291 291 * a way to record the result of processing each part,
292 292 * a way to construct a bundle response when applicable.
293 293 """
294 294
295 295 def __init__(self, repo, transactiongetter, captureoutput=True):
296 296 self.repo = repo
297 297 self.ui = repo.ui
298 298 self.records = unbundlerecords()
299 299 self.reply = None
300 300 self.captureoutput = captureoutput
301 301 self.hookargs = {}
302 302 self._gettransaction = transactiongetter
303 303
304 304 def gettransaction(self):
305 305 transaction = self._gettransaction()
306 306
307 307 if self.hookargs is not None:
308 308 # the ones added to the transaction supercede those added
309 309 # to the operation.
310 310 self.hookargs.update(transaction.hookargs)
311 311 transaction.hookargs = self.hookargs
312 312
313 313 # mark the hookargs as flushed. further attempts to add to
314 314 # hookargs will result in an abort.
315 315 self.hookargs = None
316 316
317 317 return transaction
318 318
319 319 def addhookargs(self, hookargs):
320 320 if self.hookargs is None:
321 raise error.Abort(
322 _('attempted to add hooks to operation after transaction '
323 'started'))
321 raise error.ProgrammingError('attempted to add hookargs to '
322 'operation after transaction started')
324 323 self.hookargs.update(hookargs)
325 324
326 325 class TransactionUnavailable(RuntimeError):
327 326 pass
328 327
329 328 def _notransaction():
330 329 """default method to get a transaction while processing a bundle
331 330
332 331 Raise an exception to highlight the fact that no transaction was expected
333 332 to be created"""
334 333 raise TransactionUnavailable()
335 334
336 335 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
337 336 # transform me into unbundler.apply() as soon as the freeze is lifted
338 337 if isinstance(unbundler, unbundle20):
339 338 tr.hookargs['bundle2'] = '1'
340 339 if source is not None and 'source' not in tr.hookargs:
341 340 tr.hookargs['source'] = source
342 341 if url is not None and 'url' not in tr.hookargs:
343 342 tr.hookargs['url'] = url
344 343 return processbundle(repo, unbundler, lambda: tr)
345 344 else:
346 345 # the transactiongetter won't be used, but we might as well set it
347 346 op = bundleoperation(repo, lambda: tr)
348 347 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
349 348 return op
350 349
351 350 def processbundle(repo, unbundler, transactiongetter=None, op=None):
352 351 """This function process a bundle, apply effect to/from a repo
353 352
354 353 It iterates over each part then searches for and uses the proper handling
355 354 code to process the part. Parts are processed in order.
356 355
357 356 Unknown Mandatory part will abort the process.
358 357
359 358 It is temporarily possible to provide a prebuilt bundleoperation to the
360 359 function. This is used to ensure output is properly propagated in case of
361 360 an error during the unbundling. This output capturing part will likely be
362 361 reworked and this ability will probably go away in the process.
363 362 """
364 363 if op is None:
365 364 if transactiongetter is None:
366 365 transactiongetter = _notransaction
367 366 op = bundleoperation(repo, transactiongetter)
368 367 # todo:
369 368 # - replace this is a init function soon.
370 369 # - exception catching
371 370 unbundler.params
372 371 if repo.ui.debugflag:
373 372 msg = ['bundle2-input-bundle:']
374 373 if unbundler.params:
375 374 msg.append(' %i params' % len(unbundler.params))
376 375 if op._gettransaction is None or op._gettransaction is _notransaction:
377 376 msg.append(' no-transaction')
378 377 else:
379 378 msg.append(' with-transaction')
380 379 msg.append('\n')
381 380 repo.ui.debug(''.join(msg))
382 381 iterparts = enumerate(unbundler.iterparts())
383 382 part = None
384 383 nbpart = 0
385 384 try:
386 385 for nbpart, part in iterparts:
387 386 _processpart(op, part)
388 387 except Exception as exc:
389 388 # Any exceptions seeking to the end of the bundle at this point are
390 389 # almost certainly related to the underlying stream being bad.
391 390 # And, chances are that the exception we're handling is related to
392 391 # getting in that bad state. So, we swallow the seeking error and
393 392 # re-raise the original error.
394 393 seekerror = False
395 394 try:
396 395 for nbpart, part in iterparts:
397 396 # consume the bundle content
398 397 part.seek(0, 2)
399 398 except Exception:
400 399 seekerror = True
401 400
402 401 # Small hack to let caller code distinguish exceptions from bundle2
403 402 # processing from processing the old format. This is mostly
404 403 # needed to handle different return codes to unbundle according to the
405 404 # type of bundle. We should probably clean up or drop this return code
406 405 # craziness in a future version.
407 406 exc.duringunbundle2 = True
408 407 salvaged = []
409 408 replycaps = None
410 409 if op.reply is not None:
411 410 salvaged = op.reply.salvageoutput()
412 411 replycaps = op.reply.capabilities
413 412 exc._replycaps = replycaps
414 413 exc._bundle2salvagedoutput = salvaged
415 414
416 415 # Re-raising from a variable loses the original stack. So only use
417 416 # that form if we need to.
418 417 if seekerror:
419 418 raise exc
420 419 else:
421 420 raise
422 421 finally:
423 422 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
424 423
425 424 return op
426 425
427 426 def _processchangegroup(op, cg, tr, source, url, **kwargs):
428 427 ret = cg.apply(op.repo, tr, source, url, **kwargs)
429 428 op.records.add('changegroup', {
430 429 'return': ret,
431 430 })
432 431 return ret
433 432
434 433 def _processpart(op, part):
435 434 """process a single part from a bundle
436 435
437 436 The part is guaranteed to have been fully consumed when the function exits
438 437 (even if an exception is raised)."""
439 438 status = 'unknown' # used by debug output
440 439 hardabort = False
441 440 try:
442 441 try:
443 442 handler = parthandlermapping.get(part.type)
444 443 if handler is None:
445 444 status = 'unsupported-type'
446 445 raise error.BundleUnknownFeatureError(parttype=part.type)
447 446 indebug(op.ui, 'found a handler for part %r' % part.type)
448 447 unknownparams = part.mandatorykeys - handler.params
449 448 if unknownparams:
450 449 unknownparams = list(unknownparams)
451 450 unknownparams.sort()
452 451 status = 'unsupported-params (%s)' % unknownparams
453 452 raise error.BundleUnknownFeatureError(parttype=part.type,
454 453 params=unknownparams)
455 454 status = 'supported'
456 455 except error.BundleUnknownFeatureError as exc:
457 456 if part.mandatory: # mandatory parts
458 457 raise
459 458 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
460 459 return # skip to part processing
461 460 finally:
462 461 if op.ui.debugflag:
463 462 msg = ['bundle2-input-part: "%s"' % part.type]
464 463 if not part.mandatory:
465 464 msg.append(' (advisory)')
466 465 nbmp = len(part.mandatorykeys)
467 466 nbap = len(part.params) - nbmp
468 467 if nbmp or nbap:
469 468 msg.append(' (params:')
470 469 if nbmp:
471 470 msg.append(' %i mandatory' % nbmp)
472 471 if nbap:
473 472 msg.append(' %i advisory' % nbmp)
474 473 msg.append(')')
475 474 msg.append(' %s\n' % status)
476 475 op.ui.debug(''.join(msg))
477 476
478 477 # handler is called outside the above try block so that we don't
479 478 # risk catching KeyErrors from anything other than the
480 479 # parthandlermapping lookup (any KeyError raised by handler()
481 480 # itself represents a defect of a different variety).
482 481 output = None
483 482 if op.captureoutput and op.reply is not None:
484 483 op.ui.pushbuffer(error=True, subproc=True)
485 484 output = ''
486 485 try:
487 486 handler(op, part)
488 487 finally:
489 488 if output is not None:
490 489 output = op.ui.popbuffer()
491 490 if output:
492 491 outpart = op.reply.newpart('output', data=output,
493 492 mandatory=False)
494 493 outpart.addparam(
495 494 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
496 495 # If exiting or interrupted, do not attempt to seek the stream in the
497 496 # finally block below. This makes abort faster.
498 497 except (SystemExit, KeyboardInterrupt):
499 498 hardabort = True
500 499 raise
501 500 finally:
502 501 # consume the part content to not corrupt the stream.
503 502 if not hardabort:
504 503 part.seek(0, 2)
505 504
506 505
507 506 def decodecaps(blob):
508 507 """decode a bundle2 caps bytes blob into a dictionary
509 508
510 509 The blob is a list of capabilities (one per line)
511 510 Capabilities may have values using a line of the form::
512 511
513 512 capability=value1,value2,value3
514 513
515 514 The values are always a list."""
516 515 caps = {}
517 516 for line in blob.splitlines():
518 517 if not line:
519 518 continue
520 519 if '=' not in line:
521 520 key, vals = line, ()
522 521 else:
523 522 key, vals = line.split('=', 1)
524 523 vals = vals.split(',')
525 524 key = urlreq.unquote(key)
526 525 vals = [urlreq.unquote(v) for v in vals]
527 526 caps[key] = vals
528 527 return caps
529 528
530 529 def encodecaps(caps):
531 530 """encode a bundle2 caps dictionary into a bytes blob"""
532 531 chunks = []
533 532 for ca in sorted(caps):
534 533 vals = caps[ca]
535 534 ca = urlreq.quote(ca)
536 535 vals = [urlreq.quote(v) for v in vals]
537 536 if vals:
538 537 ca = "%s=%s" % (ca, ','.join(vals))
539 538 chunks.append(ca)
540 539 return '\n'.join(chunks)
541 540
542 541 bundletypes = {
543 542 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
544 543 # since the unification ssh accepts a header but there
545 544 # is no capability signaling it.
546 545 "HG20": (), # special-cased below
547 546 "HG10UN": ("HG10UN", 'UN'),
548 547 "HG10BZ": ("HG10", 'BZ'),
549 548 "HG10GZ": ("HG10GZ", 'GZ'),
550 549 }
551 550
552 551 # hgweb uses this list to communicate its preferred type
553 552 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
554 553
555 554 class bundle20(object):
556 555 """represent an outgoing bundle2 container
557 556
558 557 Use the `addparam` method to add stream level parameter. and `newpart` to
559 558 populate it. Then call `getchunks` to retrieve all the binary chunks of
560 559 data that compose the bundle2 container."""
561 560
562 561 _magicstring = 'HG20'
563 562
564 563 def __init__(self, ui, capabilities=()):
565 564 self.ui = ui
566 565 self._params = []
567 566 self._parts = []
568 567 self.capabilities = dict(capabilities)
569 568 self._compengine = util.compengines.forbundletype('UN')
570 569 self._compopts = None
571 570
572 571 def setcompression(self, alg, compopts=None):
573 572 """setup core part compression to <alg>"""
574 573 if alg in (None, 'UN'):
575 574 return
576 575 assert not any(n.lower() == 'compression' for n, v in self._params)
577 576 self.addparam('Compression', alg)
578 577 self._compengine = util.compengines.forbundletype(alg)
579 578 self._compopts = compopts
580 579
581 580 @property
582 581 def nbparts(self):
583 582 """total number of parts added to the bundler"""
584 583 return len(self._parts)
585 584
586 585 # methods used to defines the bundle2 content
587 586 def addparam(self, name, value=None):
588 587 """add a stream level parameter"""
589 588 if not name:
590 589 raise ValueError('empty parameter name')
591 590 if name[0] not in string.letters:
592 591 raise ValueError('non letter first character: %r' % name)
593 592 self._params.append((name, value))
594 593
595 594 def addpart(self, part):
596 595 """add a new part to the bundle2 container
597 596
598 597 Parts contains the actual applicative payload."""
599 598 assert part.id is None
600 599 part.id = len(self._parts) # very cheap counter
601 600 self._parts.append(part)
602 601
603 602 def newpart(self, typeid, *args, **kwargs):
604 603 """create a new part and add it to the containers
605 604
606 605 As the part is directly added to the containers. For now, this means
607 606 that any failure to properly initialize the part after calling
608 607 ``newpart`` should result in a failure of the whole bundling process.
609 608
610 609 You can still fall back to manually create and add if you need better
611 610 control."""
612 611 part = bundlepart(typeid, *args, **kwargs)
613 612 self.addpart(part)
614 613 return part
615 614
616 615 # methods used to generate the bundle2 stream
617 616 def getchunks(self):
618 617 if self.ui.debugflag:
619 618 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
620 619 if self._params:
621 620 msg.append(' (%i params)' % len(self._params))
622 621 msg.append(' %i parts total\n' % len(self._parts))
623 622 self.ui.debug(''.join(msg))
624 623 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
625 624 yield self._magicstring
626 625 param = self._paramchunk()
627 626 outdebug(self.ui, 'bundle parameter: %s' % param)
628 627 yield _pack(_fstreamparamsize, len(param))
629 628 if param:
630 629 yield param
631 630 for chunk in self._compengine.compressstream(self._getcorechunk(),
632 631 self._compopts):
633 632 yield chunk
634 633
635 634 def _paramchunk(self):
636 635 """return a encoded version of all stream parameters"""
637 636 blocks = []
638 637 for par, value in self._params:
639 638 par = urlreq.quote(par)
640 639 if value is not None:
641 640 value = urlreq.quote(value)
642 641 par = '%s=%s' % (par, value)
643 642 blocks.append(par)
644 643 return ' '.join(blocks)
645 644
646 645 def _getcorechunk(self):
647 646 """yield chunk for the core part of the bundle
648 647
649 648 (all but headers and parameters)"""
650 649 outdebug(self.ui, 'start of parts')
651 650 for part in self._parts:
652 651 outdebug(self.ui, 'bundle part: "%s"' % part.type)
653 652 for chunk in part.getchunks(ui=self.ui):
654 653 yield chunk
655 654 outdebug(self.ui, 'end of bundle')
656 655 yield _pack(_fpartheadersize, 0)
657 656
658 657
659 658 def salvageoutput(self):
660 659 """return a list with a copy of all output parts in the bundle
661 660
662 661 This is meant to be used during error handling to make sure we preserve
663 662 server output"""
664 663 salvaged = []
665 664 for part in self._parts:
666 665 if part.type.startswith('output'):
667 666 salvaged.append(part.copy())
668 667 return salvaged
669 668
670 669
671 670 class unpackermixin(object):
672 671 """A mixin to extract bytes and struct data from a stream"""
673 672
674 673 def __init__(self, fp):
675 674 self._fp = fp
676 675
677 676 def _unpack(self, format):
678 677 """unpack this struct format from the stream
679 678
680 679 This method is meant for internal usage by the bundle2 protocol only.
681 680 They directly manipulate the low level stream including bundle2 level
682 681 instruction.
683 682
684 683 Do not use it to implement higher-level logic or methods."""
685 684 data = self._readexact(struct.calcsize(format))
686 685 return _unpack(format, data)
687 686
688 687 def _readexact(self, size):
689 688 """read exactly <size> bytes from the stream
690 689
691 690 This method is meant for internal usage by the bundle2 protocol only.
692 691 They directly manipulate the low level stream including bundle2 level
693 692 instruction.
694 693
695 694 Do not use it to implement higher-level logic or methods."""
696 695 return changegroup.readexactly(self._fp, size)
697 696
698 697 def getunbundler(ui, fp, magicstring=None):
699 698 """return a valid unbundler object for a given magicstring"""
700 699 if magicstring is None:
701 700 magicstring = changegroup.readexactly(fp, 4)
702 701 magic, version = magicstring[0:2], magicstring[2:4]
703 702 if magic != 'HG':
704 703 ui.debug(
705 704 "error: invalid magic: %r (version %r), should be 'HG'\n"
706 705 % (magic, version))
707 706 raise error.Abort(_('not a Mercurial bundle'))
708 707 unbundlerclass = formatmap.get(version)
709 708 if unbundlerclass is None:
710 709 raise error.Abort(_('unknown bundle version %s') % version)
711 710 unbundler = unbundlerclass(ui, fp)
712 711 indebug(ui, 'start processing of %s stream' % magicstring)
713 712 return unbundler
714 713
715 714 class unbundle20(unpackermixin):
716 715 """interpret a bundle2 stream
717 716
718 717 This class is fed with a binary stream and yields parts through its
719 718 `iterparts` methods."""
720 719
721 720 _magicstring = 'HG20'
722 721
723 722 def __init__(self, ui, fp):
724 723 """If header is specified, we do not read it out of the stream."""
725 724 self.ui = ui
726 725 self._compengine = util.compengines.forbundletype('UN')
727 726 self._compressed = None
728 727 super(unbundle20, self).__init__(fp)
729 728
730 729 @util.propertycache
731 730 def params(self):
732 731 """dictionary of stream level parameters"""
733 732 indebug(self.ui, 'reading bundle2 stream parameters')
734 733 params = {}
735 734 paramssize = self._unpack(_fstreamparamsize)[0]
736 735 if paramssize < 0:
737 736 raise error.BundleValueError('negative bundle param size: %i'
738 737 % paramssize)
739 738 if paramssize:
740 739 params = self._readexact(paramssize)
741 740 params = self._processallparams(params)
742 741 return params
743 742
744 743 def _processallparams(self, paramsblock):
745 744 """"""
746 745 params = util.sortdict()
747 746 for p in paramsblock.split(' '):
748 747 p = p.split('=', 1)
749 748 p = [urlreq.unquote(i) for i in p]
750 749 if len(p) < 2:
751 750 p.append(None)
752 751 self._processparam(*p)
753 752 params[p[0]] = p[1]
754 753 return params
755 754
756 755
757 756 def _processparam(self, name, value):
758 757 """process a parameter, applying its effect if needed
759 758
760 759 Parameter starting with a lower case letter are advisory and will be
761 760 ignored when unknown. Those starting with an upper case letter are
762 761 mandatory and will this function will raise a KeyError when unknown.
763 762
764 763 Note: no option are currently supported. Any input will be either
765 764 ignored or failing.
766 765 """
767 766 if not name:
768 767 raise ValueError('empty parameter name')
769 768 if name[0] not in string.letters:
770 769 raise ValueError('non letter first character: %r' % name)
771 770 try:
772 771 handler = b2streamparamsmap[name.lower()]
773 772 except KeyError:
774 773 if name[0].islower():
775 774 indebug(self.ui, "ignoring unknown parameter %r" % name)
776 775 else:
777 776 raise error.BundleUnknownFeatureError(params=(name,))
778 777 else:
779 778 handler(self, name, value)
780 779
781 780 def _forwardchunks(self):
782 781 """utility to transfer a bundle2 as binary
783 782
784 783 This is made necessary by the fact the 'getbundle' command over 'ssh'
785 784 have no way to know then the reply end, relying on the bundle to be
786 785 interpreted to know its end. This is terrible and we are sorry, but we
787 786 needed to move forward to get general delta enabled.
788 787 """
789 788 yield self._magicstring
790 789 assert 'params' not in vars(self)
791 790 paramssize = self._unpack(_fstreamparamsize)[0]
792 791 if paramssize < 0:
793 792 raise error.BundleValueError('negative bundle param size: %i'
794 793 % paramssize)
795 794 yield _pack(_fstreamparamsize, paramssize)
796 795 if paramssize:
797 796 params = self._readexact(paramssize)
798 797 self._processallparams(params)
799 798 yield params
800 799 assert self._compengine.bundletype == 'UN'
801 800 # From there, payload might need to be decompressed
802 801 self._fp = self._compengine.decompressorreader(self._fp)
803 802 emptycount = 0
804 803 while emptycount < 2:
805 804 # so we can brainlessly loop
806 805 assert _fpartheadersize == _fpayloadsize
807 806 size = self._unpack(_fpartheadersize)[0]
808 807 yield _pack(_fpartheadersize, size)
809 808 if size:
810 809 emptycount = 0
811 810 else:
812 811 emptycount += 1
813 812 continue
814 813 if size == flaginterrupt:
815 814 continue
816 815 elif size < 0:
817 816 raise error.BundleValueError('negative chunk size: %i')
818 817 yield self._readexact(size)
819 818
820 819
821 820 def iterparts(self):
822 821 """yield all parts contained in the stream"""
823 822 # make sure param have been loaded
824 823 self.params
825 824 # From there, payload need to be decompressed
826 825 self._fp = self._compengine.decompressorreader(self._fp)
827 826 indebug(self.ui, 'start extraction of bundle2 parts')
828 827 headerblock = self._readpartheader()
829 828 while headerblock is not None:
830 829 part = unbundlepart(self.ui, headerblock, self._fp)
831 830 yield part
832 831 part.seek(0, 2)
833 832 headerblock = self._readpartheader()
834 833 indebug(self.ui, 'end of bundle2 stream')
835 834
836 835 def _readpartheader(self):
837 836 """reads a part header size and return the bytes blob
838 837
839 838 returns None if empty"""
840 839 headersize = self._unpack(_fpartheadersize)[0]
841 840 if headersize < 0:
842 841 raise error.BundleValueError('negative part header size: %i'
843 842 % headersize)
844 843 indebug(self.ui, 'part header size: %i' % headersize)
845 844 if headersize:
846 845 return self._readexact(headersize)
847 846 return None
848 847
849 848 def compressed(self):
850 849 self.params # load params
851 850 return self._compressed
852 851
853 852 def close(self):
854 853 """close underlying file"""
855 854 if util.safehasattr(self._fp, 'close'):
856 855 return self._fp.close()
857 856
858 857 formatmap = {'20': unbundle20}
859 858
860 859 b2streamparamsmap = {}
861 860
862 861 def b2streamparamhandler(name):
863 862 """register a handler for a stream level parameter"""
864 863 def decorator(func):
865 864 assert name not in formatmap
866 865 b2streamparamsmap[name] = func
867 866 return func
868 867 return decorator
869 868
870 869 @b2streamparamhandler('compression')
871 870 def processcompression(unbundler, param, value):
872 871 """read compression parameter and install payload decompression"""
873 872 if value not in util.compengines.supportedbundletypes:
874 873 raise error.BundleUnknownFeatureError(params=(param,),
875 874 values=(value,))
876 875 unbundler._compengine = util.compengines.forbundletype(value)
877 876 if value is not None:
878 877 unbundler._compressed = True
879 878
880 879 class bundlepart(object):
881 880 """A bundle2 part contains application level payload
882 881
883 882 The part `type` is used to route the part to the application level
884 883 handler.
885 884
886 885 The part payload is contained in ``part.data``. It could be raw bytes or a
887 886 generator of byte chunks.
888 887
889 888 You can add parameters to the part using the ``addparam`` method.
890 889 Parameters can be either mandatory (default) or advisory. Remote side
891 890 should be able to safely ignore the advisory ones.
892 891
893 892 Both data and parameters cannot be modified after the generation has begun.
894 893 """
895 894
896 895 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
897 896 data='', mandatory=True):
898 897 validateparttype(parttype)
899 898 self.id = None
900 899 self.type = parttype
901 900 self._data = data
902 901 self._mandatoryparams = list(mandatoryparams)
903 902 self._advisoryparams = list(advisoryparams)
904 903 # checking for duplicated entries
905 904 self._seenparams = set()
906 905 for pname, __ in self._mandatoryparams + self._advisoryparams:
907 906 if pname in self._seenparams:
908 907 raise error.ProgrammingError('duplicated params: %s' % pname)
909 908 self._seenparams.add(pname)
910 909 # status of the part's generation:
911 910 # - None: not started,
912 911 # - False: currently generated,
913 912 # - True: generation done.
914 913 self._generated = None
915 914 self.mandatory = mandatory
916 915
917 916 def __repr__(self):
918 917 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
919 918 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
920 919 % (cls, id(self), self.id, self.type, self.mandatory))
921 920
922 921 def copy(self):
923 922 """return a copy of the part
924 923
925 924 The new part have the very same content but no partid assigned yet.
926 925 Parts with generated data cannot be copied."""
927 926 assert not util.safehasattr(self.data, 'next')
928 927 return self.__class__(self.type, self._mandatoryparams,
929 928 self._advisoryparams, self._data, self.mandatory)
930 929
931 930 # methods used to defines the part content
932 931 @property
933 932 def data(self):
934 933 return self._data
935 934
936 935 @data.setter
937 936 def data(self, data):
938 937 if self._generated is not None:
939 938 raise error.ReadOnlyPartError('part is being generated')
940 939 self._data = data
941 940
942 941 @property
943 942 def mandatoryparams(self):
944 943 # make it an immutable tuple to force people through ``addparam``
945 944 return tuple(self._mandatoryparams)
946 945
947 946 @property
948 947 def advisoryparams(self):
949 948 # make it an immutable tuple to force people through ``addparam``
950 949 return tuple(self._advisoryparams)
951 950
952 951 def addparam(self, name, value='', mandatory=True):
953 952 """add a parameter to the part
954 953
955 954 If 'mandatory' is set to True, the remote handler must claim support
956 955 for this parameter or the unbundling will be aborted.
957 956
958 957 The 'name' and 'value' cannot exceed 255 bytes each.
959 958 """
960 959 if self._generated is not None:
961 960 raise error.ReadOnlyPartError('part is being generated')
962 961 if name in self._seenparams:
963 962 raise ValueError('duplicated params: %s' % name)
964 963 self._seenparams.add(name)
965 964 params = self._advisoryparams
966 965 if mandatory:
967 966 params = self._mandatoryparams
968 967 params.append((name, value))
969 968
970 969 # methods used to generates the bundle2 stream
971 970 def getchunks(self, ui):
972 971 if self._generated is not None:
973 972 raise error.ProgrammingError('part can only be consumed once')
974 973 self._generated = False
975 974
976 975 if ui.debugflag:
977 976 msg = ['bundle2-output-part: "%s"' % self.type]
978 977 if not self.mandatory:
979 978 msg.append(' (advisory)')
980 979 nbmp = len(self.mandatoryparams)
981 980 nbap = len(self.advisoryparams)
982 981 if nbmp or nbap:
983 982 msg.append(' (params:')
984 983 if nbmp:
985 984 msg.append(' %i mandatory' % nbmp)
986 985 if nbap:
987 986 msg.append(' %i advisory' % nbmp)
988 987 msg.append(')')
989 988 if not self.data:
990 989 msg.append(' empty payload')
991 990 elif util.safehasattr(self.data, 'next'):
992 991 msg.append(' streamed payload')
993 992 else:
994 993 msg.append(' %i bytes payload' % len(self.data))
995 994 msg.append('\n')
996 995 ui.debug(''.join(msg))
997 996
998 997 #### header
999 998 if self.mandatory:
1000 999 parttype = self.type.upper()
1001 1000 else:
1002 1001 parttype = self.type.lower()
1003 1002 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1004 1003 ## parttype
1005 1004 header = [_pack(_fparttypesize, len(parttype)),
1006 1005 parttype, _pack(_fpartid, self.id),
1007 1006 ]
1008 1007 ## parameters
1009 1008 # count
1010 1009 manpar = self.mandatoryparams
1011 1010 advpar = self.advisoryparams
1012 1011 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1013 1012 # size
1014 1013 parsizes = []
1015 1014 for key, value in manpar:
1016 1015 parsizes.append(len(key))
1017 1016 parsizes.append(len(value))
1018 1017 for key, value in advpar:
1019 1018 parsizes.append(len(key))
1020 1019 parsizes.append(len(value))
1021 1020 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1022 1021 header.append(paramsizes)
1023 1022 # key, value
1024 1023 for key, value in manpar:
1025 1024 header.append(key)
1026 1025 header.append(value)
1027 1026 for key, value in advpar:
1028 1027 header.append(key)
1029 1028 header.append(value)
1030 1029 ## finalize header
1031 1030 headerchunk = ''.join(header)
1032 1031 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1033 1032 yield _pack(_fpartheadersize, len(headerchunk))
1034 1033 yield headerchunk
1035 1034 ## payload
1036 1035 try:
1037 1036 for chunk in self._payloadchunks():
1038 1037 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1039 1038 yield _pack(_fpayloadsize, len(chunk))
1040 1039 yield chunk
1041 1040 except GeneratorExit:
1042 1041 # GeneratorExit means that nobody is listening for our
1043 1042 # results anyway, so just bail quickly rather than trying
1044 1043 # to produce an error part.
1045 1044 ui.debug('bundle2-generatorexit\n')
1046 1045 raise
1047 1046 except BaseException as exc:
1048 1047 bexc = util.forcebytestr(exc)
1049 1048 # backup exception data for later
1050 1049 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1051 1050 % bexc)
1052 1051 tb = sys.exc_info()[2]
1053 1052 msg = 'unexpected error: %s' % bexc
1054 1053 interpart = bundlepart('error:abort', [('message', msg)],
1055 1054 mandatory=False)
1056 1055 interpart.id = 0
1057 1056 yield _pack(_fpayloadsize, -1)
1058 1057 for chunk in interpart.getchunks(ui=ui):
1059 1058 yield chunk
1060 1059 outdebug(ui, 'closing payload chunk')
1061 1060 # abort current part payload
1062 1061 yield _pack(_fpayloadsize, 0)
1063 1062 pycompat.raisewithtb(exc, tb)
1064 1063 # end of payload
1065 1064 outdebug(ui, 'closing payload chunk')
1066 1065 yield _pack(_fpayloadsize, 0)
1067 1066 self._generated = True
1068 1067
1069 1068 def _payloadchunks(self):
1070 1069 """yield chunks of a the part payload
1071 1070
1072 1071 Exists to handle the different methods to provide data to a part."""
1073 1072 # we only support fixed size data now.
1074 1073 # This will be improved in the future.
1075 1074 if (util.safehasattr(self.data, 'next')
1076 1075 or util.safehasattr(self.data, '__next__')):
1077 1076 buff = util.chunkbuffer(self.data)
1078 1077 chunk = buff.read(preferedchunksize)
1079 1078 while chunk:
1080 1079 yield chunk
1081 1080 chunk = buff.read(preferedchunksize)
1082 1081 elif len(self.data):
1083 1082 yield self.data
1084 1083
1085 1084
1086 1085 flaginterrupt = -1
1087 1086
1088 1087 class interrupthandler(unpackermixin):
1089 1088 """read one part and process it with restricted capability
1090 1089
1091 1090 This allows to transmit exception raised on the producer size during part
1092 1091 iteration while the consumer is reading a part.
1093 1092
1094 1093 Part processed in this manner only have access to a ui object,"""
1095 1094
1096 1095 def __init__(self, ui, fp):
1097 1096 super(interrupthandler, self).__init__(fp)
1098 1097 self.ui = ui
1099 1098
1100 1099 def _readpartheader(self):
1101 1100 """reads a part header size and return the bytes blob
1102 1101
1103 1102 returns None if empty"""
1104 1103 headersize = self._unpack(_fpartheadersize)[0]
1105 1104 if headersize < 0:
1106 1105 raise error.BundleValueError('negative part header size: %i'
1107 1106 % headersize)
1108 1107 indebug(self.ui, 'part header size: %i\n' % headersize)
1109 1108 if headersize:
1110 1109 return self._readexact(headersize)
1111 1110 return None
1112 1111
1113 1112 def __call__(self):
1114 1113
1115 1114 self.ui.debug('bundle2-input-stream-interrupt:'
1116 1115 ' opening out of band context\n')
1117 1116 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1118 1117 headerblock = self._readpartheader()
1119 1118 if headerblock is None:
1120 1119 indebug(self.ui, 'no part found during interruption.')
1121 1120 return
1122 1121 part = unbundlepart(self.ui, headerblock, self._fp)
1123 1122 op = interruptoperation(self.ui)
1124 1123 _processpart(op, part)
1125 1124 self.ui.debug('bundle2-input-stream-interrupt:'
1126 1125 ' closing out of band context\n')
1127 1126
1128 1127 class interruptoperation(object):
1129 1128 """A limited operation to be use by part handler during interruption
1130 1129
1131 1130 It only have access to an ui object.
1132 1131 """
1133 1132
1134 1133 def __init__(self, ui):
1135 1134 self.ui = ui
1136 1135 self.reply = None
1137 1136 self.captureoutput = False
1138 1137
1139 1138 @property
1140 1139 def repo(self):
1141 1140 raise error.ProgrammingError('no repo access from stream interruption')
1142 1141
1143 1142 def gettransaction(self):
1144 1143 raise TransactionUnavailable('no repo access from stream interruption')
1145 1144
1146 1145 class unbundlepart(unpackermixin):
1147 1146 """a bundle part read from a bundle"""
1148 1147
1149 1148 def __init__(self, ui, header, fp):
1150 1149 super(unbundlepart, self).__init__(fp)
1151 1150 self._seekable = (util.safehasattr(fp, 'seek') and
1152 1151 util.safehasattr(fp, 'tell'))
1153 1152 self.ui = ui
1154 1153 # unbundle state attr
1155 1154 self._headerdata = header
1156 1155 self._headeroffset = 0
1157 1156 self._initialized = False
1158 1157 self.consumed = False
1159 1158 # part data
1160 1159 self.id = None
1161 1160 self.type = None
1162 1161 self.mandatoryparams = None
1163 1162 self.advisoryparams = None
1164 1163 self.params = None
1165 1164 self.mandatorykeys = ()
1166 1165 self._payloadstream = None
1167 1166 self._readheader()
1168 1167 self._mandatory = None
1169 1168 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1170 1169 self._pos = 0
1171 1170
1172 1171 def _fromheader(self, size):
1173 1172 """return the next <size> byte from the header"""
1174 1173 offset = self._headeroffset
1175 1174 data = self._headerdata[offset:(offset + size)]
1176 1175 self._headeroffset = offset + size
1177 1176 return data
1178 1177
1179 1178 def _unpackheader(self, format):
1180 1179 """read given format from header
1181 1180
1182 1181 This automatically compute the size of the format to read."""
1183 1182 data = self._fromheader(struct.calcsize(format))
1184 1183 return _unpack(format, data)
1185 1184
1186 1185 def _initparams(self, mandatoryparams, advisoryparams):
1187 1186 """internal function to setup all logic related parameters"""
1188 1187 # make it read only to prevent people touching it by mistake.
1189 1188 self.mandatoryparams = tuple(mandatoryparams)
1190 1189 self.advisoryparams = tuple(advisoryparams)
1191 1190 # user friendly UI
1192 1191 self.params = util.sortdict(self.mandatoryparams)
1193 1192 self.params.update(self.advisoryparams)
1194 1193 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1195 1194
1196 1195 def _payloadchunks(self, chunknum=0):
1197 1196 '''seek to specified chunk and start yielding data'''
1198 1197 if len(self._chunkindex) == 0:
1199 1198 assert chunknum == 0, 'Must start with chunk 0'
1200 1199 self._chunkindex.append((0, self._tellfp()))
1201 1200 else:
1202 1201 assert chunknum < len(self._chunkindex), \
1203 1202 'Unknown chunk %d' % chunknum
1204 1203 self._seekfp(self._chunkindex[chunknum][1])
1205 1204
1206 1205 pos = self._chunkindex[chunknum][0]
1207 1206 payloadsize = self._unpack(_fpayloadsize)[0]
1208 1207 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1209 1208 while payloadsize:
1210 1209 if payloadsize == flaginterrupt:
1211 1210 # interruption detection, the handler will now read a
1212 1211 # single part and process it.
1213 1212 interrupthandler(self.ui, self._fp)()
1214 1213 elif payloadsize < 0:
1215 1214 msg = 'negative payload chunk size: %i' % payloadsize
1216 1215 raise error.BundleValueError(msg)
1217 1216 else:
1218 1217 result = self._readexact(payloadsize)
1219 1218 chunknum += 1
1220 1219 pos += payloadsize
1221 1220 if chunknum == len(self._chunkindex):
1222 1221 self._chunkindex.append((pos, self._tellfp()))
1223 1222 yield result
1224 1223 payloadsize = self._unpack(_fpayloadsize)[0]
1225 1224 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1226 1225
1227 1226 def _findchunk(self, pos):
1228 1227 '''for a given payload position, return a chunk number and offset'''
1229 1228 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1230 1229 if ppos == pos:
1231 1230 return chunk, 0
1232 1231 elif ppos > pos:
1233 1232 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1234 1233 raise ValueError('Unknown chunk')
1235 1234
1236 1235 def _readheader(self):
1237 1236 """read the header and setup the object"""
1238 1237 typesize = self._unpackheader(_fparttypesize)[0]
1239 1238 self.type = self._fromheader(typesize)
1240 1239 indebug(self.ui, 'part type: "%s"' % self.type)
1241 1240 self.id = self._unpackheader(_fpartid)[0]
1242 1241 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1243 1242 # extract mandatory bit from type
1244 1243 self.mandatory = (self.type != self.type.lower())
1245 1244 self.type = self.type.lower()
1246 1245 ## reading parameters
1247 1246 # param count
1248 1247 mancount, advcount = self._unpackheader(_fpartparamcount)
1249 1248 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1250 1249 # param size
1251 1250 fparamsizes = _makefpartparamsizes(mancount + advcount)
1252 1251 paramsizes = self._unpackheader(fparamsizes)
1253 1252 # make it a list of couple again
1254 1253 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1255 1254 # split mandatory from advisory
1256 1255 mansizes = paramsizes[:mancount]
1257 1256 advsizes = paramsizes[mancount:]
1258 1257 # retrieve param value
1259 1258 manparams = []
1260 1259 for key, value in mansizes:
1261 1260 manparams.append((self._fromheader(key), self._fromheader(value)))
1262 1261 advparams = []
1263 1262 for key, value in advsizes:
1264 1263 advparams.append((self._fromheader(key), self._fromheader(value)))
1265 1264 self._initparams(manparams, advparams)
1266 1265 ## part payload
1267 1266 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1268 1267 # we read the data, tell it
1269 1268 self._initialized = True
1270 1269
1271 1270 def read(self, size=None):
1272 1271 """read payload data"""
1273 1272 if not self._initialized:
1274 1273 self._readheader()
1275 1274 if size is None:
1276 1275 data = self._payloadstream.read()
1277 1276 else:
1278 1277 data = self._payloadstream.read(size)
1279 1278 self._pos += len(data)
1280 1279 if size is None or len(data) < size:
1281 1280 if not self.consumed and self._pos:
1282 1281 self.ui.debug('bundle2-input-part: total payload size %i\n'
1283 1282 % self._pos)
1284 1283 self.consumed = True
1285 1284 return data
1286 1285
1287 1286 def tell(self):
1288 1287 return self._pos
1289 1288
1290 1289 def seek(self, offset, whence=0):
1291 1290 if whence == 0:
1292 1291 newpos = offset
1293 1292 elif whence == 1:
1294 1293 newpos = self._pos + offset
1295 1294 elif whence == 2:
1296 1295 if not self.consumed:
1297 1296 self.read()
1298 1297 newpos = self._chunkindex[-1][0] - offset
1299 1298 else:
1300 1299 raise ValueError('Unknown whence value: %r' % (whence,))
1301 1300
1302 1301 if newpos > self._chunkindex[-1][0] and not self.consumed:
1303 1302 self.read()
1304 1303 if not 0 <= newpos <= self._chunkindex[-1][0]:
1305 1304 raise ValueError('Offset out of range')
1306 1305
1307 1306 if self._pos != newpos:
1308 1307 chunk, internaloffset = self._findchunk(newpos)
1309 1308 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1310 1309 adjust = self.read(internaloffset)
1311 1310 if len(adjust) != internaloffset:
1312 1311 raise error.Abort(_('Seek failed\n'))
1313 1312 self._pos = newpos
1314 1313
1315 1314 def _seekfp(self, offset, whence=0):
1316 1315 """move the underlying file pointer
1317 1316
1318 1317 This method is meant for internal usage by the bundle2 protocol only.
1319 1318 They directly manipulate the low level stream including bundle2 level
1320 1319 instruction.
1321 1320
1322 1321 Do not use it to implement higher-level logic or methods."""
1323 1322 if self._seekable:
1324 1323 return self._fp.seek(offset, whence)
1325 1324 else:
1326 1325 raise NotImplementedError(_('File pointer is not seekable'))
1327 1326
1328 1327 def _tellfp(self):
1329 1328 """return the file offset, or None if file is not seekable
1330 1329
1331 1330 This method is meant for internal usage by the bundle2 protocol only.
1332 1331 They directly manipulate the low level stream including bundle2 level
1333 1332 instruction.
1334 1333
1335 1334 Do not use it to implement higher-level logic or methods."""
1336 1335 if self._seekable:
1337 1336 try:
1338 1337 return self._fp.tell()
1339 1338 except IOError as e:
1340 1339 if e.errno == errno.ESPIPE:
1341 1340 self._seekable = False
1342 1341 else:
1343 1342 raise
1344 1343 return None
1345 1344
1346 1345 # These are only the static capabilities.
1347 1346 # Check the 'getrepocaps' function for the rest.
1348 1347 capabilities = {'HG20': (),
1349 1348 'error': ('abort', 'unsupportedcontent', 'pushraced',
1350 1349 'pushkey'),
1351 1350 'listkeys': (),
1352 1351 'pushkey': (),
1353 1352 'digests': tuple(sorted(util.DIGESTS.keys())),
1354 1353 'remote-changegroup': ('http', 'https'),
1355 1354 'hgtagsfnodes': (),
1356 1355 }
1357 1356
1358 1357 def getrepocaps(repo, allowpushback=False):
1359 1358 """return the bundle2 capabilities for a given repo
1360 1359
1361 1360 Exists to allow extensions (like evolution) to mutate the capabilities.
1362 1361 """
1363 1362 caps = capabilities.copy()
1364 1363 caps['changegroup'] = tuple(sorted(
1365 1364 changegroup.supportedincomingversions(repo)))
1366 1365 if obsolete.isenabled(repo, obsolete.exchangeopt):
1367 1366 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1368 1367 caps['obsmarkers'] = supportedformat
1369 1368 if allowpushback:
1370 1369 caps['pushback'] = ()
1371 1370 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1372 1371 if cpmode == 'check-related':
1373 1372 caps['checkheads'] = ('related',)
1374 1373 return caps
1375 1374
1376 1375 def bundle2caps(remote):
1377 1376 """return the bundle capabilities of a peer as dict"""
1378 1377 raw = remote.capable('bundle2')
1379 1378 if not raw and raw != '':
1380 1379 return {}
1381 1380 capsblob = urlreq.unquote(remote.capable('bundle2'))
1382 1381 return decodecaps(capsblob)
1383 1382
1384 1383 def obsmarkersversion(caps):
1385 1384 """extract the list of supported obsmarkers versions from a bundle2caps dict
1386 1385 """
1387 1386 obscaps = caps.get('obsmarkers', ())
1388 1387 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1389 1388
1390 1389 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1391 1390 vfs=None, compression=None, compopts=None):
1392 1391 if bundletype.startswith('HG10'):
1393 1392 cg = changegroup.getchangegroup(repo, source, outgoing, version='01')
1394 1393 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1395 1394 compression=compression, compopts=compopts)
1396 1395 elif not bundletype.startswith('HG20'):
1397 1396 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1398 1397
1399 1398 caps = {}
1400 1399 if 'obsolescence' in opts:
1401 1400 caps['obsmarkers'] = ('V1',)
1402 1401 bundle = bundle20(ui, caps)
1403 1402 bundle.setcompression(compression, compopts)
1404 1403 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1405 1404 chunkiter = bundle.getchunks()
1406 1405
1407 1406 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1408 1407
1409 1408 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1410 1409 # We should eventually reconcile this logic with the one behind
1411 1410 # 'exchange.getbundle2partsgenerator'.
1412 1411 #
1413 1412 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1414 1413 # different right now. So we keep them separated for now for the sake of
1415 1414 # simplicity.
1416 1415
1417 1416 # we always want a changegroup in such bundle
1418 1417 cgversion = opts.get('cg.version')
1419 1418 if cgversion is None:
1420 1419 cgversion = changegroup.safeversion(repo)
1421 1420 cg = changegroup.getchangegroup(repo, source, outgoing,
1422 1421 version=cgversion)
1423 1422 part = bundler.newpart('changegroup', data=cg.getchunks())
1424 1423 part.addparam('version', cg.version)
1425 1424 if 'clcount' in cg.extras:
1426 1425 part.addparam('nbchanges', str(cg.extras['clcount']),
1427 1426 mandatory=False)
1428 1427 if opts.get('phases') and repo.revs('%ln and secret()',
1429 1428 outgoing.missingheads):
1430 1429 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1431 1430
1432 1431 addparttagsfnodescache(repo, bundler, outgoing)
1433 1432
1434 1433 if opts.get('obsolescence', False):
1435 1434 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1436 1435 buildobsmarkerspart(bundler, obsmarkers)
1437 1436
1438 1437 if opts.get('phases', False):
1439 1438 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1440 1439 phasedata = []
1441 1440 for phase in phases.allphases:
1442 1441 for head in headsbyphase[phase]:
1443 1442 phasedata.append(_pack(_fphasesentry, phase, head))
1444 1443 bundler.newpart('phase-heads', data=''.join(phasedata))
1445 1444
1446 1445 def addparttagsfnodescache(repo, bundler, outgoing):
1447 1446 # we include the tags fnode cache for the bundle changeset
1448 1447 # (as an optional parts)
1449 1448 cache = tags.hgtagsfnodescache(repo.unfiltered())
1450 1449 chunks = []
1451 1450
1452 1451 # .hgtags fnodes are only relevant for head changesets. While we could
1453 1452 # transfer values for all known nodes, there will likely be little to
1454 1453 # no benefit.
1455 1454 #
1456 1455 # We don't bother using a generator to produce output data because
1457 1456 # a) we only have 40 bytes per head and even esoteric numbers of heads
1458 1457 # consume little memory (1M heads is 40MB) b) we don't want to send the
1459 1458 # part if we don't have entries and knowing if we have entries requires
1460 1459 # cache lookups.
1461 1460 for node in outgoing.missingheads:
1462 1461 # Don't compute missing, as this may slow down serving.
1463 1462 fnode = cache.getfnode(node, computemissing=False)
1464 1463 if fnode is not None:
1465 1464 chunks.extend([node, fnode])
1466 1465
1467 1466 if chunks:
1468 1467 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1469 1468
1470 1469 def buildobsmarkerspart(bundler, markers):
1471 1470 """add an obsmarker part to the bundler with <markers>
1472 1471
1473 1472 No part is created if markers is empty.
1474 1473 Raises ValueError if the bundler doesn't support any known obsmarker format.
1475 1474 """
1476 1475 if not markers:
1477 1476 return None
1478 1477
1479 1478 remoteversions = obsmarkersversion(bundler.capabilities)
1480 1479 version = obsolete.commonversion(remoteversions)
1481 1480 if version is None:
1482 1481 raise ValueError('bundler does not support common obsmarker format')
1483 1482 stream = obsolete.encodemarkers(markers, True, version=version)
1484 1483 return bundler.newpart('obsmarkers', data=stream)
1485 1484
1486 1485 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1487 1486 compopts=None):
1488 1487 """Write a bundle file and return its filename.
1489 1488
1490 1489 Existing files will not be overwritten.
1491 1490 If no filename is specified, a temporary file is created.
1492 1491 bz2 compression can be turned off.
1493 1492 The bundle file will be deleted in case of errors.
1494 1493 """
1495 1494
1496 1495 if bundletype == "HG20":
1497 1496 bundle = bundle20(ui)
1498 1497 bundle.setcompression(compression, compopts)
1499 1498 part = bundle.newpart('changegroup', data=cg.getchunks())
1500 1499 part.addparam('version', cg.version)
1501 1500 if 'clcount' in cg.extras:
1502 1501 part.addparam('nbchanges', str(cg.extras['clcount']),
1503 1502 mandatory=False)
1504 1503 chunkiter = bundle.getchunks()
1505 1504 else:
1506 1505 # compression argument is only for the bundle2 case
1507 1506 assert compression is None
1508 1507 if cg.version != '01':
1509 1508 raise error.Abort(_('old bundle types only supports v1 '
1510 1509 'changegroups'))
1511 1510 header, comp = bundletypes[bundletype]
1512 1511 if comp not in util.compengines.supportedbundletypes:
1513 1512 raise error.Abort(_('unknown stream compression type: %s')
1514 1513 % comp)
1515 1514 compengine = util.compengines.forbundletype(comp)
1516 1515 def chunkiter():
1517 1516 yield header
1518 1517 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1519 1518 yield chunk
1520 1519 chunkiter = chunkiter()
1521 1520
1522 1521 # parse the changegroup data, otherwise we will block
1523 1522 # in case of sshrepo because we don't know the end of the stream
1524 1523 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1525 1524
1526 1525 def combinechangegroupresults(op):
1527 1526 """logic to combine 0 or more addchangegroup results into one"""
1528 1527 results = [r.get('return', 0)
1529 1528 for r in op.records['changegroup']]
1530 1529 changedheads = 0
1531 1530 result = 1
1532 1531 for ret in results:
1533 1532 # If any changegroup result is 0, return 0
1534 1533 if ret == 0:
1535 1534 result = 0
1536 1535 break
1537 1536 if ret < -1:
1538 1537 changedheads += ret + 1
1539 1538 elif ret > 1:
1540 1539 changedheads += ret - 1
1541 1540 if changedheads > 0:
1542 1541 result = 1 + changedheads
1543 1542 elif changedheads < 0:
1544 1543 result = -1 + changedheads
1545 1544 return result
1546 1545
1547 1546 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1548 1547 'targetphase'))
1549 1548 def handlechangegroup(op, inpart):
1550 1549 """apply a changegroup part on the repo
1551 1550
1552 1551 This is a very early implementation that will massive rework before being
1553 1552 inflicted to any end-user.
1554 1553 """
1555 1554 tr = op.gettransaction()
1556 1555 unpackerversion = inpart.params.get('version', '01')
1557 1556 # We should raise an appropriate exception here
1558 1557 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1559 1558 # the source and url passed here are overwritten by the one contained in
1560 1559 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1561 1560 nbchangesets = None
1562 1561 if 'nbchanges' in inpart.params:
1563 1562 nbchangesets = int(inpart.params.get('nbchanges'))
1564 1563 if ('treemanifest' in inpart.params and
1565 1564 'treemanifest' not in op.repo.requirements):
1566 1565 if len(op.repo.changelog) != 0:
1567 1566 raise error.Abort(_(
1568 1567 "bundle contains tree manifests, but local repo is "
1569 1568 "non-empty and does not use tree manifests"))
1570 1569 op.repo.requirements.add('treemanifest')
1571 1570 op.repo._applyopenerreqs()
1572 1571 op.repo._writerequirements()
1573 1572 extrakwargs = {}
1574 1573 targetphase = inpart.params.get('targetphase')
1575 1574 if targetphase is not None:
1576 1575 extrakwargs['targetphase'] = int(targetphase)
1577 1576 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1578 1577 expectedtotal=nbchangesets, **extrakwargs)
1579 1578 if op.reply is not None:
1580 1579 # This is definitely not the final form of this
1581 1580 # return. But one need to start somewhere.
1582 1581 part = op.reply.newpart('reply:changegroup', mandatory=False)
1583 1582 part.addparam(
1584 1583 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1585 1584 part.addparam('return', '%i' % ret, mandatory=False)
1586 1585 assert not inpart.read()
1587 1586
1588 1587 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1589 1588 ['digest:%s' % k for k in util.DIGESTS.keys()])
1590 1589 @parthandler('remote-changegroup', _remotechangegroupparams)
1591 1590 def handleremotechangegroup(op, inpart):
1592 1591 """apply a bundle10 on the repo, given an url and validation information
1593 1592
1594 1593 All the information about the remote bundle to import are given as
1595 1594 parameters. The parameters include:
1596 1595 - url: the url to the bundle10.
1597 1596 - size: the bundle10 file size. It is used to validate what was
1598 1597 retrieved by the client matches the server knowledge about the bundle.
1599 1598 - digests: a space separated list of the digest types provided as
1600 1599 parameters.
1601 1600 - digest:<digest-type>: the hexadecimal representation of the digest with
1602 1601 that name. Like the size, it is used to validate what was retrieved by
1603 1602 the client matches what the server knows about the bundle.
1604 1603
1605 1604 When multiple digest types are given, all of them are checked.
1606 1605 """
1607 1606 try:
1608 1607 raw_url = inpart.params['url']
1609 1608 except KeyError:
1610 1609 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1611 1610 parsed_url = util.url(raw_url)
1612 1611 if parsed_url.scheme not in capabilities['remote-changegroup']:
1613 1612 raise error.Abort(_('remote-changegroup does not support %s urls') %
1614 1613 parsed_url.scheme)
1615 1614
1616 1615 try:
1617 1616 size = int(inpart.params['size'])
1618 1617 except ValueError:
1619 1618 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1620 1619 % 'size')
1621 1620 except KeyError:
1622 1621 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1623 1622
1624 1623 digests = {}
1625 1624 for typ in inpart.params.get('digests', '').split():
1626 1625 param = 'digest:%s' % typ
1627 1626 try:
1628 1627 value = inpart.params[param]
1629 1628 except KeyError:
1630 1629 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1631 1630 param)
1632 1631 digests[typ] = value
1633 1632
1634 1633 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1635 1634
1636 1635 tr = op.gettransaction()
1637 1636 from . import exchange
1638 1637 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1639 1638 if not isinstance(cg, changegroup.cg1unpacker):
1640 1639 raise error.Abort(_('%s: not a bundle version 1.0') %
1641 1640 util.hidepassword(raw_url))
1642 1641 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1643 1642 if op.reply is not None:
1644 1643 # This is definitely not the final form of this
1645 1644 # return. But one need to start somewhere.
1646 1645 part = op.reply.newpart('reply:changegroup')
1647 1646 part.addparam(
1648 1647 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1649 1648 part.addparam('return', '%i' % ret, mandatory=False)
1650 1649 try:
1651 1650 real_part.validate()
1652 1651 except error.Abort as e:
1653 1652 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1654 1653 (util.hidepassword(raw_url), str(e)))
1655 1654 assert not inpart.read()
1656 1655
1657 1656 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1658 1657 def handlereplychangegroup(op, inpart):
1659 1658 ret = int(inpart.params['return'])
1660 1659 replyto = int(inpart.params['in-reply-to'])
1661 1660 op.records.add('changegroup', {'return': ret}, replyto)
1662 1661
1663 1662 @parthandler('check:heads')
1664 1663 def handlecheckheads(op, inpart):
1665 1664 """check that head of the repo did not change
1666 1665
1667 1666 This is used to detect a push race when using unbundle.
1668 1667 This replaces the "heads" argument of unbundle."""
1669 1668 h = inpart.read(20)
1670 1669 heads = []
1671 1670 while len(h) == 20:
1672 1671 heads.append(h)
1673 1672 h = inpart.read(20)
1674 1673 assert not h
1675 1674 # Trigger a transaction so that we are guaranteed to have the lock now.
1676 1675 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1677 1676 op.gettransaction()
1678 1677 if sorted(heads) != sorted(op.repo.heads()):
1679 1678 raise error.PushRaced('repository changed while pushing - '
1680 1679 'please try again')
1681 1680
1682 1681 @parthandler('check:updated-heads')
1683 1682 def handlecheckupdatedheads(op, inpart):
1684 1683 """check for race on the heads touched by a push
1685 1684
1686 1685 This is similar to 'check:heads' but focus on the heads actually updated
1687 1686 during the push. If other activities happen on unrelated heads, it is
1688 1687 ignored.
1689 1688
1690 1689 This allow server with high traffic to avoid push contention as long as
1691 1690 unrelated parts of the graph are involved."""
1692 1691 h = inpart.read(20)
1693 1692 heads = []
1694 1693 while len(h) == 20:
1695 1694 heads.append(h)
1696 1695 h = inpart.read(20)
1697 1696 assert not h
1698 1697 # trigger a transaction so that we are guaranteed to have the lock now.
1699 1698 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1700 1699 op.gettransaction()
1701 1700
1702 1701 currentheads = set()
1703 1702 for ls in op.repo.branchmap().itervalues():
1704 1703 currentheads.update(ls)
1705 1704
1706 1705 for h in heads:
1707 1706 if h not in currentheads:
1708 1707 raise error.PushRaced('repository changed while pushing - '
1709 1708 'please try again')
1710 1709
1711 1710 @parthandler('output')
1712 1711 def handleoutput(op, inpart):
1713 1712 """forward output captured on the server to the client"""
1714 1713 for line in inpart.read().splitlines():
1715 1714 op.ui.status(_('remote: %s\n') % line)
1716 1715
1717 1716 @parthandler('replycaps')
1718 1717 def handlereplycaps(op, inpart):
1719 1718 """Notify that a reply bundle should be created
1720 1719
1721 1720 The payload contains the capabilities information for the reply"""
1722 1721 caps = decodecaps(inpart.read())
1723 1722 if op.reply is None:
1724 1723 op.reply = bundle20(op.ui, caps)
1725 1724
1726 1725 class AbortFromPart(error.Abort):
1727 1726 """Sub-class of Abort that denotes an error from a bundle2 part."""
1728 1727
1729 1728 @parthandler('error:abort', ('message', 'hint'))
1730 1729 def handleerrorabort(op, inpart):
1731 1730 """Used to transmit abort error over the wire"""
1732 1731 raise AbortFromPart(inpart.params['message'],
1733 1732 hint=inpart.params.get('hint'))
1734 1733
1735 1734 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1736 1735 'in-reply-to'))
1737 1736 def handleerrorpushkey(op, inpart):
1738 1737 """Used to transmit failure of a mandatory pushkey over the wire"""
1739 1738 kwargs = {}
1740 1739 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1741 1740 value = inpart.params.get(name)
1742 1741 if value is not None:
1743 1742 kwargs[name] = value
1744 1743 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1745 1744
1746 1745 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1747 1746 def handleerrorunsupportedcontent(op, inpart):
1748 1747 """Used to transmit unknown content error over the wire"""
1749 1748 kwargs = {}
1750 1749 parttype = inpart.params.get('parttype')
1751 1750 if parttype is not None:
1752 1751 kwargs['parttype'] = parttype
1753 1752 params = inpart.params.get('params')
1754 1753 if params is not None:
1755 1754 kwargs['params'] = params.split('\0')
1756 1755
1757 1756 raise error.BundleUnknownFeatureError(**kwargs)
1758 1757
1759 1758 @parthandler('error:pushraced', ('message',))
1760 1759 def handleerrorpushraced(op, inpart):
1761 1760 """Used to transmit push race error over the wire"""
1762 1761 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1763 1762
1764 1763 @parthandler('listkeys', ('namespace',))
1765 1764 def handlelistkeys(op, inpart):
1766 1765 """retrieve pushkey namespace content stored in a bundle2"""
1767 1766 namespace = inpart.params['namespace']
1768 1767 r = pushkey.decodekeys(inpart.read())
1769 1768 op.records.add('listkeys', (namespace, r))
1770 1769
1771 1770 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1772 1771 def handlepushkey(op, inpart):
1773 1772 """process a pushkey request"""
1774 1773 dec = pushkey.decode
1775 1774 namespace = dec(inpart.params['namespace'])
1776 1775 key = dec(inpart.params['key'])
1777 1776 old = dec(inpart.params['old'])
1778 1777 new = dec(inpart.params['new'])
1779 1778 # Grab the transaction to ensure that we have the lock before performing the
1780 1779 # pushkey.
1781 1780 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1782 1781 op.gettransaction()
1783 1782 ret = op.repo.pushkey(namespace, key, old, new)
1784 1783 record = {'namespace': namespace,
1785 1784 'key': key,
1786 1785 'old': old,
1787 1786 'new': new}
1788 1787 op.records.add('pushkey', record)
1789 1788 if op.reply is not None:
1790 1789 rpart = op.reply.newpart('reply:pushkey')
1791 1790 rpart.addparam(
1792 1791 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1793 1792 rpart.addparam('return', '%i' % ret, mandatory=False)
1794 1793 if inpart.mandatory and not ret:
1795 1794 kwargs = {}
1796 1795 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1797 1796 if key in inpart.params:
1798 1797 kwargs[key] = inpart.params[key]
1799 1798 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1800 1799
1801 1800 def _readphaseheads(inpart):
1802 1801 headsbyphase = [[] for i in phases.allphases]
1803 1802 entrysize = struct.calcsize(_fphasesentry)
1804 1803 while True:
1805 1804 entry = inpart.read(entrysize)
1806 1805 if len(entry) < entrysize:
1807 1806 if entry:
1808 1807 raise error.Abort(_('bad phase-heads bundle part'))
1809 1808 break
1810 1809 phase, node = struct.unpack(_fphasesentry, entry)
1811 1810 headsbyphase[phase].append(node)
1812 1811 return headsbyphase
1813 1812
1814 1813 @parthandler('phase-heads')
1815 1814 def handlephases(op, inpart):
1816 1815 """apply phases from bundle part to repo"""
1817 1816 headsbyphase = _readphaseheads(inpart)
1818 1817 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1819 1818
1820 1819 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1821 1820 def handlepushkeyreply(op, inpart):
1822 1821 """retrieve the result of a pushkey request"""
1823 1822 ret = int(inpart.params['return'])
1824 1823 partid = int(inpart.params['in-reply-to'])
1825 1824 op.records.add('pushkey', {'return': ret}, partid)
1826 1825
1827 1826 @parthandler('obsmarkers')
1828 1827 def handleobsmarker(op, inpart):
1829 1828 """add a stream of obsmarkers to the repo"""
1830 1829 tr = op.gettransaction()
1831 1830 markerdata = inpart.read()
1832 1831 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1833 1832 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1834 1833 % len(markerdata))
1835 1834 # The mergemarkers call will crash if marker creation is not enabled.
1836 1835 # we want to avoid this if the part is advisory.
1837 1836 if not inpart.mandatory and op.repo.obsstore.readonly:
1838 1837 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1839 1838 return
1840 1839 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1841 1840 op.repo.invalidatevolatilesets()
1842 1841 if new:
1843 1842 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1844 1843 op.records.add('obsmarkers', {'new': new})
1845 1844 if op.reply is not None:
1846 1845 rpart = op.reply.newpart('reply:obsmarkers')
1847 1846 rpart.addparam(
1848 1847 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1849 1848 rpart.addparam('new', '%i' % new, mandatory=False)
1850 1849
1851 1850
1852 1851 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1853 1852 def handleobsmarkerreply(op, inpart):
1854 1853 """retrieve the result of a pushkey request"""
1855 1854 ret = int(inpart.params['new'])
1856 1855 partid = int(inpart.params['in-reply-to'])
1857 1856 op.records.add('obsmarkers', {'new': ret}, partid)
1858 1857
1859 1858 @parthandler('hgtagsfnodes')
1860 1859 def handlehgtagsfnodes(op, inpart):
1861 1860 """Applies .hgtags fnodes cache entries to the local repo.
1862 1861
1863 1862 Payload is pairs of 20 byte changeset nodes and filenodes.
1864 1863 """
1865 1864 # Grab the transaction so we ensure that we have the lock at this point.
1866 1865 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1867 1866 op.gettransaction()
1868 1867 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1869 1868
1870 1869 count = 0
1871 1870 while True:
1872 1871 node = inpart.read(20)
1873 1872 fnode = inpart.read(20)
1874 1873 if len(node) < 20 or len(fnode) < 20:
1875 1874 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1876 1875 break
1877 1876 cache.setfnode(node, fnode)
1878 1877 count += 1
1879 1878
1880 1879 cache.write()
1881 1880 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1882 1881
1883 1882 @parthandler('pushvars')
1884 1883 def bundle2getvars(op, part):
1885 1884 '''unbundle a bundle2 containing shellvars on the server'''
1886 1885 # An option to disable unbundling on server-side for security reasons
1887 1886 if op.ui.configbool('push', 'pushvars.server', False):
1888 1887 hookargs = {}
1889 1888 for key, value in part.advisoryparams:
1890 1889 key = key.upper()
1891 1890 # We want pushed variables to have USERVAR_ prepended so we know
1892 1891 # they came from the --pushvar flag.
1893 1892 key = "USERVAR_" + key
1894 1893 hookargs[key] = value
1895 1894 op.addhookargs(hookargs)
General Comments 0
You need to be logged in to leave comments. Login now