##// END OF EJS Templates
pull: remove inadequate use of operations records to update stepdone...
Boris Feld -
r34324:6c7aaf59 default
parent child Browse files
Show More
@@ -1,1924 +1,1923 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import, division
149 149
150 150 import errno
151 151 import re
152 152 import string
153 153 import struct
154 154 import sys
155 155
156 156 from .i18n import _
157 157 from . import (
158 158 changegroup,
159 159 error,
160 160 obsolete,
161 161 phases,
162 162 pushkey,
163 163 pycompat,
164 164 tags,
165 165 url,
166 166 util,
167 167 )
168 168
169 169 urlerr = util.urlerr
170 170 urlreq = util.urlreq
171 171
172 172 _pack = struct.pack
173 173 _unpack = struct.unpack
174 174
175 175 _fstreamparamsize = '>i'
176 176 _fpartheadersize = '>i'
177 177 _fparttypesize = '>B'
178 178 _fpartid = '>I'
179 179 _fpayloadsize = '>i'
180 180 _fpartparamcount = '>BB'
181 181
182 182 preferedchunksize = 4096
183 183
184 184 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
185 185
186 186 def outdebug(ui, message):
187 187 """debug regarding output stream (bundling)"""
188 188 if ui.configbool('devel', 'bundle2.debug'):
189 189 ui.debug('bundle2-output: %s\n' % message)
190 190
191 191 def indebug(ui, message):
192 192 """debug on input stream (unbundling)"""
193 193 if ui.configbool('devel', 'bundle2.debug'):
194 194 ui.debug('bundle2-input: %s\n' % message)
195 195
196 196 def validateparttype(parttype):
197 197 """raise ValueError if a parttype contains invalid character"""
198 198 if _parttypeforbidden.search(parttype):
199 199 raise ValueError(parttype)
200 200
201 201 def _makefpartparamsizes(nbparams):
202 202 """return a struct format to read part parameter sizes
203 203
204 204 The number parameters is variable so we need to build that format
205 205 dynamically.
206 206 """
207 207 return '>'+('BB'*nbparams)
208 208
209 209 parthandlermapping = {}
210 210
211 211 def parthandler(parttype, params=()):
212 212 """decorator that register a function as a bundle2 part handler
213 213
214 214 eg::
215 215
216 216 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
217 217 def myparttypehandler(...):
218 218 '''process a part of type "my part".'''
219 219 ...
220 220 """
221 221 validateparttype(parttype)
222 222 def _decorator(func):
223 223 lparttype = parttype.lower() # enforce lower case matching.
224 224 assert lparttype not in parthandlermapping
225 225 parthandlermapping[lparttype] = func
226 226 func.params = frozenset(params)
227 227 return func
228 228 return _decorator
229 229
230 230 class unbundlerecords(object):
231 231 """keep record of what happens during and unbundle
232 232
233 233 New records are added using `records.add('cat', obj)`. Where 'cat' is a
234 234 category of record and obj is an arbitrary object.
235 235
236 236 `records['cat']` will return all entries of this category 'cat'.
237 237
238 238 Iterating on the object itself will yield `('category', obj)` tuples
239 239 for all entries.
240 240
241 241 All iterations happens in chronological order.
242 242 """
243 243
244 244 def __init__(self):
245 245 self._categories = {}
246 246 self._sequences = []
247 247 self._replies = {}
248 248
249 249 def add(self, category, entry, inreplyto=None):
250 250 """add a new record of a given category.
251 251
252 252 The entry can then be retrieved in the list returned by
253 253 self['category']."""
254 254 self._categories.setdefault(category, []).append(entry)
255 255 self._sequences.append((category, entry))
256 256 if inreplyto is not None:
257 257 self.getreplies(inreplyto).add(category, entry)
258 258
259 259 def getreplies(self, partid):
260 260 """get the records that are replies to a specific part"""
261 261 return self._replies.setdefault(partid, unbundlerecords())
262 262
263 263 def __getitem__(self, cat):
264 264 return tuple(self._categories.get(cat, ()))
265 265
266 266 def __iter__(self):
267 267 return iter(self._sequences)
268 268
269 269 def __len__(self):
270 270 return len(self._sequences)
271 271
272 272 def __nonzero__(self):
273 273 return bool(self._sequences)
274 274
275 275 __bool__ = __nonzero__
276 276
277 277 class bundleoperation(object):
278 278 """an object that represents a single bundling process
279 279
280 280 Its purpose is to carry unbundle-related objects and states.
281 281
282 282 A new object should be created at the beginning of each bundle processing.
283 283 The object is to be returned by the processing function.
284 284
285 285 The object has very little content now it will ultimately contain:
286 286 * an access to the repo the bundle is applied to,
287 287 * a ui object,
288 288 * a way to retrieve a transaction to add changes to the repo,
289 289 * a way to record the result of processing each part,
290 290 * a way to construct a bundle response when applicable.
291 291 """
292 292
293 293 def __init__(self, repo, transactiongetter, captureoutput=True):
294 294 self.repo = repo
295 295 self.ui = repo.ui
296 296 self.records = unbundlerecords()
297 297 self.reply = None
298 298 self.captureoutput = captureoutput
299 299 self.hookargs = {}
300 300 self._gettransaction = transactiongetter
301 301
302 302 def gettransaction(self):
303 303 transaction = self._gettransaction()
304 304
305 305 if self.hookargs:
306 306 # the ones added to the transaction supercede those added
307 307 # to the operation.
308 308 self.hookargs.update(transaction.hookargs)
309 309 transaction.hookargs = self.hookargs
310 310
311 311 # mark the hookargs as flushed. further attempts to add to
312 312 # hookargs will result in an abort.
313 313 self.hookargs = None
314 314
315 315 return transaction
316 316
317 317 def addhookargs(self, hookargs):
318 318 if self.hookargs is None:
319 319 raise error.ProgrammingError('attempted to add hookargs to '
320 320 'operation after transaction started')
321 321 self.hookargs.update(hookargs)
322 322
323 323 class TransactionUnavailable(RuntimeError):
324 324 pass
325 325
326 326 def _notransaction():
327 327 """default method to get a transaction while processing a bundle
328 328
329 329 Raise an exception to highlight the fact that no transaction was expected
330 330 to be created"""
331 331 raise TransactionUnavailable()
332 332
333 333 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
334 334 # transform me into unbundler.apply() as soon as the freeze is lifted
335 335 if isinstance(unbundler, unbundle20):
336 336 tr.hookargs['bundle2'] = '1'
337 337 if source is not None and 'source' not in tr.hookargs:
338 338 tr.hookargs['source'] = source
339 339 if url is not None and 'url' not in tr.hookargs:
340 340 tr.hookargs['url'] = url
341 341 return processbundle(repo, unbundler, lambda: tr)
342 342 else:
343 343 # the transactiongetter won't be used, but we might as well set it
344 344 op = bundleoperation(repo, lambda: tr)
345 345 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
346 346 return op
347 347
348 348 class partiterator(object):
349 349 def __init__(self, repo, op, unbundler):
350 350 self.repo = repo
351 351 self.op = op
352 352 self.unbundler = unbundler
353 353 self.iterator = None
354 354 self.count = 0
355 355 self.current = None
356 356
357 357 def __enter__(self):
358 358 def func():
359 359 itr = enumerate(self.unbundler.iterparts())
360 360 for count, p in itr:
361 361 self.count = count
362 362 self.current = p
363 363 yield p
364 364 p.seek(0, 2)
365 365 self.current = None
366 366 self.iterator = func()
367 367 return self.iterator
368 368
369 369 def __exit__(self, type, exc, tb):
370 370 if not self.iterator:
371 371 return
372 372
373 373 if exc:
374 374 # If exiting or interrupted, do not attempt to seek the stream in
375 375 # the finally block below. This makes abort faster.
376 376 if (self.current and
377 377 not isinstance(exc, (SystemExit, KeyboardInterrupt))):
378 378 # consume the part content to not corrupt the stream.
379 379 self.current.seek(0, 2)
380 380
381 381 # Any exceptions seeking to the end of the bundle at this point are
382 382 # almost certainly related to the underlying stream being bad.
383 383 # And, chances are that the exception we're handling is related to
384 384 # getting in that bad state. So, we swallow the seeking error and
385 385 # re-raise the original error.
386 386 seekerror = False
387 387 try:
388 388 for part in self.iterator:
389 389 # consume the bundle content
390 390 part.seek(0, 2)
391 391 except Exception:
392 392 seekerror = True
393 393
394 394 # Small hack to let caller code distinguish exceptions from bundle2
395 395 # processing from processing the old format. This is mostly needed
396 396 # to handle different return codes to unbundle according to the type
397 397 # of bundle. We should probably clean up or drop this return code
398 398 # craziness in a future version.
399 399 exc.duringunbundle2 = True
400 400 salvaged = []
401 401 replycaps = None
402 402 if self.op.reply is not None:
403 403 salvaged = self.op.reply.salvageoutput()
404 404 replycaps = self.op.reply.capabilities
405 405 exc._replycaps = replycaps
406 406 exc._bundle2salvagedoutput = salvaged
407 407
408 408 # Re-raising from a variable loses the original stack. So only use
409 409 # that form if we need to.
410 410 if seekerror:
411 411 raise exc
412 412
413 413 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
414 414 self.count)
415 415
416 416 def processbundle(repo, unbundler, transactiongetter=None, op=None):
417 417 """This function process a bundle, apply effect to/from a repo
418 418
419 419 It iterates over each part then searches for and uses the proper handling
420 420 code to process the part. Parts are processed in order.
421 421
422 422 Unknown Mandatory part will abort the process.
423 423
424 424 It is temporarily possible to provide a prebuilt bundleoperation to the
425 425 function. This is used to ensure output is properly propagated in case of
426 426 an error during the unbundling. This output capturing part will likely be
427 427 reworked and this ability will probably go away in the process.
428 428 """
429 429 if op is None:
430 430 if transactiongetter is None:
431 431 transactiongetter = _notransaction
432 432 op = bundleoperation(repo, transactiongetter)
433 433 # todo:
434 434 # - replace this is a init function soon.
435 435 # - exception catching
436 436 unbundler.params
437 437 if repo.ui.debugflag:
438 438 msg = ['bundle2-input-bundle:']
439 439 if unbundler.params:
440 440 msg.append(' %i params' % len(unbundler.params))
441 441 if op._gettransaction is None or op._gettransaction is _notransaction:
442 442 msg.append(' no-transaction')
443 443 else:
444 444 msg.append(' with-transaction')
445 445 msg.append('\n')
446 446 repo.ui.debug(''.join(msg))
447 447
448 448 processparts(repo, op, unbundler)
449 449
450 450 return op
451 451
452 452 def processparts(repo, op, unbundler):
453 453 with partiterator(repo, op, unbundler) as parts:
454 454 for part in parts:
455 455 _processpart(op, part)
456 456
457 457 def _processchangegroup(op, cg, tr, source, url, **kwargs):
458 458 ret = cg.apply(op.repo, tr, source, url, **kwargs)
459 459 op.records.add('changegroup', {
460 460 'return': ret,
461 461 })
462 462 return ret
463 463
464 464 def _gethandler(op, part):
465 465 status = 'unknown' # used by debug output
466 466 try:
467 467 handler = parthandlermapping.get(part.type)
468 468 if handler is None:
469 469 status = 'unsupported-type'
470 470 raise error.BundleUnknownFeatureError(parttype=part.type)
471 471 indebug(op.ui, 'found a handler for part %s' % part.type)
472 472 unknownparams = part.mandatorykeys - handler.params
473 473 if unknownparams:
474 474 unknownparams = list(unknownparams)
475 475 unknownparams.sort()
476 476 status = 'unsupported-params (%s)' % ', '.join(unknownparams)
477 477 raise error.BundleUnknownFeatureError(parttype=part.type,
478 478 params=unknownparams)
479 479 status = 'supported'
480 480 except error.BundleUnknownFeatureError as exc:
481 481 if part.mandatory: # mandatory parts
482 482 raise
483 483 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
484 484 return # skip to part processing
485 485 finally:
486 486 if op.ui.debugflag:
487 487 msg = ['bundle2-input-part: "%s"' % part.type]
488 488 if not part.mandatory:
489 489 msg.append(' (advisory)')
490 490 nbmp = len(part.mandatorykeys)
491 491 nbap = len(part.params) - nbmp
492 492 if nbmp or nbap:
493 493 msg.append(' (params:')
494 494 if nbmp:
495 495 msg.append(' %i mandatory' % nbmp)
496 496 if nbap:
497 497 msg.append(' %i advisory' % nbmp)
498 498 msg.append(')')
499 499 msg.append(' %s\n' % status)
500 500 op.ui.debug(''.join(msg))
501 501
502 502 return handler
503 503
504 504 def _processpart(op, part):
505 505 """process a single part from a bundle
506 506
507 507 The part is guaranteed to have been fully consumed when the function exits
508 508 (even if an exception is raised)."""
509 509 handler = _gethandler(op, part)
510 510 if handler is None:
511 511 return
512 512
513 513 # handler is called outside the above try block so that we don't
514 514 # risk catching KeyErrors from anything other than the
515 515 # parthandlermapping lookup (any KeyError raised by handler()
516 516 # itself represents a defect of a different variety).
517 517 output = None
518 518 if op.captureoutput and op.reply is not None:
519 519 op.ui.pushbuffer(error=True, subproc=True)
520 520 output = ''
521 521 try:
522 522 handler(op, part)
523 523 finally:
524 524 if output is not None:
525 525 output = op.ui.popbuffer()
526 526 if output:
527 527 outpart = op.reply.newpart('output', data=output,
528 528 mandatory=False)
529 529 outpart.addparam(
530 530 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
531 531
532 532 def decodecaps(blob):
533 533 """decode a bundle2 caps bytes blob into a dictionary
534 534
535 535 The blob is a list of capabilities (one per line)
536 536 Capabilities may have values using a line of the form::
537 537
538 538 capability=value1,value2,value3
539 539
540 540 The values are always a list."""
541 541 caps = {}
542 542 for line in blob.splitlines():
543 543 if not line:
544 544 continue
545 545 if '=' not in line:
546 546 key, vals = line, ()
547 547 else:
548 548 key, vals = line.split('=', 1)
549 549 vals = vals.split(',')
550 550 key = urlreq.unquote(key)
551 551 vals = [urlreq.unquote(v) for v in vals]
552 552 caps[key] = vals
553 553 return caps
554 554
555 555 def encodecaps(caps):
556 556 """encode a bundle2 caps dictionary into a bytes blob"""
557 557 chunks = []
558 558 for ca in sorted(caps):
559 559 vals = caps[ca]
560 560 ca = urlreq.quote(ca)
561 561 vals = [urlreq.quote(v) for v in vals]
562 562 if vals:
563 563 ca = "%s=%s" % (ca, ','.join(vals))
564 564 chunks.append(ca)
565 565 return '\n'.join(chunks)
566 566
567 567 bundletypes = {
568 568 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
569 569 # since the unification ssh accepts a header but there
570 570 # is no capability signaling it.
571 571 "HG20": (), # special-cased below
572 572 "HG10UN": ("HG10UN", 'UN'),
573 573 "HG10BZ": ("HG10", 'BZ'),
574 574 "HG10GZ": ("HG10GZ", 'GZ'),
575 575 }
576 576
577 577 # hgweb uses this list to communicate its preferred type
578 578 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
579 579
580 580 class bundle20(object):
581 581 """represent an outgoing bundle2 container
582 582
583 583 Use the `addparam` method to add stream level parameter. and `newpart` to
584 584 populate it. Then call `getchunks` to retrieve all the binary chunks of
585 585 data that compose the bundle2 container."""
586 586
587 587 _magicstring = 'HG20'
588 588
589 589 def __init__(self, ui, capabilities=()):
590 590 self.ui = ui
591 591 self._params = []
592 592 self._parts = []
593 593 self.capabilities = dict(capabilities)
594 594 self._compengine = util.compengines.forbundletype('UN')
595 595 self._compopts = None
596 596
597 597 def setcompression(self, alg, compopts=None):
598 598 """setup core part compression to <alg>"""
599 599 if alg in (None, 'UN'):
600 600 return
601 601 assert not any(n.lower() == 'compression' for n, v in self._params)
602 602 self.addparam('Compression', alg)
603 603 self._compengine = util.compengines.forbundletype(alg)
604 604 self._compopts = compopts
605 605
606 606 @property
607 607 def nbparts(self):
608 608 """total number of parts added to the bundler"""
609 609 return len(self._parts)
610 610
611 611 # methods used to defines the bundle2 content
612 612 def addparam(self, name, value=None):
613 613 """add a stream level parameter"""
614 614 if not name:
615 615 raise ValueError(r'empty parameter name')
616 616 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
617 617 raise ValueError(r'non letter first character: %s' % name)
618 618 self._params.append((name, value))
619 619
620 620 def addpart(self, part):
621 621 """add a new part to the bundle2 container
622 622
623 623 Parts contains the actual applicative payload."""
624 624 assert part.id is None
625 625 part.id = len(self._parts) # very cheap counter
626 626 self._parts.append(part)
627 627
628 628 def newpart(self, typeid, *args, **kwargs):
629 629 """create a new part and add it to the containers
630 630
631 631 As the part is directly added to the containers. For now, this means
632 632 that any failure to properly initialize the part after calling
633 633 ``newpart`` should result in a failure of the whole bundling process.
634 634
635 635 You can still fall back to manually create and add if you need better
636 636 control."""
637 637 part = bundlepart(typeid, *args, **kwargs)
638 638 self.addpart(part)
639 639 return part
640 640
641 641 # methods used to generate the bundle2 stream
642 642 def getchunks(self):
643 643 if self.ui.debugflag:
644 644 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
645 645 if self._params:
646 646 msg.append(' (%i params)' % len(self._params))
647 647 msg.append(' %i parts total\n' % len(self._parts))
648 648 self.ui.debug(''.join(msg))
649 649 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
650 650 yield self._magicstring
651 651 param = self._paramchunk()
652 652 outdebug(self.ui, 'bundle parameter: %s' % param)
653 653 yield _pack(_fstreamparamsize, len(param))
654 654 if param:
655 655 yield param
656 656 for chunk in self._compengine.compressstream(self._getcorechunk(),
657 657 self._compopts):
658 658 yield chunk
659 659
660 660 def _paramchunk(self):
661 661 """return a encoded version of all stream parameters"""
662 662 blocks = []
663 663 for par, value in self._params:
664 664 par = urlreq.quote(par)
665 665 if value is not None:
666 666 value = urlreq.quote(value)
667 667 par = '%s=%s' % (par, value)
668 668 blocks.append(par)
669 669 return ' '.join(blocks)
670 670
671 671 def _getcorechunk(self):
672 672 """yield chunk for the core part of the bundle
673 673
674 674 (all but headers and parameters)"""
675 675 outdebug(self.ui, 'start of parts')
676 676 for part in self._parts:
677 677 outdebug(self.ui, 'bundle part: "%s"' % part.type)
678 678 for chunk in part.getchunks(ui=self.ui):
679 679 yield chunk
680 680 outdebug(self.ui, 'end of bundle')
681 681 yield _pack(_fpartheadersize, 0)
682 682
683 683
684 684 def salvageoutput(self):
685 685 """return a list with a copy of all output parts in the bundle
686 686
687 687 This is meant to be used during error handling to make sure we preserve
688 688 server output"""
689 689 salvaged = []
690 690 for part in self._parts:
691 691 if part.type.startswith('output'):
692 692 salvaged.append(part.copy())
693 693 return salvaged
694 694
695 695
696 696 class unpackermixin(object):
697 697 """A mixin to extract bytes and struct data from a stream"""
698 698
699 699 def __init__(self, fp):
700 700 self._fp = fp
701 701
702 702 def _unpack(self, format):
703 703 """unpack this struct format from the stream
704 704
705 705 This method is meant for internal usage by the bundle2 protocol only.
706 706 They directly manipulate the low level stream including bundle2 level
707 707 instruction.
708 708
709 709 Do not use it to implement higher-level logic or methods."""
710 710 data = self._readexact(struct.calcsize(format))
711 711 return _unpack(format, data)
712 712
713 713 def _readexact(self, size):
714 714 """read exactly <size> bytes from the stream
715 715
716 716 This method is meant for internal usage by the bundle2 protocol only.
717 717 They directly manipulate the low level stream including bundle2 level
718 718 instruction.
719 719
720 720 Do not use it to implement higher-level logic or methods."""
721 721 return changegroup.readexactly(self._fp, size)
722 722
723 723 def getunbundler(ui, fp, magicstring=None):
724 724 """return a valid unbundler object for a given magicstring"""
725 725 if magicstring is None:
726 726 magicstring = changegroup.readexactly(fp, 4)
727 727 magic, version = magicstring[0:2], magicstring[2:4]
728 728 if magic != 'HG':
729 729 ui.debug(
730 730 "error: invalid magic: %r (version %r), should be 'HG'\n"
731 731 % (magic, version))
732 732 raise error.Abort(_('not a Mercurial bundle'))
733 733 unbundlerclass = formatmap.get(version)
734 734 if unbundlerclass is None:
735 735 raise error.Abort(_('unknown bundle version %s') % version)
736 736 unbundler = unbundlerclass(ui, fp)
737 737 indebug(ui, 'start processing of %s stream' % magicstring)
738 738 return unbundler
739 739
740 740 class unbundle20(unpackermixin):
741 741 """interpret a bundle2 stream
742 742
743 743 This class is fed with a binary stream and yields parts through its
744 744 `iterparts` methods."""
745 745
746 746 _magicstring = 'HG20'
747 747
748 748 def __init__(self, ui, fp):
749 749 """If header is specified, we do not read it out of the stream."""
750 750 self.ui = ui
751 751 self._compengine = util.compengines.forbundletype('UN')
752 752 self._compressed = None
753 753 super(unbundle20, self).__init__(fp)
754 754
755 755 @util.propertycache
756 756 def params(self):
757 757 """dictionary of stream level parameters"""
758 758 indebug(self.ui, 'reading bundle2 stream parameters')
759 759 params = {}
760 760 paramssize = self._unpack(_fstreamparamsize)[0]
761 761 if paramssize < 0:
762 762 raise error.BundleValueError('negative bundle param size: %i'
763 763 % paramssize)
764 764 if paramssize:
765 765 params = self._readexact(paramssize)
766 766 params = self._processallparams(params)
767 767 return params
768 768
769 769 def _processallparams(self, paramsblock):
770 770 """"""
771 771 params = util.sortdict()
772 772 for p in paramsblock.split(' '):
773 773 p = p.split('=', 1)
774 774 p = [urlreq.unquote(i) for i in p]
775 775 if len(p) < 2:
776 776 p.append(None)
777 777 self._processparam(*p)
778 778 params[p[0]] = p[1]
779 779 return params
780 780
781 781
782 782 def _processparam(self, name, value):
783 783 """process a parameter, applying its effect if needed
784 784
785 785 Parameter starting with a lower case letter are advisory and will be
786 786 ignored when unknown. Those starting with an upper case letter are
787 787 mandatory and will this function will raise a KeyError when unknown.
788 788
789 789 Note: no option are currently supported. Any input will be either
790 790 ignored or failing.
791 791 """
792 792 if not name:
793 793 raise ValueError(r'empty parameter name')
794 794 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
795 795 raise ValueError(r'non letter first character: %s' % name)
796 796 try:
797 797 handler = b2streamparamsmap[name.lower()]
798 798 except KeyError:
799 799 if name[0:1].islower():
800 800 indebug(self.ui, "ignoring unknown parameter %s" % name)
801 801 else:
802 802 raise error.BundleUnknownFeatureError(params=(name,))
803 803 else:
804 804 handler(self, name, value)
805 805
806 806 def _forwardchunks(self):
807 807 """utility to transfer a bundle2 as binary
808 808
809 809 This is made necessary by the fact the 'getbundle' command over 'ssh'
810 810 have no way to know then the reply end, relying on the bundle to be
811 811 interpreted to know its end. This is terrible and we are sorry, but we
812 812 needed to move forward to get general delta enabled.
813 813 """
814 814 yield self._magicstring
815 815 assert 'params' not in vars(self)
816 816 paramssize = self._unpack(_fstreamparamsize)[0]
817 817 if paramssize < 0:
818 818 raise error.BundleValueError('negative bundle param size: %i'
819 819 % paramssize)
820 820 yield _pack(_fstreamparamsize, paramssize)
821 821 if paramssize:
822 822 params = self._readexact(paramssize)
823 823 self._processallparams(params)
824 824 yield params
825 825 assert self._compengine.bundletype == 'UN'
826 826 # From there, payload might need to be decompressed
827 827 self._fp = self._compengine.decompressorreader(self._fp)
828 828 emptycount = 0
829 829 while emptycount < 2:
830 830 # so we can brainlessly loop
831 831 assert _fpartheadersize == _fpayloadsize
832 832 size = self._unpack(_fpartheadersize)[0]
833 833 yield _pack(_fpartheadersize, size)
834 834 if size:
835 835 emptycount = 0
836 836 else:
837 837 emptycount += 1
838 838 continue
839 839 if size == flaginterrupt:
840 840 continue
841 841 elif size < 0:
842 842 raise error.BundleValueError('negative chunk size: %i')
843 843 yield self._readexact(size)
844 844
845 845
846 846 def iterparts(self):
847 847 """yield all parts contained in the stream"""
848 848 # make sure param have been loaded
849 849 self.params
850 850 # From there, payload need to be decompressed
851 851 self._fp = self._compengine.decompressorreader(self._fp)
852 852 indebug(self.ui, 'start extraction of bundle2 parts')
853 853 headerblock = self._readpartheader()
854 854 while headerblock is not None:
855 855 part = unbundlepart(self.ui, headerblock, self._fp)
856 856 yield part
857 857 # Seek to the end of the part to force it's consumption so the next
858 858 # part can be read. But then seek back to the beginning so the
859 859 # code consuming this generator has a part that starts at 0.
860 860 part.seek(0, 2)
861 861 part.seek(0)
862 862 headerblock = self._readpartheader()
863 863 indebug(self.ui, 'end of bundle2 stream')
864 864
865 865 def _readpartheader(self):
866 866 """reads a part header size and return the bytes blob
867 867
868 868 returns None if empty"""
869 869 headersize = self._unpack(_fpartheadersize)[0]
870 870 if headersize < 0:
871 871 raise error.BundleValueError('negative part header size: %i'
872 872 % headersize)
873 873 indebug(self.ui, 'part header size: %i' % headersize)
874 874 if headersize:
875 875 return self._readexact(headersize)
876 876 return None
877 877
878 878 def compressed(self):
879 879 self.params # load params
880 880 return self._compressed
881 881
882 882 def close(self):
883 883 """close underlying file"""
884 884 if util.safehasattr(self._fp, 'close'):
885 885 return self._fp.close()
886 886
887 887 formatmap = {'20': unbundle20}
888 888
889 889 b2streamparamsmap = {}
890 890
891 891 def b2streamparamhandler(name):
892 892 """register a handler for a stream level parameter"""
893 893 def decorator(func):
894 894 assert name not in formatmap
895 895 b2streamparamsmap[name] = func
896 896 return func
897 897 return decorator
898 898
899 899 @b2streamparamhandler('compression')
900 900 def processcompression(unbundler, param, value):
901 901 """read compression parameter and install payload decompression"""
902 902 if value not in util.compengines.supportedbundletypes:
903 903 raise error.BundleUnknownFeatureError(params=(param,),
904 904 values=(value,))
905 905 unbundler._compengine = util.compengines.forbundletype(value)
906 906 if value is not None:
907 907 unbundler._compressed = True
908 908
909 909 class bundlepart(object):
910 910 """A bundle2 part contains application level payload
911 911
912 912 The part `type` is used to route the part to the application level
913 913 handler.
914 914
915 915 The part payload is contained in ``part.data``. It could be raw bytes or a
916 916 generator of byte chunks.
917 917
918 918 You can add parameters to the part using the ``addparam`` method.
919 919 Parameters can be either mandatory (default) or advisory. Remote side
920 920 should be able to safely ignore the advisory ones.
921 921
922 922 Both data and parameters cannot be modified after the generation has begun.
923 923 """
924 924
925 925 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
926 926 data='', mandatory=True):
927 927 validateparttype(parttype)
928 928 self.id = None
929 929 self.type = parttype
930 930 self._data = data
931 931 self._mandatoryparams = list(mandatoryparams)
932 932 self._advisoryparams = list(advisoryparams)
933 933 # checking for duplicated entries
934 934 self._seenparams = set()
935 935 for pname, __ in self._mandatoryparams + self._advisoryparams:
936 936 if pname in self._seenparams:
937 937 raise error.ProgrammingError('duplicated params: %s' % pname)
938 938 self._seenparams.add(pname)
939 939 # status of the part's generation:
940 940 # - None: not started,
941 941 # - False: currently generated,
942 942 # - True: generation done.
943 943 self._generated = None
944 944 self.mandatory = mandatory
945 945
946 946 def __repr__(self):
947 947 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
948 948 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
949 949 % (cls, id(self), self.id, self.type, self.mandatory))
950 950
951 951 def copy(self):
952 952 """return a copy of the part
953 953
954 954 The new part have the very same content but no partid assigned yet.
955 955 Parts with generated data cannot be copied."""
956 956 assert not util.safehasattr(self.data, 'next')
957 957 return self.__class__(self.type, self._mandatoryparams,
958 958 self._advisoryparams, self._data, self.mandatory)
959 959
960 960 # methods used to defines the part content
961 961 @property
962 962 def data(self):
963 963 return self._data
964 964
965 965 @data.setter
966 966 def data(self, data):
967 967 if self._generated is not None:
968 968 raise error.ReadOnlyPartError('part is being generated')
969 969 self._data = data
970 970
971 971 @property
972 972 def mandatoryparams(self):
973 973 # make it an immutable tuple to force people through ``addparam``
974 974 return tuple(self._mandatoryparams)
975 975
976 976 @property
977 977 def advisoryparams(self):
978 978 # make it an immutable tuple to force people through ``addparam``
979 979 return tuple(self._advisoryparams)
980 980
981 981 def addparam(self, name, value='', mandatory=True):
982 982 """add a parameter to the part
983 983
984 984 If 'mandatory' is set to True, the remote handler must claim support
985 985 for this parameter or the unbundling will be aborted.
986 986
987 987 The 'name' and 'value' cannot exceed 255 bytes each.
988 988 """
989 989 if self._generated is not None:
990 990 raise error.ReadOnlyPartError('part is being generated')
991 991 if name in self._seenparams:
992 992 raise ValueError('duplicated params: %s' % name)
993 993 self._seenparams.add(name)
994 994 params = self._advisoryparams
995 995 if mandatory:
996 996 params = self._mandatoryparams
997 997 params.append((name, value))
998 998
999 999 # methods used to generates the bundle2 stream
1000 1000 def getchunks(self, ui):
1001 1001 if self._generated is not None:
1002 1002 raise error.ProgrammingError('part can only be consumed once')
1003 1003 self._generated = False
1004 1004
1005 1005 if ui.debugflag:
1006 1006 msg = ['bundle2-output-part: "%s"' % self.type]
1007 1007 if not self.mandatory:
1008 1008 msg.append(' (advisory)')
1009 1009 nbmp = len(self.mandatoryparams)
1010 1010 nbap = len(self.advisoryparams)
1011 1011 if nbmp or nbap:
1012 1012 msg.append(' (params:')
1013 1013 if nbmp:
1014 1014 msg.append(' %i mandatory' % nbmp)
1015 1015 if nbap:
1016 1016 msg.append(' %i advisory' % nbmp)
1017 1017 msg.append(')')
1018 1018 if not self.data:
1019 1019 msg.append(' empty payload')
1020 1020 elif (util.safehasattr(self.data, 'next')
1021 1021 or util.safehasattr(self.data, '__next__')):
1022 1022 msg.append(' streamed payload')
1023 1023 else:
1024 1024 msg.append(' %i bytes payload' % len(self.data))
1025 1025 msg.append('\n')
1026 1026 ui.debug(''.join(msg))
1027 1027
1028 1028 #### header
1029 1029 if self.mandatory:
1030 1030 parttype = self.type.upper()
1031 1031 else:
1032 1032 parttype = self.type.lower()
1033 1033 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1034 1034 ## parttype
1035 1035 header = [_pack(_fparttypesize, len(parttype)),
1036 1036 parttype, _pack(_fpartid, self.id),
1037 1037 ]
1038 1038 ## parameters
1039 1039 # count
1040 1040 manpar = self.mandatoryparams
1041 1041 advpar = self.advisoryparams
1042 1042 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1043 1043 # size
1044 1044 parsizes = []
1045 1045 for key, value in manpar:
1046 1046 parsizes.append(len(key))
1047 1047 parsizes.append(len(value))
1048 1048 for key, value in advpar:
1049 1049 parsizes.append(len(key))
1050 1050 parsizes.append(len(value))
1051 1051 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1052 1052 header.append(paramsizes)
1053 1053 # key, value
1054 1054 for key, value in manpar:
1055 1055 header.append(key)
1056 1056 header.append(value)
1057 1057 for key, value in advpar:
1058 1058 header.append(key)
1059 1059 header.append(value)
1060 1060 ## finalize header
1061 1061 try:
1062 1062 headerchunk = ''.join(header)
1063 1063 except TypeError:
1064 1064 raise TypeError(r'Found a non-bytes trying to '
1065 1065 r'build bundle part header: %r' % header)
1066 1066 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1067 1067 yield _pack(_fpartheadersize, len(headerchunk))
1068 1068 yield headerchunk
1069 1069 ## payload
1070 1070 try:
1071 1071 for chunk in self._payloadchunks():
1072 1072 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1073 1073 yield _pack(_fpayloadsize, len(chunk))
1074 1074 yield chunk
1075 1075 except GeneratorExit:
1076 1076 # GeneratorExit means that nobody is listening for our
1077 1077 # results anyway, so just bail quickly rather than trying
1078 1078 # to produce an error part.
1079 1079 ui.debug('bundle2-generatorexit\n')
1080 1080 raise
1081 1081 except BaseException as exc:
1082 1082 bexc = util.forcebytestr(exc)
1083 1083 # backup exception data for later
1084 1084 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1085 1085 % bexc)
1086 1086 tb = sys.exc_info()[2]
1087 1087 msg = 'unexpected error: %s' % bexc
1088 1088 interpart = bundlepart('error:abort', [('message', msg)],
1089 1089 mandatory=False)
1090 1090 interpart.id = 0
1091 1091 yield _pack(_fpayloadsize, -1)
1092 1092 for chunk in interpart.getchunks(ui=ui):
1093 1093 yield chunk
1094 1094 outdebug(ui, 'closing payload chunk')
1095 1095 # abort current part payload
1096 1096 yield _pack(_fpayloadsize, 0)
1097 1097 pycompat.raisewithtb(exc, tb)
1098 1098 # end of payload
1099 1099 outdebug(ui, 'closing payload chunk')
1100 1100 yield _pack(_fpayloadsize, 0)
1101 1101 self._generated = True
1102 1102
1103 1103 def _payloadchunks(self):
1104 1104 """yield chunks of a the part payload
1105 1105
1106 1106 Exists to handle the different methods to provide data to a part."""
1107 1107 # we only support fixed size data now.
1108 1108 # This will be improved in the future.
1109 1109 if (util.safehasattr(self.data, 'next')
1110 1110 or util.safehasattr(self.data, '__next__')):
1111 1111 buff = util.chunkbuffer(self.data)
1112 1112 chunk = buff.read(preferedchunksize)
1113 1113 while chunk:
1114 1114 yield chunk
1115 1115 chunk = buff.read(preferedchunksize)
1116 1116 elif len(self.data):
1117 1117 yield self.data
1118 1118
1119 1119
1120 1120 flaginterrupt = -1
1121 1121
1122 1122 class interrupthandler(unpackermixin):
1123 1123 """read one part and process it with restricted capability
1124 1124
1125 1125 This allows to transmit exception raised on the producer size during part
1126 1126 iteration while the consumer is reading a part.
1127 1127
1128 1128 Part processed in this manner only have access to a ui object,"""
1129 1129
1130 1130 def __init__(self, ui, fp):
1131 1131 super(interrupthandler, self).__init__(fp)
1132 1132 self.ui = ui
1133 1133
1134 1134 def _readpartheader(self):
1135 1135 """reads a part header size and return the bytes blob
1136 1136
1137 1137 returns None if empty"""
1138 1138 headersize = self._unpack(_fpartheadersize)[0]
1139 1139 if headersize < 0:
1140 1140 raise error.BundleValueError('negative part header size: %i'
1141 1141 % headersize)
1142 1142 indebug(self.ui, 'part header size: %i\n' % headersize)
1143 1143 if headersize:
1144 1144 return self._readexact(headersize)
1145 1145 return None
1146 1146
1147 1147 def __call__(self):
1148 1148
1149 1149 self.ui.debug('bundle2-input-stream-interrupt:'
1150 1150 ' opening out of band context\n')
1151 1151 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1152 1152 headerblock = self._readpartheader()
1153 1153 if headerblock is None:
1154 1154 indebug(self.ui, 'no part found during interruption.')
1155 1155 return
1156 1156 part = unbundlepart(self.ui, headerblock, self._fp)
1157 1157 op = interruptoperation(self.ui)
1158 1158 hardabort = False
1159 1159 try:
1160 1160 _processpart(op, part)
1161 1161 except (SystemExit, KeyboardInterrupt):
1162 1162 hardabort = True
1163 1163 raise
1164 1164 finally:
1165 1165 if not hardabort:
1166 1166 part.seek(0, 2)
1167 1167 self.ui.debug('bundle2-input-stream-interrupt:'
1168 1168 ' closing out of band context\n')
1169 1169
1170 1170 class interruptoperation(object):
1171 1171 """A limited operation to be use by part handler during interruption
1172 1172
1173 1173 It only have access to an ui object.
1174 1174 """
1175 1175
1176 1176 def __init__(self, ui):
1177 1177 self.ui = ui
1178 1178 self.reply = None
1179 1179 self.captureoutput = False
1180 1180
1181 1181 @property
1182 1182 def repo(self):
1183 1183 raise error.ProgrammingError('no repo access from stream interruption')
1184 1184
1185 1185 def gettransaction(self):
1186 1186 raise TransactionUnavailable('no repo access from stream interruption')
1187 1187
1188 1188 class unbundlepart(unpackermixin):
1189 1189 """a bundle part read from a bundle"""
1190 1190
1191 1191 def __init__(self, ui, header, fp):
1192 1192 super(unbundlepart, self).__init__(fp)
1193 1193 self._seekable = (util.safehasattr(fp, 'seek') and
1194 1194 util.safehasattr(fp, 'tell'))
1195 1195 self.ui = ui
1196 1196 # unbundle state attr
1197 1197 self._headerdata = header
1198 1198 self._headeroffset = 0
1199 1199 self._initialized = False
1200 1200 self.consumed = False
1201 1201 # part data
1202 1202 self.id = None
1203 1203 self.type = None
1204 1204 self.mandatoryparams = None
1205 1205 self.advisoryparams = None
1206 1206 self.params = None
1207 1207 self.mandatorykeys = ()
1208 1208 self._payloadstream = None
1209 1209 self._readheader()
1210 1210 self._mandatory = None
1211 1211 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1212 1212 self._pos = 0
1213 1213
1214 1214 def _fromheader(self, size):
1215 1215 """return the next <size> byte from the header"""
1216 1216 offset = self._headeroffset
1217 1217 data = self._headerdata[offset:(offset + size)]
1218 1218 self._headeroffset = offset + size
1219 1219 return data
1220 1220
1221 1221 def _unpackheader(self, format):
1222 1222 """read given format from header
1223 1223
1224 1224 This automatically compute the size of the format to read."""
1225 1225 data = self._fromheader(struct.calcsize(format))
1226 1226 return _unpack(format, data)
1227 1227
1228 1228 def _initparams(self, mandatoryparams, advisoryparams):
1229 1229 """internal function to setup all logic related parameters"""
1230 1230 # make it read only to prevent people touching it by mistake.
1231 1231 self.mandatoryparams = tuple(mandatoryparams)
1232 1232 self.advisoryparams = tuple(advisoryparams)
1233 1233 # user friendly UI
1234 1234 self.params = util.sortdict(self.mandatoryparams)
1235 1235 self.params.update(self.advisoryparams)
1236 1236 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1237 1237
1238 1238 def _payloadchunks(self, chunknum=0):
1239 1239 '''seek to specified chunk and start yielding data'''
1240 1240 if len(self._chunkindex) == 0:
1241 1241 assert chunknum == 0, 'Must start with chunk 0'
1242 1242 self._chunkindex.append((0, self._tellfp()))
1243 1243 else:
1244 1244 assert chunknum < len(self._chunkindex), \
1245 1245 'Unknown chunk %d' % chunknum
1246 1246 self._seekfp(self._chunkindex[chunknum][1])
1247 1247
1248 1248 pos = self._chunkindex[chunknum][0]
1249 1249 payloadsize = self._unpack(_fpayloadsize)[0]
1250 1250 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1251 1251 while payloadsize:
1252 1252 if payloadsize == flaginterrupt:
1253 1253 # interruption detection, the handler will now read a
1254 1254 # single part and process it.
1255 1255 interrupthandler(self.ui, self._fp)()
1256 1256 elif payloadsize < 0:
1257 1257 msg = 'negative payload chunk size: %i' % payloadsize
1258 1258 raise error.BundleValueError(msg)
1259 1259 else:
1260 1260 result = self._readexact(payloadsize)
1261 1261 chunknum += 1
1262 1262 pos += payloadsize
1263 1263 if chunknum == len(self._chunkindex):
1264 1264 self._chunkindex.append((pos, self._tellfp()))
1265 1265 yield result
1266 1266 payloadsize = self._unpack(_fpayloadsize)[0]
1267 1267 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1268 1268
1269 1269 def _findchunk(self, pos):
1270 1270 '''for a given payload position, return a chunk number and offset'''
1271 1271 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1272 1272 if ppos == pos:
1273 1273 return chunk, 0
1274 1274 elif ppos > pos:
1275 1275 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1276 1276 raise ValueError('Unknown chunk')
1277 1277
1278 1278 def _readheader(self):
1279 1279 """read the header and setup the object"""
1280 1280 typesize = self._unpackheader(_fparttypesize)[0]
1281 1281 self.type = self._fromheader(typesize)
1282 1282 indebug(self.ui, 'part type: "%s"' % self.type)
1283 1283 self.id = self._unpackheader(_fpartid)[0]
1284 1284 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1285 1285 # extract mandatory bit from type
1286 1286 self.mandatory = (self.type != self.type.lower())
1287 1287 self.type = self.type.lower()
1288 1288 ## reading parameters
1289 1289 # param count
1290 1290 mancount, advcount = self._unpackheader(_fpartparamcount)
1291 1291 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1292 1292 # param size
1293 1293 fparamsizes = _makefpartparamsizes(mancount + advcount)
1294 1294 paramsizes = self._unpackheader(fparamsizes)
1295 1295 # make it a list of couple again
1296 1296 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1297 1297 # split mandatory from advisory
1298 1298 mansizes = paramsizes[:mancount]
1299 1299 advsizes = paramsizes[mancount:]
1300 1300 # retrieve param value
1301 1301 manparams = []
1302 1302 for key, value in mansizes:
1303 1303 manparams.append((self._fromheader(key), self._fromheader(value)))
1304 1304 advparams = []
1305 1305 for key, value in advsizes:
1306 1306 advparams.append((self._fromheader(key), self._fromheader(value)))
1307 1307 self._initparams(manparams, advparams)
1308 1308 ## part payload
1309 1309 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1310 1310 # we read the data, tell it
1311 1311 self._initialized = True
1312 1312
1313 1313 def read(self, size=None):
1314 1314 """read payload data"""
1315 1315 if not self._initialized:
1316 1316 self._readheader()
1317 1317 if size is None:
1318 1318 data = self._payloadstream.read()
1319 1319 else:
1320 1320 data = self._payloadstream.read(size)
1321 1321 self._pos += len(data)
1322 1322 if size is None or len(data) < size:
1323 1323 if not self.consumed and self._pos:
1324 1324 self.ui.debug('bundle2-input-part: total payload size %i\n'
1325 1325 % self._pos)
1326 1326 self.consumed = True
1327 1327 return data
1328 1328
1329 1329 def tell(self):
1330 1330 return self._pos
1331 1331
1332 1332 def seek(self, offset, whence=0):
1333 1333 if whence == 0:
1334 1334 newpos = offset
1335 1335 elif whence == 1:
1336 1336 newpos = self._pos + offset
1337 1337 elif whence == 2:
1338 1338 if not self.consumed:
1339 1339 self.read()
1340 1340 newpos = self._chunkindex[-1][0] - offset
1341 1341 else:
1342 1342 raise ValueError('Unknown whence value: %r' % (whence,))
1343 1343
1344 1344 if newpos > self._chunkindex[-1][0] and not self.consumed:
1345 1345 self.read()
1346 1346 if not 0 <= newpos <= self._chunkindex[-1][0]:
1347 1347 raise ValueError('Offset out of range')
1348 1348
1349 1349 if self._pos != newpos:
1350 1350 chunk, internaloffset = self._findchunk(newpos)
1351 1351 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1352 1352 adjust = self.read(internaloffset)
1353 1353 if len(adjust) != internaloffset:
1354 1354 raise error.Abort(_('Seek failed\n'))
1355 1355 self._pos = newpos
1356 1356
1357 1357 def _seekfp(self, offset, whence=0):
1358 1358 """move the underlying file pointer
1359 1359
1360 1360 This method is meant for internal usage by the bundle2 protocol only.
1361 1361 They directly manipulate the low level stream including bundle2 level
1362 1362 instruction.
1363 1363
1364 1364 Do not use it to implement higher-level logic or methods."""
1365 1365 if self._seekable:
1366 1366 return self._fp.seek(offset, whence)
1367 1367 else:
1368 1368 raise NotImplementedError(_('File pointer is not seekable'))
1369 1369
1370 1370 def _tellfp(self):
1371 1371 """return the file offset, or None if file is not seekable
1372 1372
1373 1373 This method is meant for internal usage by the bundle2 protocol only.
1374 1374 They directly manipulate the low level stream including bundle2 level
1375 1375 instruction.
1376 1376
1377 1377 Do not use it to implement higher-level logic or methods."""
1378 1378 if self._seekable:
1379 1379 try:
1380 1380 return self._fp.tell()
1381 1381 except IOError as e:
1382 1382 if e.errno == errno.ESPIPE:
1383 1383 self._seekable = False
1384 1384 else:
1385 1385 raise
1386 1386 return None
1387 1387
1388 1388 # These are only the static capabilities.
1389 1389 # Check the 'getrepocaps' function for the rest.
1390 1390 capabilities = {'HG20': (),
1391 1391 'error': ('abort', 'unsupportedcontent', 'pushraced',
1392 1392 'pushkey'),
1393 1393 'listkeys': (),
1394 1394 'pushkey': (),
1395 1395 'digests': tuple(sorted(util.DIGESTS.keys())),
1396 1396 'remote-changegroup': ('http', 'https'),
1397 1397 'hgtagsfnodes': (),
1398 1398 'phases': ('heads',),
1399 1399 }
1400 1400
1401 1401 def getrepocaps(repo, allowpushback=False):
1402 1402 """return the bundle2 capabilities for a given repo
1403 1403
1404 1404 Exists to allow extensions (like evolution) to mutate the capabilities.
1405 1405 """
1406 1406 caps = capabilities.copy()
1407 1407 caps['changegroup'] = tuple(sorted(
1408 1408 changegroup.supportedincomingversions(repo)))
1409 1409 if obsolete.isenabled(repo, obsolete.exchangeopt):
1410 1410 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1411 1411 caps['obsmarkers'] = supportedformat
1412 1412 if allowpushback:
1413 1413 caps['pushback'] = ()
1414 1414 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1415 1415 if cpmode == 'check-related':
1416 1416 caps['checkheads'] = ('related',)
1417 1417 if 'phases' in repo.ui.configlist('devel', 'legacy.exchange'):
1418 1418 caps.pop('phases')
1419 1419 return caps
1420 1420
1421 1421 def bundle2caps(remote):
1422 1422 """return the bundle capabilities of a peer as dict"""
1423 1423 raw = remote.capable('bundle2')
1424 1424 if not raw and raw != '':
1425 1425 return {}
1426 1426 capsblob = urlreq.unquote(remote.capable('bundle2'))
1427 1427 return decodecaps(capsblob)
1428 1428
1429 1429 def obsmarkersversion(caps):
1430 1430 """extract the list of supported obsmarkers versions from a bundle2caps dict
1431 1431 """
1432 1432 obscaps = caps.get('obsmarkers', ())
1433 1433 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1434 1434
1435 1435 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1436 1436 vfs=None, compression=None, compopts=None):
1437 1437 if bundletype.startswith('HG10'):
1438 1438 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1439 1439 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1440 1440 compression=compression, compopts=compopts)
1441 1441 elif not bundletype.startswith('HG20'):
1442 1442 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1443 1443
1444 1444 caps = {}
1445 1445 if 'obsolescence' in opts:
1446 1446 caps['obsmarkers'] = ('V1',)
1447 1447 bundle = bundle20(ui, caps)
1448 1448 bundle.setcompression(compression, compopts)
1449 1449 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1450 1450 chunkiter = bundle.getchunks()
1451 1451
1452 1452 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1453 1453
1454 1454 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1455 1455 # We should eventually reconcile this logic with the one behind
1456 1456 # 'exchange.getbundle2partsgenerator'.
1457 1457 #
1458 1458 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1459 1459 # different right now. So we keep them separated for now for the sake of
1460 1460 # simplicity.
1461 1461
1462 1462 # we always want a changegroup in such bundle
1463 1463 cgversion = opts.get('cg.version')
1464 1464 if cgversion is None:
1465 1465 cgversion = changegroup.safeversion(repo)
1466 1466 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1467 1467 part = bundler.newpart('changegroup', data=cg.getchunks())
1468 1468 part.addparam('version', cg.version)
1469 1469 if 'clcount' in cg.extras:
1470 1470 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1471 1471 mandatory=False)
1472 1472 if opts.get('phases') and repo.revs('%ln and secret()',
1473 1473 outgoing.missingheads):
1474 1474 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1475 1475
1476 1476 addparttagsfnodescache(repo, bundler, outgoing)
1477 1477
1478 1478 if opts.get('obsolescence', False):
1479 1479 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1480 1480 buildobsmarkerspart(bundler, obsmarkers)
1481 1481
1482 1482 if opts.get('phases', False):
1483 1483 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1484 1484 phasedata = phases.binaryencode(headsbyphase)
1485 1485 bundler.newpart('phase-heads', data=phasedata)
1486 1486
1487 1487 def addparttagsfnodescache(repo, bundler, outgoing):
1488 1488 # we include the tags fnode cache for the bundle changeset
1489 1489 # (as an optional parts)
1490 1490 cache = tags.hgtagsfnodescache(repo.unfiltered())
1491 1491 chunks = []
1492 1492
1493 1493 # .hgtags fnodes are only relevant for head changesets. While we could
1494 1494 # transfer values for all known nodes, there will likely be little to
1495 1495 # no benefit.
1496 1496 #
1497 1497 # We don't bother using a generator to produce output data because
1498 1498 # a) we only have 40 bytes per head and even esoteric numbers of heads
1499 1499 # consume little memory (1M heads is 40MB) b) we don't want to send the
1500 1500 # part if we don't have entries and knowing if we have entries requires
1501 1501 # cache lookups.
1502 1502 for node in outgoing.missingheads:
1503 1503 # Don't compute missing, as this may slow down serving.
1504 1504 fnode = cache.getfnode(node, computemissing=False)
1505 1505 if fnode is not None:
1506 1506 chunks.extend([node, fnode])
1507 1507
1508 1508 if chunks:
1509 1509 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1510 1510
1511 1511 def buildobsmarkerspart(bundler, markers):
1512 1512 """add an obsmarker part to the bundler with <markers>
1513 1513
1514 1514 No part is created if markers is empty.
1515 1515 Raises ValueError if the bundler doesn't support any known obsmarker format.
1516 1516 """
1517 1517 if not markers:
1518 1518 return None
1519 1519
1520 1520 remoteversions = obsmarkersversion(bundler.capabilities)
1521 1521 version = obsolete.commonversion(remoteversions)
1522 1522 if version is None:
1523 1523 raise ValueError('bundler does not support common obsmarker format')
1524 1524 stream = obsolete.encodemarkers(markers, True, version=version)
1525 1525 return bundler.newpart('obsmarkers', data=stream)
1526 1526
1527 1527 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1528 1528 compopts=None):
1529 1529 """Write a bundle file and return its filename.
1530 1530
1531 1531 Existing files will not be overwritten.
1532 1532 If no filename is specified, a temporary file is created.
1533 1533 bz2 compression can be turned off.
1534 1534 The bundle file will be deleted in case of errors.
1535 1535 """
1536 1536
1537 1537 if bundletype == "HG20":
1538 1538 bundle = bundle20(ui)
1539 1539 bundle.setcompression(compression, compopts)
1540 1540 part = bundle.newpart('changegroup', data=cg.getchunks())
1541 1541 part.addparam('version', cg.version)
1542 1542 if 'clcount' in cg.extras:
1543 1543 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1544 1544 mandatory=False)
1545 1545 chunkiter = bundle.getchunks()
1546 1546 else:
1547 1547 # compression argument is only for the bundle2 case
1548 1548 assert compression is None
1549 1549 if cg.version != '01':
1550 1550 raise error.Abort(_('old bundle types only supports v1 '
1551 1551 'changegroups'))
1552 1552 header, comp = bundletypes[bundletype]
1553 1553 if comp not in util.compengines.supportedbundletypes:
1554 1554 raise error.Abort(_('unknown stream compression type: %s')
1555 1555 % comp)
1556 1556 compengine = util.compengines.forbundletype(comp)
1557 1557 def chunkiter():
1558 1558 yield header
1559 1559 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1560 1560 yield chunk
1561 1561 chunkiter = chunkiter()
1562 1562
1563 1563 # parse the changegroup data, otherwise we will block
1564 1564 # in case of sshrepo because we don't know the end of the stream
1565 1565 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1566 1566
1567 1567 def combinechangegroupresults(op):
1568 1568 """logic to combine 0 or more addchangegroup results into one"""
1569 1569 results = [r.get('return', 0)
1570 1570 for r in op.records['changegroup']]
1571 1571 changedheads = 0
1572 1572 result = 1
1573 1573 for ret in results:
1574 1574 # If any changegroup result is 0, return 0
1575 1575 if ret == 0:
1576 1576 result = 0
1577 1577 break
1578 1578 if ret < -1:
1579 1579 changedheads += ret + 1
1580 1580 elif ret > 1:
1581 1581 changedheads += ret - 1
1582 1582 if changedheads > 0:
1583 1583 result = 1 + changedheads
1584 1584 elif changedheads < 0:
1585 1585 result = -1 + changedheads
1586 1586 return result
1587 1587
1588 1588 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1589 1589 'targetphase'))
1590 1590 def handlechangegroup(op, inpart):
1591 1591 """apply a changegroup part on the repo
1592 1592
1593 1593 This is a very early implementation that will massive rework before being
1594 1594 inflicted to any end-user.
1595 1595 """
1596 1596 tr = op.gettransaction()
1597 1597 unpackerversion = inpart.params.get('version', '01')
1598 1598 # We should raise an appropriate exception here
1599 1599 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1600 1600 # the source and url passed here are overwritten by the one contained in
1601 1601 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1602 1602 nbchangesets = None
1603 1603 if 'nbchanges' in inpart.params:
1604 1604 nbchangesets = int(inpart.params.get('nbchanges'))
1605 1605 if ('treemanifest' in inpart.params and
1606 1606 'treemanifest' not in op.repo.requirements):
1607 1607 if len(op.repo.changelog) != 0:
1608 1608 raise error.Abort(_(
1609 1609 "bundle contains tree manifests, but local repo is "
1610 1610 "non-empty and does not use tree manifests"))
1611 1611 op.repo.requirements.add('treemanifest')
1612 1612 op.repo._applyopenerreqs()
1613 1613 op.repo._writerequirements()
1614 1614 extrakwargs = {}
1615 1615 targetphase = inpart.params.get('targetphase')
1616 1616 if targetphase is not None:
1617 1617 extrakwargs['targetphase'] = int(targetphase)
1618 1618 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1619 1619 expectedtotal=nbchangesets, **extrakwargs)
1620 1620 if op.reply is not None:
1621 1621 # This is definitely not the final form of this
1622 1622 # return. But one need to start somewhere.
1623 1623 part = op.reply.newpart('reply:changegroup', mandatory=False)
1624 1624 part.addparam(
1625 1625 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1626 1626 part.addparam('return', '%i' % ret, mandatory=False)
1627 1627 assert not inpart.read()
1628 1628
1629 1629 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1630 1630 ['digest:%s' % k for k in util.DIGESTS.keys()])
1631 1631 @parthandler('remote-changegroup', _remotechangegroupparams)
1632 1632 def handleremotechangegroup(op, inpart):
1633 1633 """apply a bundle10 on the repo, given an url and validation information
1634 1634
1635 1635 All the information about the remote bundle to import are given as
1636 1636 parameters. The parameters include:
1637 1637 - url: the url to the bundle10.
1638 1638 - size: the bundle10 file size. It is used to validate what was
1639 1639 retrieved by the client matches the server knowledge about the bundle.
1640 1640 - digests: a space separated list of the digest types provided as
1641 1641 parameters.
1642 1642 - digest:<digest-type>: the hexadecimal representation of the digest with
1643 1643 that name. Like the size, it is used to validate what was retrieved by
1644 1644 the client matches what the server knows about the bundle.
1645 1645
1646 1646 When multiple digest types are given, all of them are checked.
1647 1647 """
1648 1648 try:
1649 1649 raw_url = inpart.params['url']
1650 1650 except KeyError:
1651 1651 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1652 1652 parsed_url = util.url(raw_url)
1653 1653 if parsed_url.scheme not in capabilities['remote-changegroup']:
1654 1654 raise error.Abort(_('remote-changegroup does not support %s urls') %
1655 1655 parsed_url.scheme)
1656 1656
1657 1657 try:
1658 1658 size = int(inpart.params['size'])
1659 1659 except ValueError:
1660 1660 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1661 1661 % 'size')
1662 1662 except KeyError:
1663 1663 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1664 1664
1665 1665 digests = {}
1666 1666 for typ in inpart.params.get('digests', '').split():
1667 1667 param = 'digest:%s' % typ
1668 1668 try:
1669 1669 value = inpart.params[param]
1670 1670 except KeyError:
1671 1671 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1672 1672 param)
1673 1673 digests[typ] = value
1674 1674
1675 1675 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1676 1676
1677 1677 tr = op.gettransaction()
1678 1678 from . import exchange
1679 1679 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1680 1680 if not isinstance(cg, changegroup.cg1unpacker):
1681 1681 raise error.Abort(_('%s: not a bundle version 1.0') %
1682 1682 util.hidepassword(raw_url))
1683 1683 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1684 1684 if op.reply is not None:
1685 1685 # This is definitely not the final form of this
1686 1686 # return. But one need to start somewhere.
1687 1687 part = op.reply.newpart('reply:changegroup')
1688 1688 part.addparam(
1689 1689 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1690 1690 part.addparam('return', '%i' % ret, mandatory=False)
1691 1691 try:
1692 1692 real_part.validate()
1693 1693 except error.Abort as e:
1694 1694 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1695 1695 (util.hidepassword(raw_url), str(e)))
1696 1696 assert not inpart.read()
1697 1697
1698 1698 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1699 1699 def handlereplychangegroup(op, inpart):
1700 1700 ret = int(inpart.params['return'])
1701 1701 replyto = int(inpart.params['in-reply-to'])
1702 1702 op.records.add('changegroup', {'return': ret}, replyto)
1703 1703
1704 1704 @parthandler('check:heads')
1705 1705 def handlecheckheads(op, inpart):
1706 1706 """check that head of the repo did not change
1707 1707
1708 1708 This is used to detect a push race when using unbundle.
1709 1709 This replaces the "heads" argument of unbundle."""
1710 1710 h = inpart.read(20)
1711 1711 heads = []
1712 1712 while len(h) == 20:
1713 1713 heads.append(h)
1714 1714 h = inpart.read(20)
1715 1715 assert not h
1716 1716 # Trigger a transaction so that we are guaranteed to have the lock now.
1717 1717 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1718 1718 op.gettransaction()
1719 1719 if sorted(heads) != sorted(op.repo.heads()):
1720 1720 raise error.PushRaced('repository changed while pushing - '
1721 1721 'please try again')
1722 1722
1723 1723 @parthandler('check:updated-heads')
1724 1724 def handlecheckupdatedheads(op, inpart):
1725 1725 """check for race on the heads touched by a push
1726 1726
1727 1727 This is similar to 'check:heads' but focus on the heads actually updated
1728 1728 during the push. If other activities happen on unrelated heads, it is
1729 1729 ignored.
1730 1730
1731 1731 This allow server with high traffic to avoid push contention as long as
1732 1732 unrelated parts of the graph are involved."""
1733 1733 h = inpart.read(20)
1734 1734 heads = []
1735 1735 while len(h) == 20:
1736 1736 heads.append(h)
1737 1737 h = inpart.read(20)
1738 1738 assert not h
1739 1739 # trigger a transaction so that we are guaranteed to have the lock now.
1740 1740 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1741 1741 op.gettransaction()
1742 1742
1743 1743 currentheads = set()
1744 1744 for ls in op.repo.branchmap().itervalues():
1745 1745 currentheads.update(ls)
1746 1746
1747 1747 for h in heads:
1748 1748 if h not in currentheads:
1749 1749 raise error.PushRaced('repository changed while pushing - '
1750 1750 'please try again')
1751 1751
1752 1752 @parthandler('output')
1753 1753 def handleoutput(op, inpart):
1754 1754 """forward output captured on the server to the client"""
1755 1755 for line in inpart.read().splitlines():
1756 1756 op.ui.status(_('remote: %s\n') % line)
1757 1757
1758 1758 @parthandler('replycaps')
1759 1759 def handlereplycaps(op, inpart):
1760 1760 """Notify that a reply bundle should be created
1761 1761
1762 1762 The payload contains the capabilities information for the reply"""
1763 1763 caps = decodecaps(inpart.read())
1764 1764 if op.reply is None:
1765 1765 op.reply = bundle20(op.ui, caps)
1766 1766
1767 1767 class AbortFromPart(error.Abort):
1768 1768 """Sub-class of Abort that denotes an error from a bundle2 part."""
1769 1769
1770 1770 @parthandler('error:abort', ('message', 'hint'))
1771 1771 def handleerrorabort(op, inpart):
1772 1772 """Used to transmit abort error over the wire"""
1773 1773 raise AbortFromPart(inpart.params['message'],
1774 1774 hint=inpart.params.get('hint'))
1775 1775
1776 1776 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1777 1777 'in-reply-to'))
1778 1778 def handleerrorpushkey(op, inpart):
1779 1779 """Used to transmit failure of a mandatory pushkey over the wire"""
1780 1780 kwargs = {}
1781 1781 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1782 1782 value = inpart.params.get(name)
1783 1783 if value is not None:
1784 1784 kwargs[name] = value
1785 1785 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1786 1786
1787 1787 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1788 1788 def handleerrorunsupportedcontent(op, inpart):
1789 1789 """Used to transmit unknown content error over the wire"""
1790 1790 kwargs = {}
1791 1791 parttype = inpart.params.get('parttype')
1792 1792 if parttype is not None:
1793 1793 kwargs['parttype'] = parttype
1794 1794 params = inpart.params.get('params')
1795 1795 if params is not None:
1796 1796 kwargs['params'] = params.split('\0')
1797 1797
1798 1798 raise error.BundleUnknownFeatureError(**kwargs)
1799 1799
1800 1800 @parthandler('error:pushraced', ('message',))
1801 1801 def handleerrorpushraced(op, inpart):
1802 1802 """Used to transmit push race error over the wire"""
1803 1803 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1804 1804
1805 1805 @parthandler('listkeys', ('namespace',))
1806 1806 def handlelistkeys(op, inpart):
1807 1807 """retrieve pushkey namespace content stored in a bundle2"""
1808 1808 namespace = inpart.params['namespace']
1809 1809 r = pushkey.decodekeys(inpart.read())
1810 1810 op.records.add('listkeys', (namespace, r))
1811 1811
1812 1812 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1813 1813 def handlepushkey(op, inpart):
1814 1814 """process a pushkey request"""
1815 1815 dec = pushkey.decode
1816 1816 namespace = dec(inpart.params['namespace'])
1817 1817 key = dec(inpart.params['key'])
1818 1818 old = dec(inpart.params['old'])
1819 1819 new = dec(inpart.params['new'])
1820 1820 # Grab the transaction to ensure that we have the lock before performing the
1821 1821 # pushkey.
1822 1822 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1823 1823 op.gettransaction()
1824 1824 ret = op.repo.pushkey(namespace, key, old, new)
1825 1825 record = {'namespace': namespace,
1826 1826 'key': key,
1827 1827 'old': old,
1828 1828 'new': new}
1829 1829 op.records.add('pushkey', record)
1830 1830 if op.reply is not None:
1831 1831 rpart = op.reply.newpart('reply:pushkey')
1832 1832 rpart.addparam(
1833 1833 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1834 1834 rpart.addparam('return', '%i' % ret, mandatory=False)
1835 1835 if inpart.mandatory and not ret:
1836 1836 kwargs = {}
1837 1837 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1838 1838 if key in inpart.params:
1839 1839 kwargs[key] = inpart.params[key]
1840 1840 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1841 1841
1842 1842 @parthandler('phase-heads')
1843 1843 def handlephases(op, inpart):
1844 1844 """apply phases from bundle part to repo"""
1845 1845 headsbyphase = phases.binarydecode(inpart)
1846 1846 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
1847 op.records.add('phase-heads', {})
1848 1847
1849 1848 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1850 1849 def handlepushkeyreply(op, inpart):
1851 1850 """retrieve the result of a pushkey request"""
1852 1851 ret = int(inpart.params['return'])
1853 1852 partid = int(inpart.params['in-reply-to'])
1854 1853 op.records.add('pushkey', {'return': ret}, partid)
1855 1854
1856 1855 @parthandler('obsmarkers')
1857 1856 def handleobsmarker(op, inpart):
1858 1857 """add a stream of obsmarkers to the repo"""
1859 1858 tr = op.gettransaction()
1860 1859 markerdata = inpart.read()
1861 1860 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1862 1861 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1863 1862 % len(markerdata))
1864 1863 # The mergemarkers call will crash if marker creation is not enabled.
1865 1864 # we want to avoid this if the part is advisory.
1866 1865 if not inpart.mandatory and op.repo.obsstore.readonly:
1867 1866 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1868 1867 return
1869 1868 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1870 1869 op.repo.invalidatevolatilesets()
1871 1870 if new:
1872 1871 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1873 1872 op.records.add('obsmarkers', {'new': new})
1874 1873 if op.reply is not None:
1875 1874 rpart = op.reply.newpart('reply:obsmarkers')
1876 1875 rpart.addparam(
1877 1876 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1878 1877 rpart.addparam('new', '%i' % new, mandatory=False)
1879 1878
1880 1879
1881 1880 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1882 1881 def handleobsmarkerreply(op, inpart):
1883 1882 """retrieve the result of a pushkey request"""
1884 1883 ret = int(inpart.params['new'])
1885 1884 partid = int(inpart.params['in-reply-to'])
1886 1885 op.records.add('obsmarkers', {'new': ret}, partid)
1887 1886
1888 1887 @parthandler('hgtagsfnodes')
1889 1888 def handlehgtagsfnodes(op, inpart):
1890 1889 """Applies .hgtags fnodes cache entries to the local repo.
1891 1890
1892 1891 Payload is pairs of 20 byte changeset nodes and filenodes.
1893 1892 """
1894 1893 # Grab the transaction so we ensure that we have the lock at this point.
1895 1894 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1896 1895 op.gettransaction()
1897 1896 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1898 1897
1899 1898 count = 0
1900 1899 while True:
1901 1900 node = inpart.read(20)
1902 1901 fnode = inpart.read(20)
1903 1902 if len(node) < 20 or len(fnode) < 20:
1904 1903 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1905 1904 break
1906 1905 cache.setfnode(node, fnode)
1907 1906 count += 1
1908 1907
1909 1908 cache.write()
1910 1909 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1911 1910
1912 1911 @parthandler('pushvars')
1913 1912 def bundle2getvars(op, part):
1914 1913 '''unbundle a bundle2 containing shellvars on the server'''
1915 1914 # An option to disable unbundling on server-side for security reasons
1916 1915 if op.ui.configbool('push', 'pushvars.server'):
1917 1916 hookargs = {}
1918 1917 for key, value in part.advisoryparams:
1919 1918 key = key.upper()
1920 1919 # We want pushed variables to have USERVAR_ prepended so we know
1921 1920 # they came from the --pushvar flag.
1922 1921 key = "USERVAR_" + key
1923 1922 hookargs[key] = value
1924 1923 op.addhookargs(hookargs)
@@ -1,2064 +1,2060 b''
1 1 # exchange.py - utility to exchange data between repos.
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import collections
11 11 import errno
12 12 import hashlib
13 13
14 14 from .i18n import _
15 15 from .node import (
16 16 hex,
17 17 nullid,
18 18 )
19 19 from . import (
20 20 bookmarks as bookmod,
21 21 bundle2,
22 22 changegroup,
23 23 discovery,
24 24 error,
25 25 lock as lockmod,
26 26 obsolete,
27 27 phases,
28 28 pushkey,
29 29 pycompat,
30 30 scmutil,
31 31 sslutil,
32 32 streamclone,
33 33 url as urlmod,
34 34 util,
35 35 )
36 36
37 37 urlerr = util.urlerr
38 38 urlreq = util.urlreq
39 39
40 40 # Maps bundle version human names to changegroup versions.
41 41 _bundlespeccgversions = {'v1': '01',
42 42 'v2': '02',
43 43 'packed1': 's1',
44 44 'bundle2': '02', #legacy
45 45 }
46 46
47 47 # Compression engines allowed in version 1. THIS SHOULD NEVER CHANGE.
48 48 _bundlespecv1compengines = {'gzip', 'bzip2', 'none'}
49 49
50 50 def parsebundlespec(repo, spec, strict=True, externalnames=False):
51 51 """Parse a bundle string specification into parts.
52 52
53 53 Bundle specifications denote a well-defined bundle/exchange format.
54 54 The content of a given specification should not change over time in
55 55 order to ensure that bundles produced by a newer version of Mercurial are
56 56 readable from an older version.
57 57
58 58 The string currently has the form:
59 59
60 60 <compression>-<type>[;<parameter0>[;<parameter1>]]
61 61
62 62 Where <compression> is one of the supported compression formats
63 63 and <type> is (currently) a version string. A ";" can follow the type and
64 64 all text afterwards is interpreted as URI encoded, ";" delimited key=value
65 65 pairs.
66 66
67 67 If ``strict`` is True (the default) <compression> is required. Otherwise,
68 68 it is optional.
69 69
70 70 If ``externalnames`` is False (the default), the human-centric names will
71 71 be converted to their internal representation.
72 72
73 73 Returns a 3-tuple of (compression, version, parameters). Compression will
74 74 be ``None`` if not in strict mode and a compression isn't defined.
75 75
76 76 An ``InvalidBundleSpecification`` is raised when the specification is
77 77 not syntactically well formed.
78 78
79 79 An ``UnsupportedBundleSpecification`` is raised when the compression or
80 80 bundle type/version is not recognized.
81 81
82 82 Note: this function will likely eventually return a more complex data
83 83 structure, including bundle2 part information.
84 84 """
85 85 def parseparams(s):
86 86 if ';' not in s:
87 87 return s, {}
88 88
89 89 params = {}
90 90 version, paramstr = s.split(';', 1)
91 91
92 92 for p in paramstr.split(';'):
93 93 if '=' not in p:
94 94 raise error.InvalidBundleSpecification(
95 95 _('invalid bundle specification: '
96 96 'missing "=" in parameter: %s') % p)
97 97
98 98 key, value = p.split('=', 1)
99 99 key = urlreq.unquote(key)
100 100 value = urlreq.unquote(value)
101 101 params[key] = value
102 102
103 103 return version, params
104 104
105 105
106 106 if strict and '-' not in spec:
107 107 raise error.InvalidBundleSpecification(
108 108 _('invalid bundle specification; '
109 109 'must be prefixed with compression: %s') % spec)
110 110
111 111 if '-' in spec:
112 112 compression, version = spec.split('-', 1)
113 113
114 114 if compression not in util.compengines.supportedbundlenames:
115 115 raise error.UnsupportedBundleSpecification(
116 116 _('%s compression is not supported') % compression)
117 117
118 118 version, params = parseparams(version)
119 119
120 120 if version not in _bundlespeccgversions:
121 121 raise error.UnsupportedBundleSpecification(
122 122 _('%s is not a recognized bundle version') % version)
123 123 else:
124 124 # Value could be just the compression or just the version, in which
125 125 # case some defaults are assumed (but only when not in strict mode).
126 126 assert not strict
127 127
128 128 spec, params = parseparams(spec)
129 129
130 130 if spec in util.compengines.supportedbundlenames:
131 131 compression = spec
132 132 version = 'v1'
133 133 # Generaldelta repos require v2.
134 134 if 'generaldelta' in repo.requirements:
135 135 version = 'v2'
136 136 # Modern compression engines require v2.
137 137 if compression not in _bundlespecv1compengines:
138 138 version = 'v2'
139 139 elif spec in _bundlespeccgversions:
140 140 if spec == 'packed1':
141 141 compression = 'none'
142 142 else:
143 143 compression = 'bzip2'
144 144 version = spec
145 145 else:
146 146 raise error.UnsupportedBundleSpecification(
147 147 _('%s is not a recognized bundle specification') % spec)
148 148
149 149 # Bundle version 1 only supports a known set of compression engines.
150 150 if version == 'v1' and compression not in _bundlespecv1compengines:
151 151 raise error.UnsupportedBundleSpecification(
152 152 _('compression engine %s is not supported on v1 bundles') %
153 153 compression)
154 154
155 155 # The specification for packed1 can optionally declare the data formats
156 156 # required to apply it. If we see this metadata, compare against what the
157 157 # repo supports and error if the bundle isn't compatible.
158 158 if version == 'packed1' and 'requirements' in params:
159 159 requirements = set(params['requirements'].split(','))
160 160 missingreqs = requirements - repo.supportedformats
161 161 if missingreqs:
162 162 raise error.UnsupportedBundleSpecification(
163 163 _('missing support for repository features: %s') %
164 164 ', '.join(sorted(missingreqs)))
165 165
166 166 if not externalnames:
167 167 engine = util.compengines.forbundlename(compression)
168 168 compression = engine.bundletype()[1]
169 169 version = _bundlespeccgversions[version]
170 170 return compression, version, params
171 171
172 172 def readbundle(ui, fh, fname, vfs=None):
173 173 header = changegroup.readexactly(fh, 4)
174 174
175 175 alg = None
176 176 if not fname:
177 177 fname = "stream"
178 178 if not header.startswith('HG') and header.startswith('\0'):
179 179 fh = changegroup.headerlessfixup(fh, header)
180 180 header = "HG10"
181 181 alg = 'UN'
182 182 elif vfs:
183 183 fname = vfs.join(fname)
184 184
185 185 magic, version = header[0:2], header[2:4]
186 186
187 187 if magic != 'HG':
188 188 raise error.Abort(_('%s: not a Mercurial bundle') % fname)
189 189 if version == '10':
190 190 if alg is None:
191 191 alg = changegroup.readexactly(fh, 2)
192 192 return changegroup.cg1unpacker(fh, alg)
193 193 elif version.startswith('2'):
194 194 return bundle2.getunbundler(ui, fh, magicstring=magic + version)
195 195 elif version == 'S1':
196 196 return streamclone.streamcloneapplier(fh)
197 197 else:
198 198 raise error.Abort(_('%s: unknown bundle version %s') % (fname, version))
199 199
200 200 def getbundlespec(ui, fh):
201 201 """Infer the bundlespec from a bundle file handle.
202 202
203 203 The input file handle is seeked and the original seek position is not
204 204 restored.
205 205 """
206 206 def speccompression(alg):
207 207 try:
208 208 return util.compengines.forbundletype(alg).bundletype()[0]
209 209 except KeyError:
210 210 return None
211 211
212 212 b = readbundle(ui, fh, None)
213 213 if isinstance(b, changegroup.cg1unpacker):
214 214 alg = b._type
215 215 if alg == '_truncatedBZ':
216 216 alg = 'BZ'
217 217 comp = speccompression(alg)
218 218 if not comp:
219 219 raise error.Abort(_('unknown compression algorithm: %s') % alg)
220 220 return '%s-v1' % comp
221 221 elif isinstance(b, bundle2.unbundle20):
222 222 if 'Compression' in b.params:
223 223 comp = speccompression(b.params['Compression'])
224 224 if not comp:
225 225 raise error.Abort(_('unknown compression algorithm: %s') % comp)
226 226 else:
227 227 comp = 'none'
228 228
229 229 version = None
230 230 for part in b.iterparts():
231 231 if part.type == 'changegroup':
232 232 version = part.params['version']
233 233 if version in ('01', '02'):
234 234 version = 'v2'
235 235 else:
236 236 raise error.Abort(_('changegroup version %s does not have '
237 237 'a known bundlespec') % version,
238 238 hint=_('try upgrading your Mercurial '
239 239 'client'))
240 240
241 241 if not version:
242 242 raise error.Abort(_('could not identify changegroup version in '
243 243 'bundle'))
244 244
245 245 return '%s-%s' % (comp, version)
246 246 elif isinstance(b, streamclone.streamcloneapplier):
247 247 requirements = streamclone.readbundle1header(fh)[2]
248 248 params = 'requirements=%s' % ','.join(sorted(requirements))
249 249 return 'none-packed1;%s' % urlreq.quote(params)
250 250 else:
251 251 raise error.Abort(_('unknown bundle type: %s') % b)
252 252
253 253 def _computeoutgoing(repo, heads, common):
254 254 """Computes which revs are outgoing given a set of common
255 255 and a set of heads.
256 256
257 257 This is a separate function so extensions can have access to
258 258 the logic.
259 259
260 260 Returns a discovery.outgoing object.
261 261 """
262 262 cl = repo.changelog
263 263 if common:
264 264 hasnode = cl.hasnode
265 265 common = [n for n in common if hasnode(n)]
266 266 else:
267 267 common = [nullid]
268 268 if not heads:
269 269 heads = cl.heads()
270 270 return discovery.outgoing(repo, common, heads)
271 271
272 272 def _forcebundle1(op):
273 273 """return true if a pull/push must use bundle1
274 274
275 275 This function is used to allow testing of the older bundle version"""
276 276 ui = op.repo.ui
277 277 forcebundle1 = False
278 278 # The goal is this config is to allow developer to choose the bundle
279 279 # version used during exchanged. This is especially handy during test.
280 280 # Value is a list of bundle version to be picked from, highest version
281 281 # should be used.
282 282 #
283 283 # developer config: devel.legacy.exchange
284 284 exchange = ui.configlist('devel', 'legacy.exchange')
285 285 forcebundle1 = 'bundle2' not in exchange and 'bundle1' in exchange
286 286 return forcebundle1 or not op.remote.capable('bundle2')
287 287
288 288 class pushoperation(object):
289 289 """A object that represent a single push operation
290 290
291 291 Its purpose is to carry push related state and very common operations.
292 292
293 293 A new pushoperation should be created at the beginning of each push and
294 294 discarded afterward.
295 295 """
296 296
297 297 def __init__(self, repo, remote, force=False, revs=None, newbranch=False,
298 298 bookmarks=(), pushvars=None):
299 299 # repo we push from
300 300 self.repo = repo
301 301 self.ui = repo.ui
302 302 # repo we push to
303 303 self.remote = remote
304 304 # force option provided
305 305 self.force = force
306 306 # revs to be pushed (None is "all")
307 307 self.revs = revs
308 308 # bookmark explicitly pushed
309 309 self.bookmarks = bookmarks
310 310 # allow push of new branch
311 311 self.newbranch = newbranch
312 312 # step already performed
313 313 # (used to check what steps have been already performed through bundle2)
314 314 self.stepsdone = set()
315 315 # Integer version of the changegroup push result
316 316 # - None means nothing to push
317 317 # - 0 means HTTP error
318 318 # - 1 means we pushed and remote head count is unchanged *or*
319 319 # we have outgoing changesets but refused to push
320 320 # - other values as described by addchangegroup()
321 321 self.cgresult = None
322 322 # Boolean value for the bookmark push
323 323 self.bkresult = None
324 324 # discover.outgoing object (contains common and outgoing data)
325 325 self.outgoing = None
326 326 # all remote topological heads before the push
327 327 self.remoteheads = None
328 328 # Details of the remote branch pre and post push
329 329 #
330 330 # mapping: {'branch': ([remoteheads],
331 331 # [newheads],
332 332 # [unsyncedheads],
333 333 # [discardedheads])}
334 334 # - branch: the branch name
335 335 # - remoteheads: the list of remote heads known locally
336 336 # None if the branch is new
337 337 # - newheads: the new remote heads (known locally) with outgoing pushed
338 338 # - unsyncedheads: the list of remote heads unknown locally.
339 339 # - discardedheads: the list of remote heads made obsolete by the push
340 340 self.pushbranchmap = None
341 341 # testable as a boolean indicating if any nodes are missing locally.
342 342 self.incoming = None
343 343 # phases changes that must be pushed along side the changesets
344 344 self.outdatedphases = None
345 345 # phases changes that must be pushed if changeset push fails
346 346 self.fallbackoutdatedphases = None
347 347 # outgoing obsmarkers
348 348 self.outobsmarkers = set()
349 349 # outgoing bookmarks
350 350 self.outbookmarks = []
351 351 # transaction manager
352 352 self.trmanager = None
353 353 # map { pushkey partid -> callback handling failure}
354 354 # used to handle exception from mandatory pushkey part failure
355 355 self.pkfailcb = {}
356 356 # an iterable of pushvars or None
357 357 self.pushvars = pushvars
358 358
359 359 @util.propertycache
360 360 def futureheads(self):
361 361 """future remote heads if the changeset push succeeds"""
362 362 return self.outgoing.missingheads
363 363
364 364 @util.propertycache
365 365 def fallbackheads(self):
366 366 """future remote heads if the changeset push fails"""
367 367 if self.revs is None:
368 368 # not target to push, all common are relevant
369 369 return self.outgoing.commonheads
370 370 unfi = self.repo.unfiltered()
371 371 # I want cheads = heads(::missingheads and ::commonheads)
372 372 # (missingheads is revs with secret changeset filtered out)
373 373 #
374 374 # This can be expressed as:
375 375 # cheads = ( (missingheads and ::commonheads)
376 376 # + (commonheads and ::missingheads))"
377 377 # )
378 378 #
379 379 # while trying to push we already computed the following:
380 380 # common = (::commonheads)
381 381 # missing = ((commonheads::missingheads) - commonheads)
382 382 #
383 383 # We can pick:
384 384 # * missingheads part of common (::commonheads)
385 385 common = self.outgoing.common
386 386 nm = self.repo.changelog.nodemap
387 387 cheads = [node for node in self.revs if nm[node] in common]
388 388 # and
389 389 # * commonheads parents on missing
390 390 revset = unfi.set('%ln and parents(roots(%ln))',
391 391 self.outgoing.commonheads,
392 392 self.outgoing.missing)
393 393 cheads.extend(c.node() for c in revset)
394 394 return cheads
395 395
396 396 @property
397 397 def commonheads(self):
398 398 """set of all common heads after changeset bundle push"""
399 399 if self.cgresult:
400 400 return self.futureheads
401 401 else:
402 402 return self.fallbackheads
403 403
404 404 # mapping of message used when pushing bookmark
405 405 bookmsgmap = {'update': (_("updating bookmark %s\n"),
406 406 _('updating bookmark %s failed!\n')),
407 407 'export': (_("exporting bookmark %s\n"),
408 408 _('exporting bookmark %s failed!\n')),
409 409 'delete': (_("deleting remote bookmark %s\n"),
410 410 _('deleting remote bookmark %s failed!\n')),
411 411 }
412 412
413 413
414 414 def push(repo, remote, force=False, revs=None, newbranch=False, bookmarks=(),
415 415 opargs=None):
416 416 '''Push outgoing changesets (limited by revs) from a local
417 417 repository to remote. Return an integer:
418 418 - None means nothing to push
419 419 - 0 means HTTP error
420 420 - 1 means we pushed and remote head count is unchanged *or*
421 421 we have outgoing changesets but refused to push
422 422 - other values as described by addchangegroup()
423 423 '''
424 424 if opargs is None:
425 425 opargs = {}
426 426 pushop = pushoperation(repo, remote, force, revs, newbranch, bookmarks,
427 427 **pycompat.strkwargs(opargs))
428 428 if pushop.remote.local():
429 429 missing = (set(pushop.repo.requirements)
430 430 - pushop.remote.local().supported)
431 431 if missing:
432 432 msg = _("required features are not"
433 433 " supported in the destination:"
434 434 " %s") % (', '.join(sorted(missing)))
435 435 raise error.Abort(msg)
436 436
437 437 if not pushop.remote.canpush():
438 438 raise error.Abort(_("destination does not support push"))
439 439
440 440 if not pushop.remote.capable('unbundle'):
441 441 raise error.Abort(_('cannot push: destination does not support the '
442 442 'unbundle wire protocol command'))
443 443
444 444 # get lock as we might write phase data
445 445 wlock = lock = None
446 446 try:
447 447 # bundle2 push may receive a reply bundle touching bookmarks or other
448 448 # things requiring the wlock. Take it now to ensure proper ordering.
449 449 maypushback = pushop.ui.configbool('experimental', 'bundle2.pushback')
450 450 if (not _forcebundle1(pushop)) and maypushback:
451 451 wlock = pushop.repo.wlock()
452 452 lock = pushop.repo.lock()
453 453 pushop.trmanager = transactionmanager(pushop.repo,
454 454 'push-response',
455 455 pushop.remote.url())
456 456 except IOError as err:
457 457 if err.errno != errno.EACCES:
458 458 raise
459 459 # source repo cannot be locked.
460 460 # We do not abort the push, but just disable the local phase
461 461 # synchronisation.
462 462 msg = 'cannot lock source repository: %s\n' % err
463 463 pushop.ui.debug(msg)
464 464
465 465 with wlock or util.nullcontextmanager(), \
466 466 lock or util.nullcontextmanager(), \
467 467 pushop.trmanager or util.nullcontextmanager():
468 468 pushop.repo.checkpush(pushop)
469 469 _pushdiscovery(pushop)
470 470 if not _forcebundle1(pushop):
471 471 _pushbundle2(pushop)
472 472 _pushchangeset(pushop)
473 473 _pushsyncphase(pushop)
474 474 _pushobsolete(pushop)
475 475 _pushbookmark(pushop)
476 476
477 477 return pushop
478 478
479 479 # list of steps to perform discovery before push
480 480 pushdiscoveryorder = []
481 481
482 482 # Mapping between step name and function
483 483 #
484 484 # This exists to help extensions wrap steps if necessary
485 485 pushdiscoverymapping = {}
486 486
487 487 def pushdiscovery(stepname):
488 488 """decorator for function performing discovery before push
489 489
490 490 The function is added to the step -> function mapping and appended to the
491 491 list of steps. Beware that decorated function will be added in order (this
492 492 may matter).
493 493
494 494 You can only use this decorator for a new step, if you want to wrap a step
495 495 from an extension, change the pushdiscovery dictionary directly."""
496 496 def dec(func):
497 497 assert stepname not in pushdiscoverymapping
498 498 pushdiscoverymapping[stepname] = func
499 499 pushdiscoveryorder.append(stepname)
500 500 return func
501 501 return dec
502 502
503 503 def _pushdiscovery(pushop):
504 504 """Run all discovery steps"""
505 505 for stepname in pushdiscoveryorder:
506 506 step = pushdiscoverymapping[stepname]
507 507 step(pushop)
508 508
509 509 @pushdiscovery('changeset')
510 510 def _pushdiscoverychangeset(pushop):
511 511 """discover the changeset that need to be pushed"""
512 512 fci = discovery.findcommonincoming
513 513 commoninc = fci(pushop.repo, pushop.remote, force=pushop.force)
514 514 common, inc, remoteheads = commoninc
515 515 fco = discovery.findcommonoutgoing
516 516 outgoing = fco(pushop.repo, pushop.remote, onlyheads=pushop.revs,
517 517 commoninc=commoninc, force=pushop.force)
518 518 pushop.outgoing = outgoing
519 519 pushop.remoteheads = remoteheads
520 520 pushop.incoming = inc
521 521
522 522 @pushdiscovery('phase')
523 523 def _pushdiscoveryphase(pushop):
524 524 """discover the phase that needs to be pushed
525 525
526 526 (computed for both success and failure case for changesets push)"""
527 527 outgoing = pushop.outgoing
528 528 unfi = pushop.repo.unfiltered()
529 529 remotephases = pushop.remote.listkeys('phases')
530 530 publishing = remotephases.get('publishing', False)
531 531 if (pushop.ui.configbool('ui', '_usedassubrepo')
532 532 and remotephases # server supports phases
533 533 and not pushop.outgoing.missing # no changesets to be pushed
534 534 and publishing):
535 535 # When:
536 536 # - this is a subrepo push
537 537 # - and remote support phase
538 538 # - and no changeset are to be pushed
539 539 # - and remote is publishing
540 540 # We may be in issue 3871 case!
541 541 # We drop the possible phase synchronisation done by
542 542 # courtesy to publish changesets possibly locally draft
543 543 # on the remote.
544 544 remotephases = {'publishing': 'True'}
545 545 ana = phases.analyzeremotephases(pushop.repo,
546 546 pushop.fallbackheads,
547 547 remotephases)
548 548 pheads, droots = ana
549 549 extracond = ''
550 550 if not publishing:
551 551 extracond = ' and public()'
552 552 revset = 'heads((%%ln::%%ln) %s)' % extracond
553 553 # Get the list of all revs draft on remote by public here.
554 554 # XXX Beware that revset break if droots is not strictly
555 555 # XXX root we may want to ensure it is but it is costly
556 556 fallback = list(unfi.set(revset, droots, pushop.fallbackheads))
557 557 if not outgoing.missing:
558 558 future = fallback
559 559 else:
560 560 # adds changeset we are going to push as draft
561 561 #
562 562 # should not be necessary for publishing server, but because of an
563 563 # issue fixed in xxxxx we have to do it anyway.
564 564 fdroots = list(unfi.set('roots(%ln + %ln::)',
565 565 outgoing.missing, droots))
566 566 fdroots = [f.node() for f in fdroots]
567 567 future = list(unfi.set(revset, fdroots, pushop.futureheads))
568 568 pushop.outdatedphases = future
569 569 pushop.fallbackoutdatedphases = fallback
570 570
571 571 @pushdiscovery('obsmarker')
572 572 def _pushdiscoveryobsmarkers(pushop):
573 573 if (obsolete.isenabled(pushop.repo, obsolete.exchangeopt)
574 574 and pushop.repo.obsstore
575 575 and 'obsolete' in pushop.remote.listkeys('namespaces')):
576 576 repo = pushop.repo
577 577 # very naive computation, that can be quite expensive on big repo.
578 578 # However: evolution is currently slow on them anyway.
579 579 nodes = (c.node() for c in repo.set('::%ln', pushop.futureheads))
580 580 pushop.outobsmarkers = pushop.repo.obsstore.relevantmarkers(nodes)
581 581
582 582 @pushdiscovery('bookmarks')
583 583 def _pushdiscoverybookmarks(pushop):
584 584 ui = pushop.ui
585 585 repo = pushop.repo.unfiltered()
586 586 remote = pushop.remote
587 587 ui.debug("checking for updated bookmarks\n")
588 588 ancestors = ()
589 589 if pushop.revs:
590 590 revnums = map(repo.changelog.rev, pushop.revs)
591 591 ancestors = repo.changelog.ancestors(revnums, inclusive=True)
592 592 remotebookmark = remote.listkeys('bookmarks')
593 593
594 594 explicit = set([repo._bookmarks.expandname(bookmark)
595 595 for bookmark in pushop.bookmarks])
596 596
597 597 remotebookmark = bookmod.unhexlifybookmarks(remotebookmark)
598 598 comp = bookmod.comparebookmarks(repo, repo._bookmarks, remotebookmark)
599 599
600 600 def safehex(x):
601 601 if x is None:
602 602 return x
603 603 return hex(x)
604 604
605 605 def hexifycompbookmarks(bookmarks):
606 606 for b, scid, dcid in bookmarks:
607 607 yield b, safehex(scid), safehex(dcid)
608 608
609 609 comp = [hexifycompbookmarks(marks) for marks in comp]
610 610 addsrc, adddst, advsrc, advdst, diverge, differ, invalid, same = comp
611 611
612 612 for b, scid, dcid in advsrc:
613 613 if b in explicit:
614 614 explicit.remove(b)
615 615 if not ancestors or repo[scid].rev() in ancestors:
616 616 pushop.outbookmarks.append((b, dcid, scid))
617 617 # search added bookmark
618 618 for b, scid, dcid in addsrc:
619 619 if b in explicit:
620 620 explicit.remove(b)
621 621 pushop.outbookmarks.append((b, '', scid))
622 622 # search for overwritten bookmark
623 623 for b, scid, dcid in list(advdst) + list(diverge) + list(differ):
624 624 if b in explicit:
625 625 explicit.remove(b)
626 626 pushop.outbookmarks.append((b, dcid, scid))
627 627 # search for bookmark to delete
628 628 for b, scid, dcid in adddst:
629 629 if b in explicit:
630 630 explicit.remove(b)
631 631 # treat as "deleted locally"
632 632 pushop.outbookmarks.append((b, dcid, ''))
633 633 # identical bookmarks shouldn't get reported
634 634 for b, scid, dcid in same:
635 635 if b in explicit:
636 636 explicit.remove(b)
637 637
638 638 if explicit:
639 639 explicit = sorted(explicit)
640 640 # we should probably list all of them
641 641 ui.warn(_('bookmark %s does not exist on the local '
642 642 'or remote repository!\n') % explicit[0])
643 643 pushop.bkresult = 2
644 644
645 645 pushop.outbookmarks.sort()
646 646
647 647 def _pushcheckoutgoing(pushop):
648 648 outgoing = pushop.outgoing
649 649 unfi = pushop.repo.unfiltered()
650 650 if not outgoing.missing:
651 651 # nothing to push
652 652 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
653 653 return False
654 654 # something to push
655 655 if not pushop.force:
656 656 # if repo.obsstore == False --> no obsolete
657 657 # then, save the iteration
658 658 if unfi.obsstore:
659 659 # this message are here for 80 char limit reason
660 660 mso = _("push includes obsolete changeset: %s!")
661 661 mspd = _("push includes phase-divergent changeset: %s!")
662 662 mscd = _("push includes content-divergent changeset: %s!")
663 663 mst = {"orphan": _("push includes orphan changeset: %s!"),
664 664 "phase-divergent": mspd,
665 665 "content-divergent": mscd}
666 666 # If we are to push if there is at least one
667 667 # obsolete or unstable changeset in missing, at
668 668 # least one of the missinghead will be obsolete or
669 669 # unstable. So checking heads only is ok
670 670 for node in outgoing.missingheads:
671 671 ctx = unfi[node]
672 672 if ctx.obsolete():
673 673 raise error.Abort(mso % ctx)
674 674 elif ctx.isunstable():
675 675 # TODO print more than one instability in the abort
676 676 # message
677 677 raise error.Abort(mst[ctx.instabilities()[0]] % ctx)
678 678
679 679 discovery.checkheads(pushop)
680 680 return True
681 681
682 682 # List of names of steps to perform for an outgoing bundle2, order matters.
683 683 b2partsgenorder = []
684 684
685 685 # Mapping between step name and function
686 686 #
687 687 # This exists to help extensions wrap steps if necessary
688 688 b2partsgenmapping = {}
689 689
690 690 def b2partsgenerator(stepname, idx=None):
691 691 """decorator for function generating bundle2 part
692 692
693 693 The function is added to the step -> function mapping and appended to the
694 694 list of steps. Beware that decorated functions will be added in order
695 695 (this may matter).
696 696
697 697 You can only use this decorator for new steps, if you want to wrap a step
698 698 from an extension, attack the b2partsgenmapping dictionary directly."""
699 699 def dec(func):
700 700 assert stepname not in b2partsgenmapping
701 701 b2partsgenmapping[stepname] = func
702 702 if idx is None:
703 703 b2partsgenorder.append(stepname)
704 704 else:
705 705 b2partsgenorder.insert(idx, stepname)
706 706 return func
707 707 return dec
708 708
709 709 def _pushb2ctxcheckheads(pushop, bundler):
710 710 """Generate race condition checking parts
711 711
712 712 Exists as an independent function to aid extensions
713 713 """
714 714 # * 'force' do not check for push race,
715 715 # * if we don't push anything, there are nothing to check.
716 716 if not pushop.force and pushop.outgoing.missingheads:
717 717 allowunrelated = 'related' in bundler.capabilities.get('checkheads', ())
718 718 emptyremote = pushop.pushbranchmap is None
719 719 if not allowunrelated or emptyremote:
720 720 bundler.newpart('check:heads', data=iter(pushop.remoteheads))
721 721 else:
722 722 affected = set()
723 723 for branch, heads in pushop.pushbranchmap.iteritems():
724 724 remoteheads, newheads, unsyncedheads, discardedheads = heads
725 725 if remoteheads is not None:
726 726 remote = set(remoteheads)
727 727 affected |= set(discardedheads) & remote
728 728 affected |= remote - set(newheads)
729 729 if affected:
730 730 data = iter(sorted(affected))
731 731 bundler.newpart('check:updated-heads', data=data)
732 732
733 733 @b2partsgenerator('changeset')
734 734 def _pushb2ctx(pushop, bundler):
735 735 """handle changegroup push through bundle2
736 736
737 737 addchangegroup result is stored in the ``pushop.cgresult`` attribute.
738 738 """
739 739 if 'changesets' in pushop.stepsdone:
740 740 return
741 741 pushop.stepsdone.add('changesets')
742 742 # Send known heads to the server for race detection.
743 743 if not _pushcheckoutgoing(pushop):
744 744 return
745 745 pushop.repo.prepushoutgoinghooks(pushop)
746 746
747 747 _pushb2ctxcheckheads(pushop, bundler)
748 748
749 749 b2caps = bundle2.bundle2caps(pushop.remote)
750 750 version = '01'
751 751 cgversions = b2caps.get('changegroup')
752 752 if cgversions: # 3.1 and 3.2 ship with an empty value
753 753 cgversions = [v for v in cgversions
754 754 if v in changegroup.supportedoutgoingversions(
755 755 pushop.repo)]
756 756 if not cgversions:
757 757 raise ValueError(_('no common changegroup version'))
758 758 version = max(cgversions)
759 759 cgstream = changegroup.makestream(pushop.repo, pushop.outgoing, version,
760 760 'push')
761 761 cgpart = bundler.newpart('changegroup', data=cgstream)
762 762 if cgversions:
763 763 cgpart.addparam('version', version)
764 764 if 'treemanifest' in pushop.repo.requirements:
765 765 cgpart.addparam('treemanifest', '1')
766 766 def handlereply(op):
767 767 """extract addchangegroup returns from server reply"""
768 768 cgreplies = op.records.getreplies(cgpart.id)
769 769 assert len(cgreplies['changegroup']) == 1
770 770 pushop.cgresult = cgreplies['changegroup'][0]['return']
771 771 return handlereply
772 772
773 773 @b2partsgenerator('phase')
774 774 def _pushb2phases(pushop, bundler):
775 775 """handle phase push through bundle2"""
776 776 if 'phases' in pushop.stepsdone:
777 777 return
778 778 b2caps = bundle2.bundle2caps(pushop.remote)
779 779 if not 'pushkey' in b2caps:
780 780 return
781 781 pushop.stepsdone.add('phases')
782 782 part2node = []
783 783
784 784 def handlefailure(pushop, exc):
785 785 targetid = int(exc.partid)
786 786 for partid, node in part2node:
787 787 if partid == targetid:
788 788 raise error.Abort(_('updating %s to public failed') % node)
789 789
790 790 enc = pushkey.encode
791 791 for newremotehead in pushop.outdatedphases:
792 792 part = bundler.newpart('pushkey')
793 793 part.addparam('namespace', enc('phases'))
794 794 part.addparam('key', enc(newremotehead.hex()))
795 795 part.addparam('old', enc('%d' % phases.draft))
796 796 part.addparam('new', enc('%d' % phases.public))
797 797 part2node.append((part.id, newremotehead))
798 798 pushop.pkfailcb[part.id] = handlefailure
799 799
800 800 def handlereply(op):
801 801 for partid, node in part2node:
802 802 partrep = op.records.getreplies(partid)
803 803 results = partrep['pushkey']
804 804 assert len(results) <= 1
805 805 msg = None
806 806 if not results:
807 807 msg = _('server ignored update of %s to public!\n') % node
808 808 elif not int(results[0]['return']):
809 809 msg = _('updating %s to public failed!\n') % node
810 810 if msg is not None:
811 811 pushop.ui.warn(msg)
812 812 return handlereply
813 813
814 814 @b2partsgenerator('obsmarkers')
815 815 def _pushb2obsmarkers(pushop, bundler):
816 816 if 'obsmarkers' in pushop.stepsdone:
817 817 return
818 818 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
819 819 if obsolete.commonversion(remoteversions) is None:
820 820 return
821 821 pushop.stepsdone.add('obsmarkers')
822 822 if pushop.outobsmarkers:
823 823 markers = sorted(pushop.outobsmarkers)
824 824 bundle2.buildobsmarkerspart(bundler, markers)
825 825
826 826 @b2partsgenerator('bookmarks')
827 827 def _pushb2bookmarks(pushop, bundler):
828 828 """handle bookmark push through bundle2"""
829 829 if 'bookmarks' in pushop.stepsdone:
830 830 return
831 831 b2caps = bundle2.bundle2caps(pushop.remote)
832 832 if 'pushkey' not in b2caps:
833 833 return
834 834 pushop.stepsdone.add('bookmarks')
835 835 part2book = []
836 836 enc = pushkey.encode
837 837
838 838 def handlefailure(pushop, exc):
839 839 targetid = int(exc.partid)
840 840 for partid, book, action in part2book:
841 841 if partid == targetid:
842 842 raise error.Abort(bookmsgmap[action][1].rstrip() % book)
843 843 # we should not be called for part we did not generated
844 844 assert False
845 845
846 846 for book, old, new in pushop.outbookmarks:
847 847 part = bundler.newpart('pushkey')
848 848 part.addparam('namespace', enc('bookmarks'))
849 849 part.addparam('key', enc(book))
850 850 part.addparam('old', enc(old))
851 851 part.addparam('new', enc(new))
852 852 action = 'update'
853 853 if not old:
854 854 action = 'export'
855 855 elif not new:
856 856 action = 'delete'
857 857 part2book.append((part.id, book, action))
858 858 pushop.pkfailcb[part.id] = handlefailure
859 859
860 860 def handlereply(op):
861 861 ui = pushop.ui
862 862 for partid, book, action in part2book:
863 863 partrep = op.records.getreplies(partid)
864 864 results = partrep['pushkey']
865 865 assert len(results) <= 1
866 866 if not results:
867 867 pushop.ui.warn(_('server ignored bookmark %s update\n') % book)
868 868 else:
869 869 ret = int(results[0]['return'])
870 870 if ret:
871 871 ui.status(bookmsgmap[action][0] % book)
872 872 else:
873 873 ui.warn(bookmsgmap[action][1] % book)
874 874 if pushop.bkresult is not None:
875 875 pushop.bkresult = 1
876 876 return handlereply
877 877
878 878 @b2partsgenerator('pushvars', idx=0)
879 879 def _getbundlesendvars(pushop, bundler):
880 880 '''send shellvars via bundle2'''
881 881 pushvars = pushop.pushvars
882 882 if pushvars:
883 883 shellvars = {}
884 884 for raw in pushvars:
885 885 if '=' not in raw:
886 886 msg = ("unable to parse variable '%s', should follow "
887 887 "'KEY=VALUE' or 'KEY=' format")
888 888 raise error.Abort(msg % raw)
889 889 k, v = raw.split('=', 1)
890 890 shellvars[k] = v
891 891
892 892 part = bundler.newpart('pushvars')
893 893
894 894 for key, value in shellvars.iteritems():
895 895 part.addparam(key, value, mandatory=False)
896 896
897 897 def _pushbundle2(pushop):
898 898 """push data to the remote using bundle2
899 899
900 900 The only currently supported type of data is changegroup but this will
901 901 evolve in the future."""
902 902 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
903 903 pushback = (pushop.trmanager
904 904 and pushop.ui.configbool('experimental', 'bundle2.pushback'))
905 905
906 906 # create reply capability
907 907 capsblob = bundle2.encodecaps(bundle2.getrepocaps(pushop.repo,
908 908 allowpushback=pushback))
909 909 bundler.newpart('replycaps', data=capsblob)
910 910 replyhandlers = []
911 911 for partgenname in b2partsgenorder:
912 912 partgen = b2partsgenmapping[partgenname]
913 913 ret = partgen(pushop, bundler)
914 914 if callable(ret):
915 915 replyhandlers.append(ret)
916 916 # do not push if nothing to push
917 917 if bundler.nbparts <= 1:
918 918 return
919 919 stream = util.chunkbuffer(bundler.getchunks())
920 920 try:
921 921 try:
922 922 reply = pushop.remote.unbundle(
923 923 stream, ['force'], pushop.remote.url())
924 924 except error.BundleValueError as exc:
925 925 raise error.Abort(_('missing support for %s') % exc)
926 926 try:
927 927 trgetter = None
928 928 if pushback:
929 929 trgetter = pushop.trmanager.transaction
930 930 op = bundle2.processbundle(pushop.repo, reply, trgetter)
931 931 except error.BundleValueError as exc:
932 932 raise error.Abort(_('missing support for %s') % exc)
933 933 except bundle2.AbortFromPart as exc:
934 934 pushop.ui.status(_('remote: %s\n') % exc)
935 935 if exc.hint is not None:
936 936 pushop.ui.status(_('remote: %s\n') % ('(%s)' % exc.hint))
937 937 raise error.Abort(_('push failed on remote'))
938 938 except error.PushkeyFailed as exc:
939 939 partid = int(exc.partid)
940 940 if partid not in pushop.pkfailcb:
941 941 raise
942 942 pushop.pkfailcb[partid](pushop, exc)
943 943 for rephand in replyhandlers:
944 944 rephand(op)
945 945
946 946 def _pushchangeset(pushop):
947 947 """Make the actual push of changeset bundle to remote repo"""
948 948 if 'changesets' in pushop.stepsdone:
949 949 return
950 950 pushop.stepsdone.add('changesets')
951 951 if not _pushcheckoutgoing(pushop):
952 952 return
953 953
954 954 # Should have verified this in push().
955 955 assert pushop.remote.capable('unbundle')
956 956
957 957 pushop.repo.prepushoutgoinghooks(pushop)
958 958 outgoing = pushop.outgoing
959 959 # TODO: get bundlecaps from remote
960 960 bundlecaps = None
961 961 # create a changegroup from local
962 962 if pushop.revs is None and not (outgoing.excluded
963 963 or pushop.repo.changelog.filteredrevs):
964 964 # push everything,
965 965 # use the fast path, no race possible on push
966 966 cg = changegroup.makechangegroup(pushop.repo, outgoing, '01', 'push',
967 967 fastpath=True, bundlecaps=bundlecaps)
968 968 else:
969 969 cg = changegroup.makechangegroup(pushop.repo, outgoing, '01',
970 970 'push', bundlecaps=bundlecaps)
971 971
972 972 # apply changegroup to remote
973 973 # local repo finds heads on server, finds out what
974 974 # revs it must push. once revs transferred, if server
975 975 # finds it has different heads (someone else won
976 976 # commit/push race), server aborts.
977 977 if pushop.force:
978 978 remoteheads = ['force']
979 979 else:
980 980 remoteheads = pushop.remoteheads
981 981 # ssh: return remote's addchangegroup()
982 982 # http: return remote's addchangegroup() or 0 for error
983 983 pushop.cgresult = pushop.remote.unbundle(cg, remoteheads,
984 984 pushop.repo.url())
985 985
986 986 def _pushsyncphase(pushop):
987 987 """synchronise phase information locally and remotely"""
988 988 cheads = pushop.commonheads
989 989 # even when we don't push, exchanging phase data is useful
990 990 remotephases = pushop.remote.listkeys('phases')
991 991 if (pushop.ui.configbool('ui', '_usedassubrepo')
992 992 and remotephases # server supports phases
993 993 and pushop.cgresult is None # nothing was pushed
994 994 and remotephases.get('publishing', False)):
995 995 # When:
996 996 # - this is a subrepo push
997 997 # - and remote support phase
998 998 # - and no changeset was pushed
999 999 # - and remote is publishing
1000 1000 # We may be in issue 3871 case!
1001 1001 # We drop the possible phase synchronisation done by
1002 1002 # courtesy to publish changesets possibly locally draft
1003 1003 # on the remote.
1004 1004 remotephases = {'publishing': 'True'}
1005 1005 if not remotephases: # old server or public only reply from non-publishing
1006 1006 _localphasemove(pushop, cheads)
1007 1007 # don't push any phase data as there is nothing to push
1008 1008 else:
1009 1009 ana = phases.analyzeremotephases(pushop.repo, cheads,
1010 1010 remotephases)
1011 1011 pheads, droots = ana
1012 1012 ### Apply remote phase on local
1013 1013 if remotephases.get('publishing', False):
1014 1014 _localphasemove(pushop, cheads)
1015 1015 else: # publish = False
1016 1016 _localphasemove(pushop, pheads)
1017 1017 _localphasemove(pushop, cheads, phases.draft)
1018 1018 ### Apply local phase on remote
1019 1019
1020 1020 if pushop.cgresult:
1021 1021 if 'phases' in pushop.stepsdone:
1022 1022 # phases already pushed though bundle2
1023 1023 return
1024 1024 outdated = pushop.outdatedphases
1025 1025 else:
1026 1026 outdated = pushop.fallbackoutdatedphases
1027 1027
1028 1028 pushop.stepsdone.add('phases')
1029 1029
1030 1030 # filter heads already turned public by the push
1031 1031 outdated = [c for c in outdated if c.node() not in pheads]
1032 1032 # fallback to independent pushkey command
1033 1033 for newremotehead in outdated:
1034 1034 r = pushop.remote.pushkey('phases',
1035 1035 newremotehead.hex(),
1036 1036 str(phases.draft),
1037 1037 str(phases.public))
1038 1038 if not r:
1039 1039 pushop.ui.warn(_('updating %s to public failed!\n')
1040 1040 % newremotehead)
1041 1041
1042 1042 def _localphasemove(pushop, nodes, phase=phases.public):
1043 1043 """move <nodes> to <phase> in the local source repo"""
1044 1044 if pushop.trmanager:
1045 1045 phases.advanceboundary(pushop.repo,
1046 1046 pushop.trmanager.transaction(),
1047 1047 phase,
1048 1048 nodes)
1049 1049 else:
1050 1050 # repo is not locked, do not change any phases!
1051 1051 # Informs the user that phases should have been moved when
1052 1052 # applicable.
1053 1053 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
1054 1054 phasestr = phases.phasenames[phase]
1055 1055 if actualmoves:
1056 1056 pushop.ui.status(_('cannot lock source repo, skipping '
1057 1057 'local %s phase update\n') % phasestr)
1058 1058
1059 1059 def _pushobsolete(pushop):
1060 1060 """utility function to push obsolete markers to a remote"""
1061 1061 if 'obsmarkers' in pushop.stepsdone:
1062 1062 return
1063 1063 repo = pushop.repo
1064 1064 remote = pushop.remote
1065 1065 pushop.stepsdone.add('obsmarkers')
1066 1066 if pushop.outobsmarkers:
1067 1067 pushop.ui.debug('try to push obsolete markers to remote\n')
1068 1068 rslts = []
1069 1069 remotedata = obsolete._pushkeyescape(sorted(pushop.outobsmarkers))
1070 1070 for key in sorted(remotedata, reverse=True):
1071 1071 # reverse sort to ensure we end with dump0
1072 1072 data = remotedata[key]
1073 1073 rslts.append(remote.pushkey('obsolete', key, '', data))
1074 1074 if [r for r in rslts if not r]:
1075 1075 msg = _('failed to push some obsolete markers!\n')
1076 1076 repo.ui.warn(msg)
1077 1077
1078 1078 def _pushbookmark(pushop):
1079 1079 """Update bookmark position on remote"""
1080 1080 if pushop.cgresult == 0 or 'bookmarks' in pushop.stepsdone:
1081 1081 return
1082 1082 pushop.stepsdone.add('bookmarks')
1083 1083 ui = pushop.ui
1084 1084 remote = pushop.remote
1085 1085
1086 1086 for b, old, new in pushop.outbookmarks:
1087 1087 action = 'update'
1088 1088 if not old:
1089 1089 action = 'export'
1090 1090 elif not new:
1091 1091 action = 'delete'
1092 1092 if remote.pushkey('bookmarks', b, old, new):
1093 1093 ui.status(bookmsgmap[action][0] % b)
1094 1094 else:
1095 1095 ui.warn(bookmsgmap[action][1] % b)
1096 1096 # discovery can have set the value form invalid entry
1097 1097 if pushop.bkresult is not None:
1098 1098 pushop.bkresult = 1
1099 1099
1100 1100 class pulloperation(object):
1101 1101 """A object that represent a single pull operation
1102 1102
1103 1103 It purpose is to carry pull related state and very common operation.
1104 1104
1105 1105 A new should be created at the beginning of each pull and discarded
1106 1106 afterward.
1107 1107 """
1108 1108
1109 1109 def __init__(self, repo, remote, heads=None, force=False, bookmarks=(),
1110 1110 remotebookmarks=None, streamclonerequested=None):
1111 1111 # repo we pull into
1112 1112 self.repo = repo
1113 1113 # repo we pull from
1114 1114 self.remote = remote
1115 1115 # revision we try to pull (None is "all")
1116 1116 self.heads = heads
1117 1117 # bookmark pulled explicitly
1118 1118 self.explicitbookmarks = [repo._bookmarks.expandname(bookmark)
1119 1119 for bookmark in bookmarks]
1120 1120 # do we force pull?
1121 1121 self.force = force
1122 1122 # whether a streaming clone was requested
1123 1123 self.streamclonerequested = streamclonerequested
1124 1124 # transaction manager
1125 1125 self.trmanager = None
1126 1126 # set of common changeset between local and remote before pull
1127 1127 self.common = None
1128 1128 # set of pulled head
1129 1129 self.rheads = None
1130 1130 # list of missing changeset to fetch remotely
1131 1131 self.fetch = None
1132 1132 # remote bookmarks data
1133 1133 self.remotebookmarks = remotebookmarks
1134 1134 # result of changegroup pulling (used as return code by pull)
1135 1135 self.cgresult = None
1136 1136 # list of step already done
1137 1137 self.stepsdone = set()
1138 1138 # Whether we attempted a clone from pre-generated bundles.
1139 1139 self.clonebundleattempted = False
1140 1140
1141 1141 @util.propertycache
1142 1142 def pulledsubset(self):
1143 1143 """heads of the set of changeset target by the pull"""
1144 1144 # compute target subset
1145 1145 if self.heads is None:
1146 1146 # We pulled every thing possible
1147 1147 # sync on everything common
1148 1148 c = set(self.common)
1149 1149 ret = list(self.common)
1150 1150 for n in self.rheads:
1151 1151 if n not in c:
1152 1152 ret.append(n)
1153 1153 return ret
1154 1154 else:
1155 1155 # We pulled a specific subset
1156 1156 # sync on this subset
1157 1157 return self.heads
1158 1158
1159 1159 @util.propertycache
1160 1160 def canusebundle2(self):
1161 1161 return not _forcebundle1(self)
1162 1162
1163 1163 @util.propertycache
1164 1164 def remotebundle2caps(self):
1165 1165 return bundle2.bundle2caps(self.remote)
1166 1166
1167 1167 def gettransaction(self):
1168 1168 # deprecated; talk to trmanager directly
1169 1169 return self.trmanager.transaction()
1170 1170
1171 1171 class transactionmanager(util.transactional):
1172 1172 """An object to manage the life cycle of a transaction
1173 1173
1174 1174 It creates the transaction on demand and calls the appropriate hooks when
1175 1175 closing the transaction."""
1176 1176 def __init__(self, repo, source, url):
1177 1177 self.repo = repo
1178 1178 self.source = source
1179 1179 self.url = url
1180 1180 self._tr = None
1181 1181
1182 1182 def transaction(self):
1183 1183 """Return an open transaction object, constructing if necessary"""
1184 1184 if not self._tr:
1185 1185 trname = '%s\n%s' % (self.source, util.hidepassword(self.url))
1186 1186 self._tr = self.repo.transaction(trname)
1187 1187 self._tr.hookargs['source'] = self.source
1188 1188 self._tr.hookargs['url'] = self.url
1189 1189 return self._tr
1190 1190
1191 1191 def close(self):
1192 1192 """close transaction if created"""
1193 1193 if self._tr is not None:
1194 1194 self._tr.close()
1195 1195
1196 1196 def release(self):
1197 1197 """release transaction if created"""
1198 1198 if self._tr is not None:
1199 1199 self._tr.release()
1200 1200
1201 1201 def pull(repo, remote, heads=None, force=False, bookmarks=(), opargs=None,
1202 1202 streamclonerequested=None):
1203 1203 """Fetch repository data from a remote.
1204 1204
1205 1205 This is the main function used to retrieve data from a remote repository.
1206 1206
1207 1207 ``repo`` is the local repository to clone into.
1208 1208 ``remote`` is a peer instance.
1209 1209 ``heads`` is an iterable of revisions we want to pull. ``None`` (the
1210 1210 default) means to pull everything from the remote.
1211 1211 ``bookmarks`` is an iterable of bookmarks requesting to be pulled. By
1212 1212 default, all remote bookmarks are pulled.
1213 1213 ``opargs`` are additional keyword arguments to pass to ``pulloperation``
1214 1214 initialization.
1215 1215 ``streamclonerequested`` is a boolean indicating whether a "streaming
1216 1216 clone" is requested. A "streaming clone" is essentially a raw file copy
1217 1217 of revlogs from the server. This only works when the local repository is
1218 1218 empty. The default value of ``None`` means to respect the server
1219 1219 configuration for preferring stream clones.
1220 1220
1221 1221 Returns the ``pulloperation`` created for this pull.
1222 1222 """
1223 1223 if opargs is None:
1224 1224 opargs = {}
1225 1225 pullop = pulloperation(repo, remote, heads, force, bookmarks=bookmarks,
1226 1226 streamclonerequested=streamclonerequested, **opargs)
1227 1227
1228 1228 peerlocal = pullop.remote.local()
1229 1229 if peerlocal:
1230 1230 missing = set(peerlocal.requirements) - pullop.repo.supported
1231 1231 if missing:
1232 1232 msg = _("required features are not"
1233 1233 " supported in the destination:"
1234 1234 " %s") % (', '.join(sorted(missing)))
1235 1235 raise error.Abort(msg)
1236 1236
1237 1237 wlock = lock = None
1238 1238 try:
1239 1239 wlock = pullop.repo.wlock()
1240 1240 lock = pullop.repo.lock()
1241 1241 pullop.trmanager = transactionmanager(repo, 'pull', remote.url())
1242 1242 streamclone.maybeperformlegacystreamclone(pullop)
1243 1243 # This should ideally be in _pullbundle2(). However, it needs to run
1244 1244 # before discovery to avoid extra work.
1245 1245 _maybeapplyclonebundle(pullop)
1246 1246 _pulldiscovery(pullop)
1247 1247 if pullop.canusebundle2:
1248 1248 _pullbundle2(pullop)
1249 1249 _pullchangeset(pullop)
1250 1250 _pullphase(pullop)
1251 1251 _pullbookmarks(pullop)
1252 1252 _pullobsolete(pullop)
1253 1253 pullop.trmanager.close()
1254 1254 finally:
1255 1255 lockmod.release(pullop.trmanager, lock, wlock)
1256 1256
1257 1257 return pullop
1258 1258
1259 1259 # list of steps to perform discovery before pull
1260 1260 pulldiscoveryorder = []
1261 1261
1262 1262 # Mapping between step name and function
1263 1263 #
1264 1264 # This exists to help extensions wrap steps if necessary
1265 1265 pulldiscoverymapping = {}
1266 1266
1267 1267 def pulldiscovery(stepname):
1268 1268 """decorator for function performing discovery before pull
1269 1269
1270 1270 The function is added to the step -> function mapping and appended to the
1271 1271 list of steps. Beware that decorated function will be added in order (this
1272 1272 may matter).
1273 1273
1274 1274 You can only use this decorator for a new step, if you want to wrap a step
1275 1275 from an extension, change the pulldiscovery dictionary directly."""
1276 1276 def dec(func):
1277 1277 assert stepname not in pulldiscoverymapping
1278 1278 pulldiscoverymapping[stepname] = func
1279 1279 pulldiscoveryorder.append(stepname)
1280 1280 return func
1281 1281 return dec
1282 1282
1283 1283 def _pulldiscovery(pullop):
1284 1284 """Run all discovery steps"""
1285 1285 for stepname in pulldiscoveryorder:
1286 1286 step = pulldiscoverymapping[stepname]
1287 1287 step(pullop)
1288 1288
1289 1289 @pulldiscovery('b1:bookmarks')
1290 1290 def _pullbookmarkbundle1(pullop):
1291 1291 """fetch bookmark data in bundle1 case
1292 1292
1293 1293 If not using bundle2, we have to fetch bookmarks before changeset
1294 1294 discovery to reduce the chance and impact of race conditions."""
1295 1295 if pullop.remotebookmarks is not None:
1296 1296 return
1297 1297 if pullop.canusebundle2 and 'listkeys' in pullop.remotebundle2caps:
1298 1298 # all known bundle2 servers now support listkeys, but lets be nice with
1299 1299 # new implementation.
1300 1300 return
1301 1301 pullop.remotebookmarks = pullop.remote.listkeys('bookmarks')
1302 1302
1303 1303
1304 1304 @pulldiscovery('changegroup')
1305 1305 def _pulldiscoverychangegroup(pullop):
1306 1306 """discovery phase for the pull
1307 1307
1308 1308 Current handle changeset discovery only, will change handle all discovery
1309 1309 at some point."""
1310 1310 tmp = discovery.findcommonincoming(pullop.repo,
1311 1311 pullop.remote,
1312 1312 heads=pullop.heads,
1313 1313 force=pullop.force)
1314 1314 common, fetch, rheads = tmp
1315 1315 nm = pullop.repo.unfiltered().changelog.nodemap
1316 1316 if fetch and rheads:
1317 1317 # If a remote heads is filtered locally, put in back in common.
1318 1318 #
1319 1319 # This is a hackish solution to catch most of "common but locally
1320 1320 # hidden situation". We do not performs discovery on unfiltered
1321 1321 # repository because it end up doing a pathological amount of round
1322 1322 # trip for w huge amount of changeset we do not care about.
1323 1323 #
1324 1324 # If a set of such "common but filtered" changeset exist on the server
1325 1325 # but are not including a remote heads, we'll not be able to detect it,
1326 1326 scommon = set(common)
1327 1327 for n in rheads:
1328 1328 if n in nm:
1329 1329 if n not in scommon:
1330 1330 common.append(n)
1331 1331 if set(rheads).issubset(set(common)):
1332 1332 fetch = []
1333 1333 pullop.common = common
1334 1334 pullop.fetch = fetch
1335 1335 pullop.rheads = rheads
1336 1336
1337 1337 def _pullbundle2(pullop):
1338 1338 """pull data using bundle2
1339 1339
1340 1340 For now, the only supported data are changegroup."""
1341 1341 kwargs = {'bundlecaps': caps20to10(pullop.repo)}
1342 1342
1343 1343 # At the moment we don't do stream clones over bundle2. If that is
1344 1344 # implemented then here's where the check for that will go.
1345 1345 streaming = False
1346 1346
1347 1347 # pulling changegroup
1348 1348 pullop.stepsdone.add('changegroup')
1349 1349
1350 1350 kwargs['common'] = pullop.common
1351 1351 kwargs['heads'] = pullop.heads or pullop.rheads
1352 1352 kwargs['cg'] = pullop.fetch
1353 1353
1354 1354 ui = pullop.repo.ui
1355 1355 legacyphase = 'phases' in ui.configlist('devel', 'legacy.exchange')
1356 1356 if (not legacyphase and 'heads' in pullop.remotebundle2caps.get('phases')):
1357 1357 kwargs['phases'] = True
1358 1358 pullop.stepsdone.add('phases')
1359 1359
1360 1360 if 'listkeys' in pullop.remotebundle2caps:
1361 1361 if 'phases' not in pullop.stepsdone:
1362 1362 kwargs['listkeys'] = ['phases']
1363 1363 if pullop.remotebookmarks is None:
1364 1364 # make sure to always includes bookmark data when migrating
1365 1365 # `hg incoming --bundle` to using this function.
1366 1366 kwargs.setdefault('listkeys', []).append('bookmarks')
1367 1367
1368 1368 # If this is a full pull / clone and the server supports the clone bundles
1369 1369 # feature, tell the server whether we attempted a clone bundle. The
1370 1370 # presence of this flag indicates the client supports clone bundles. This
1371 1371 # will enable the server to treat clients that support clone bundles
1372 1372 # differently from those that don't.
1373 1373 if (pullop.remote.capable('clonebundles')
1374 1374 and pullop.heads is None and list(pullop.common) == [nullid]):
1375 1375 kwargs['cbattempted'] = pullop.clonebundleattempted
1376 1376
1377 1377 if streaming:
1378 1378 pullop.repo.ui.status(_('streaming all changes\n'))
1379 1379 elif not pullop.fetch:
1380 1380 pullop.repo.ui.status(_("no changes found\n"))
1381 1381 pullop.cgresult = 0
1382 1382 else:
1383 1383 if pullop.heads is None and list(pullop.common) == [nullid]:
1384 1384 pullop.repo.ui.status(_("requesting all changes\n"))
1385 1385 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1386 1386 remoteversions = bundle2.obsmarkersversion(pullop.remotebundle2caps)
1387 1387 if obsolete.commonversion(remoteversions) is not None:
1388 1388 kwargs['obsmarkers'] = True
1389 1389 pullop.stepsdone.add('obsmarkers')
1390 1390 _pullbundle2extraprepare(pullop, kwargs)
1391 1391 bundle = pullop.remote.getbundle('pull', **pycompat.strkwargs(kwargs))
1392 1392 try:
1393 1393 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
1394 1394 except bundle2.AbortFromPart as exc:
1395 1395 pullop.repo.ui.status(_('remote: abort: %s\n') % exc)
1396 1396 raise error.Abort(_('pull failed on remote'), hint=exc.hint)
1397 1397 except error.BundleValueError as exc:
1398 1398 raise error.Abort(_('missing support for %s') % exc)
1399 1399
1400 1400 if pullop.fetch:
1401 1401 pullop.cgresult = bundle2.combinechangegroupresults(op)
1402 1402
1403 # If the bundle had a phase-heads part, then phase exchange is already done
1404 if op.records['phase-heads']:
1405 pullop.stepsdone.add('phases')
1406
1407 1403 # processing phases change
1408 1404 for namespace, value in op.records['listkeys']:
1409 1405 if namespace == 'phases':
1410 1406 _pullapplyphases(pullop, value)
1411 1407
1412 1408 # processing bookmark update
1413 1409 for namespace, value in op.records['listkeys']:
1414 1410 if namespace == 'bookmarks':
1415 1411 pullop.remotebookmarks = value
1416 1412
1417 1413 # bookmark data were either already there or pulled in the bundle
1418 1414 if pullop.remotebookmarks is not None:
1419 1415 _pullbookmarks(pullop)
1420 1416
1421 1417 def _pullbundle2extraprepare(pullop, kwargs):
1422 1418 """hook function so that extensions can extend the getbundle call"""
1423 1419 pass
1424 1420
1425 1421 def _pullchangeset(pullop):
1426 1422 """pull changeset from unbundle into the local repo"""
1427 1423 # We delay the open of the transaction as late as possible so we
1428 1424 # don't open transaction for nothing or you break future useful
1429 1425 # rollback call
1430 1426 if 'changegroup' in pullop.stepsdone:
1431 1427 return
1432 1428 pullop.stepsdone.add('changegroup')
1433 1429 if not pullop.fetch:
1434 1430 pullop.repo.ui.status(_("no changes found\n"))
1435 1431 pullop.cgresult = 0
1436 1432 return
1437 1433 tr = pullop.gettransaction()
1438 1434 if pullop.heads is None and list(pullop.common) == [nullid]:
1439 1435 pullop.repo.ui.status(_("requesting all changes\n"))
1440 1436 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
1441 1437 # issue1320, avoid a race if remote changed after discovery
1442 1438 pullop.heads = pullop.rheads
1443 1439
1444 1440 if pullop.remote.capable('getbundle'):
1445 1441 # TODO: get bundlecaps from remote
1446 1442 cg = pullop.remote.getbundle('pull', common=pullop.common,
1447 1443 heads=pullop.heads or pullop.rheads)
1448 1444 elif pullop.heads is None:
1449 1445 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
1450 1446 elif not pullop.remote.capable('changegroupsubset'):
1451 1447 raise error.Abort(_("partial pull cannot be done because "
1452 1448 "other repository doesn't support "
1453 1449 "changegroupsubset."))
1454 1450 else:
1455 1451 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
1456 1452 bundleop = bundle2.applybundle(pullop.repo, cg, tr, 'pull',
1457 1453 pullop.remote.url())
1458 1454 pullop.cgresult = bundle2.combinechangegroupresults(bundleop)
1459 1455
1460 1456 def _pullphase(pullop):
1461 1457 # Get remote phases data from remote
1462 1458 if 'phases' in pullop.stepsdone:
1463 1459 return
1464 1460 remotephases = pullop.remote.listkeys('phases')
1465 1461 _pullapplyphases(pullop, remotephases)
1466 1462
1467 1463 def _pullapplyphases(pullop, remotephases):
1468 1464 """apply phase movement from observed remote state"""
1469 1465 if 'phases' in pullop.stepsdone:
1470 1466 return
1471 1467 pullop.stepsdone.add('phases')
1472 1468 publishing = bool(remotephases.get('publishing', False))
1473 1469 if remotephases and not publishing:
1474 1470 # remote is new and non-publishing
1475 1471 pheads, _dr = phases.analyzeremotephases(pullop.repo,
1476 1472 pullop.pulledsubset,
1477 1473 remotephases)
1478 1474 dheads = pullop.pulledsubset
1479 1475 else:
1480 1476 # Remote is old or publishing all common changesets
1481 1477 # should be seen as public
1482 1478 pheads = pullop.pulledsubset
1483 1479 dheads = []
1484 1480 unfi = pullop.repo.unfiltered()
1485 1481 phase = unfi._phasecache.phase
1486 1482 rev = unfi.changelog.nodemap.get
1487 1483 public = phases.public
1488 1484 draft = phases.draft
1489 1485
1490 1486 # exclude changesets already public locally and update the others
1491 1487 pheads = [pn for pn in pheads if phase(unfi, rev(pn)) > public]
1492 1488 if pheads:
1493 1489 tr = pullop.gettransaction()
1494 1490 phases.advanceboundary(pullop.repo, tr, public, pheads)
1495 1491
1496 1492 # exclude changesets already draft locally and update the others
1497 1493 dheads = [pn for pn in dheads if phase(unfi, rev(pn)) > draft]
1498 1494 if dheads:
1499 1495 tr = pullop.gettransaction()
1500 1496 phases.advanceboundary(pullop.repo, tr, draft, dheads)
1501 1497
1502 1498 def _pullbookmarks(pullop):
1503 1499 """process the remote bookmark information to update the local one"""
1504 1500 if 'bookmarks' in pullop.stepsdone:
1505 1501 return
1506 1502 pullop.stepsdone.add('bookmarks')
1507 1503 repo = pullop.repo
1508 1504 remotebookmarks = pullop.remotebookmarks
1509 1505 remotebookmarks = bookmod.unhexlifybookmarks(remotebookmarks)
1510 1506 bookmod.updatefromremote(repo.ui, repo, remotebookmarks,
1511 1507 pullop.remote.url(),
1512 1508 pullop.gettransaction,
1513 1509 explicit=pullop.explicitbookmarks)
1514 1510
1515 1511 def _pullobsolete(pullop):
1516 1512 """utility function to pull obsolete markers from a remote
1517 1513
1518 1514 The `gettransaction` is function that return the pull transaction, creating
1519 1515 one if necessary. We return the transaction to inform the calling code that
1520 1516 a new transaction have been created (when applicable).
1521 1517
1522 1518 Exists mostly to allow overriding for experimentation purpose"""
1523 1519 if 'obsmarkers' in pullop.stepsdone:
1524 1520 return
1525 1521 pullop.stepsdone.add('obsmarkers')
1526 1522 tr = None
1527 1523 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1528 1524 pullop.repo.ui.debug('fetching remote obsolete markers\n')
1529 1525 remoteobs = pullop.remote.listkeys('obsolete')
1530 1526 if 'dump0' in remoteobs:
1531 1527 tr = pullop.gettransaction()
1532 1528 markers = []
1533 1529 for key in sorted(remoteobs, reverse=True):
1534 1530 if key.startswith('dump'):
1535 1531 data = util.b85decode(remoteobs[key])
1536 1532 version, newmarks = obsolete._readmarkers(data)
1537 1533 markers += newmarks
1538 1534 if markers:
1539 1535 pullop.repo.obsstore.add(tr, markers)
1540 1536 pullop.repo.invalidatevolatilesets()
1541 1537 return tr
1542 1538
1543 1539 def caps20to10(repo):
1544 1540 """return a set with appropriate options to use bundle20 during getbundle"""
1545 1541 caps = {'HG20'}
1546 1542 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
1547 1543 caps.add('bundle2=' + urlreq.quote(capsblob))
1548 1544 return caps
1549 1545
1550 1546 # List of names of steps to perform for a bundle2 for getbundle, order matters.
1551 1547 getbundle2partsorder = []
1552 1548
1553 1549 # Mapping between step name and function
1554 1550 #
1555 1551 # This exists to help extensions wrap steps if necessary
1556 1552 getbundle2partsmapping = {}
1557 1553
1558 1554 def getbundle2partsgenerator(stepname, idx=None):
1559 1555 """decorator for function generating bundle2 part for getbundle
1560 1556
1561 1557 The function is added to the step -> function mapping and appended to the
1562 1558 list of steps. Beware that decorated functions will be added in order
1563 1559 (this may matter).
1564 1560
1565 1561 You can only use this decorator for new steps, if you want to wrap a step
1566 1562 from an extension, attack the getbundle2partsmapping dictionary directly."""
1567 1563 def dec(func):
1568 1564 assert stepname not in getbundle2partsmapping
1569 1565 getbundle2partsmapping[stepname] = func
1570 1566 if idx is None:
1571 1567 getbundle2partsorder.append(stepname)
1572 1568 else:
1573 1569 getbundle2partsorder.insert(idx, stepname)
1574 1570 return func
1575 1571 return dec
1576 1572
1577 1573 def bundle2requested(bundlecaps):
1578 1574 if bundlecaps is not None:
1579 1575 return any(cap.startswith('HG2') for cap in bundlecaps)
1580 1576 return False
1581 1577
1582 1578 def getbundlechunks(repo, source, heads=None, common=None, bundlecaps=None,
1583 1579 **kwargs):
1584 1580 """Return chunks constituting a bundle's raw data.
1585 1581
1586 1582 Could be a bundle HG10 or a bundle HG20 depending on bundlecaps
1587 1583 passed.
1588 1584
1589 1585 Returns an iterator over raw chunks (of varying sizes).
1590 1586 """
1591 1587 kwargs = pycompat.byteskwargs(kwargs)
1592 1588 usebundle2 = bundle2requested(bundlecaps)
1593 1589 # bundle10 case
1594 1590 if not usebundle2:
1595 1591 if bundlecaps and not kwargs.get('cg', True):
1596 1592 raise ValueError(_('request for bundle10 must include changegroup'))
1597 1593
1598 1594 if kwargs:
1599 1595 raise ValueError(_('unsupported getbundle arguments: %s')
1600 1596 % ', '.join(sorted(kwargs.keys())))
1601 1597 outgoing = _computeoutgoing(repo, heads, common)
1602 1598 return changegroup.makestream(repo, outgoing, '01', source,
1603 1599 bundlecaps=bundlecaps)
1604 1600
1605 1601 # bundle20 case
1606 1602 b2caps = {}
1607 1603 for bcaps in bundlecaps:
1608 1604 if bcaps.startswith('bundle2='):
1609 1605 blob = urlreq.unquote(bcaps[len('bundle2='):])
1610 1606 b2caps.update(bundle2.decodecaps(blob))
1611 1607 bundler = bundle2.bundle20(repo.ui, b2caps)
1612 1608
1613 1609 kwargs['heads'] = heads
1614 1610 kwargs['common'] = common
1615 1611
1616 1612 for name in getbundle2partsorder:
1617 1613 func = getbundle2partsmapping[name]
1618 1614 func(bundler, repo, source, bundlecaps=bundlecaps, b2caps=b2caps,
1619 1615 **pycompat.strkwargs(kwargs))
1620 1616
1621 1617 return bundler.getchunks()
1622 1618
1623 1619 @getbundle2partsgenerator('changegroup')
1624 1620 def _getbundlechangegrouppart(bundler, repo, source, bundlecaps=None,
1625 1621 b2caps=None, heads=None, common=None, **kwargs):
1626 1622 """add a changegroup part to the requested bundle"""
1627 1623 cgstream = None
1628 1624 if kwargs.get('cg', True):
1629 1625 # build changegroup bundle here.
1630 1626 version = '01'
1631 1627 cgversions = b2caps.get('changegroup')
1632 1628 if cgversions: # 3.1 and 3.2 ship with an empty value
1633 1629 cgversions = [v for v in cgversions
1634 1630 if v in changegroup.supportedoutgoingversions(repo)]
1635 1631 if not cgversions:
1636 1632 raise ValueError(_('no common changegroup version'))
1637 1633 version = max(cgversions)
1638 1634 outgoing = _computeoutgoing(repo, heads, common)
1639 1635 if outgoing.missing:
1640 1636 cgstream = changegroup.makestream(repo, outgoing, version, source,
1641 1637 bundlecaps=bundlecaps)
1642 1638
1643 1639 if cgstream:
1644 1640 part = bundler.newpart('changegroup', data=cgstream)
1645 1641 if cgversions:
1646 1642 part.addparam('version', version)
1647 1643 part.addparam('nbchanges', '%d' % len(outgoing.missing),
1648 1644 mandatory=False)
1649 1645 if 'treemanifest' in repo.requirements:
1650 1646 part.addparam('treemanifest', '1')
1651 1647
1652 1648 @getbundle2partsgenerator('listkeys')
1653 1649 def _getbundlelistkeysparts(bundler, repo, source, bundlecaps=None,
1654 1650 b2caps=None, **kwargs):
1655 1651 """add parts containing listkeys namespaces to the requested bundle"""
1656 1652 listkeys = kwargs.get('listkeys', ())
1657 1653 for namespace in listkeys:
1658 1654 part = bundler.newpart('listkeys')
1659 1655 part.addparam('namespace', namespace)
1660 1656 keys = repo.listkeys(namespace).items()
1661 1657 part.data = pushkey.encodekeys(keys)
1662 1658
1663 1659 @getbundle2partsgenerator('obsmarkers')
1664 1660 def _getbundleobsmarkerpart(bundler, repo, source, bundlecaps=None,
1665 1661 b2caps=None, heads=None, **kwargs):
1666 1662 """add an obsolescence markers part to the requested bundle"""
1667 1663 if kwargs.get('obsmarkers', False):
1668 1664 if heads is None:
1669 1665 heads = repo.heads()
1670 1666 subset = [c.node() for c in repo.set('::%ln', heads)]
1671 1667 markers = repo.obsstore.relevantmarkers(subset)
1672 1668 markers = sorted(markers)
1673 1669 bundle2.buildobsmarkerspart(bundler, markers)
1674 1670
1675 1671 @getbundle2partsgenerator('phases')
1676 1672 def _getbundlephasespart(bundler, repo, source, bundlecaps=None,
1677 1673 b2caps=None, heads=None, **kwargs):
1678 1674 """add phase heads part to the requested bundle"""
1679 1675 if kwargs.get('phases', False):
1680 1676 if not 'heads' in b2caps.get('phases'):
1681 1677 raise ValueError(_('no common phases exchange method'))
1682 1678 if heads is None:
1683 1679 heads = repo.heads()
1684 1680
1685 1681 headsbyphase = collections.defaultdict(set)
1686 1682 if repo.publishing():
1687 1683 headsbyphase[phases.public] = heads
1688 1684 else:
1689 1685 # find the appropriate heads to move
1690 1686
1691 1687 phase = repo._phasecache.phase
1692 1688 node = repo.changelog.node
1693 1689 rev = repo.changelog.rev
1694 1690 for h in heads:
1695 1691 headsbyphase[phase(repo, rev(h))].add(h)
1696 1692 seenphases = list(headsbyphase.keys())
1697 1693
1698 1694 # We do not handle anything but public and draft phase for now)
1699 1695 if seenphases:
1700 1696 assert max(seenphases) <= phases.draft
1701 1697
1702 1698 # if client is pulling non-public changesets, we need to find
1703 1699 # intermediate public heads.
1704 1700 draftheads = headsbyphase.get(phases.draft, set())
1705 1701 if draftheads:
1706 1702 publicheads = headsbyphase.get(phases.public, set())
1707 1703
1708 1704 revset = 'heads(only(%ln, %ln) and public())'
1709 1705 extraheads = repo.revs(revset, draftheads, publicheads)
1710 1706 for r in extraheads:
1711 1707 headsbyphase[phases.public].add(node(r))
1712 1708
1713 1709 # transform data in a format used by the encoding function
1714 1710 phasemapping = []
1715 1711 for phase in phases.allphases:
1716 1712 phasemapping.append(sorted(headsbyphase[phase]))
1717 1713
1718 1714 # generate the actual part
1719 1715 phasedata = phases.binaryencode(phasemapping)
1720 1716 bundler.newpart('phase-heads', data=phasedata)
1721 1717
1722 1718 @getbundle2partsgenerator('hgtagsfnodes')
1723 1719 def _getbundletagsfnodes(bundler, repo, source, bundlecaps=None,
1724 1720 b2caps=None, heads=None, common=None,
1725 1721 **kwargs):
1726 1722 """Transfer the .hgtags filenodes mapping.
1727 1723
1728 1724 Only values for heads in this bundle will be transferred.
1729 1725
1730 1726 The part data consists of pairs of 20 byte changeset node and .hgtags
1731 1727 filenodes raw values.
1732 1728 """
1733 1729 # Don't send unless:
1734 1730 # - changeset are being exchanged,
1735 1731 # - the client supports it.
1736 1732 if not (kwargs.get('cg', True) and 'hgtagsfnodes' in b2caps):
1737 1733 return
1738 1734
1739 1735 outgoing = _computeoutgoing(repo, heads, common)
1740 1736 bundle2.addparttagsfnodescache(repo, bundler, outgoing)
1741 1737
1742 1738 def _getbookmarks(repo, **kwargs):
1743 1739 """Returns bookmark to node mapping.
1744 1740
1745 1741 This function is primarily used to generate `bookmarks` bundle2 part.
1746 1742 It is a separate function in order to make it easy to wrap it
1747 1743 in extensions. Passing `kwargs` to the function makes it easy to
1748 1744 add new parameters in extensions.
1749 1745 """
1750 1746
1751 1747 return dict(bookmod.listbinbookmarks(repo))
1752 1748
1753 1749 def check_heads(repo, their_heads, context):
1754 1750 """check if the heads of a repo have been modified
1755 1751
1756 1752 Used by peer for unbundling.
1757 1753 """
1758 1754 heads = repo.heads()
1759 1755 heads_hash = hashlib.sha1(''.join(sorted(heads))).digest()
1760 1756 if not (their_heads == ['force'] or their_heads == heads or
1761 1757 their_heads == ['hashed', heads_hash]):
1762 1758 # someone else committed/pushed/unbundled while we
1763 1759 # were transferring data
1764 1760 raise error.PushRaced('repository changed while %s - '
1765 1761 'please try again' % context)
1766 1762
1767 1763 def unbundle(repo, cg, heads, source, url):
1768 1764 """Apply a bundle to a repo.
1769 1765
1770 1766 this function makes sure the repo is locked during the application and have
1771 1767 mechanism to check that no push race occurred between the creation of the
1772 1768 bundle and its application.
1773 1769
1774 1770 If the push was raced as PushRaced exception is raised."""
1775 1771 r = 0
1776 1772 # need a transaction when processing a bundle2 stream
1777 1773 # [wlock, lock, tr] - needs to be an array so nested functions can modify it
1778 1774 lockandtr = [None, None, None]
1779 1775 recordout = None
1780 1776 # quick fix for output mismatch with bundle2 in 3.4
1781 1777 captureoutput = repo.ui.configbool('experimental', 'bundle2-output-capture')
1782 1778 if url.startswith('remote:http:') or url.startswith('remote:https:'):
1783 1779 captureoutput = True
1784 1780 try:
1785 1781 # note: outside bundle1, 'heads' is expected to be empty and this
1786 1782 # 'check_heads' call wil be a no-op
1787 1783 check_heads(repo, heads, 'uploading changes')
1788 1784 # push can proceed
1789 1785 if not isinstance(cg, bundle2.unbundle20):
1790 1786 # legacy case: bundle1 (changegroup 01)
1791 1787 txnname = "\n".join([source, util.hidepassword(url)])
1792 1788 with repo.lock(), repo.transaction(txnname) as tr:
1793 1789 op = bundle2.applybundle(repo, cg, tr, source, url)
1794 1790 r = bundle2.combinechangegroupresults(op)
1795 1791 else:
1796 1792 r = None
1797 1793 try:
1798 1794 def gettransaction():
1799 1795 if not lockandtr[2]:
1800 1796 lockandtr[0] = repo.wlock()
1801 1797 lockandtr[1] = repo.lock()
1802 1798 lockandtr[2] = repo.transaction(source)
1803 1799 lockandtr[2].hookargs['source'] = source
1804 1800 lockandtr[2].hookargs['url'] = url
1805 1801 lockandtr[2].hookargs['bundle2'] = '1'
1806 1802 return lockandtr[2]
1807 1803
1808 1804 # Do greedy locking by default until we're satisfied with lazy
1809 1805 # locking.
1810 1806 if not repo.ui.configbool('experimental', 'bundle2lazylocking'):
1811 1807 gettransaction()
1812 1808
1813 1809 op = bundle2.bundleoperation(repo, gettransaction,
1814 1810 captureoutput=captureoutput)
1815 1811 try:
1816 1812 op = bundle2.processbundle(repo, cg, op=op)
1817 1813 finally:
1818 1814 r = op.reply
1819 1815 if captureoutput and r is not None:
1820 1816 repo.ui.pushbuffer(error=True, subproc=True)
1821 1817 def recordout(output):
1822 1818 r.newpart('output', data=output, mandatory=False)
1823 1819 if lockandtr[2] is not None:
1824 1820 lockandtr[2].close()
1825 1821 except BaseException as exc:
1826 1822 exc.duringunbundle2 = True
1827 1823 if captureoutput and r is not None:
1828 1824 parts = exc._bundle2salvagedoutput = r.salvageoutput()
1829 1825 def recordout(output):
1830 1826 part = bundle2.bundlepart('output', data=output,
1831 1827 mandatory=False)
1832 1828 parts.append(part)
1833 1829 raise
1834 1830 finally:
1835 1831 lockmod.release(lockandtr[2], lockandtr[1], lockandtr[0])
1836 1832 if recordout is not None:
1837 1833 recordout(repo.ui.popbuffer())
1838 1834 return r
1839 1835
1840 1836 def _maybeapplyclonebundle(pullop):
1841 1837 """Apply a clone bundle from a remote, if possible."""
1842 1838
1843 1839 repo = pullop.repo
1844 1840 remote = pullop.remote
1845 1841
1846 1842 if not repo.ui.configbool('ui', 'clonebundles'):
1847 1843 return
1848 1844
1849 1845 # Only run if local repo is empty.
1850 1846 if len(repo):
1851 1847 return
1852 1848
1853 1849 if pullop.heads:
1854 1850 return
1855 1851
1856 1852 if not remote.capable('clonebundles'):
1857 1853 return
1858 1854
1859 1855 res = remote._call('clonebundles')
1860 1856
1861 1857 # If we call the wire protocol command, that's good enough to record the
1862 1858 # attempt.
1863 1859 pullop.clonebundleattempted = True
1864 1860
1865 1861 entries = parseclonebundlesmanifest(repo, res)
1866 1862 if not entries:
1867 1863 repo.ui.note(_('no clone bundles available on remote; '
1868 1864 'falling back to regular clone\n'))
1869 1865 return
1870 1866
1871 1867 entries = filterclonebundleentries(repo, entries)
1872 1868 if not entries:
1873 1869 # There is a thundering herd concern here. However, if a server
1874 1870 # operator doesn't advertise bundles appropriate for its clients,
1875 1871 # they deserve what's coming. Furthermore, from a client's
1876 1872 # perspective, no automatic fallback would mean not being able to
1877 1873 # clone!
1878 1874 repo.ui.warn(_('no compatible clone bundles available on server; '
1879 1875 'falling back to regular clone\n'))
1880 1876 repo.ui.warn(_('(you may want to report this to the server '
1881 1877 'operator)\n'))
1882 1878 return
1883 1879
1884 1880 entries = sortclonebundleentries(repo.ui, entries)
1885 1881
1886 1882 url = entries[0]['URL']
1887 1883 repo.ui.status(_('applying clone bundle from %s\n') % url)
1888 1884 if trypullbundlefromurl(repo.ui, repo, url):
1889 1885 repo.ui.status(_('finished applying clone bundle\n'))
1890 1886 # Bundle failed.
1891 1887 #
1892 1888 # We abort by default to avoid the thundering herd of
1893 1889 # clients flooding a server that was expecting expensive
1894 1890 # clone load to be offloaded.
1895 1891 elif repo.ui.configbool('ui', 'clonebundlefallback'):
1896 1892 repo.ui.warn(_('falling back to normal clone\n'))
1897 1893 else:
1898 1894 raise error.Abort(_('error applying bundle'),
1899 1895 hint=_('if this error persists, consider contacting '
1900 1896 'the server operator or disable clone '
1901 1897 'bundles via '
1902 1898 '"--config ui.clonebundles=false"'))
1903 1899
1904 1900 def parseclonebundlesmanifest(repo, s):
1905 1901 """Parses the raw text of a clone bundles manifest.
1906 1902
1907 1903 Returns a list of dicts. The dicts have a ``URL`` key corresponding
1908 1904 to the URL and other keys are the attributes for the entry.
1909 1905 """
1910 1906 m = []
1911 1907 for line in s.splitlines():
1912 1908 fields = line.split()
1913 1909 if not fields:
1914 1910 continue
1915 1911 attrs = {'URL': fields[0]}
1916 1912 for rawattr in fields[1:]:
1917 1913 key, value = rawattr.split('=', 1)
1918 1914 key = urlreq.unquote(key)
1919 1915 value = urlreq.unquote(value)
1920 1916 attrs[key] = value
1921 1917
1922 1918 # Parse BUNDLESPEC into components. This makes client-side
1923 1919 # preferences easier to specify since you can prefer a single
1924 1920 # component of the BUNDLESPEC.
1925 1921 if key == 'BUNDLESPEC':
1926 1922 try:
1927 1923 comp, version, params = parsebundlespec(repo, value,
1928 1924 externalnames=True)
1929 1925 attrs['COMPRESSION'] = comp
1930 1926 attrs['VERSION'] = version
1931 1927 except error.InvalidBundleSpecification:
1932 1928 pass
1933 1929 except error.UnsupportedBundleSpecification:
1934 1930 pass
1935 1931
1936 1932 m.append(attrs)
1937 1933
1938 1934 return m
1939 1935
1940 1936 def filterclonebundleentries(repo, entries):
1941 1937 """Remove incompatible clone bundle manifest entries.
1942 1938
1943 1939 Accepts a list of entries parsed with ``parseclonebundlesmanifest``
1944 1940 and returns a new list consisting of only the entries that this client
1945 1941 should be able to apply.
1946 1942
1947 1943 There is no guarantee we'll be able to apply all returned entries because
1948 1944 the metadata we use to filter on may be missing or wrong.
1949 1945 """
1950 1946 newentries = []
1951 1947 for entry in entries:
1952 1948 spec = entry.get('BUNDLESPEC')
1953 1949 if spec:
1954 1950 try:
1955 1951 parsebundlespec(repo, spec, strict=True)
1956 1952 except error.InvalidBundleSpecification as e:
1957 1953 repo.ui.debug(str(e) + '\n')
1958 1954 continue
1959 1955 except error.UnsupportedBundleSpecification as e:
1960 1956 repo.ui.debug('filtering %s because unsupported bundle '
1961 1957 'spec: %s\n' % (entry['URL'], str(e)))
1962 1958 continue
1963 1959
1964 1960 if 'REQUIRESNI' in entry and not sslutil.hassni:
1965 1961 repo.ui.debug('filtering %s because SNI not supported\n' %
1966 1962 entry['URL'])
1967 1963 continue
1968 1964
1969 1965 newentries.append(entry)
1970 1966
1971 1967 return newentries
1972 1968
1973 1969 class clonebundleentry(object):
1974 1970 """Represents an item in a clone bundles manifest.
1975 1971
1976 1972 This rich class is needed to support sorting since sorted() in Python 3
1977 1973 doesn't support ``cmp`` and our comparison is complex enough that ``key=``
1978 1974 won't work.
1979 1975 """
1980 1976
1981 1977 def __init__(self, value, prefers):
1982 1978 self.value = value
1983 1979 self.prefers = prefers
1984 1980
1985 1981 def _cmp(self, other):
1986 1982 for prefkey, prefvalue in self.prefers:
1987 1983 avalue = self.value.get(prefkey)
1988 1984 bvalue = other.value.get(prefkey)
1989 1985
1990 1986 # Special case for b missing attribute and a matches exactly.
1991 1987 if avalue is not None and bvalue is None and avalue == prefvalue:
1992 1988 return -1
1993 1989
1994 1990 # Special case for a missing attribute and b matches exactly.
1995 1991 if bvalue is not None and avalue is None and bvalue == prefvalue:
1996 1992 return 1
1997 1993
1998 1994 # We can't compare unless attribute present on both.
1999 1995 if avalue is None or bvalue is None:
2000 1996 continue
2001 1997
2002 1998 # Same values should fall back to next attribute.
2003 1999 if avalue == bvalue:
2004 2000 continue
2005 2001
2006 2002 # Exact matches come first.
2007 2003 if avalue == prefvalue:
2008 2004 return -1
2009 2005 if bvalue == prefvalue:
2010 2006 return 1
2011 2007
2012 2008 # Fall back to next attribute.
2013 2009 continue
2014 2010
2015 2011 # If we got here we couldn't sort by attributes and prefers. Fall
2016 2012 # back to index order.
2017 2013 return 0
2018 2014
2019 2015 def __lt__(self, other):
2020 2016 return self._cmp(other) < 0
2021 2017
2022 2018 def __gt__(self, other):
2023 2019 return self._cmp(other) > 0
2024 2020
2025 2021 def __eq__(self, other):
2026 2022 return self._cmp(other) == 0
2027 2023
2028 2024 def __le__(self, other):
2029 2025 return self._cmp(other) <= 0
2030 2026
2031 2027 def __ge__(self, other):
2032 2028 return self._cmp(other) >= 0
2033 2029
2034 2030 def __ne__(self, other):
2035 2031 return self._cmp(other) != 0
2036 2032
2037 2033 def sortclonebundleentries(ui, entries):
2038 2034 prefers = ui.configlist('ui', 'clonebundleprefers')
2039 2035 if not prefers:
2040 2036 return list(entries)
2041 2037
2042 2038 prefers = [p.split('=', 1) for p in prefers]
2043 2039
2044 2040 items = sorted(clonebundleentry(v, prefers) for v in entries)
2045 2041 return [i.value for i in items]
2046 2042
2047 2043 def trypullbundlefromurl(ui, repo, url):
2048 2044 """Attempt to apply a bundle from a URL."""
2049 2045 with repo.lock(), repo.transaction('bundleurl') as tr:
2050 2046 try:
2051 2047 fh = urlmod.open(ui, url)
2052 2048 cg = readbundle(ui, fh, 'stream')
2053 2049
2054 2050 if isinstance(cg, streamclone.streamcloneapplier):
2055 2051 cg.apply(repo)
2056 2052 else:
2057 2053 bundle2.applybundle(repo, cg, tr, 'clonebundles', url)
2058 2054 return True
2059 2055 except urlerr.httperror as e:
2060 2056 ui.warn(_('HTTP error fetching bundle: %s\n') % str(e))
2061 2057 except urlerr.urlerror as e:
2062 2058 ui.warn(_('error fetching bundle: %s\n') % e.reason)
2063 2059
2064 2060 return False
General Comments 0
You need to be logged in to leave comments. Login now