##// END OF EJS Templates
bundle2: convert ints to strings using pycompat.bytestring()...
Augie Fackler -
r33677:373ca510 default
parent child Browse files
Show More
@@ -1,1895 +1,1895
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import, division
149 149
150 150 import errno
151 151 import re
152 152 import string
153 153 import struct
154 154 import sys
155 155
156 156 from .i18n import _
157 157 from . import (
158 158 changegroup,
159 159 error,
160 160 obsolete,
161 161 phases,
162 162 pushkey,
163 163 pycompat,
164 164 tags,
165 165 url,
166 166 util,
167 167 )
168 168
169 169 urlerr = util.urlerr
170 170 urlreq = util.urlreq
171 171
172 172 _pack = struct.pack
173 173 _unpack = struct.unpack
174 174
175 175 _fstreamparamsize = '>i'
176 176 _fpartheadersize = '>i'
177 177 _fparttypesize = '>B'
178 178 _fpartid = '>I'
179 179 _fpayloadsize = '>i'
180 180 _fpartparamcount = '>BB'
181 181
182 182 _fphasesentry = '>i20s'
183 183
184 184 preferedchunksize = 4096
185 185
186 186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
187 187
188 188 def outdebug(ui, message):
189 189 """debug regarding output stream (bundling)"""
190 190 if ui.configbool('devel', 'bundle2.debug'):
191 191 ui.debug('bundle2-output: %s\n' % message)
192 192
193 193 def indebug(ui, message):
194 194 """debug on input stream (unbundling)"""
195 195 if ui.configbool('devel', 'bundle2.debug'):
196 196 ui.debug('bundle2-input: %s\n' % message)
197 197
198 198 def validateparttype(parttype):
199 199 """raise ValueError if a parttype contains invalid character"""
200 200 if _parttypeforbidden.search(parttype):
201 201 raise ValueError(parttype)
202 202
203 203 def _makefpartparamsizes(nbparams):
204 204 """return a struct format to read part parameter sizes
205 205
206 206 The number parameters is variable so we need to build that format
207 207 dynamically.
208 208 """
209 209 return '>'+('BB'*nbparams)
210 210
211 211 parthandlermapping = {}
212 212
213 213 def parthandler(parttype, params=()):
214 214 """decorator that register a function as a bundle2 part handler
215 215
216 216 eg::
217 217
218 218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
219 219 def myparttypehandler(...):
220 220 '''process a part of type "my part".'''
221 221 ...
222 222 """
223 223 validateparttype(parttype)
224 224 def _decorator(func):
225 225 lparttype = parttype.lower() # enforce lower case matching.
226 226 assert lparttype not in parthandlermapping
227 227 parthandlermapping[lparttype] = func
228 228 func.params = frozenset(params)
229 229 return func
230 230 return _decorator
231 231
232 232 class unbundlerecords(object):
233 233 """keep record of what happens during and unbundle
234 234
235 235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
236 236 category of record and obj is an arbitrary object.
237 237
238 238 `records['cat']` will return all entries of this category 'cat'.
239 239
240 240 Iterating on the object itself will yield `('category', obj)` tuples
241 241 for all entries.
242 242
243 243 All iterations happens in chronological order.
244 244 """
245 245
246 246 def __init__(self):
247 247 self._categories = {}
248 248 self._sequences = []
249 249 self._replies = {}
250 250
251 251 def add(self, category, entry, inreplyto=None):
252 252 """add a new record of a given category.
253 253
254 254 The entry can then be retrieved in the list returned by
255 255 self['category']."""
256 256 self._categories.setdefault(category, []).append(entry)
257 257 self._sequences.append((category, entry))
258 258 if inreplyto is not None:
259 259 self.getreplies(inreplyto).add(category, entry)
260 260
261 261 def getreplies(self, partid):
262 262 """get the records that are replies to a specific part"""
263 263 return self._replies.setdefault(partid, unbundlerecords())
264 264
265 265 def __getitem__(self, cat):
266 266 return tuple(self._categories.get(cat, ()))
267 267
268 268 def __iter__(self):
269 269 return iter(self._sequences)
270 270
271 271 def __len__(self):
272 272 return len(self._sequences)
273 273
274 274 def __nonzero__(self):
275 275 return bool(self._sequences)
276 276
277 277 __bool__ = __nonzero__
278 278
279 279 class bundleoperation(object):
280 280 """an object that represents a single bundling process
281 281
282 282 Its purpose is to carry unbundle-related objects and states.
283 283
284 284 A new object should be created at the beginning of each bundle processing.
285 285 The object is to be returned by the processing function.
286 286
287 287 The object has very little content now it will ultimately contain:
288 288 * an access to the repo the bundle is applied to,
289 289 * a ui object,
290 290 * a way to retrieve a transaction to add changes to the repo,
291 291 * a way to record the result of processing each part,
292 292 * a way to construct a bundle response when applicable.
293 293 """
294 294
295 295 def __init__(self, repo, transactiongetter, captureoutput=True):
296 296 self.repo = repo
297 297 self.ui = repo.ui
298 298 self.records = unbundlerecords()
299 299 self.reply = None
300 300 self.captureoutput = captureoutput
301 301 self.hookargs = {}
302 302 self._gettransaction = transactiongetter
303 303
304 304 def gettransaction(self):
305 305 transaction = self._gettransaction()
306 306
307 307 if self.hookargs is not None:
308 308 # the ones added to the transaction supercede those added
309 309 # to the operation.
310 310 self.hookargs.update(transaction.hookargs)
311 311 transaction.hookargs = self.hookargs
312 312
313 313 # mark the hookargs as flushed. further attempts to add to
314 314 # hookargs will result in an abort.
315 315 self.hookargs = None
316 316
317 317 return transaction
318 318
319 319 def addhookargs(self, hookargs):
320 320 if self.hookargs is None:
321 321 raise error.Abort(
322 322 _('attempted to add hooks to operation after transaction '
323 323 'started'))
324 324 self.hookargs.update(hookargs)
325 325
326 326 class TransactionUnavailable(RuntimeError):
327 327 pass
328 328
329 329 def _notransaction():
330 330 """default method to get a transaction while processing a bundle
331 331
332 332 Raise an exception to highlight the fact that no transaction was expected
333 333 to be created"""
334 334 raise TransactionUnavailable()
335 335
336 336 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
337 337 # transform me into unbundler.apply() as soon as the freeze is lifted
338 338 if isinstance(unbundler, unbundle20):
339 339 tr.hookargs['bundle2'] = '1'
340 340 if source is not None and 'source' not in tr.hookargs:
341 341 tr.hookargs['source'] = source
342 342 if url is not None and 'url' not in tr.hookargs:
343 343 tr.hookargs['url'] = url
344 344 return processbundle(repo, unbundler, lambda: tr)
345 345 else:
346 346 # the transactiongetter won't be used, but we might as well set it
347 347 op = bundleoperation(repo, lambda: tr)
348 348 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
349 349 return op
350 350
351 351 def processbundle(repo, unbundler, transactiongetter=None, op=None):
352 352 """This function process a bundle, apply effect to/from a repo
353 353
354 354 It iterates over each part then searches for and uses the proper handling
355 355 code to process the part. Parts are processed in order.
356 356
357 357 Unknown Mandatory part will abort the process.
358 358
359 359 It is temporarily possible to provide a prebuilt bundleoperation to the
360 360 function. This is used to ensure output is properly propagated in case of
361 361 an error during the unbundling. This output capturing part will likely be
362 362 reworked and this ability will probably go away in the process.
363 363 """
364 364 if op is None:
365 365 if transactiongetter is None:
366 366 transactiongetter = _notransaction
367 367 op = bundleoperation(repo, transactiongetter)
368 368 # todo:
369 369 # - replace this is a init function soon.
370 370 # - exception catching
371 371 unbundler.params
372 372 if repo.ui.debugflag:
373 373 msg = ['bundle2-input-bundle:']
374 374 if unbundler.params:
375 375 msg.append(' %i params' % len(unbundler.params))
376 376 if op.gettransaction is None or op.gettransaction is _notransaction:
377 377 msg.append(' no-transaction')
378 378 else:
379 379 msg.append(' with-transaction')
380 380 msg.append('\n')
381 381 repo.ui.debug(''.join(msg))
382 382 iterparts = enumerate(unbundler.iterparts())
383 383 part = None
384 384 nbpart = 0
385 385 try:
386 386 for nbpart, part in iterparts:
387 387 _processpart(op, part)
388 388 except Exception as exc:
389 389 # Any exceptions seeking to the end of the bundle at this point are
390 390 # almost certainly related to the underlying stream being bad.
391 391 # And, chances are that the exception we're handling is related to
392 392 # getting in that bad state. So, we swallow the seeking error and
393 393 # re-raise the original error.
394 394 seekerror = False
395 395 try:
396 396 for nbpart, part in iterparts:
397 397 # consume the bundle content
398 398 part.seek(0, 2)
399 399 except Exception:
400 400 seekerror = True
401 401
402 402 # Small hack to let caller code distinguish exceptions from bundle2
403 403 # processing from processing the old format. This is mostly
404 404 # needed to handle different return codes to unbundle according to the
405 405 # type of bundle. We should probably clean up or drop this return code
406 406 # craziness in a future version.
407 407 exc.duringunbundle2 = True
408 408 salvaged = []
409 409 replycaps = None
410 410 if op.reply is not None:
411 411 salvaged = op.reply.salvageoutput()
412 412 replycaps = op.reply.capabilities
413 413 exc._replycaps = replycaps
414 414 exc._bundle2salvagedoutput = salvaged
415 415
416 416 # Re-raising from a variable loses the original stack. So only use
417 417 # that form if we need to.
418 418 if seekerror:
419 419 raise exc
420 420 else:
421 421 raise
422 422 finally:
423 423 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
424 424
425 425 return op
426 426
427 427 def _processchangegroup(op, cg, tr, source, url, **kwargs):
428 428 ret = cg.apply(op.repo, tr, source, url, **kwargs)
429 429 op.records.add('changegroup', {
430 430 'return': ret,
431 431 })
432 432 return ret
433 433
434 434 def _processpart(op, part):
435 435 """process a single part from a bundle
436 436
437 437 The part is guaranteed to have been fully consumed when the function exits
438 438 (even if an exception is raised)."""
439 439 status = 'unknown' # used by debug output
440 440 hardabort = False
441 441 try:
442 442 try:
443 443 handler = parthandlermapping.get(part.type)
444 444 if handler is None:
445 445 status = 'unsupported-type'
446 446 raise error.BundleUnknownFeatureError(parttype=part.type)
447 447 indebug(op.ui, 'found a handler for part %r' % part.type)
448 448 unknownparams = part.mandatorykeys - handler.params
449 449 if unknownparams:
450 450 unknownparams = list(unknownparams)
451 451 unknownparams.sort()
452 452 status = 'unsupported-params (%s)' % unknownparams
453 453 raise error.BundleUnknownFeatureError(parttype=part.type,
454 454 params=unknownparams)
455 455 status = 'supported'
456 456 except error.BundleUnknownFeatureError as exc:
457 457 if part.mandatory: # mandatory parts
458 458 raise
459 459 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
460 460 return # skip to part processing
461 461 finally:
462 462 if op.ui.debugflag:
463 463 msg = ['bundle2-input-part: "%s"' % part.type]
464 464 if not part.mandatory:
465 465 msg.append(' (advisory)')
466 466 nbmp = len(part.mandatorykeys)
467 467 nbap = len(part.params) - nbmp
468 468 if nbmp or nbap:
469 469 msg.append(' (params:')
470 470 if nbmp:
471 471 msg.append(' %i mandatory' % nbmp)
472 472 if nbap:
473 473 msg.append(' %i advisory' % nbmp)
474 474 msg.append(')')
475 475 msg.append(' %s\n' % status)
476 476 op.ui.debug(''.join(msg))
477 477
478 478 # handler is called outside the above try block so that we don't
479 479 # risk catching KeyErrors from anything other than the
480 480 # parthandlermapping lookup (any KeyError raised by handler()
481 481 # itself represents a defect of a different variety).
482 482 output = None
483 483 if op.captureoutput and op.reply is not None:
484 484 op.ui.pushbuffer(error=True, subproc=True)
485 485 output = ''
486 486 try:
487 487 handler(op, part)
488 488 finally:
489 489 if output is not None:
490 490 output = op.ui.popbuffer()
491 491 if output:
492 492 outpart = op.reply.newpart('output', data=output,
493 493 mandatory=False)
494 494 outpart.addparam(
495 495 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
496 496 # If exiting or interrupted, do not attempt to seek the stream in the
497 497 # finally block below. This makes abort faster.
498 498 except (SystemExit, KeyboardInterrupt):
499 499 hardabort = True
500 500 raise
501 501 finally:
502 502 # consume the part content to not corrupt the stream.
503 503 if not hardabort:
504 504 part.seek(0, 2)
505 505
506 506
507 507 def decodecaps(blob):
508 508 """decode a bundle2 caps bytes blob into a dictionary
509 509
510 510 The blob is a list of capabilities (one per line)
511 511 Capabilities may have values using a line of the form::
512 512
513 513 capability=value1,value2,value3
514 514
515 515 The values are always a list."""
516 516 caps = {}
517 517 for line in blob.splitlines():
518 518 if not line:
519 519 continue
520 520 if '=' not in line:
521 521 key, vals = line, ()
522 522 else:
523 523 key, vals = line.split('=', 1)
524 524 vals = vals.split(',')
525 525 key = urlreq.unquote(key)
526 526 vals = [urlreq.unquote(v) for v in vals]
527 527 caps[key] = vals
528 528 return caps
529 529
530 530 def encodecaps(caps):
531 531 """encode a bundle2 caps dictionary into a bytes blob"""
532 532 chunks = []
533 533 for ca in sorted(caps):
534 534 vals = caps[ca]
535 535 ca = urlreq.quote(ca)
536 536 vals = [urlreq.quote(v) for v in vals]
537 537 if vals:
538 538 ca = "%s=%s" % (ca, ','.join(vals))
539 539 chunks.append(ca)
540 540 return '\n'.join(chunks)
541 541
542 542 bundletypes = {
543 543 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
544 544 # since the unification ssh accepts a header but there
545 545 # is no capability signaling it.
546 546 "HG20": (), # special-cased below
547 547 "HG10UN": ("HG10UN", 'UN'),
548 548 "HG10BZ": ("HG10", 'BZ'),
549 549 "HG10GZ": ("HG10GZ", 'GZ'),
550 550 }
551 551
552 552 # hgweb uses this list to communicate its preferred type
553 553 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
554 554
555 555 class bundle20(object):
556 556 """represent an outgoing bundle2 container
557 557
558 558 Use the `addparam` method to add stream level parameter. and `newpart` to
559 559 populate it. Then call `getchunks` to retrieve all the binary chunks of
560 560 data that compose the bundle2 container."""
561 561
562 562 _magicstring = 'HG20'
563 563
564 564 def __init__(self, ui, capabilities=()):
565 565 self.ui = ui
566 566 self._params = []
567 567 self._parts = []
568 568 self.capabilities = dict(capabilities)
569 569 self._compengine = util.compengines.forbundletype('UN')
570 570 self._compopts = None
571 571
572 572 def setcompression(self, alg, compopts=None):
573 573 """setup core part compression to <alg>"""
574 574 if alg in (None, 'UN'):
575 575 return
576 576 assert not any(n.lower() == 'compression' for n, v in self._params)
577 577 self.addparam('Compression', alg)
578 578 self._compengine = util.compengines.forbundletype(alg)
579 579 self._compopts = compopts
580 580
581 581 @property
582 582 def nbparts(self):
583 583 """total number of parts added to the bundler"""
584 584 return len(self._parts)
585 585
586 586 # methods used to defines the bundle2 content
587 587 def addparam(self, name, value=None):
588 588 """add a stream level parameter"""
589 589 if not name:
590 590 raise ValueError('empty parameter name')
591 591 if name[0] not in string.letters:
592 592 raise ValueError('non letter first character: %r' % name)
593 593 self._params.append((name, value))
594 594
595 595 def addpart(self, part):
596 596 """add a new part to the bundle2 container
597 597
598 598 Parts contains the actual applicative payload."""
599 599 assert part.id is None
600 600 part.id = len(self._parts) # very cheap counter
601 601 self._parts.append(part)
602 602
603 603 def newpart(self, typeid, *args, **kwargs):
604 604 """create a new part and add it to the containers
605 605
606 606 As the part is directly added to the containers. For now, this means
607 607 that any failure to properly initialize the part after calling
608 608 ``newpart`` should result in a failure of the whole bundling process.
609 609
610 610 You can still fall back to manually create and add if you need better
611 611 control."""
612 612 part = bundlepart(typeid, *args, **kwargs)
613 613 self.addpart(part)
614 614 return part
615 615
616 616 # methods used to generate the bundle2 stream
617 617 def getchunks(self):
618 618 if self.ui.debugflag:
619 619 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
620 620 if self._params:
621 621 msg.append(' (%i params)' % len(self._params))
622 622 msg.append(' %i parts total\n' % len(self._parts))
623 623 self.ui.debug(''.join(msg))
624 624 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
625 625 yield self._magicstring
626 626 param = self._paramchunk()
627 627 outdebug(self.ui, 'bundle parameter: %s' % param)
628 628 yield _pack(_fstreamparamsize, len(param))
629 629 if param:
630 630 yield param
631 631 for chunk in self._compengine.compressstream(self._getcorechunk(),
632 632 self._compopts):
633 633 yield chunk
634 634
635 635 def _paramchunk(self):
636 636 """return a encoded version of all stream parameters"""
637 637 blocks = []
638 638 for par, value in self._params:
639 639 par = urlreq.quote(par)
640 640 if value is not None:
641 641 value = urlreq.quote(value)
642 642 par = '%s=%s' % (par, value)
643 643 blocks.append(par)
644 644 return ' '.join(blocks)
645 645
646 646 def _getcorechunk(self):
647 647 """yield chunk for the core part of the bundle
648 648
649 649 (all but headers and parameters)"""
650 650 outdebug(self.ui, 'start of parts')
651 651 for part in self._parts:
652 652 outdebug(self.ui, 'bundle part: "%s"' % part.type)
653 653 for chunk in part.getchunks(ui=self.ui):
654 654 yield chunk
655 655 outdebug(self.ui, 'end of bundle')
656 656 yield _pack(_fpartheadersize, 0)
657 657
658 658
659 659 def salvageoutput(self):
660 660 """return a list with a copy of all output parts in the bundle
661 661
662 662 This is meant to be used during error handling to make sure we preserve
663 663 server output"""
664 664 salvaged = []
665 665 for part in self._parts:
666 666 if part.type.startswith('output'):
667 667 salvaged.append(part.copy())
668 668 return salvaged
669 669
670 670
671 671 class unpackermixin(object):
672 672 """A mixin to extract bytes and struct data from a stream"""
673 673
674 674 def __init__(self, fp):
675 675 self._fp = fp
676 676
677 677 def _unpack(self, format):
678 678 """unpack this struct format from the stream
679 679
680 680 This method is meant for internal usage by the bundle2 protocol only.
681 681 They directly manipulate the low level stream including bundle2 level
682 682 instruction.
683 683
684 684 Do not use it to implement higher-level logic or methods."""
685 685 data = self._readexact(struct.calcsize(format))
686 686 return _unpack(format, data)
687 687
688 688 def _readexact(self, size):
689 689 """read exactly <size> bytes from the stream
690 690
691 691 This method is meant for internal usage by the bundle2 protocol only.
692 692 They directly manipulate the low level stream including bundle2 level
693 693 instruction.
694 694
695 695 Do not use it to implement higher-level logic or methods."""
696 696 return changegroup.readexactly(self._fp, size)
697 697
698 698 def getunbundler(ui, fp, magicstring=None):
699 699 """return a valid unbundler object for a given magicstring"""
700 700 if magicstring is None:
701 701 magicstring = changegroup.readexactly(fp, 4)
702 702 magic, version = magicstring[0:2], magicstring[2:4]
703 703 if magic != 'HG':
704 704 ui.debug(
705 705 "error: invalid magic: %r (version %r), should be 'HG'\n"
706 706 % (magic, version))
707 707 raise error.Abort(_('not a Mercurial bundle'))
708 708 unbundlerclass = formatmap.get(version)
709 709 if unbundlerclass is None:
710 710 raise error.Abort(_('unknown bundle version %s') % version)
711 711 unbundler = unbundlerclass(ui, fp)
712 712 indebug(ui, 'start processing of %s stream' % magicstring)
713 713 return unbundler
714 714
715 715 class unbundle20(unpackermixin):
716 716 """interpret a bundle2 stream
717 717
718 718 This class is fed with a binary stream and yields parts through its
719 719 `iterparts` methods."""
720 720
721 721 _magicstring = 'HG20'
722 722
723 723 def __init__(self, ui, fp):
724 724 """If header is specified, we do not read it out of the stream."""
725 725 self.ui = ui
726 726 self._compengine = util.compengines.forbundletype('UN')
727 727 self._compressed = None
728 728 super(unbundle20, self).__init__(fp)
729 729
730 730 @util.propertycache
731 731 def params(self):
732 732 """dictionary of stream level parameters"""
733 733 indebug(self.ui, 'reading bundle2 stream parameters')
734 734 params = {}
735 735 paramssize = self._unpack(_fstreamparamsize)[0]
736 736 if paramssize < 0:
737 737 raise error.BundleValueError('negative bundle param size: %i'
738 738 % paramssize)
739 739 if paramssize:
740 740 params = self._readexact(paramssize)
741 741 params = self._processallparams(params)
742 742 return params
743 743
744 744 def _processallparams(self, paramsblock):
745 745 """"""
746 746 params = util.sortdict()
747 747 for p in paramsblock.split(' '):
748 748 p = p.split('=', 1)
749 749 p = [urlreq.unquote(i) for i in p]
750 750 if len(p) < 2:
751 751 p.append(None)
752 752 self._processparam(*p)
753 753 params[p[0]] = p[1]
754 754 return params
755 755
756 756
757 757 def _processparam(self, name, value):
758 758 """process a parameter, applying its effect if needed
759 759
760 760 Parameter starting with a lower case letter are advisory and will be
761 761 ignored when unknown. Those starting with an upper case letter are
762 762 mandatory and will this function will raise a KeyError when unknown.
763 763
764 764 Note: no option are currently supported. Any input will be either
765 765 ignored or failing.
766 766 """
767 767 if not name:
768 768 raise ValueError('empty parameter name')
769 769 if name[0] not in string.letters:
770 770 raise ValueError('non letter first character: %r' % name)
771 771 try:
772 772 handler = b2streamparamsmap[name.lower()]
773 773 except KeyError:
774 774 if name[0].islower():
775 775 indebug(self.ui, "ignoring unknown parameter %r" % name)
776 776 else:
777 777 raise error.BundleUnknownFeatureError(params=(name,))
778 778 else:
779 779 handler(self, name, value)
780 780
781 781 def _forwardchunks(self):
782 782 """utility to transfer a bundle2 as binary
783 783
784 784 This is made necessary by the fact the 'getbundle' command over 'ssh'
785 785 have no way to know then the reply end, relying on the bundle to be
786 786 interpreted to know its end. This is terrible and we are sorry, but we
787 787 needed to move forward to get general delta enabled.
788 788 """
789 789 yield self._magicstring
790 790 assert 'params' not in vars(self)
791 791 paramssize = self._unpack(_fstreamparamsize)[0]
792 792 if paramssize < 0:
793 793 raise error.BundleValueError('negative bundle param size: %i'
794 794 % paramssize)
795 795 yield _pack(_fstreamparamsize, paramssize)
796 796 if paramssize:
797 797 params = self._readexact(paramssize)
798 798 self._processallparams(params)
799 799 yield params
800 800 assert self._compengine.bundletype == 'UN'
801 801 # From there, payload might need to be decompressed
802 802 self._fp = self._compengine.decompressorreader(self._fp)
803 803 emptycount = 0
804 804 while emptycount < 2:
805 805 # so we can brainlessly loop
806 806 assert _fpartheadersize == _fpayloadsize
807 807 size = self._unpack(_fpartheadersize)[0]
808 808 yield _pack(_fpartheadersize, size)
809 809 if size:
810 810 emptycount = 0
811 811 else:
812 812 emptycount += 1
813 813 continue
814 814 if size == flaginterrupt:
815 815 continue
816 816 elif size < 0:
817 817 raise error.BundleValueError('negative chunk size: %i')
818 818 yield self._readexact(size)
819 819
820 820
821 821 def iterparts(self):
822 822 """yield all parts contained in the stream"""
823 823 # make sure param have been loaded
824 824 self.params
825 825 # From there, payload need to be decompressed
826 826 self._fp = self._compengine.decompressorreader(self._fp)
827 827 indebug(self.ui, 'start extraction of bundle2 parts')
828 828 headerblock = self._readpartheader()
829 829 while headerblock is not None:
830 830 part = unbundlepart(self.ui, headerblock, self._fp)
831 831 yield part
832 832 part.seek(0, 2)
833 833 headerblock = self._readpartheader()
834 834 indebug(self.ui, 'end of bundle2 stream')
835 835
836 836 def _readpartheader(self):
837 837 """reads a part header size and return the bytes blob
838 838
839 839 returns None if empty"""
840 840 headersize = self._unpack(_fpartheadersize)[0]
841 841 if headersize < 0:
842 842 raise error.BundleValueError('negative part header size: %i'
843 843 % headersize)
844 844 indebug(self.ui, 'part header size: %i' % headersize)
845 845 if headersize:
846 846 return self._readexact(headersize)
847 847 return None
848 848
849 849 def compressed(self):
850 850 self.params # load params
851 851 return self._compressed
852 852
853 853 def close(self):
854 854 """close underlying file"""
855 855 if util.safehasattr(self._fp, 'close'):
856 856 return self._fp.close()
857 857
858 858 formatmap = {'20': unbundle20}
859 859
860 860 b2streamparamsmap = {}
861 861
862 862 def b2streamparamhandler(name):
863 863 """register a handler for a stream level parameter"""
864 864 def decorator(func):
865 865 assert name not in formatmap
866 866 b2streamparamsmap[name] = func
867 867 return func
868 868 return decorator
869 869
870 870 @b2streamparamhandler('compression')
871 871 def processcompression(unbundler, param, value):
872 872 """read compression parameter and install payload decompression"""
873 873 if value not in util.compengines.supportedbundletypes:
874 874 raise error.BundleUnknownFeatureError(params=(param,),
875 875 values=(value,))
876 876 unbundler._compengine = util.compengines.forbundletype(value)
877 877 if value is not None:
878 878 unbundler._compressed = True
879 879
880 880 class bundlepart(object):
881 881 """A bundle2 part contains application level payload
882 882
883 883 The part `type` is used to route the part to the application level
884 884 handler.
885 885
886 886 The part payload is contained in ``part.data``. It could be raw bytes or a
887 887 generator of byte chunks.
888 888
889 889 You can add parameters to the part using the ``addparam`` method.
890 890 Parameters can be either mandatory (default) or advisory. Remote side
891 891 should be able to safely ignore the advisory ones.
892 892
893 893 Both data and parameters cannot be modified after the generation has begun.
894 894 """
895 895
896 896 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
897 897 data='', mandatory=True):
898 898 validateparttype(parttype)
899 899 self.id = None
900 900 self.type = parttype
901 901 self._data = data
902 902 self._mandatoryparams = list(mandatoryparams)
903 903 self._advisoryparams = list(advisoryparams)
904 904 # checking for duplicated entries
905 905 self._seenparams = set()
906 906 for pname, __ in self._mandatoryparams + self._advisoryparams:
907 907 if pname in self._seenparams:
908 908 raise error.ProgrammingError('duplicated params: %s' % pname)
909 909 self._seenparams.add(pname)
910 910 # status of the part's generation:
911 911 # - None: not started,
912 912 # - False: currently generated,
913 913 # - True: generation done.
914 914 self._generated = None
915 915 self.mandatory = mandatory
916 916
917 917 def __repr__(self):
918 918 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
919 919 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
920 920 % (cls, id(self), self.id, self.type, self.mandatory))
921 921
922 922 def copy(self):
923 923 """return a copy of the part
924 924
925 925 The new part have the very same content but no partid assigned yet.
926 926 Parts with generated data cannot be copied."""
927 927 assert not util.safehasattr(self.data, 'next')
928 928 return self.__class__(self.type, self._mandatoryparams,
929 929 self._advisoryparams, self._data, self.mandatory)
930 930
931 931 # methods used to defines the part content
932 932 @property
933 933 def data(self):
934 934 return self._data
935 935
936 936 @data.setter
937 937 def data(self, data):
938 938 if self._generated is not None:
939 939 raise error.ReadOnlyPartError('part is being generated')
940 940 self._data = data
941 941
942 942 @property
943 943 def mandatoryparams(self):
944 944 # make it an immutable tuple to force people through ``addparam``
945 945 return tuple(self._mandatoryparams)
946 946
947 947 @property
948 948 def advisoryparams(self):
949 949 # make it an immutable tuple to force people through ``addparam``
950 950 return tuple(self._advisoryparams)
951 951
952 952 def addparam(self, name, value='', mandatory=True):
953 953 """add a parameter to the part
954 954
955 955 If 'mandatory' is set to True, the remote handler must claim support
956 956 for this parameter or the unbundling will be aborted.
957 957
958 958 The 'name' and 'value' cannot exceed 255 bytes each.
959 959 """
960 960 if self._generated is not None:
961 961 raise error.ReadOnlyPartError('part is being generated')
962 962 if name in self._seenparams:
963 963 raise ValueError('duplicated params: %s' % name)
964 964 self._seenparams.add(name)
965 965 params = self._advisoryparams
966 966 if mandatory:
967 967 params = self._mandatoryparams
968 968 params.append((name, value))
969 969
970 970 # methods used to generates the bundle2 stream
971 971 def getchunks(self, ui):
972 972 if self._generated is not None:
973 973 raise error.ProgrammingError('part can only be consumed once')
974 974 self._generated = False
975 975
976 976 if ui.debugflag:
977 977 msg = ['bundle2-output-part: "%s"' % self.type]
978 978 if not self.mandatory:
979 979 msg.append(' (advisory)')
980 980 nbmp = len(self.mandatoryparams)
981 981 nbap = len(self.advisoryparams)
982 982 if nbmp or nbap:
983 983 msg.append(' (params:')
984 984 if nbmp:
985 985 msg.append(' %i mandatory' % nbmp)
986 986 if nbap:
987 987 msg.append(' %i advisory' % nbmp)
988 988 msg.append(')')
989 989 if not self.data:
990 990 msg.append(' empty payload')
991 991 elif util.safehasattr(self.data, 'next'):
992 992 msg.append(' streamed payload')
993 993 else:
994 994 msg.append(' %i bytes payload' % len(self.data))
995 995 msg.append('\n')
996 996 ui.debug(''.join(msg))
997 997
998 998 #### header
999 999 if self.mandatory:
1000 1000 parttype = self.type.upper()
1001 1001 else:
1002 1002 parttype = self.type.lower()
1003 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
1003 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1004 1004 ## parttype
1005 1005 header = [_pack(_fparttypesize, len(parttype)),
1006 1006 parttype, _pack(_fpartid, self.id),
1007 1007 ]
1008 1008 ## parameters
1009 1009 # count
1010 1010 manpar = self.mandatoryparams
1011 1011 advpar = self.advisoryparams
1012 1012 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1013 1013 # size
1014 1014 parsizes = []
1015 1015 for key, value in manpar:
1016 1016 parsizes.append(len(key))
1017 1017 parsizes.append(len(value))
1018 1018 for key, value in advpar:
1019 1019 parsizes.append(len(key))
1020 1020 parsizes.append(len(value))
1021 1021 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1022 1022 header.append(paramsizes)
1023 1023 # key, value
1024 1024 for key, value in manpar:
1025 1025 header.append(key)
1026 1026 header.append(value)
1027 1027 for key, value in advpar:
1028 1028 header.append(key)
1029 1029 header.append(value)
1030 1030 ## finalize header
1031 1031 headerchunk = ''.join(header)
1032 1032 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1033 1033 yield _pack(_fpartheadersize, len(headerchunk))
1034 1034 yield headerchunk
1035 1035 ## payload
1036 1036 try:
1037 1037 for chunk in self._payloadchunks():
1038 1038 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1039 1039 yield _pack(_fpayloadsize, len(chunk))
1040 1040 yield chunk
1041 1041 except GeneratorExit:
1042 1042 # GeneratorExit means that nobody is listening for our
1043 1043 # results anyway, so just bail quickly rather than trying
1044 1044 # to produce an error part.
1045 1045 ui.debug('bundle2-generatorexit\n')
1046 1046 raise
1047 1047 except BaseException as exc:
1048 1048 bexc = util.forcebytestr(exc)
1049 1049 # backup exception data for later
1050 1050 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1051 1051 % bexc)
1052 1052 tb = sys.exc_info()[2]
1053 1053 msg = 'unexpected error: %s' % bexc
1054 1054 interpart = bundlepart('error:abort', [('message', msg)],
1055 1055 mandatory=False)
1056 1056 interpart.id = 0
1057 1057 yield _pack(_fpayloadsize, -1)
1058 1058 for chunk in interpart.getchunks(ui=ui):
1059 1059 yield chunk
1060 1060 outdebug(ui, 'closing payload chunk')
1061 1061 # abort current part payload
1062 1062 yield _pack(_fpayloadsize, 0)
1063 1063 pycompat.raisewithtb(exc, tb)
1064 1064 # end of payload
1065 1065 outdebug(ui, 'closing payload chunk')
1066 1066 yield _pack(_fpayloadsize, 0)
1067 1067 self._generated = True
1068 1068
1069 1069 def _payloadchunks(self):
1070 1070 """yield chunks of a the part payload
1071 1071
1072 1072 Exists to handle the different methods to provide data to a part."""
1073 1073 # we only support fixed size data now.
1074 1074 # This will be improved in the future.
1075 1075 if (util.safehasattr(self.data, 'next')
1076 1076 or util.safehasattr(self.data, '__next__')):
1077 1077 buff = util.chunkbuffer(self.data)
1078 1078 chunk = buff.read(preferedchunksize)
1079 1079 while chunk:
1080 1080 yield chunk
1081 1081 chunk = buff.read(preferedchunksize)
1082 1082 elif len(self.data):
1083 1083 yield self.data
1084 1084
1085 1085
1086 1086 flaginterrupt = -1
1087 1087
1088 1088 class interrupthandler(unpackermixin):
1089 1089 """read one part and process it with restricted capability
1090 1090
1091 1091 This allows to transmit exception raised on the producer size during part
1092 1092 iteration while the consumer is reading a part.
1093 1093
1094 1094 Part processed in this manner only have access to a ui object,"""
1095 1095
1096 1096 def __init__(self, ui, fp):
1097 1097 super(interrupthandler, self).__init__(fp)
1098 1098 self.ui = ui
1099 1099
1100 1100 def _readpartheader(self):
1101 1101 """reads a part header size and return the bytes blob
1102 1102
1103 1103 returns None if empty"""
1104 1104 headersize = self._unpack(_fpartheadersize)[0]
1105 1105 if headersize < 0:
1106 1106 raise error.BundleValueError('negative part header size: %i'
1107 1107 % headersize)
1108 1108 indebug(self.ui, 'part header size: %i\n' % headersize)
1109 1109 if headersize:
1110 1110 return self._readexact(headersize)
1111 1111 return None
1112 1112
1113 1113 def __call__(self):
1114 1114
1115 1115 self.ui.debug('bundle2-input-stream-interrupt:'
1116 1116 ' opening out of band context\n')
1117 1117 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1118 1118 headerblock = self._readpartheader()
1119 1119 if headerblock is None:
1120 1120 indebug(self.ui, 'no part found during interruption.')
1121 1121 return
1122 1122 part = unbundlepart(self.ui, headerblock, self._fp)
1123 1123 op = interruptoperation(self.ui)
1124 1124 _processpart(op, part)
1125 1125 self.ui.debug('bundle2-input-stream-interrupt:'
1126 1126 ' closing out of band context\n')
1127 1127
1128 1128 class interruptoperation(object):
1129 1129 """A limited operation to be use by part handler during interruption
1130 1130
1131 1131 It only have access to an ui object.
1132 1132 """
1133 1133
1134 1134 def __init__(self, ui):
1135 1135 self.ui = ui
1136 1136 self.reply = None
1137 1137 self.captureoutput = False
1138 1138
1139 1139 @property
1140 1140 def repo(self):
1141 1141 raise error.ProgrammingError('no repo access from stream interruption')
1142 1142
1143 1143 def gettransaction(self):
1144 1144 raise TransactionUnavailable('no repo access from stream interruption')
1145 1145
1146 1146 class unbundlepart(unpackermixin):
1147 1147 """a bundle part read from a bundle"""
1148 1148
1149 1149 def __init__(self, ui, header, fp):
1150 1150 super(unbundlepart, self).__init__(fp)
1151 1151 self._seekable = (util.safehasattr(fp, 'seek') and
1152 1152 util.safehasattr(fp, 'tell'))
1153 1153 self.ui = ui
1154 1154 # unbundle state attr
1155 1155 self._headerdata = header
1156 1156 self._headeroffset = 0
1157 1157 self._initialized = False
1158 1158 self.consumed = False
1159 1159 # part data
1160 1160 self.id = None
1161 1161 self.type = None
1162 1162 self.mandatoryparams = None
1163 1163 self.advisoryparams = None
1164 1164 self.params = None
1165 1165 self.mandatorykeys = ()
1166 1166 self._payloadstream = None
1167 1167 self._readheader()
1168 1168 self._mandatory = None
1169 1169 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1170 1170 self._pos = 0
1171 1171
1172 1172 def _fromheader(self, size):
1173 1173 """return the next <size> byte from the header"""
1174 1174 offset = self._headeroffset
1175 1175 data = self._headerdata[offset:(offset + size)]
1176 1176 self._headeroffset = offset + size
1177 1177 return data
1178 1178
1179 1179 def _unpackheader(self, format):
1180 1180 """read given format from header
1181 1181
1182 1182 This automatically compute the size of the format to read."""
1183 1183 data = self._fromheader(struct.calcsize(format))
1184 1184 return _unpack(format, data)
1185 1185
1186 1186 def _initparams(self, mandatoryparams, advisoryparams):
1187 1187 """internal function to setup all logic related parameters"""
1188 1188 # make it read only to prevent people touching it by mistake.
1189 1189 self.mandatoryparams = tuple(mandatoryparams)
1190 1190 self.advisoryparams = tuple(advisoryparams)
1191 1191 # user friendly UI
1192 1192 self.params = util.sortdict(self.mandatoryparams)
1193 1193 self.params.update(self.advisoryparams)
1194 1194 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1195 1195
1196 1196 def _payloadchunks(self, chunknum=0):
1197 1197 '''seek to specified chunk and start yielding data'''
1198 1198 if len(self._chunkindex) == 0:
1199 1199 assert chunknum == 0, 'Must start with chunk 0'
1200 1200 self._chunkindex.append((0, self._tellfp()))
1201 1201 else:
1202 1202 assert chunknum < len(self._chunkindex), \
1203 1203 'Unknown chunk %d' % chunknum
1204 1204 self._seekfp(self._chunkindex[chunknum][1])
1205 1205
1206 1206 pos = self._chunkindex[chunknum][0]
1207 1207 payloadsize = self._unpack(_fpayloadsize)[0]
1208 1208 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1209 1209 while payloadsize:
1210 1210 if payloadsize == flaginterrupt:
1211 1211 # interruption detection, the handler will now read a
1212 1212 # single part and process it.
1213 1213 interrupthandler(self.ui, self._fp)()
1214 1214 elif payloadsize < 0:
1215 1215 msg = 'negative payload chunk size: %i' % payloadsize
1216 1216 raise error.BundleValueError(msg)
1217 1217 else:
1218 1218 result = self._readexact(payloadsize)
1219 1219 chunknum += 1
1220 1220 pos += payloadsize
1221 1221 if chunknum == len(self._chunkindex):
1222 1222 self._chunkindex.append((pos, self._tellfp()))
1223 1223 yield result
1224 1224 payloadsize = self._unpack(_fpayloadsize)[0]
1225 1225 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1226 1226
1227 1227 def _findchunk(self, pos):
1228 1228 '''for a given payload position, return a chunk number and offset'''
1229 1229 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1230 1230 if ppos == pos:
1231 1231 return chunk, 0
1232 1232 elif ppos > pos:
1233 1233 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1234 1234 raise ValueError('Unknown chunk')
1235 1235
1236 1236 def _readheader(self):
1237 1237 """read the header and setup the object"""
1238 1238 typesize = self._unpackheader(_fparttypesize)[0]
1239 1239 self.type = self._fromheader(typesize)
1240 1240 indebug(self.ui, 'part type: "%s"' % self.type)
1241 1241 self.id = self._unpackheader(_fpartid)[0]
1242 indebug(self.ui, 'part id: "%s"' % self.id)
1242 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1243 1243 # extract mandatory bit from type
1244 1244 self.mandatory = (self.type != self.type.lower())
1245 1245 self.type = self.type.lower()
1246 1246 ## reading parameters
1247 1247 # param count
1248 1248 mancount, advcount = self._unpackheader(_fpartparamcount)
1249 1249 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1250 1250 # param size
1251 1251 fparamsizes = _makefpartparamsizes(mancount + advcount)
1252 1252 paramsizes = self._unpackheader(fparamsizes)
1253 1253 # make it a list of couple again
1254 1254 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1255 1255 # split mandatory from advisory
1256 1256 mansizes = paramsizes[:mancount]
1257 1257 advsizes = paramsizes[mancount:]
1258 1258 # retrieve param value
1259 1259 manparams = []
1260 1260 for key, value in mansizes:
1261 1261 manparams.append((self._fromheader(key), self._fromheader(value)))
1262 1262 advparams = []
1263 1263 for key, value in advsizes:
1264 1264 advparams.append((self._fromheader(key), self._fromheader(value)))
1265 1265 self._initparams(manparams, advparams)
1266 1266 ## part payload
1267 1267 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1268 1268 # we read the data, tell it
1269 1269 self._initialized = True
1270 1270
1271 1271 def read(self, size=None):
1272 1272 """read payload data"""
1273 1273 if not self._initialized:
1274 1274 self._readheader()
1275 1275 if size is None:
1276 1276 data = self._payloadstream.read()
1277 1277 else:
1278 1278 data = self._payloadstream.read(size)
1279 1279 self._pos += len(data)
1280 1280 if size is None or len(data) < size:
1281 1281 if not self.consumed and self._pos:
1282 1282 self.ui.debug('bundle2-input-part: total payload size %i\n'
1283 1283 % self._pos)
1284 1284 self.consumed = True
1285 1285 return data
1286 1286
1287 1287 def tell(self):
1288 1288 return self._pos
1289 1289
1290 1290 def seek(self, offset, whence=0):
1291 1291 if whence == 0:
1292 1292 newpos = offset
1293 1293 elif whence == 1:
1294 1294 newpos = self._pos + offset
1295 1295 elif whence == 2:
1296 1296 if not self.consumed:
1297 1297 self.read()
1298 1298 newpos = self._chunkindex[-1][0] - offset
1299 1299 else:
1300 1300 raise ValueError('Unknown whence value: %r' % (whence,))
1301 1301
1302 1302 if newpos > self._chunkindex[-1][0] and not self.consumed:
1303 1303 self.read()
1304 1304 if not 0 <= newpos <= self._chunkindex[-1][0]:
1305 1305 raise ValueError('Offset out of range')
1306 1306
1307 1307 if self._pos != newpos:
1308 1308 chunk, internaloffset = self._findchunk(newpos)
1309 1309 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1310 1310 adjust = self.read(internaloffset)
1311 1311 if len(adjust) != internaloffset:
1312 1312 raise error.Abort(_('Seek failed\n'))
1313 1313 self._pos = newpos
1314 1314
1315 1315 def _seekfp(self, offset, whence=0):
1316 1316 """move the underlying file pointer
1317 1317
1318 1318 This method is meant for internal usage by the bundle2 protocol only.
1319 1319 They directly manipulate the low level stream including bundle2 level
1320 1320 instruction.
1321 1321
1322 1322 Do not use it to implement higher-level logic or methods."""
1323 1323 if self._seekable:
1324 1324 return self._fp.seek(offset, whence)
1325 1325 else:
1326 1326 raise NotImplementedError(_('File pointer is not seekable'))
1327 1327
1328 1328 def _tellfp(self):
1329 1329 """return the file offset, or None if file is not seekable
1330 1330
1331 1331 This method is meant for internal usage by the bundle2 protocol only.
1332 1332 They directly manipulate the low level stream including bundle2 level
1333 1333 instruction.
1334 1334
1335 1335 Do not use it to implement higher-level logic or methods."""
1336 1336 if self._seekable:
1337 1337 try:
1338 1338 return self._fp.tell()
1339 1339 except IOError as e:
1340 1340 if e.errno == errno.ESPIPE:
1341 1341 self._seekable = False
1342 1342 else:
1343 1343 raise
1344 1344 return None
1345 1345
1346 1346 # These are only the static capabilities.
1347 1347 # Check the 'getrepocaps' function for the rest.
1348 1348 capabilities = {'HG20': (),
1349 1349 'error': ('abort', 'unsupportedcontent', 'pushraced',
1350 1350 'pushkey'),
1351 1351 'listkeys': (),
1352 1352 'pushkey': (),
1353 1353 'digests': tuple(sorted(util.DIGESTS.keys())),
1354 1354 'remote-changegroup': ('http', 'https'),
1355 1355 'hgtagsfnodes': (),
1356 1356 }
1357 1357
1358 1358 def getrepocaps(repo, allowpushback=False):
1359 1359 """return the bundle2 capabilities for a given repo
1360 1360
1361 1361 Exists to allow extensions (like evolution) to mutate the capabilities.
1362 1362 """
1363 1363 caps = capabilities.copy()
1364 1364 caps['changegroup'] = tuple(sorted(
1365 1365 changegroup.supportedincomingversions(repo)))
1366 1366 if obsolete.isenabled(repo, obsolete.exchangeopt):
1367 1367 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1368 1368 caps['obsmarkers'] = supportedformat
1369 1369 if allowpushback:
1370 1370 caps['pushback'] = ()
1371 1371 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1372 1372 if cpmode == 'check-related':
1373 1373 caps['checkheads'] = ('related',)
1374 1374 return caps
1375 1375
1376 1376 def bundle2caps(remote):
1377 1377 """return the bundle capabilities of a peer as dict"""
1378 1378 raw = remote.capable('bundle2')
1379 1379 if not raw and raw != '':
1380 1380 return {}
1381 1381 capsblob = urlreq.unquote(remote.capable('bundle2'))
1382 1382 return decodecaps(capsblob)
1383 1383
1384 1384 def obsmarkersversion(caps):
1385 1385 """extract the list of supported obsmarkers versions from a bundle2caps dict
1386 1386 """
1387 1387 obscaps = caps.get('obsmarkers', ())
1388 1388 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1389 1389
1390 1390 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1391 1391 vfs=None, compression=None, compopts=None):
1392 1392 if bundletype.startswith('HG10'):
1393 1393 cg = changegroup.getchangegroup(repo, source, outgoing, version='01')
1394 1394 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1395 1395 compression=compression, compopts=compopts)
1396 1396 elif not bundletype.startswith('HG20'):
1397 1397 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1398 1398
1399 1399 caps = {}
1400 1400 if 'obsolescence' in opts:
1401 1401 caps['obsmarkers'] = ('V1',)
1402 1402 bundle = bundle20(ui, caps)
1403 1403 bundle.setcompression(compression, compopts)
1404 1404 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1405 1405 chunkiter = bundle.getchunks()
1406 1406
1407 1407 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1408 1408
1409 1409 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1410 1410 # We should eventually reconcile this logic with the one behind
1411 1411 # 'exchange.getbundle2partsgenerator'.
1412 1412 #
1413 1413 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1414 1414 # different right now. So we keep them separated for now for the sake of
1415 1415 # simplicity.
1416 1416
1417 1417 # we always want a changegroup in such bundle
1418 1418 cgversion = opts.get('cg.version')
1419 1419 if cgversion is None:
1420 1420 cgversion = changegroup.safeversion(repo)
1421 1421 cg = changegroup.getchangegroup(repo, source, outgoing,
1422 1422 version=cgversion)
1423 1423 part = bundler.newpart('changegroup', data=cg.getchunks())
1424 1424 part.addparam('version', cg.version)
1425 1425 if 'clcount' in cg.extras:
1426 1426 part.addparam('nbchanges', str(cg.extras['clcount']),
1427 1427 mandatory=False)
1428 1428 if opts.get('phases') and repo.revs('%ln and secret()',
1429 1429 outgoing.missingheads):
1430 1430 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1431 1431
1432 1432 addparttagsfnodescache(repo, bundler, outgoing)
1433 1433
1434 1434 if opts.get('obsolescence', False):
1435 1435 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1436 1436 buildobsmarkerspart(bundler, obsmarkers)
1437 1437
1438 1438 if opts.get('phases', False):
1439 1439 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1440 1440 phasedata = []
1441 1441 for phase in phases.allphases:
1442 1442 for head in headsbyphase[phase]:
1443 1443 phasedata.append(_pack(_fphasesentry, phase, head))
1444 1444 bundler.newpart('phase-heads', data=''.join(phasedata))
1445 1445
1446 1446 def addparttagsfnodescache(repo, bundler, outgoing):
1447 1447 # we include the tags fnode cache for the bundle changeset
1448 1448 # (as an optional parts)
1449 1449 cache = tags.hgtagsfnodescache(repo.unfiltered())
1450 1450 chunks = []
1451 1451
1452 1452 # .hgtags fnodes are only relevant for head changesets. While we could
1453 1453 # transfer values for all known nodes, there will likely be little to
1454 1454 # no benefit.
1455 1455 #
1456 1456 # We don't bother using a generator to produce output data because
1457 1457 # a) we only have 40 bytes per head and even esoteric numbers of heads
1458 1458 # consume little memory (1M heads is 40MB) b) we don't want to send the
1459 1459 # part if we don't have entries and knowing if we have entries requires
1460 1460 # cache lookups.
1461 1461 for node in outgoing.missingheads:
1462 1462 # Don't compute missing, as this may slow down serving.
1463 1463 fnode = cache.getfnode(node, computemissing=False)
1464 1464 if fnode is not None:
1465 1465 chunks.extend([node, fnode])
1466 1466
1467 1467 if chunks:
1468 1468 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1469 1469
1470 1470 def buildobsmarkerspart(bundler, markers):
1471 1471 """add an obsmarker part to the bundler with <markers>
1472 1472
1473 1473 No part is created if markers is empty.
1474 1474 Raises ValueError if the bundler doesn't support any known obsmarker format.
1475 1475 """
1476 1476 if not markers:
1477 1477 return None
1478 1478
1479 1479 remoteversions = obsmarkersversion(bundler.capabilities)
1480 1480 version = obsolete.commonversion(remoteversions)
1481 1481 if version is None:
1482 1482 raise ValueError('bundler does not support common obsmarker format')
1483 1483 stream = obsolete.encodemarkers(markers, True, version=version)
1484 1484 return bundler.newpart('obsmarkers', data=stream)
1485 1485
1486 1486 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1487 1487 compopts=None):
1488 1488 """Write a bundle file and return its filename.
1489 1489
1490 1490 Existing files will not be overwritten.
1491 1491 If no filename is specified, a temporary file is created.
1492 1492 bz2 compression can be turned off.
1493 1493 The bundle file will be deleted in case of errors.
1494 1494 """
1495 1495
1496 1496 if bundletype == "HG20":
1497 1497 bundle = bundle20(ui)
1498 1498 bundle.setcompression(compression, compopts)
1499 1499 part = bundle.newpart('changegroup', data=cg.getchunks())
1500 1500 part.addparam('version', cg.version)
1501 1501 if 'clcount' in cg.extras:
1502 1502 part.addparam('nbchanges', str(cg.extras['clcount']),
1503 1503 mandatory=False)
1504 1504 chunkiter = bundle.getchunks()
1505 1505 else:
1506 1506 # compression argument is only for the bundle2 case
1507 1507 assert compression is None
1508 1508 if cg.version != '01':
1509 1509 raise error.Abort(_('old bundle types only supports v1 '
1510 1510 'changegroups'))
1511 1511 header, comp = bundletypes[bundletype]
1512 1512 if comp not in util.compengines.supportedbundletypes:
1513 1513 raise error.Abort(_('unknown stream compression type: %s')
1514 1514 % comp)
1515 1515 compengine = util.compengines.forbundletype(comp)
1516 1516 def chunkiter():
1517 1517 yield header
1518 1518 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1519 1519 yield chunk
1520 1520 chunkiter = chunkiter()
1521 1521
1522 1522 # parse the changegroup data, otherwise we will block
1523 1523 # in case of sshrepo because we don't know the end of the stream
1524 1524 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1525 1525
1526 1526 def combinechangegroupresults(op):
1527 1527 """logic to combine 0 or more addchangegroup results into one"""
1528 1528 results = [r.get('return', 0)
1529 1529 for r in op.records['changegroup']]
1530 1530 changedheads = 0
1531 1531 result = 1
1532 1532 for ret in results:
1533 1533 # If any changegroup result is 0, return 0
1534 1534 if ret == 0:
1535 1535 result = 0
1536 1536 break
1537 1537 if ret < -1:
1538 1538 changedheads += ret + 1
1539 1539 elif ret > 1:
1540 1540 changedheads += ret - 1
1541 1541 if changedheads > 0:
1542 1542 result = 1 + changedheads
1543 1543 elif changedheads < 0:
1544 1544 result = -1 + changedheads
1545 1545 return result
1546 1546
1547 1547 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1548 1548 'targetphase'))
1549 1549 def handlechangegroup(op, inpart):
1550 1550 """apply a changegroup part on the repo
1551 1551
1552 1552 This is a very early implementation that will massive rework before being
1553 1553 inflicted to any end-user.
1554 1554 """
1555 1555 tr = op.gettransaction()
1556 1556 unpackerversion = inpart.params.get('version', '01')
1557 1557 # We should raise an appropriate exception here
1558 1558 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1559 1559 # the source and url passed here are overwritten by the one contained in
1560 1560 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1561 1561 nbchangesets = None
1562 1562 if 'nbchanges' in inpart.params:
1563 1563 nbchangesets = int(inpart.params.get('nbchanges'))
1564 1564 if ('treemanifest' in inpart.params and
1565 1565 'treemanifest' not in op.repo.requirements):
1566 1566 if len(op.repo.changelog) != 0:
1567 1567 raise error.Abort(_(
1568 1568 "bundle contains tree manifests, but local repo is "
1569 1569 "non-empty and does not use tree manifests"))
1570 1570 op.repo.requirements.add('treemanifest')
1571 1571 op.repo._applyopenerreqs()
1572 1572 op.repo._writerequirements()
1573 1573 extrakwargs = {}
1574 1574 targetphase = inpart.params.get('targetphase')
1575 1575 if targetphase is not None:
1576 1576 extrakwargs['targetphase'] = int(targetphase)
1577 1577 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1578 1578 expectedtotal=nbchangesets, **extrakwargs)
1579 1579 if op.reply is not None:
1580 1580 # This is definitely not the final form of this
1581 1581 # return. But one need to start somewhere.
1582 1582 part = op.reply.newpart('reply:changegroup', mandatory=False)
1583 1583 part.addparam(
1584 1584 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1585 1585 part.addparam('return', '%i' % ret, mandatory=False)
1586 1586 assert not inpart.read()
1587 1587
1588 1588 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1589 1589 ['digest:%s' % k for k in util.DIGESTS.keys()])
1590 1590 @parthandler('remote-changegroup', _remotechangegroupparams)
1591 1591 def handleremotechangegroup(op, inpart):
1592 1592 """apply a bundle10 on the repo, given an url and validation information
1593 1593
1594 1594 All the information about the remote bundle to import are given as
1595 1595 parameters. The parameters include:
1596 1596 - url: the url to the bundle10.
1597 1597 - size: the bundle10 file size. It is used to validate what was
1598 1598 retrieved by the client matches the server knowledge about the bundle.
1599 1599 - digests: a space separated list of the digest types provided as
1600 1600 parameters.
1601 1601 - digest:<digest-type>: the hexadecimal representation of the digest with
1602 1602 that name. Like the size, it is used to validate what was retrieved by
1603 1603 the client matches what the server knows about the bundle.
1604 1604
1605 1605 When multiple digest types are given, all of them are checked.
1606 1606 """
1607 1607 try:
1608 1608 raw_url = inpart.params['url']
1609 1609 except KeyError:
1610 1610 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1611 1611 parsed_url = util.url(raw_url)
1612 1612 if parsed_url.scheme not in capabilities['remote-changegroup']:
1613 1613 raise error.Abort(_('remote-changegroup does not support %s urls') %
1614 1614 parsed_url.scheme)
1615 1615
1616 1616 try:
1617 1617 size = int(inpart.params['size'])
1618 1618 except ValueError:
1619 1619 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1620 1620 % 'size')
1621 1621 except KeyError:
1622 1622 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1623 1623
1624 1624 digests = {}
1625 1625 for typ in inpart.params.get('digests', '').split():
1626 1626 param = 'digest:%s' % typ
1627 1627 try:
1628 1628 value = inpart.params[param]
1629 1629 except KeyError:
1630 1630 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1631 1631 param)
1632 1632 digests[typ] = value
1633 1633
1634 1634 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1635 1635
1636 1636 tr = op.gettransaction()
1637 1637 from . import exchange
1638 1638 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1639 1639 if not isinstance(cg, changegroup.cg1unpacker):
1640 1640 raise error.Abort(_('%s: not a bundle version 1.0') %
1641 1641 util.hidepassword(raw_url))
1642 1642 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1643 1643 if op.reply is not None:
1644 1644 # This is definitely not the final form of this
1645 1645 # return. But one need to start somewhere.
1646 1646 part = op.reply.newpart('reply:changegroup')
1647 1647 part.addparam(
1648 1648 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1649 1649 part.addparam('return', '%i' % ret, mandatory=False)
1650 1650 try:
1651 1651 real_part.validate()
1652 1652 except error.Abort as e:
1653 1653 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1654 1654 (util.hidepassword(raw_url), str(e)))
1655 1655 assert not inpart.read()
1656 1656
1657 1657 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1658 1658 def handlereplychangegroup(op, inpart):
1659 1659 ret = int(inpart.params['return'])
1660 1660 replyto = int(inpart.params['in-reply-to'])
1661 1661 op.records.add('changegroup', {'return': ret}, replyto)
1662 1662
1663 1663 @parthandler('check:heads')
1664 1664 def handlecheckheads(op, inpart):
1665 1665 """check that head of the repo did not change
1666 1666
1667 1667 This is used to detect a push race when using unbundle.
1668 1668 This replaces the "heads" argument of unbundle."""
1669 1669 h = inpart.read(20)
1670 1670 heads = []
1671 1671 while len(h) == 20:
1672 1672 heads.append(h)
1673 1673 h = inpart.read(20)
1674 1674 assert not h
1675 1675 # Trigger a transaction so that we are guaranteed to have the lock now.
1676 1676 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1677 1677 op.gettransaction()
1678 1678 if sorted(heads) != sorted(op.repo.heads()):
1679 1679 raise error.PushRaced('repository changed while pushing - '
1680 1680 'please try again')
1681 1681
1682 1682 @parthandler('check:updated-heads')
1683 1683 def handlecheckupdatedheads(op, inpart):
1684 1684 """check for race on the heads touched by a push
1685 1685
1686 1686 This is similar to 'check:heads' but focus on the heads actually updated
1687 1687 during the push. If other activities happen on unrelated heads, it is
1688 1688 ignored.
1689 1689
1690 1690 This allow server with high traffic to avoid push contention as long as
1691 1691 unrelated parts of the graph are involved."""
1692 1692 h = inpart.read(20)
1693 1693 heads = []
1694 1694 while len(h) == 20:
1695 1695 heads.append(h)
1696 1696 h = inpart.read(20)
1697 1697 assert not h
1698 1698 # trigger a transaction so that we are guaranteed to have the lock now.
1699 1699 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1700 1700 op.gettransaction()
1701 1701
1702 1702 currentheads = set()
1703 1703 for ls in op.repo.branchmap().itervalues():
1704 1704 currentheads.update(ls)
1705 1705
1706 1706 for h in heads:
1707 1707 if h not in currentheads:
1708 1708 raise error.PushRaced('repository changed while pushing - '
1709 1709 'please try again')
1710 1710
1711 1711 @parthandler('output')
1712 1712 def handleoutput(op, inpart):
1713 1713 """forward output captured on the server to the client"""
1714 1714 for line in inpart.read().splitlines():
1715 1715 op.ui.status(_('remote: %s\n') % line)
1716 1716
1717 1717 @parthandler('replycaps')
1718 1718 def handlereplycaps(op, inpart):
1719 1719 """Notify that a reply bundle should be created
1720 1720
1721 1721 The payload contains the capabilities information for the reply"""
1722 1722 caps = decodecaps(inpart.read())
1723 1723 if op.reply is None:
1724 1724 op.reply = bundle20(op.ui, caps)
1725 1725
1726 1726 class AbortFromPart(error.Abort):
1727 1727 """Sub-class of Abort that denotes an error from a bundle2 part."""
1728 1728
1729 1729 @parthandler('error:abort', ('message', 'hint'))
1730 1730 def handleerrorabort(op, inpart):
1731 1731 """Used to transmit abort error over the wire"""
1732 1732 raise AbortFromPart(inpart.params['message'],
1733 1733 hint=inpart.params.get('hint'))
1734 1734
1735 1735 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1736 1736 'in-reply-to'))
1737 1737 def handleerrorpushkey(op, inpart):
1738 1738 """Used to transmit failure of a mandatory pushkey over the wire"""
1739 1739 kwargs = {}
1740 1740 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1741 1741 value = inpart.params.get(name)
1742 1742 if value is not None:
1743 1743 kwargs[name] = value
1744 1744 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1745 1745
1746 1746 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1747 1747 def handleerrorunsupportedcontent(op, inpart):
1748 1748 """Used to transmit unknown content error over the wire"""
1749 1749 kwargs = {}
1750 1750 parttype = inpart.params.get('parttype')
1751 1751 if parttype is not None:
1752 1752 kwargs['parttype'] = parttype
1753 1753 params = inpart.params.get('params')
1754 1754 if params is not None:
1755 1755 kwargs['params'] = params.split('\0')
1756 1756
1757 1757 raise error.BundleUnknownFeatureError(**kwargs)
1758 1758
1759 1759 @parthandler('error:pushraced', ('message',))
1760 1760 def handleerrorpushraced(op, inpart):
1761 1761 """Used to transmit push race error over the wire"""
1762 1762 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1763 1763
1764 1764 @parthandler('listkeys', ('namespace',))
1765 1765 def handlelistkeys(op, inpart):
1766 1766 """retrieve pushkey namespace content stored in a bundle2"""
1767 1767 namespace = inpart.params['namespace']
1768 1768 r = pushkey.decodekeys(inpart.read())
1769 1769 op.records.add('listkeys', (namespace, r))
1770 1770
1771 1771 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1772 1772 def handlepushkey(op, inpart):
1773 1773 """process a pushkey request"""
1774 1774 dec = pushkey.decode
1775 1775 namespace = dec(inpart.params['namespace'])
1776 1776 key = dec(inpart.params['key'])
1777 1777 old = dec(inpart.params['old'])
1778 1778 new = dec(inpart.params['new'])
1779 1779 # Grab the transaction to ensure that we have the lock before performing the
1780 1780 # pushkey.
1781 1781 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1782 1782 op.gettransaction()
1783 1783 ret = op.repo.pushkey(namespace, key, old, new)
1784 1784 record = {'namespace': namespace,
1785 1785 'key': key,
1786 1786 'old': old,
1787 1787 'new': new}
1788 1788 op.records.add('pushkey', record)
1789 1789 if op.reply is not None:
1790 1790 rpart = op.reply.newpart('reply:pushkey')
1791 1791 rpart.addparam(
1792 1792 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1793 1793 rpart.addparam('return', '%i' % ret, mandatory=False)
1794 1794 if inpart.mandatory and not ret:
1795 1795 kwargs = {}
1796 1796 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1797 1797 if key in inpart.params:
1798 1798 kwargs[key] = inpart.params[key]
1799 1799 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1800 1800
1801 1801 def _readphaseheads(inpart):
1802 1802 headsbyphase = [[] for i in phases.allphases]
1803 1803 entrysize = struct.calcsize(_fphasesentry)
1804 1804 while True:
1805 1805 entry = inpart.read(entrysize)
1806 1806 if len(entry) < entrysize:
1807 1807 if entry:
1808 1808 raise error.Abort(_('bad phase-heads bundle part'))
1809 1809 break
1810 1810 phase, node = struct.unpack(_fphasesentry, entry)
1811 1811 headsbyphase[phase].append(node)
1812 1812 return headsbyphase
1813 1813
1814 1814 @parthandler('phase-heads')
1815 1815 def handlephases(op, inpart):
1816 1816 """apply phases from bundle part to repo"""
1817 1817 headsbyphase = _readphaseheads(inpart)
1818 1818 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1819 1819
1820 1820 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1821 1821 def handlepushkeyreply(op, inpart):
1822 1822 """retrieve the result of a pushkey request"""
1823 1823 ret = int(inpart.params['return'])
1824 1824 partid = int(inpart.params['in-reply-to'])
1825 1825 op.records.add('pushkey', {'return': ret}, partid)
1826 1826
1827 1827 @parthandler('obsmarkers')
1828 1828 def handleobsmarker(op, inpart):
1829 1829 """add a stream of obsmarkers to the repo"""
1830 1830 tr = op.gettransaction()
1831 1831 markerdata = inpart.read()
1832 1832 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1833 1833 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1834 1834 % len(markerdata))
1835 1835 # The mergemarkers call will crash if marker creation is not enabled.
1836 1836 # we want to avoid this if the part is advisory.
1837 1837 if not inpart.mandatory and op.repo.obsstore.readonly:
1838 1838 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1839 1839 return
1840 1840 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1841 1841 op.repo.invalidatevolatilesets()
1842 1842 if new:
1843 1843 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1844 1844 op.records.add('obsmarkers', {'new': new})
1845 1845 if op.reply is not None:
1846 1846 rpart = op.reply.newpart('reply:obsmarkers')
1847 1847 rpart.addparam(
1848 1848 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1849 1849 rpart.addparam('new', '%i' % new, mandatory=False)
1850 1850
1851 1851
1852 1852 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1853 1853 def handleobsmarkerreply(op, inpart):
1854 1854 """retrieve the result of a pushkey request"""
1855 1855 ret = int(inpart.params['new'])
1856 1856 partid = int(inpart.params['in-reply-to'])
1857 1857 op.records.add('obsmarkers', {'new': ret}, partid)
1858 1858
1859 1859 @parthandler('hgtagsfnodes')
1860 1860 def handlehgtagsfnodes(op, inpart):
1861 1861 """Applies .hgtags fnodes cache entries to the local repo.
1862 1862
1863 1863 Payload is pairs of 20 byte changeset nodes and filenodes.
1864 1864 """
1865 1865 # Grab the transaction so we ensure that we have the lock at this point.
1866 1866 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1867 1867 op.gettransaction()
1868 1868 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1869 1869
1870 1870 count = 0
1871 1871 while True:
1872 1872 node = inpart.read(20)
1873 1873 fnode = inpart.read(20)
1874 1874 if len(node) < 20 or len(fnode) < 20:
1875 1875 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1876 1876 break
1877 1877 cache.setfnode(node, fnode)
1878 1878 count += 1
1879 1879
1880 1880 cache.write()
1881 1881 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1882 1882
1883 1883 @parthandler('pushvars')
1884 1884 def bundle2getvars(op, part):
1885 1885 '''unbundle a bundle2 containing shellvars on the server'''
1886 1886 # An option to disable unbundling on server-side for security reasons
1887 1887 if op.ui.configbool('push', 'pushvars.server', False):
1888 1888 hookargs = {}
1889 1889 for key, value in part.advisoryparams:
1890 1890 key = key.upper()
1891 1891 # We want pushed variables to have USERVAR_ prepended so we know
1892 1892 # they came from the --pushvar flag.
1893 1893 key = "USERVAR_" + key
1894 1894 hookargs[key] = value
1895 1895 op.addhookargs(hookargs)
General Comments 0
You need to be logged in to leave comments. Login now