##// END OF EJS Templates
bundle2: add a 'stream' part handler for stream cloning...
Boris Feld -
r35776:b996ddf5 default
parent child Browse files
Show More
@@ -1,2116 +1,2144
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import, division
149 149
150 150 import errno
151 151 import os
152 152 import re
153 153 import string
154 154 import struct
155 155 import sys
156 156
157 157 from .i18n import _
158 158 from . import (
159 159 bookmarks,
160 160 changegroup,
161 161 error,
162 162 node as nodemod,
163 163 obsolete,
164 164 phases,
165 165 pushkey,
166 166 pycompat,
167 streamclone,
167 168 tags,
168 169 url,
169 170 util,
170 171 )
171 172
172 173 urlerr = util.urlerr
173 174 urlreq = util.urlreq
174 175
175 176 _pack = struct.pack
176 177 _unpack = struct.unpack
177 178
178 179 _fstreamparamsize = '>i'
179 180 _fpartheadersize = '>i'
180 181 _fparttypesize = '>B'
181 182 _fpartid = '>I'
182 183 _fpayloadsize = '>i'
183 184 _fpartparamcount = '>BB'
184 185
185 186 preferedchunksize = 4096
186 187
187 188 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
188 189
189 190 def outdebug(ui, message):
190 191 """debug regarding output stream (bundling)"""
191 192 if ui.configbool('devel', 'bundle2.debug'):
192 193 ui.debug('bundle2-output: %s\n' % message)
193 194
194 195 def indebug(ui, message):
195 196 """debug on input stream (unbundling)"""
196 197 if ui.configbool('devel', 'bundle2.debug'):
197 198 ui.debug('bundle2-input: %s\n' % message)
198 199
199 200 def validateparttype(parttype):
200 201 """raise ValueError if a parttype contains invalid character"""
201 202 if _parttypeforbidden.search(parttype):
202 203 raise ValueError(parttype)
203 204
204 205 def _makefpartparamsizes(nbparams):
205 206 """return a struct format to read part parameter sizes
206 207
207 208 The number parameters is variable so we need to build that format
208 209 dynamically.
209 210 """
210 211 return '>'+('BB'*nbparams)
211 212
212 213 parthandlermapping = {}
213 214
214 215 def parthandler(parttype, params=()):
215 216 """decorator that register a function as a bundle2 part handler
216 217
217 218 eg::
218 219
219 220 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
220 221 def myparttypehandler(...):
221 222 '''process a part of type "my part".'''
222 223 ...
223 224 """
224 225 validateparttype(parttype)
225 226 def _decorator(func):
226 227 lparttype = parttype.lower() # enforce lower case matching.
227 228 assert lparttype not in parthandlermapping
228 229 parthandlermapping[lparttype] = func
229 230 func.params = frozenset(params)
230 231 return func
231 232 return _decorator
232 233
233 234 class unbundlerecords(object):
234 235 """keep record of what happens during and unbundle
235 236
236 237 New records are added using `records.add('cat', obj)`. Where 'cat' is a
237 238 category of record and obj is an arbitrary object.
238 239
239 240 `records['cat']` will return all entries of this category 'cat'.
240 241
241 242 Iterating on the object itself will yield `('category', obj)` tuples
242 243 for all entries.
243 244
244 245 All iterations happens in chronological order.
245 246 """
246 247
247 248 def __init__(self):
248 249 self._categories = {}
249 250 self._sequences = []
250 251 self._replies = {}
251 252
252 253 def add(self, category, entry, inreplyto=None):
253 254 """add a new record of a given category.
254 255
255 256 The entry can then be retrieved in the list returned by
256 257 self['category']."""
257 258 self._categories.setdefault(category, []).append(entry)
258 259 self._sequences.append((category, entry))
259 260 if inreplyto is not None:
260 261 self.getreplies(inreplyto).add(category, entry)
261 262
262 263 def getreplies(self, partid):
263 264 """get the records that are replies to a specific part"""
264 265 return self._replies.setdefault(partid, unbundlerecords())
265 266
266 267 def __getitem__(self, cat):
267 268 return tuple(self._categories.get(cat, ()))
268 269
269 270 def __iter__(self):
270 271 return iter(self._sequences)
271 272
272 273 def __len__(self):
273 274 return len(self._sequences)
274 275
275 276 def __nonzero__(self):
276 277 return bool(self._sequences)
277 278
278 279 __bool__ = __nonzero__
279 280
280 281 class bundleoperation(object):
281 282 """an object that represents a single bundling process
282 283
283 284 Its purpose is to carry unbundle-related objects and states.
284 285
285 286 A new object should be created at the beginning of each bundle processing.
286 287 The object is to be returned by the processing function.
287 288
288 289 The object has very little content now it will ultimately contain:
289 290 * an access to the repo the bundle is applied to,
290 291 * a ui object,
291 292 * a way to retrieve a transaction to add changes to the repo,
292 293 * a way to record the result of processing each part,
293 294 * a way to construct a bundle response when applicable.
294 295 """
295 296
296 297 def __init__(self, repo, transactiongetter, captureoutput=True):
297 298 self.repo = repo
298 299 self.ui = repo.ui
299 300 self.records = unbundlerecords()
300 301 self.reply = None
301 302 self.captureoutput = captureoutput
302 303 self.hookargs = {}
303 304 self._gettransaction = transactiongetter
304 305 # carries value that can modify part behavior
305 306 self.modes = {}
306 307
307 308 def gettransaction(self):
308 309 transaction = self._gettransaction()
309 310
310 311 if self.hookargs:
311 312 # the ones added to the transaction supercede those added
312 313 # to the operation.
313 314 self.hookargs.update(transaction.hookargs)
314 315 transaction.hookargs = self.hookargs
315 316
316 317 # mark the hookargs as flushed. further attempts to add to
317 318 # hookargs will result in an abort.
318 319 self.hookargs = None
319 320
320 321 return transaction
321 322
322 323 def addhookargs(self, hookargs):
323 324 if self.hookargs is None:
324 325 raise error.ProgrammingError('attempted to add hookargs to '
325 326 'operation after transaction started')
326 327 self.hookargs.update(hookargs)
327 328
328 329 class TransactionUnavailable(RuntimeError):
329 330 pass
330 331
331 332 def _notransaction():
332 333 """default method to get a transaction while processing a bundle
333 334
334 335 Raise an exception to highlight the fact that no transaction was expected
335 336 to be created"""
336 337 raise TransactionUnavailable()
337 338
338 339 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
339 340 # transform me into unbundler.apply() as soon as the freeze is lifted
340 341 if isinstance(unbundler, unbundle20):
341 342 tr.hookargs['bundle2'] = '1'
342 343 if source is not None and 'source' not in tr.hookargs:
343 344 tr.hookargs['source'] = source
344 345 if url is not None and 'url' not in tr.hookargs:
345 346 tr.hookargs['url'] = url
346 347 return processbundle(repo, unbundler, lambda: tr)
347 348 else:
348 349 # the transactiongetter won't be used, but we might as well set it
349 350 op = bundleoperation(repo, lambda: tr)
350 351 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
351 352 return op
352 353
353 354 class partiterator(object):
354 355 def __init__(self, repo, op, unbundler):
355 356 self.repo = repo
356 357 self.op = op
357 358 self.unbundler = unbundler
358 359 self.iterator = None
359 360 self.count = 0
360 361 self.current = None
361 362
362 363 def __enter__(self):
363 364 def func():
364 365 itr = enumerate(self.unbundler.iterparts())
365 366 for count, p in itr:
366 367 self.count = count
367 368 self.current = p
368 369 yield p
369 370 p.consume()
370 371 self.current = None
371 372 self.iterator = func()
372 373 return self.iterator
373 374
374 375 def __exit__(self, type, exc, tb):
375 376 if not self.iterator:
376 377 return
377 378
378 379 # Only gracefully abort in a normal exception situation. User aborts
379 380 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
380 381 # and should not gracefully cleanup.
381 382 if isinstance(exc, Exception):
382 383 # Any exceptions seeking to the end of the bundle at this point are
383 384 # almost certainly related to the underlying stream being bad.
384 385 # And, chances are that the exception we're handling is related to
385 386 # getting in that bad state. So, we swallow the seeking error and
386 387 # re-raise the original error.
387 388 seekerror = False
388 389 try:
389 390 if self.current:
390 391 # consume the part content to not corrupt the stream.
391 392 self.current.consume()
392 393
393 394 for part in self.iterator:
394 395 # consume the bundle content
395 396 part.consume()
396 397 except Exception:
397 398 seekerror = True
398 399
399 400 # Small hack to let caller code distinguish exceptions from bundle2
400 401 # processing from processing the old format. This is mostly needed
401 402 # to handle different return codes to unbundle according to the type
402 403 # of bundle. We should probably clean up or drop this return code
403 404 # craziness in a future version.
404 405 exc.duringunbundle2 = True
405 406 salvaged = []
406 407 replycaps = None
407 408 if self.op.reply is not None:
408 409 salvaged = self.op.reply.salvageoutput()
409 410 replycaps = self.op.reply.capabilities
410 411 exc._replycaps = replycaps
411 412 exc._bundle2salvagedoutput = salvaged
412 413
413 414 # Re-raising from a variable loses the original stack. So only use
414 415 # that form if we need to.
415 416 if seekerror:
416 417 raise exc
417 418
418 419 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
419 420 self.count)
420 421
421 422 def processbundle(repo, unbundler, transactiongetter=None, op=None):
422 423 """This function process a bundle, apply effect to/from a repo
423 424
424 425 It iterates over each part then searches for and uses the proper handling
425 426 code to process the part. Parts are processed in order.
426 427
427 428 Unknown Mandatory part will abort the process.
428 429
429 430 It is temporarily possible to provide a prebuilt bundleoperation to the
430 431 function. This is used to ensure output is properly propagated in case of
431 432 an error during the unbundling. This output capturing part will likely be
432 433 reworked and this ability will probably go away in the process.
433 434 """
434 435 if op is None:
435 436 if transactiongetter is None:
436 437 transactiongetter = _notransaction
437 438 op = bundleoperation(repo, transactiongetter)
438 439 # todo:
439 440 # - replace this is a init function soon.
440 441 # - exception catching
441 442 unbundler.params
442 443 if repo.ui.debugflag:
443 444 msg = ['bundle2-input-bundle:']
444 445 if unbundler.params:
445 446 msg.append(' %i params' % len(unbundler.params))
446 447 if op._gettransaction is None or op._gettransaction is _notransaction:
447 448 msg.append(' no-transaction')
448 449 else:
449 450 msg.append(' with-transaction')
450 451 msg.append('\n')
451 452 repo.ui.debug(''.join(msg))
452 453
453 454 processparts(repo, op, unbundler)
454 455
455 456 return op
456 457
457 458 def processparts(repo, op, unbundler):
458 459 with partiterator(repo, op, unbundler) as parts:
459 460 for part in parts:
460 461 _processpart(op, part)
461 462
462 463 def _processchangegroup(op, cg, tr, source, url, **kwargs):
463 464 ret = cg.apply(op.repo, tr, source, url, **kwargs)
464 465 op.records.add('changegroup', {
465 466 'return': ret,
466 467 })
467 468 return ret
468 469
469 470 def _gethandler(op, part):
470 471 status = 'unknown' # used by debug output
471 472 try:
472 473 handler = parthandlermapping.get(part.type)
473 474 if handler is None:
474 475 status = 'unsupported-type'
475 476 raise error.BundleUnknownFeatureError(parttype=part.type)
476 477 indebug(op.ui, 'found a handler for part %s' % part.type)
477 478 unknownparams = part.mandatorykeys - handler.params
478 479 if unknownparams:
479 480 unknownparams = list(unknownparams)
480 481 unknownparams.sort()
481 482 status = 'unsupported-params (%s)' % ', '.join(unknownparams)
482 483 raise error.BundleUnknownFeatureError(parttype=part.type,
483 484 params=unknownparams)
484 485 status = 'supported'
485 486 except error.BundleUnknownFeatureError as exc:
486 487 if part.mandatory: # mandatory parts
487 488 raise
488 489 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
489 490 return # skip to part processing
490 491 finally:
491 492 if op.ui.debugflag:
492 493 msg = ['bundle2-input-part: "%s"' % part.type]
493 494 if not part.mandatory:
494 495 msg.append(' (advisory)')
495 496 nbmp = len(part.mandatorykeys)
496 497 nbap = len(part.params) - nbmp
497 498 if nbmp or nbap:
498 499 msg.append(' (params:')
499 500 if nbmp:
500 501 msg.append(' %i mandatory' % nbmp)
501 502 if nbap:
502 503 msg.append(' %i advisory' % nbmp)
503 504 msg.append(')')
504 505 msg.append(' %s\n' % status)
505 506 op.ui.debug(''.join(msg))
506 507
507 508 return handler
508 509
509 510 def _processpart(op, part):
510 511 """process a single part from a bundle
511 512
512 513 The part is guaranteed to have been fully consumed when the function exits
513 514 (even if an exception is raised)."""
514 515 handler = _gethandler(op, part)
515 516 if handler is None:
516 517 return
517 518
518 519 # handler is called outside the above try block so that we don't
519 520 # risk catching KeyErrors from anything other than the
520 521 # parthandlermapping lookup (any KeyError raised by handler()
521 522 # itself represents a defect of a different variety).
522 523 output = None
523 524 if op.captureoutput and op.reply is not None:
524 525 op.ui.pushbuffer(error=True, subproc=True)
525 526 output = ''
526 527 try:
527 528 handler(op, part)
528 529 finally:
529 530 if output is not None:
530 531 output = op.ui.popbuffer()
531 532 if output:
532 533 outpart = op.reply.newpart('output', data=output,
533 534 mandatory=False)
534 535 outpart.addparam(
535 536 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
536 537
537 538 def decodecaps(blob):
538 539 """decode a bundle2 caps bytes blob into a dictionary
539 540
540 541 The blob is a list of capabilities (one per line)
541 542 Capabilities may have values using a line of the form::
542 543
543 544 capability=value1,value2,value3
544 545
545 546 The values are always a list."""
546 547 caps = {}
547 548 for line in blob.splitlines():
548 549 if not line:
549 550 continue
550 551 if '=' not in line:
551 552 key, vals = line, ()
552 553 else:
553 554 key, vals = line.split('=', 1)
554 555 vals = vals.split(',')
555 556 key = urlreq.unquote(key)
556 557 vals = [urlreq.unquote(v) for v in vals]
557 558 caps[key] = vals
558 559 return caps
559 560
560 561 def encodecaps(caps):
561 562 """encode a bundle2 caps dictionary into a bytes blob"""
562 563 chunks = []
563 564 for ca in sorted(caps):
564 565 vals = caps[ca]
565 566 ca = urlreq.quote(ca)
566 567 vals = [urlreq.quote(v) for v in vals]
567 568 if vals:
568 569 ca = "%s=%s" % (ca, ','.join(vals))
569 570 chunks.append(ca)
570 571 return '\n'.join(chunks)
571 572
572 573 bundletypes = {
573 574 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
574 575 # since the unification ssh accepts a header but there
575 576 # is no capability signaling it.
576 577 "HG20": (), # special-cased below
577 578 "HG10UN": ("HG10UN", 'UN'),
578 579 "HG10BZ": ("HG10", 'BZ'),
579 580 "HG10GZ": ("HG10GZ", 'GZ'),
580 581 }
581 582
582 583 # hgweb uses this list to communicate its preferred type
583 584 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
584 585
585 586 class bundle20(object):
586 587 """represent an outgoing bundle2 container
587 588
588 589 Use the `addparam` method to add stream level parameter. and `newpart` to
589 590 populate it. Then call `getchunks` to retrieve all the binary chunks of
590 591 data that compose the bundle2 container."""
591 592
592 593 _magicstring = 'HG20'
593 594
594 595 def __init__(self, ui, capabilities=()):
595 596 self.ui = ui
596 597 self._params = []
597 598 self._parts = []
598 599 self.capabilities = dict(capabilities)
599 600 self._compengine = util.compengines.forbundletype('UN')
600 601 self._compopts = None
601 602
602 603 def setcompression(self, alg, compopts=None):
603 604 """setup core part compression to <alg>"""
604 605 if alg in (None, 'UN'):
605 606 return
606 607 assert not any(n.lower() == 'compression' for n, v in self._params)
607 608 self.addparam('Compression', alg)
608 609 self._compengine = util.compengines.forbundletype(alg)
609 610 self._compopts = compopts
610 611
611 612 @property
612 613 def nbparts(self):
613 614 """total number of parts added to the bundler"""
614 615 return len(self._parts)
615 616
616 617 # methods used to defines the bundle2 content
617 618 def addparam(self, name, value=None):
618 619 """add a stream level parameter"""
619 620 if not name:
620 621 raise ValueError(r'empty parameter name')
621 622 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
622 623 raise ValueError(r'non letter first character: %s' % name)
623 624 self._params.append((name, value))
624 625
625 626 def addpart(self, part):
626 627 """add a new part to the bundle2 container
627 628
628 629 Parts contains the actual applicative payload."""
629 630 assert part.id is None
630 631 part.id = len(self._parts) # very cheap counter
631 632 self._parts.append(part)
632 633
633 634 def newpart(self, typeid, *args, **kwargs):
634 635 """create a new part and add it to the containers
635 636
636 637 As the part is directly added to the containers. For now, this means
637 638 that any failure to properly initialize the part after calling
638 639 ``newpart`` should result in a failure of the whole bundling process.
639 640
640 641 You can still fall back to manually create and add if you need better
641 642 control."""
642 643 part = bundlepart(typeid, *args, **kwargs)
643 644 self.addpart(part)
644 645 return part
645 646
646 647 # methods used to generate the bundle2 stream
647 648 def getchunks(self):
648 649 if self.ui.debugflag:
649 650 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
650 651 if self._params:
651 652 msg.append(' (%i params)' % len(self._params))
652 653 msg.append(' %i parts total\n' % len(self._parts))
653 654 self.ui.debug(''.join(msg))
654 655 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
655 656 yield self._magicstring
656 657 param = self._paramchunk()
657 658 outdebug(self.ui, 'bundle parameter: %s' % param)
658 659 yield _pack(_fstreamparamsize, len(param))
659 660 if param:
660 661 yield param
661 662 for chunk in self._compengine.compressstream(self._getcorechunk(),
662 663 self._compopts):
663 664 yield chunk
664 665
665 666 def _paramchunk(self):
666 667 """return a encoded version of all stream parameters"""
667 668 blocks = []
668 669 for par, value in self._params:
669 670 par = urlreq.quote(par)
670 671 if value is not None:
671 672 value = urlreq.quote(value)
672 673 par = '%s=%s' % (par, value)
673 674 blocks.append(par)
674 675 return ' '.join(blocks)
675 676
676 677 def _getcorechunk(self):
677 678 """yield chunk for the core part of the bundle
678 679
679 680 (all but headers and parameters)"""
680 681 outdebug(self.ui, 'start of parts')
681 682 for part in self._parts:
682 683 outdebug(self.ui, 'bundle part: "%s"' % part.type)
683 684 for chunk in part.getchunks(ui=self.ui):
684 685 yield chunk
685 686 outdebug(self.ui, 'end of bundle')
686 687 yield _pack(_fpartheadersize, 0)
687 688
688 689
689 690 def salvageoutput(self):
690 691 """return a list with a copy of all output parts in the bundle
691 692
692 693 This is meant to be used during error handling to make sure we preserve
693 694 server output"""
694 695 salvaged = []
695 696 for part in self._parts:
696 697 if part.type.startswith('output'):
697 698 salvaged.append(part.copy())
698 699 return salvaged
699 700
700 701
701 702 class unpackermixin(object):
702 703 """A mixin to extract bytes and struct data from a stream"""
703 704
704 705 def __init__(self, fp):
705 706 self._fp = fp
706 707
707 708 def _unpack(self, format):
708 709 """unpack this struct format from the stream
709 710
710 711 This method is meant for internal usage by the bundle2 protocol only.
711 712 They directly manipulate the low level stream including bundle2 level
712 713 instruction.
713 714
714 715 Do not use it to implement higher-level logic or methods."""
715 716 data = self._readexact(struct.calcsize(format))
716 717 return _unpack(format, data)
717 718
718 719 def _readexact(self, size):
719 720 """read exactly <size> bytes from the stream
720 721
721 722 This method is meant for internal usage by the bundle2 protocol only.
722 723 They directly manipulate the low level stream including bundle2 level
723 724 instruction.
724 725
725 726 Do not use it to implement higher-level logic or methods."""
726 727 return changegroup.readexactly(self._fp, size)
727 728
728 729 def getunbundler(ui, fp, magicstring=None):
729 730 """return a valid unbundler object for a given magicstring"""
730 731 if magicstring is None:
731 732 magicstring = changegroup.readexactly(fp, 4)
732 733 magic, version = magicstring[0:2], magicstring[2:4]
733 734 if magic != 'HG':
734 735 ui.debug(
735 736 "error: invalid magic: %r (version %r), should be 'HG'\n"
736 737 % (magic, version))
737 738 raise error.Abort(_('not a Mercurial bundle'))
738 739 unbundlerclass = formatmap.get(version)
739 740 if unbundlerclass is None:
740 741 raise error.Abort(_('unknown bundle version %s') % version)
741 742 unbundler = unbundlerclass(ui, fp)
742 743 indebug(ui, 'start processing of %s stream' % magicstring)
743 744 return unbundler
744 745
745 746 class unbundle20(unpackermixin):
746 747 """interpret a bundle2 stream
747 748
748 749 This class is fed with a binary stream and yields parts through its
749 750 `iterparts` methods."""
750 751
751 752 _magicstring = 'HG20'
752 753
753 754 def __init__(self, ui, fp):
754 755 """If header is specified, we do not read it out of the stream."""
755 756 self.ui = ui
756 757 self._compengine = util.compengines.forbundletype('UN')
757 758 self._compressed = None
758 759 super(unbundle20, self).__init__(fp)
759 760
760 761 @util.propertycache
761 762 def params(self):
762 763 """dictionary of stream level parameters"""
763 764 indebug(self.ui, 'reading bundle2 stream parameters')
764 765 params = {}
765 766 paramssize = self._unpack(_fstreamparamsize)[0]
766 767 if paramssize < 0:
767 768 raise error.BundleValueError('negative bundle param size: %i'
768 769 % paramssize)
769 770 if paramssize:
770 771 params = self._readexact(paramssize)
771 772 params = self._processallparams(params)
772 773 return params
773 774
774 775 def _processallparams(self, paramsblock):
775 776 """"""
776 777 params = util.sortdict()
777 778 for p in paramsblock.split(' '):
778 779 p = p.split('=', 1)
779 780 p = [urlreq.unquote(i) for i in p]
780 781 if len(p) < 2:
781 782 p.append(None)
782 783 self._processparam(*p)
783 784 params[p[0]] = p[1]
784 785 return params
785 786
786 787
787 788 def _processparam(self, name, value):
788 789 """process a parameter, applying its effect if needed
789 790
790 791 Parameter starting with a lower case letter are advisory and will be
791 792 ignored when unknown. Those starting with an upper case letter are
792 793 mandatory and will this function will raise a KeyError when unknown.
793 794
794 795 Note: no option are currently supported. Any input will be either
795 796 ignored or failing.
796 797 """
797 798 if not name:
798 799 raise ValueError(r'empty parameter name')
799 800 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
800 801 raise ValueError(r'non letter first character: %s' % name)
801 802 try:
802 803 handler = b2streamparamsmap[name.lower()]
803 804 except KeyError:
804 805 if name[0:1].islower():
805 806 indebug(self.ui, "ignoring unknown parameter %s" % name)
806 807 else:
807 808 raise error.BundleUnknownFeatureError(params=(name,))
808 809 else:
809 810 handler(self, name, value)
810 811
811 812 def _forwardchunks(self):
812 813 """utility to transfer a bundle2 as binary
813 814
814 815 This is made necessary by the fact the 'getbundle' command over 'ssh'
815 816 have no way to know then the reply end, relying on the bundle to be
816 817 interpreted to know its end. This is terrible and we are sorry, but we
817 818 needed to move forward to get general delta enabled.
818 819 """
819 820 yield self._magicstring
820 821 assert 'params' not in vars(self)
821 822 paramssize = self._unpack(_fstreamparamsize)[0]
822 823 if paramssize < 0:
823 824 raise error.BundleValueError('negative bundle param size: %i'
824 825 % paramssize)
825 826 yield _pack(_fstreamparamsize, paramssize)
826 827 if paramssize:
827 828 params = self._readexact(paramssize)
828 829 self._processallparams(params)
829 830 yield params
830 831 assert self._compengine.bundletype == 'UN'
831 832 # From there, payload might need to be decompressed
832 833 self._fp = self._compengine.decompressorreader(self._fp)
833 834 emptycount = 0
834 835 while emptycount < 2:
835 836 # so we can brainlessly loop
836 837 assert _fpartheadersize == _fpayloadsize
837 838 size = self._unpack(_fpartheadersize)[0]
838 839 yield _pack(_fpartheadersize, size)
839 840 if size:
840 841 emptycount = 0
841 842 else:
842 843 emptycount += 1
843 844 continue
844 845 if size == flaginterrupt:
845 846 continue
846 847 elif size < 0:
847 848 raise error.BundleValueError('negative chunk size: %i')
848 849 yield self._readexact(size)
849 850
850 851
851 852 def iterparts(self, seekable=False):
852 853 """yield all parts contained in the stream"""
853 854 cls = seekableunbundlepart if seekable else unbundlepart
854 855 # make sure param have been loaded
855 856 self.params
856 857 # From there, payload need to be decompressed
857 858 self._fp = self._compengine.decompressorreader(self._fp)
858 859 indebug(self.ui, 'start extraction of bundle2 parts')
859 860 headerblock = self._readpartheader()
860 861 while headerblock is not None:
861 862 part = cls(self.ui, headerblock, self._fp)
862 863 yield part
863 864 # Ensure part is fully consumed so we can start reading the next
864 865 # part.
865 866 part.consume()
866 867
867 868 headerblock = self._readpartheader()
868 869 indebug(self.ui, 'end of bundle2 stream')
869 870
870 871 def _readpartheader(self):
871 872 """reads a part header size and return the bytes blob
872 873
873 874 returns None if empty"""
874 875 headersize = self._unpack(_fpartheadersize)[0]
875 876 if headersize < 0:
876 877 raise error.BundleValueError('negative part header size: %i'
877 878 % headersize)
878 879 indebug(self.ui, 'part header size: %i' % headersize)
879 880 if headersize:
880 881 return self._readexact(headersize)
881 882 return None
882 883
883 884 def compressed(self):
884 885 self.params # load params
885 886 return self._compressed
886 887
887 888 def close(self):
888 889 """close underlying file"""
889 890 if util.safehasattr(self._fp, 'close'):
890 891 return self._fp.close()
891 892
892 893 formatmap = {'20': unbundle20}
893 894
894 895 b2streamparamsmap = {}
895 896
896 897 def b2streamparamhandler(name):
897 898 """register a handler for a stream level parameter"""
898 899 def decorator(func):
899 900 assert name not in formatmap
900 901 b2streamparamsmap[name] = func
901 902 return func
902 903 return decorator
903 904
904 905 @b2streamparamhandler('compression')
905 906 def processcompression(unbundler, param, value):
906 907 """read compression parameter and install payload decompression"""
907 908 if value not in util.compengines.supportedbundletypes:
908 909 raise error.BundleUnknownFeatureError(params=(param,),
909 910 values=(value,))
910 911 unbundler._compengine = util.compengines.forbundletype(value)
911 912 if value is not None:
912 913 unbundler._compressed = True
913 914
914 915 class bundlepart(object):
915 916 """A bundle2 part contains application level payload
916 917
917 918 The part `type` is used to route the part to the application level
918 919 handler.
919 920
920 921 The part payload is contained in ``part.data``. It could be raw bytes or a
921 922 generator of byte chunks.
922 923
923 924 You can add parameters to the part using the ``addparam`` method.
924 925 Parameters can be either mandatory (default) or advisory. Remote side
925 926 should be able to safely ignore the advisory ones.
926 927
927 928 Both data and parameters cannot be modified after the generation has begun.
928 929 """
929 930
930 931 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
931 932 data='', mandatory=True):
932 933 validateparttype(parttype)
933 934 self.id = None
934 935 self.type = parttype
935 936 self._data = data
936 937 self._mandatoryparams = list(mandatoryparams)
937 938 self._advisoryparams = list(advisoryparams)
938 939 # checking for duplicated entries
939 940 self._seenparams = set()
940 941 for pname, __ in self._mandatoryparams + self._advisoryparams:
941 942 if pname in self._seenparams:
942 943 raise error.ProgrammingError('duplicated params: %s' % pname)
943 944 self._seenparams.add(pname)
944 945 # status of the part's generation:
945 946 # - None: not started,
946 947 # - False: currently generated,
947 948 # - True: generation done.
948 949 self._generated = None
949 950 self.mandatory = mandatory
950 951
951 952 def __repr__(self):
952 953 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
953 954 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
954 955 % (cls, id(self), self.id, self.type, self.mandatory))
955 956
956 957 def copy(self):
957 958 """return a copy of the part
958 959
959 960 The new part have the very same content but no partid assigned yet.
960 961 Parts with generated data cannot be copied."""
961 962 assert not util.safehasattr(self.data, 'next')
962 963 return self.__class__(self.type, self._mandatoryparams,
963 964 self._advisoryparams, self._data, self.mandatory)
964 965
965 966 # methods used to defines the part content
966 967 @property
967 968 def data(self):
968 969 return self._data
969 970
970 971 @data.setter
971 972 def data(self, data):
972 973 if self._generated is not None:
973 974 raise error.ReadOnlyPartError('part is being generated')
974 975 self._data = data
975 976
976 977 @property
977 978 def mandatoryparams(self):
978 979 # make it an immutable tuple to force people through ``addparam``
979 980 return tuple(self._mandatoryparams)
980 981
981 982 @property
982 983 def advisoryparams(self):
983 984 # make it an immutable tuple to force people through ``addparam``
984 985 return tuple(self._advisoryparams)
985 986
986 987 def addparam(self, name, value='', mandatory=True):
987 988 """add a parameter to the part
988 989
989 990 If 'mandatory' is set to True, the remote handler must claim support
990 991 for this parameter or the unbundling will be aborted.
991 992
992 993 The 'name' and 'value' cannot exceed 255 bytes each.
993 994 """
994 995 if self._generated is not None:
995 996 raise error.ReadOnlyPartError('part is being generated')
996 997 if name in self._seenparams:
997 998 raise ValueError('duplicated params: %s' % name)
998 999 self._seenparams.add(name)
999 1000 params = self._advisoryparams
1000 1001 if mandatory:
1001 1002 params = self._mandatoryparams
1002 1003 params.append((name, value))
1003 1004
1004 1005 # methods used to generates the bundle2 stream
1005 1006 def getchunks(self, ui):
1006 1007 if self._generated is not None:
1007 1008 raise error.ProgrammingError('part can only be consumed once')
1008 1009 self._generated = False
1009 1010
1010 1011 if ui.debugflag:
1011 1012 msg = ['bundle2-output-part: "%s"' % self.type]
1012 1013 if not self.mandatory:
1013 1014 msg.append(' (advisory)')
1014 1015 nbmp = len(self.mandatoryparams)
1015 1016 nbap = len(self.advisoryparams)
1016 1017 if nbmp or nbap:
1017 1018 msg.append(' (params:')
1018 1019 if nbmp:
1019 1020 msg.append(' %i mandatory' % nbmp)
1020 1021 if nbap:
1021 1022 msg.append(' %i advisory' % nbmp)
1022 1023 msg.append(')')
1023 1024 if not self.data:
1024 1025 msg.append(' empty payload')
1025 1026 elif (util.safehasattr(self.data, 'next')
1026 1027 or util.safehasattr(self.data, '__next__')):
1027 1028 msg.append(' streamed payload')
1028 1029 else:
1029 1030 msg.append(' %i bytes payload' % len(self.data))
1030 1031 msg.append('\n')
1031 1032 ui.debug(''.join(msg))
1032 1033
1033 1034 #### header
1034 1035 if self.mandatory:
1035 1036 parttype = self.type.upper()
1036 1037 else:
1037 1038 parttype = self.type.lower()
1038 1039 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1039 1040 ## parttype
1040 1041 header = [_pack(_fparttypesize, len(parttype)),
1041 1042 parttype, _pack(_fpartid, self.id),
1042 1043 ]
1043 1044 ## parameters
1044 1045 # count
1045 1046 manpar = self.mandatoryparams
1046 1047 advpar = self.advisoryparams
1047 1048 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1048 1049 # size
1049 1050 parsizes = []
1050 1051 for key, value in manpar:
1051 1052 parsizes.append(len(key))
1052 1053 parsizes.append(len(value))
1053 1054 for key, value in advpar:
1054 1055 parsizes.append(len(key))
1055 1056 parsizes.append(len(value))
1056 1057 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1057 1058 header.append(paramsizes)
1058 1059 # key, value
1059 1060 for key, value in manpar:
1060 1061 header.append(key)
1061 1062 header.append(value)
1062 1063 for key, value in advpar:
1063 1064 header.append(key)
1064 1065 header.append(value)
1065 1066 ## finalize header
1066 1067 try:
1067 1068 headerchunk = ''.join(header)
1068 1069 except TypeError:
1069 1070 raise TypeError(r'Found a non-bytes trying to '
1070 1071 r'build bundle part header: %r' % header)
1071 1072 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1072 1073 yield _pack(_fpartheadersize, len(headerchunk))
1073 1074 yield headerchunk
1074 1075 ## payload
1075 1076 try:
1076 1077 for chunk in self._payloadchunks():
1077 1078 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1078 1079 yield _pack(_fpayloadsize, len(chunk))
1079 1080 yield chunk
1080 1081 except GeneratorExit:
1081 1082 # GeneratorExit means that nobody is listening for our
1082 1083 # results anyway, so just bail quickly rather than trying
1083 1084 # to produce an error part.
1084 1085 ui.debug('bundle2-generatorexit\n')
1085 1086 raise
1086 1087 except BaseException as exc:
1087 1088 bexc = util.forcebytestr(exc)
1088 1089 # backup exception data for later
1089 1090 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1090 1091 % bexc)
1091 1092 tb = sys.exc_info()[2]
1092 1093 msg = 'unexpected error: %s' % bexc
1093 1094 interpart = bundlepart('error:abort', [('message', msg)],
1094 1095 mandatory=False)
1095 1096 interpart.id = 0
1096 1097 yield _pack(_fpayloadsize, -1)
1097 1098 for chunk in interpart.getchunks(ui=ui):
1098 1099 yield chunk
1099 1100 outdebug(ui, 'closing payload chunk')
1100 1101 # abort current part payload
1101 1102 yield _pack(_fpayloadsize, 0)
1102 1103 pycompat.raisewithtb(exc, tb)
1103 1104 # end of payload
1104 1105 outdebug(ui, 'closing payload chunk')
1105 1106 yield _pack(_fpayloadsize, 0)
1106 1107 self._generated = True
1107 1108
1108 1109 def _payloadchunks(self):
1109 1110 """yield chunks of a the part payload
1110 1111
1111 1112 Exists to handle the different methods to provide data to a part."""
1112 1113 # we only support fixed size data now.
1113 1114 # This will be improved in the future.
1114 1115 if (util.safehasattr(self.data, 'next')
1115 1116 or util.safehasattr(self.data, '__next__')):
1116 1117 buff = util.chunkbuffer(self.data)
1117 1118 chunk = buff.read(preferedchunksize)
1118 1119 while chunk:
1119 1120 yield chunk
1120 1121 chunk = buff.read(preferedchunksize)
1121 1122 elif len(self.data):
1122 1123 yield self.data
1123 1124
1124 1125
1125 1126 flaginterrupt = -1
1126 1127
1127 1128 class interrupthandler(unpackermixin):
1128 1129 """read one part and process it with restricted capability
1129 1130
1130 1131 This allows to transmit exception raised on the producer size during part
1131 1132 iteration while the consumer is reading a part.
1132 1133
1133 1134 Part processed in this manner only have access to a ui object,"""
1134 1135
1135 1136 def __init__(self, ui, fp):
1136 1137 super(interrupthandler, self).__init__(fp)
1137 1138 self.ui = ui
1138 1139
1139 1140 def _readpartheader(self):
1140 1141 """reads a part header size and return the bytes blob
1141 1142
1142 1143 returns None if empty"""
1143 1144 headersize = self._unpack(_fpartheadersize)[0]
1144 1145 if headersize < 0:
1145 1146 raise error.BundleValueError('negative part header size: %i'
1146 1147 % headersize)
1147 1148 indebug(self.ui, 'part header size: %i\n' % headersize)
1148 1149 if headersize:
1149 1150 return self._readexact(headersize)
1150 1151 return None
1151 1152
1152 1153 def __call__(self):
1153 1154
1154 1155 self.ui.debug('bundle2-input-stream-interrupt:'
1155 1156 ' opening out of band context\n')
1156 1157 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1157 1158 headerblock = self._readpartheader()
1158 1159 if headerblock is None:
1159 1160 indebug(self.ui, 'no part found during interruption.')
1160 1161 return
1161 1162 part = unbundlepart(self.ui, headerblock, self._fp)
1162 1163 op = interruptoperation(self.ui)
1163 1164 hardabort = False
1164 1165 try:
1165 1166 _processpart(op, part)
1166 1167 except (SystemExit, KeyboardInterrupt):
1167 1168 hardabort = True
1168 1169 raise
1169 1170 finally:
1170 1171 if not hardabort:
1171 1172 part.consume()
1172 1173 self.ui.debug('bundle2-input-stream-interrupt:'
1173 1174 ' closing out of band context\n')
1174 1175
1175 1176 class interruptoperation(object):
1176 1177 """A limited operation to be use by part handler during interruption
1177 1178
1178 1179 It only have access to an ui object.
1179 1180 """
1180 1181
1181 1182 def __init__(self, ui):
1182 1183 self.ui = ui
1183 1184 self.reply = None
1184 1185 self.captureoutput = False
1185 1186
1186 1187 @property
1187 1188 def repo(self):
1188 1189 raise error.ProgrammingError('no repo access from stream interruption')
1189 1190
1190 1191 def gettransaction(self):
1191 1192 raise TransactionUnavailable('no repo access from stream interruption')
1192 1193
1193 1194 def decodepayloadchunks(ui, fh):
1194 1195 """Reads bundle2 part payload data into chunks.
1195 1196
1196 1197 Part payload data consists of framed chunks. This function takes
1197 1198 a file handle and emits those chunks.
1198 1199 """
1199 1200 dolog = ui.configbool('devel', 'bundle2.debug')
1200 1201 debug = ui.debug
1201 1202
1202 1203 headerstruct = struct.Struct(_fpayloadsize)
1203 1204 headersize = headerstruct.size
1204 1205 unpack = headerstruct.unpack
1205 1206
1206 1207 readexactly = changegroup.readexactly
1207 1208 read = fh.read
1208 1209
1209 1210 chunksize = unpack(readexactly(fh, headersize))[0]
1210 1211 indebug(ui, 'payload chunk size: %i' % chunksize)
1211 1212
1212 1213 # changegroup.readexactly() is inlined below for performance.
1213 1214 while chunksize:
1214 1215 if chunksize >= 0:
1215 1216 s = read(chunksize)
1216 1217 if len(s) < chunksize:
1217 1218 raise error.Abort(_('stream ended unexpectedly '
1218 1219 ' (got %d bytes, expected %d)') %
1219 1220 (len(s), chunksize))
1220 1221
1221 1222 yield s
1222 1223 elif chunksize == flaginterrupt:
1223 1224 # Interrupt "signal" detected. The regular stream is interrupted
1224 1225 # and a bundle2 part follows. Consume it.
1225 1226 interrupthandler(ui, fh)()
1226 1227 else:
1227 1228 raise error.BundleValueError(
1228 1229 'negative payload chunk size: %s' % chunksize)
1229 1230
1230 1231 s = read(headersize)
1231 1232 if len(s) < headersize:
1232 1233 raise error.Abort(_('stream ended unexpectedly '
1233 1234 ' (got %d bytes, expected %d)') %
1234 1235 (len(s), chunksize))
1235 1236
1236 1237 chunksize = unpack(s)[0]
1237 1238
1238 1239 # indebug() inlined for performance.
1239 1240 if dolog:
1240 1241 debug('bundle2-input: payload chunk size: %i\n' % chunksize)
1241 1242
1242 1243 class unbundlepart(unpackermixin):
1243 1244 """a bundle part read from a bundle"""
1244 1245
1245 1246 def __init__(self, ui, header, fp):
1246 1247 super(unbundlepart, self).__init__(fp)
1247 1248 self._seekable = (util.safehasattr(fp, 'seek') and
1248 1249 util.safehasattr(fp, 'tell'))
1249 1250 self.ui = ui
1250 1251 # unbundle state attr
1251 1252 self._headerdata = header
1252 1253 self._headeroffset = 0
1253 1254 self._initialized = False
1254 1255 self.consumed = False
1255 1256 # part data
1256 1257 self.id = None
1257 1258 self.type = None
1258 1259 self.mandatoryparams = None
1259 1260 self.advisoryparams = None
1260 1261 self.params = None
1261 1262 self.mandatorykeys = ()
1262 1263 self._readheader()
1263 1264 self._mandatory = None
1264 1265 self._pos = 0
1265 1266
1266 1267 def _fromheader(self, size):
1267 1268 """return the next <size> byte from the header"""
1268 1269 offset = self._headeroffset
1269 1270 data = self._headerdata[offset:(offset + size)]
1270 1271 self._headeroffset = offset + size
1271 1272 return data
1272 1273
1273 1274 def _unpackheader(self, format):
1274 1275 """read given format from header
1275 1276
1276 1277 This automatically compute the size of the format to read."""
1277 1278 data = self._fromheader(struct.calcsize(format))
1278 1279 return _unpack(format, data)
1279 1280
1280 1281 def _initparams(self, mandatoryparams, advisoryparams):
1281 1282 """internal function to setup all logic related parameters"""
1282 1283 # make it read only to prevent people touching it by mistake.
1283 1284 self.mandatoryparams = tuple(mandatoryparams)
1284 1285 self.advisoryparams = tuple(advisoryparams)
1285 1286 # user friendly UI
1286 1287 self.params = util.sortdict(self.mandatoryparams)
1287 1288 self.params.update(self.advisoryparams)
1288 1289 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1289 1290
1290 1291 def _readheader(self):
1291 1292 """read the header and setup the object"""
1292 1293 typesize = self._unpackheader(_fparttypesize)[0]
1293 1294 self.type = self._fromheader(typesize)
1294 1295 indebug(self.ui, 'part type: "%s"' % self.type)
1295 1296 self.id = self._unpackheader(_fpartid)[0]
1296 1297 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1297 1298 # extract mandatory bit from type
1298 1299 self.mandatory = (self.type != self.type.lower())
1299 1300 self.type = self.type.lower()
1300 1301 ## reading parameters
1301 1302 # param count
1302 1303 mancount, advcount = self._unpackheader(_fpartparamcount)
1303 1304 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1304 1305 # param size
1305 1306 fparamsizes = _makefpartparamsizes(mancount + advcount)
1306 1307 paramsizes = self._unpackheader(fparamsizes)
1307 1308 # make it a list of couple again
1308 1309 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1309 1310 # split mandatory from advisory
1310 1311 mansizes = paramsizes[:mancount]
1311 1312 advsizes = paramsizes[mancount:]
1312 1313 # retrieve param value
1313 1314 manparams = []
1314 1315 for key, value in mansizes:
1315 1316 manparams.append((self._fromheader(key), self._fromheader(value)))
1316 1317 advparams = []
1317 1318 for key, value in advsizes:
1318 1319 advparams.append((self._fromheader(key), self._fromheader(value)))
1319 1320 self._initparams(manparams, advparams)
1320 1321 ## part payload
1321 1322 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1322 1323 # we read the data, tell it
1323 1324 self._initialized = True
1324 1325
1325 1326 def _payloadchunks(self):
1326 1327 """Generator of decoded chunks in the payload."""
1327 1328 return decodepayloadchunks(self.ui, self._fp)
1328 1329
1329 1330 def consume(self):
1330 1331 """Read the part payload until completion.
1331 1332
1332 1333 By consuming the part data, the underlying stream read offset will
1333 1334 be advanced to the next part (or end of stream).
1334 1335 """
1335 1336 if self.consumed:
1336 1337 return
1337 1338
1338 1339 chunk = self.read(32768)
1339 1340 while chunk:
1340 1341 self._pos += len(chunk)
1341 1342 chunk = self.read(32768)
1342 1343
1343 1344 def read(self, size=None):
1344 1345 """read payload data"""
1345 1346 if not self._initialized:
1346 1347 self._readheader()
1347 1348 if size is None:
1348 1349 data = self._payloadstream.read()
1349 1350 else:
1350 1351 data = self._payloadstream.read(size)
1351 1352 self._pos += len(data)
1352 1353 if size is None or len(data) < size:
1353 1354 if not self.consumed and self._pos:
1354 1355 self.ui.debug('bundle2-input-part: total payload size %i\n'
1355 1356 % self._pos)
1356 1357 self.consumed = True
1357 1358 return data
1358 1359
1359 1360 class seekableunbundlepart(unbundlepart):
1360 1361 """A bundle2 part in a bundle that is seekable.
1361 1362
1362 1363 Regular ``unbundlepart`` instances can only be read once. This class
1363 1364 extends ``unbundlepart`` to enable bi-directional seeking within the
1364 1365 part.
1365 1366
1366 1367 Bundle2 part data consists of framed chunks. Offsets when seeking
1367 1368 refer to the decoded data, not the offsets in the underlying bundle2
1368 1369 stream.
1369 1370
1370 1371 To facilitate quickly seeking within the decoded data, instances of this
1371 1372 class maintain a mapping between offsets in the underlying stream and
1372 1373 the decoded payload. This mapping will consume memory in proportion
1373 1374 to the number of chunks within the payload (which almost certainly
1374 1375 increases in proportion with the size of the part).
1375 1376 """
1376 1377 def __init__(self, ui, header, fp):
1377 1378 # (payload, file) offsets for chunk starts.
1378 1379 self._chunkindex = []
1379 1380
1380 1381 super(seekableunbundlepart, self).__init__(ui, header, fp)
1381 1382
1382 1383 def _payloadchunks(self, chunknum=0):
1383 1384 '''seek to specified chunk and start yielding data'''
1384 1385 if len(self._chunkindex) == 0:
1385 1386 assert chunknum == 0, 'Must start with chunk 0'
1386 1387 self._chunkindex.append((0, self._tellfp()))
1387 1388 else:
1388 1389 assert chunknum < len(self._chunkindex), \
1389 1390 'Unknown chunk %d' % chunknum
1390 1391 self._seekfp(self._chunkindex[chunknum][1])
1391 1392
1392 1393 pos = self._chunkindex[chunknum][0]
1393 1394
1394 1395 for chunk in decodepayloadchunks(self.ui, self._fp):
1395 1396 chunknum += 1
1396 1397 pos += len(chunk)
1397 1398 if chunknum == len(self._chunkindex):
1398 1399 self._chunkindex.append((pos, self._tellfp()))
1399 1400
1400 1401 yield chunk
1401 1402
1402 1403 def _findchunk(self, pos):
1403 1404 '''for a given payload position, return a chunk number and offset'''
1404 1405 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1405 1406 if ppos == pos:
1406 1407 return chunk, 0
1407 1408 elif ppos > pos:
1408 1409 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1409 1410 raise ValueError('Unknown chunk')
1410 1411
1411 1412 def tell(self):
1412 1413 return self._pos
1413 1414
1414 1415 def seek(self, offset, whence=os.SEEK_SET):
1415 1416 if whence == os.SEEK_SET:
1416 1417 newpos = offset
1417 1418 elif whence == os.SEEK_CUR:
1418 1419 newpos = self._pos + offset
1419 1420 elif whence == os.SEEK_END:
1420 1421 if not self.consumed:
1421 1422 # Can't use self.consume() here because it advances self._pos.
1422 1423 chunk = self.read(32768)
1423 1424 while chunk:
1424 1425 chunk = self.read(32768)
1425 1426 newpos = self._chunkindex[-1][0] - offset
1426 1427 else:
1427 1428 raise ValueError('Unknown whence value: %r' % (whence,))
1428 1429
1429 1430 if newpos > self._chunkindex[-1][0] and not self.consumed:
1430 1431 # Can't use self.consume() here because it advances self._pos.
1431 1432 chunk = self.read(32768)
1432 1433 while chunk:
1433 1434 chunk = self.read(32668)
1434 1435
1435 1436 if not 0 <= newpos <= self._chunkindex[-1][0]:
1436 1437 raise ValueError('Offset out of range')
1437 1438
1438 1439 if self._pos != newpos:
1439 1440 chunk, internaloffset = self._findchunk(newpos)
1440 1441 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1441 1442 adjust = self.read(internaloffset)
1442 1443 if len(adjust) != internaloffset:
1443 1444 raise error.Abort(_('Seek failed\n'))
1444 1445 self._pos = newpos
1445 1446
1446 1447 def _seekfp(self, offset, whence=0):
1447 1448 """move the underlying file pointer
1448 1449
1449 1450 This method is meant for internal usage by the bundle2 protocol only.
1450 1451 They directly manipulate the low level stream including bundle2 level
1451 1452 instruction.
1452 1453
1453 1454 Do not use it to implement higher-level logic or methods."""
1454 1455 if self._seekable:
1455 1456 return self._fp.seek(offset, whence)
1456 1457 else:
1457 1458 raise NotImplementedError(_('File pointer is not seekable'))
1458 1459
1459 1460 def _tellfp(self):
1460 1461 """return the file offset, or None if file is not seekable
1461 1462
1462 1463 This method is meant for internal usage by the bundle2 protocol only.
1463 1464 They directly manipulate the low level stream including bundle2 level
1464 1465 instruction.
1465 1466
1466 1467 Do not use it to implement higher-level logic or methods."""
1467 1468 if self._seekable:
1468 1469 try:
1469 1470 return self._fp.tell()
1470 1471 except IOError as e:
1471 1472 if e.errno == errno.ESPIPE:
1472 1473 self._seekable = False
1473 1474 else:
1474 1475 raise
1475 1476 return None
1476 1477
1477 1478 # These are only the static capabilities.
1478 1479 # Check the 'getrepocaps' function for the rest.
1479 1480 capabilities = {'HG20': (),
1480 1481 'bookmarks': (),
1481 1482 'error': ('abort', 'unsupportedcontent', 'pushraced',
1482 1483 'pushkey'),
1483 1484 'listkeys': (),
1484 1485 'pushkey': (),
1485 1486 'digests': tuple(sorted(util.DIGESTS.keys())),
1486 1487 'remote-changegroup': ('http', 'https'),
1487 1488 'hgtagsfnodes': (),
1488 1489 'phases': ('heads',),
1489 1490 }
1490 1491
1491 1492 def getrepocaps(repo, allowpushback=False):
1492 1493 """return the bundle2 capabilities for a given repo
1493 1494
1494 1495 Exists to allow extensions (like evolution) to mutate the capabilities.
1495 1496 """
1496 1497 caps = capabilities.copy()
1497 1498 caps['changegroup'] = tuple(sorted(
1498 1499 changegroup.supportedincomingversions(repo)))
1499 1500 if obsolete.isenabled(repo, obsolete.exchangeopt):
1500 1501 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1501 1502 caps['obsmarkers'] = supportedformat
1502 1503 if allowpushback:
1503 1504 caps['pushback'] = ()
1504 1505 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1505 1506 if cpmode == 'check-related':
1506 1507 caps['checkheads'] = ('related',)
1507 1508 if 'phases' in repo.ui.configlist('devel', 'legacy.exchange'):
1508 1509 caps.pop('phases')
1509 1510 return caps
1510 1511
1511 1512 def bundle2caps(remote):
1512 1513 """return the bundle capabilities of a peer as dict"""
1513 1514 raw = remote.capable('bundle2')
1514 1515 if not raw and raw != '':
1515 1516 return {}
1516 1517 capsblob = urlreq.unquote(remote.capable('bundle2'))
1517 1518 return decodecaps(capsblob)
1518 1519
1519 1520 def obsmarkersversion(caps):
1520 1521 """extract the list of supported obsmarkers versions from a bundle2caps dict
1521 1522 """
1522 1523 obscaps = caps.get('obsmarkers', ())
1523 1524 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1524 1525
1525 1526 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1526 1527 vfs=None, compression=None, compopts=None):
1527 1528 if bundletype.startswith('HG10'):
1528 1529 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1529 1530 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1530 1531 compression=compression, compopts=compopts)
1531 1532 elif not bundletype.startswith('HG20'):
1532 1533 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1533 1534
1534 1535 caps = {}
1535 1536 if 'obsolescence' in opts:
1536 1537 caps['obsmarkers'] = ('V1',)
1537 1538 bundle = bundle20(ui, caps)
1538 1539 bundle.setcompression(compression, compopts)
1539 1540 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1540 1541 chunkiter = bundle.getchunks()
1541 1542
1542 1543 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1543 1544
1544 1545 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1545 1546 # We should eventually reconcile this logic with the one behind
1546 1547 # 'exchange.getbundle2partsgenerator'.
1547 1548 #
1548 1549 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1549 1550 # different right now. So we keep them separated for now for the sake of
1550 1551 # simplicity.
1551 1552
1552 1553 # we always want a changegroup in such bundle
1553 1554 cgversion = opts.get('cg.version')
1554 1555 if cgversion is None:
1555 1556 cgversion = changegroup.safeversion(repo)
1556 1557 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1557 1558 part = bundler.newpart('changegroup', data=cg.getchunks())
1558 1559 part.addparam('version', cg.version)
1559 1560 if 'clcount' in cg.extras:
1560 1561 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1561 1562 mandatory=False)
1562 1563 if opts.get('phases') and repo.revs('%ln and secret()',
1563 1564 outgoing.missingheads):
1564 1565 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1565 1566
1566 1567 addparttagsfnodescache(repo, bundler, outgoing)
1567 1568
1568 1569 if opts.get('obsolescence', False):
1569 1570 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1570 1571 buildobsmarkerspart(bundler, obsmarkers)
1571 1572
1572 1573 if opts.get('phases', False):
1573 1574 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1574 1575 phasedata = phases.binaryencode(headsbyphase)
1575 1576 bundler.newpart('phase-heads', data=phasedata)
1576 1577
1577 1578 def addparttagsfnodescache(repo, bundler, outgoing):
1578 1579 # we include the tags fnode cache for the bundle changeset
1579 1580 # (as an optional parts)
1580 1581 cache = tags.hgtagsfnodescache(repo.unfiltered())
1581 1582 chunks = []
1582 1583
1583 1584 # .hgtags fnodes are only relevant for head changesets. While we could
1584 1585 # transfer values for all known nodes, there will likely be little to
1585 1586 # no benefit.
1586 1587 #
1587 1588 # We don't bother using a generator to produce output data because
1588 1589 # a) we only have 40 bytes per head and even esoteric numbers of heads
1589 1590 # consume little memory (1M heads is 40MB) b) we don't want to send the
1590 1591 # part if we don't have entries and knowing if we have entries requires
1591 1592 # cache lookups.
1592 1593 for node in outgoing.missingheads:
1593 1594 # Don't compute missing, as this may slow down serving.
1594 1595 fnode = cache.getfnode(node, computemissing=False)
1595 1596 if fnode is not None:
1596 1597 chunks.extend([node, fnode])
1597 1598
1598 1599 if chunks:
1599 1600 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1600 1601
1601 1602 def buildobsmarkerspart(bundler, markers):
1602 1603 """add an obsmarker part to the bundler with <markers>
1603 1604
1604 1605 No part is created if markers is empty.
1605 1606 Raises ValueError if the bundler doesn't support any known obsmarker format.
1606 1607 """
1607 1608 if not markers:
1608 1609 return None
1609 1610
1610 1611 remoteversions = obsmarkersversion(bundler.capabilities)
1611 1612 version = obsolete.commonversion(remoteversions)
1612 1613 if version is None:
1613 1614 raise ValueError('bundler does not support common obsmarker format')
1614 1615 stream = obsolete.encodemarkers(markers, True, version=version)
1615 1616 return bundler.newpart('obsmarkers', data=stream)
1616 1617
1617 1618 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1618 1619 compopts=None):
1619 1620 """Write a bundle file and return its filename.
1620 1621
1621 1622 Existing files will not be overwritten.
1622 1623 If no filename is specified, a temporary file is created.
1623 1624 bz2 compression can be turned off.
1624 1625 The bundle file will be deleted in case of errors.
1625 1626 """
1626 1627
1627 1628 if bundletype == "HG20":
1628 1629 bundle = bundle20(ui)
1629 1630 bundle.setcompression(compression, compopts)
1630 1631 part = bundle.newpart('changegroup', data=cg.getchunks())
1631 1632 part.addparam('version', cg.version)
1632 1633 if 'clcount' in cg.extras:
1633 1634 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1634 1635 mandatory=False)
1635 1636 chunkiter = bundle.getchunks()
1636 1637 else:
1637 1638 # compression argument is only for the bundle2 case
1638 1639 assert compression is None
1639 1640 if cg.version != '01':
1640 1641 raise error.Abort(_('old bundle types only supports v1 '
1641 1642 'changegroups'))
1642 1643 header, comp = bundletypes[bundletype]
1643 1644 if comp not in util.compengines.supportedbundletypes:
1644 1645 raise error.Abort(_('unknown stream compression type: %s')
1645 1646 % comp)
1646 1647 compengine = util.compengines.forbundletype(comp)
1647 1648 def chunkiter():
1648 1649 yield header
1649 1650 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1650 1651 yield chunk
1651 1652 chunkiter = chunkiter()
1652 1653
1653 1654 # parse the changegroup data, otherwise we will block
1654 1655 # in case of sshrepo because we don't know the end of the stream
1655 1656 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1656 1657
1657 1658 def combinechangegroupresults(op):
1658 1659 """logic to combine 0 or more addchangegroup results into one"""
1659 1660 results = [r.get('return', 0)
1660 1661 for r in op.records['changegroup']]
1661 1662 changedheads = 0
1662 1663 result = 1
1663 1664 for ret in results:
1664 1665 # If any changegroup result is 0, return 0
1665 1666 if ret == 0:
1666 1667 result = 0
1667 1668 break
1668 1669 if ret < -1:
1669 1670 changedheads += ret + 1
1670 1671 elif ret > 1:
1671 1672 changedheads += ret - 1
1672 1673 if changedheads > 0:
1673 1674 result = 1 + changedheads
1674 1675 elif changedheads < 0:
1675 1676 result = -1 + changedheads
1676 1677 return result
1677 1678
1678 1679 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1679 1680 'targetphase'))
1680 1681 def handlechangegroup(op, inpart):
1681 1682 """apply a changegroup part on the repo
1682 1683
1683 1684 This is a very early implementation that will massive rework before being
1684 1685 inflicted to any end-user.
1685 1686 """
1686 1687 tr = op.gettransaction()
1687 1688 unpackerversion = inpart.params.get('version', '01')
1688 1689 # We should raise an appropriate exception here
1689 1690 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1690 1691 # the source and url passed here are overwritten by the one contained in
1691 1692 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1692 1693 nbchangesets = None
1693 1694 if 'nbchanges' in inpart.params:
1694 1695 nbchangesets = int(inpart.params.get('nbchanges'))
1695 1696 if ('treemanifest' in inpart.params and
1696 1697 'treemanifest' not in op.repo.requirements):
1697 1698 if len(op.repo.changelog) != 0:
1698 1699 raise error.Abort(_(
1699 1700 "bundle contains tree manifests, but local repo is "
1700 1701 "non-empty and does not use tree manifests"))
1701 1702 op.repo.requirements.add('treemanifest')
1702 1703 op.repo._applyopenerreqs()
1703 1704 op.repo._writerequirements()
1704 1705 extrakwargs = {}
1705 1706 targetphase = inpart.params.get('targetphase')
1706 1707 if targetphase is not None:
1707 1708 extrakwargs['targetphase'] = int(targetphase)
1708 1709 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1709 1710 expectedtotal=nbchangesets, **extrakwargs)
1710 1711 if op.reply is not None:
1711 1712 # This is definitely not the final form of this
1712 1713 # return. But one need to start somewhere.
1713 1714 part = op.reply.newpart('reply:changegroup', mandatory=False)
1714 1715 part.addparam(
1715 1716 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1716 1717 part.addparam('return', '%i' % ret, mandatory=False)
1717 1718 assert not inpart.read()
1718 1719
1719 1720 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1720 1721 ['digest:%s' % k for k in util.DIGESTS.keys()])
1721 1722 @parthandler('remote-changegroup', _remotechangegroupparams)
1722 1723 def handleremotechangegroup(op, inpart):
1723 1724 """apply a bundle10 on the repo, given an url and validation information
1724 1725
1725 1726 All the information about the remote bundle to import are given as
1726 1727 parameters. The parameters include:
1727 1728 - url: the url to the bundle10.
1728 1729 - size: the bundle10 file size. It is used to validate what was
1729 1730 retrieved by the client matches the server knowledge about the bundle.
1730 1731 - digests: a space separated list of the digest types provided as
1731 1732 parameters.
1732 1733 - digest:<digest-type>: the hexadecimal representation of the digest with
1733 1734 that name. Like the size, it is used to validate what was retrieved by
1734 1735 the client matches what the server knows about the bundle.
1735 1736
1736 1737 When multiple digest types are given, all of them are checked.
1737 1738 """
1738 1739 try:
1739 1740 raw_url = inpart.params['url']
1740 1741 except KeyError:
1741 1742 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1742 1743 parsed_url = util.url(raw_url)
1743 1744 if parsed_url.scheme not in capabilities['remote-changegroup']:
1744 1745 raise error.Abort(_('remote-changegroup does not support %s urls') %
1745 1746 parsed_url.scheme)
1746 1747
1747 1748 try:
1748 1749 size = int(inpart.params['size'])
1749 1750 except ValueError:
1750 1751 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1751 1752 % 'size')
1752 1753 except KeyError:
1753 1754 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1754 1755
1755 1756 digests = {}
1756 1757 for typ in inpart.params.get('digests', '').split():
1757 1758 param = 'digest:%s' % typ
1758 1759 try:
1759 1760 value = inpart.params[param]
1760 1761 except KeyError:
1761 1762 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1762 1763 param)
1763 1764 digests[typ] = value
1764 1765
1765 1766 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1766 1767
1767 1768 tr = op.gettransaction()
1768 1769 from . import exchange
1769 1770 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1770 1771 if not isinstance(cg, changegroup.cg1unpacker):
1771 1772 raise error.Abort(_('%s: not a bundle version 1.0') %
1772 1773 util.hidepassword(raw_url))
1773 1774 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1774 1775 if op.reply is not None:
1775 1776 # This is definitely not the final form of this
1776 1777 # return. But one need to start somewhere.
1777 1778 part = op.reply.newpart('reply:changegroup')
1778 1779 part.addparam(
1779 1780 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1780 1781 part.addparam('return', '%i' % ret, mandatory=False)
1781 1782 try:
1782 1783 real_part.validate()
1783 1784 except error.Abort as e:
1784 1785 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1785 1786 (util.hidepassword(raw_url), str(e)))
1786 1787 assert not inpart.read()
1787 1788
1788 1789 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1789 1790 def handlereplychangegroup(op, inpart):
1790 1791 ret = int(inpart.params['return'])
1791 1792 replyto = int(inpart.params['in-reply-to'])
1792 1793 op.records.add('changegroup', {'return': ret}, replyto)
1793 1794
1794 1795 @parthandler('check:bookmarks')
1795 1796 def handlecheckbookmarks(op, inpart):
1796 1797 """check location of bookmarks
1797 1798
1798 1799 This part is to be used to detect push race regarding bookmark, it
1799 1800 contains binary encoded (bookmark, node) tuple. If the local state does
1800 1801 not marks the one in the part, a PushRaced exception is raised
1801 1802 """
1802 1803 bookdata = bookmarks.binarydecode(inpart)
1803 1804
1804 1805 msgstandard = ('repository changed while pushing - please try again '
1805 1806 '(bookmark "%s" move from %s to %s)')
1806 1807 msgmissing = ('repository changed while pushing - please try again '
1807 1808 '(bookmark "%s" is missing, expected %s)')
1808 1809 msgexist = ('repository changed while pushing - please try again '
1809 1810 '(bookmark "%s" set on %s, expected missing)')
1810 1811 for book, node in bookdata:
1811 1812 currentnode = op.repo._bookmarks.get(book)
1812 1813 if currentnode != node:
1813 1814 if node is None:
1814 1815 finalmsg = msgexist % (book, nodemod.short(currentnode))
1815 1816 elif currentnode is None:
1816 1817 finalmsg = msgmissing % (book, nodemod.short(node))
1817 1818 else:
1818 1819 finalmsg = msgstandard % (book, nodemod.short(node),
1819 1820 nodemod.short(currentnode))
1820 1821 raise error.PushRaced(finalmsg)
1821 1822
1822 1823 @parthandler('check:heads')
1823 1824 def handlecheckheads(op, inpart):
1824 1825 """check that head of the repo did not change
1825 1826
1826 1827 This is used to detect a push race when using unbundle.
1827 1828 This replaces the "heads" argument of unbundle."""
1828 1829 h = inpart.read(20)
1829 1830 heads = []
1830 1831 while len(h) == 20:
1831 1832 heads.append(h)
1832 1833 h = inpart.read(20)
1833 1834 assert not h
1834 1835 # Trigger a transaction so that we are guaranteed to have the lock now.
1835 1836 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1836 1837 op.gettransaction()
1837 1838 if sorted(heads) != sorted(op.repo.heads()):
1838 1839 raise error.PushRaced('repository changed while pushing - '
1839 1840 'please try again')
1840 1841
1841 1842 @parthandler('check:updated-heads')
1842 1843 def handlecheckupdatedheads(op, inpart):
1843 1844 """check for race on the heads touched by a push
1844 1845
1845 1846 This is similar to 'check:heads' but focus on the heads actually updated
1846 1847 during the push. If other activities happen on unrelated heads, it is
1847 1848 ignored.
1848 1849
1849 1850 This allow server with high traffic to avoid push contention as long as
1850 1851 unrelated parts of the graph are involved."""
1851 1852 h = inpart.read(20)
1852 1853 heads = []
1853 1854 while len(h) == 20:
1854 1855 heads.append(h)
1855 1856 h = inpart.read(20)
1856 1857 assert not h
1857 1858 # trigger a transaction so that we are guaranteed to have the lock now.
1858 1859 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1859 1860 op.gettransaction()
1860 1861
1861 1862 currentheads = set()
1862 1863 for ls in op.repo.branchmap().itervalues():
1863 1864 currentheads.update(ls)
1864 1865
1865 1866 for h in heads:
1866 1867 if h not in currentheads:
1867 1868 raise error.PushRaced('repository changed while pushing - '
1868 1869 'please try again')
1869 1870
1870 1871 @parthandler('check:phases')
1871 1872 def handlecheckphases(op, inpart):
1872 1873 """check that phase boundaries of the repository did not change
1873 1874
1874 1875 This is used to detect a push race.
1875 1876 """
1876 1877 phasetonodes = phases.binarydecode(inpart)
1877 1878 unfi = op.repo.unfiltered()
1878 1879 cl = unfi.changelog
1879 1880 phasecache = unfi._phasecache
1880 1881 msg = ('repository changed while pushing - please try again '
1881 1882 '(%s is %s expected %s)')
1882 1883 for expectedphase, nodes in enumerate(phasetonodes):
1883 1884 for n in nodes:
1884 1885 actualphase = phasecache.phase(unfi, cl.rev(n))
1885 1886 if actualphase != expectedphase:
1886 1887 finalmsg = msg % (nodemod.short(n),
1887 1888 phases.phasenames[actualphase],
1888 1889 phases.phasenames[expectedphase])
1889 1890 raise error.PushRaced(finalmsg)
1890 1891
1891 1892 @parthandler('output')
1892 1893 def handleoutput(op, inpart):
1893 1894 """forward output captured on the server to the client"""
1894 1895 for line in inpart.read().splitlines():
1895 1896 op.ui.status(_('remote: %s\n') % line)
1896 1897
1897 1898 @parthandler('replycaps')
1898 1899 def handlereplycaps(op, inpart):
1899 1900 """Notify that a reply bundle should be created
1900 1901
1901 1902 The payload contains the capabilities information for the reply"""
1902 1903 caps = decodecaps(inpart.read())
1903 1904 if op.reply is None:
1904 1905 op.reply = bundle20(op.ui, caps)
1905 1906
1906 1907 class AbortFromPart(error.Abort):
1907 1908 """Sub-class of Abort that denotes an error from a bundle2 part."""
1908 1909
1909 1910 @parthandler('error:abort', ('message', 'hint'))
1910 1911 def handleerrorabort(op, inpart):
1911 1912 """Used to transmit abort error over the wire"""
1912 1913 raise AbortFromPart(inpart.params['message'],
1913 1914 hint=inpart.params.get('hint'))
1914 1915
1915 1916 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1916 1917 'in-reply-to'))
1917 1918 def handleerrorpushkey(op, inpart):
1918 1919 """Used to transmit failure of a mandatory pushkey over the wire"""
1919 1920 kwargs = {}
1920 1921 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1921 1922 value = inpart.params.get(name)
1922 1923 if value is not None:
1923 1924 kwargs[name] = value
1924 1925 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1925 1926
1926 1927 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1927 1928 def handleerrorunsupportedcontent(op, inpart):
1928 1929 """Used to transmit unknown content error over the wire"""
1929 1930 kwargs = {}
1930 1931 parttype = inpart.params.get('parttype')
1931 1932 if parttype is not None:
1932 1933 kwargs['parttype'] = parttype
1933 1934 params = inpart.params.get('params')
1934 1935 if params is not None:
1935 1936 kwargs['params'] = params.split('\0')
1936 1937
1937 1938 raise error.BundleUnknownFeatureError(**kwargs)
1938 1939
1939 1940 @parthandler('error:pushraced', ('message',))
1940 1941 def handleerrorpushraced(op, inpart):
1941 1942 """Used to transmit push race error over the wire"""
1942 1943 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1943 1944
1944 1945 @parthandler('listkeys', ('namespace',))
1945 1946 def handlelistkeys(op, inpart):
1946 1947 """retrieve pushkey namespace content stored in a bundle2"""
1947 1948 namespace = inpart.params['namespace']
1948 1949 r = pushkey.decodekeys(inpart.read())
1949 1950 op.records.add('listkeys', (namespace, r))
1950 1951
1951 1952 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1952 1953 def handlepushkey(op, inpart):
1953 1954 """process a pushkey request"""
1954 1955 dec = pushkey.decode
1955 1956 namespace = dec(inpart.params['namespace'])
1956 1957 key = dec(inpart.params['key'])
1957 1958 old = dec(inpart.params['old'])
1958 1959 new = dec(inpart.params['new'])
1959 1960 # Grab the transaction to ensure that we have the lock before performing the
1960 1961 # pushkey.
1961 1962 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1962 1963 op.gettransaction()
1963 1964 ret = op.repo.pushkey(namespace, key, old, new)
1964 1965 record = {'namespace': namespace,
1965 1966 'key': key,
1966 1967 'old': old,
1967 1968 'new': new}
1968 1969 op.records.add('pushkey', record)
1969 1970 if op.reply is not None:
1970 1971 rpart = op.reply.newpart('reply:pushkey')
1971 1972 rpart.addparam(
1972 1973 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1973 1974 rpart.addparam('return', '%i' % ret, mandatory=False)
1974 1975 if inpart.mandatory and not ret:
1975 1976 kwargs = {}
1976 1977 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1977 1978 if key in inpart.params:
1978 1979 kwargs[key] = inpart.params[key]
1979 1980 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1980 1981
1981 1982 @parthandler('bookmarks')
1982 1983 def handlebookmark(op, inpart):
1983 1984 """transmit bookmark information
1984 1985
1985 1986 The part contains binary encoded bookmark information.
1986 1987
1987 1988 The exact behavior of this part can be controlled by the 'bookmarks' mode
1988 1989 on the bundle operation.
1989 1990
1990 1991 When mode is 'apply' (the default) the bookmark information is applied as
1991 1992 is to the unbundling repository. Make sure a 'check:bookmarks' part is
1992 1993 issued earlier to check for push races in such update. This behavior is
1993 1994 suitable for pushing.
1994 1995
1995 1996 When mode is 'records', the information is recorded into the 'bookmarks'
1996 1997 records of the bundle operation. This behavior is suitable for pulling.
1997 1998 """
1998 1999 changes = bookmarks.binarydecode(inpart)
1999 2000
2000 2001 pushkeycompat = op.repo.ui.configbool('server', 'bookmarks-pushkey-compat')
2001 2002 bookmarksmode = op.modes.get('bookmarks', 'apply')
2002 2003
2003 2004 if bookmarksmode == 'apply':
2004 2005 tr = op.gettransaction()
2005 2006 bookstore = op.repo._bookmarks
2006 2007 if pushkeycompat:
2007 2008 allhooks = []
2008 2009 for book, node in changes:
2009 2010 hookargs = tr.hookargs.copy()
2010 2011 hookargs['pushkeycompat'] = '1'
2011 2012 hookargs['namespace'] = 'bookmark'
2012 2013 hookargs['key'] = book
2013 2014 hookargs['old'] = nodemod.hex(bookstore.get(book, ''))
2014 2015 hookargs['new'] = nodemod.hex(node if node is not None else '')
2015 2016 allhooks.append(hookargs)
2016 2017
2017 2018 for hookargs in allhooks:
2018 2019 op.repo.hook('prepushkey', throw=True, **hookargs)
2019 2020
2020 2021 bookstore.applychanges(op.repo, op.gettransaction(), changes)
2021 2022
2022 2023 if pushkeycompat:
2023 2024 def runhook():
2024 2025 for hookargs in allhooks:
2025 2026 op.repo.hook('pushkey', **hookargs)
2026 2027 op.repo._afterlock(runhook)
2027 2028
2028 2029 elif bookmarksmode == 'records':
2029 2030 for book, node in changes:
2030 2031 record = {'bookmark': book, 'node': node}
2031 2032 op.records.add('bookmarks', record)
2032 2033 else:
2033 2034 raise error.ProgrammingError('unkown bookmark mode: %s' % bookmarksmode)
2034 2035
2035 2036 @parthandler('phase-heads')
2036 2037 def handlephases(op, inpart):
2037 2038 """apply phases from bundle part to repo"""
2038 2039 headsbyphase = phases.binarydecode(inpart)
2039 2040 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
2040 2041
2041 2042 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
2042 2043 def handlepushkeyreply(op, inpart):
2043 2044 """retrieve the result of a pushkey request"""
2044 2045 ret = int(inpart.params['return'])
2045 2046 partid = int(inpart.params['in-reply-to'])
2046 2047 op.records.add('pushkey', {'return': ret}, partid)
2047 2048
2048 2049 @parthandler('obsmarkers')
2049 2050 def handleobsmarker(op, inpart):
2050 2051 """add a stream of obsmarkers to the repo"""
2051 2052 tr = op.gettransaction()
2052 2053 markerdata = inpart.read()
2053 2054 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
2054 2055 op.ui.write(('obsmarker-exchange: %i bytes received\n')
2055 2056 % len(markerdata))
2056 2057 # The mergemarkers call will crash if marker creation is not enabled.
2057 2058 # we want to avoid this if the part is advisory.
2058 2059 if not inpart.mandatory and op.repo.obsstore.readonly:
2059 2060 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled\n')
2060 2061 return
2061 2062 new = op.repo.obsstore.mergemarkers(tr, markerdata)
2062 2063 op.repo.invalidatevolatilesets()
2063 2064 if new:
2064 2065 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
2065 2066 op.records.add('obsmarkers', {'new': new})
2066 2067 if op.reply is not None:
2067 2068 rpart = op.reply.newpart('reply:obsmarkers')
2068 2069 rpart.addparam(
2069 2070 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
2070 2071 rpart.addparam('new', '%i' % new, mandatory=False)
2071 2072
2072 2073
2073 2074 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
2074 2075 def handleobsmarkerreply(op, inpart):
2075 2076 """retrieve the result of a pushkey request"""
2076 2077 ret = int(inpart.params['new'])
2077 2078 partid = int(inpart.params['in-reply-to'])
2078 2079 op.records.add('obsmarkers', {'new': ret}, partid)
2079 2080
2080 2081 @parthandler('hgtagsfnodes')
2081 2082 def handlehgtagsfnodes(op, inpart):
2082 2083 """Applies .hgtags fnodes cache entries to the local repo.
2083 2084
2084 2085 Payload is pairs of 20 byte changeset nodes and filenodes.
2085 2086 """
2086 2087 # Grab the transaction so we ensure that we have the lock at this point.
2087 2088 if op.ui.configbool('experimental', 'bundle2lazylocking'):
2088 2089 op.gettransaction()
2089 2090 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
2090 2091
2091 2092 count = 0
2092 2093 while True:
2093 2094 node = inpart.read(20)
2094 2095 fnode = inpart.read(20)
2095 2096 if len(node) < 20 or len(fnode) < 20:
2096 2097 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
2097 2098 break
2098 2099 cache.setfnode(node, fnode)
2099 2100 count += 1
2100 2101
2101 2102 cache.write()
2102 2103 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
2103 2104
2104 2105 @parthandler('pushvars')
2105 2106 def bundle2getvars(op, part):
2106 2107 '''unbundle a bundle2 containing shellvars on the server'''
2107 2108 # An option to disable unbundling on server-side for security reasons
2108 2109 if op.ui.configbool('push', 'pushvars.server'):
2109 2110 hookargs = {}
2110 2111 for key, value in part.advisoryparams:
2111 2112 key = key.upper()
2112 2113 # We want pushed variables to have USERVAR_ prepended so we know
2113 2114 # they came from the --pushvar flag.
2114 2115 key = "USERVAR_" + key
2115 2116 hookargs[key] = value
2116 2117 op.addhookargs(hookargs)
2118
2119 @parthandler('stream', ('requirements', 'filecount', 'bytecount', 'version'))
2120 def handlestreambundle(op, part):
2121
2122 version = part.params['version']
2123 if version != 'v2':
2124 raise error.Abort(_('unknown stream bundle version %s') % version)
2125 requirements = part.params['requirements'].split()
2126 filecount = int(part.params['filecount'])
2127 bytecount = int(part.params['bytecount'])
2128
2129 repo = op.repo
2130 if len(repo):
2131 msg = _('cannot apply stream clone to non empty repository')
2132 raise error.Abort(msg)
2133
2134 repo.ui.debug('applying stream bundle\n')
2135 streamclone.applybundlev2(repo, part, filecount, bytecount,
2136 requirements)
2137
2138 # new requirements = old non-format requirements +
2139 # new format-related remote requirements
2140 # requirements from the streamed-in repository
2141 repo.requirements = set(requirements) | (
2142 repo.requirements - repo.supportedformats)
2143 repo._applyopenerreqs()
2144 repo._writerequirements()
General Comments 0
You need to be logged in to leave comments. Login now