##// END OF EJS Templates
exchange: don't attempt phase exchange if phase-heads was in bundle...
Martin von Zweigbergk -
r33887:13dc7f29 default
parent child Browse files
Show More
@@ -1,1894 +1,1895 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import, division
149 149
150 150 import errno
151 151 import re
152 152 import string
153 153 import struct
154 154 import sys
155 155
156 156 from .i18n import _
157 157 from . import (
158 158 changegroup,
159 159 error,
160 160 obsolete,
161 161 phases,
162 162 pushkey,
163 163 pycompat,
164 164 tags,
165 165 url,
166 166 util,
167 167 )
168 168
169 169 urlerr = util.urlerr
170 170 urlreq = util.urlreq
171 171
172 172 _pack = struct.pack
173 173 _unpack = struct.unpack
174 174
175 175 _fstreamparamsize = '>i'
176 176 _fpartheadersize = '>i'
177 177 _fparttypesize = '>B'
178 178 _fpartid = '>I'
179 179 _fpayloadsize = '>i'
180 180 _fpartparamcount = '>BB'
181 181
182 182 _fphasesentry = '>i20s'
183 183
184 184 preferedchunksize = 4096
185 185
186 186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
187 187
188 188 def outdebug(ui, message):
189 189 """debug regarding output stream (bundling)"""
190 190 if ui.configbool('devel', 'bundle2.debug'):
191 191 ui.debug('bundle2-output: %s\n' % message)
192 192
193 193 def indebug(ui, message):
194 194 """debug on input stream (unbundling)"""
195 195 if ui.configbool('devel', 'bundle2.debug'):
196 196 ui.debug('bundle2-input: %s\n' % message)
197 197
198 198 def validateparttype(parttype):
199 199 """raise ValueError if a parttype contains invalid character"""
200 200 if _parttypeforbidden.search(parttype):
201 201 raise ValueError(parttype)
202 202
203 203 def _makefpartparamsizes(nbparams):
204 204 """return a struct format to read part parameter sizes
205 205
206 206 The number parameters is variable so we need to build that format
207 207 dynamically.
208 208 """
209 209 return '>'+('BB'*nbparams)
210 210
211 211 parthandlermapping = {}
212 212
213 213 def parthandler(parttype, params=()):
214 214 """decorator that register a function as a bundle2 part handler
215 215
216 216 eg::
217 217
218 218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
219 219 def myparttypehandler(...):
220 220 '''process a part of type "my part".'''
221 221 ...
222 222 """
223 223 validateparttype(parttype)
224 224 def _decorator(func):
225 225 lparttype = parttype.lower() # enforce lower case matching.
226 226 assert lparttype not in parthandlermapping
227 227 parthandlermapping[lparttype] = func
228 228 func.params = frozenset(params)
229 229 return func
230 230 return _decorator
231 231
232 232 class unbundlerecords(object):
233 233 """keep record of what happens during and unbundle
234 234
235 235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
236 236 category of record and obj is an arbitrary object.
237 237
238 238 `records['cat']` will return all entries of this category 'cat'.
239 239
240 240 Iterating on the object itself will yield `('category', obj)` tuples
241 241 for all entries.
242 242
243 243 All iterations happens in chronological order.
244 244 """
245 245
246 246 def __init__(self):
247 247 self._categories = {}
248 248 self._sequences = []
249 249 self._replies = {}
250 250
251 251 def add(self, category, entry, inreplyto=None):
252 252 """add a new record of a given category.
253 253
254 254 The entry can then be retrieved in the list returned by
255 255 self['category']."""
256 256 self._categories.setdefault(category, []).append(entry)
257 257 self._sequences.append((category, entry))
258 258 if inreplyto is not None:
259 259 self.getreplies(inreplyto).add(category, entry)
260 260
261 261 def getreplies(self, partid):
262 262 """get the records that are replies to a specific part"""
263 263 return self._replies.setdefault(partid, unbundlerecords())
264 264
265 265 def __getitem__(self, cat):
266 266 return tuple(self._categories.get(cat, ()))
267 267
268 268 def __iter__(self):
269 269 return iter(self._sequences)
270 270
271 271 def __len__(self):
272 272 return len(self._sequences)
273 273
274 274 def __nonzero__(self):
275 275 return bool(self._sequences)
276 276
277 277 __bool__ = __nonzero__
278 278
279 279 class bundleoperation(object):
280 280 """an object that represents a single bundling process
281 281
282 282 Its purpose is to carry unbundle-related objects and states.
283 283
284 284 A new object should be created at the beginning of each bundle processing.
285 285 The object is to be returned by the processing function.
286 286
287 287 The object has very little content now it will ultimately contain:
288 288 * an access to the repo the bundle is applied to,
289 289 * a ui object,
290 290 * a way to retrieve a transaction to add changes to the repo,
291 291 * a way to record the result of processing each part,
292 292 * a way to construct a bundle response when applicable.
293 293 """
294 294
295 295 def __init__(self, repo, transactiongetter, captureoutput=True):
296 296 self.repo = repo
297 297 self.ui = repo.ui
298 298 self.records = unbundlerecords()
299 299 self.reply = None
300 300 self.captureoutput = captureoutput
301 301 self.hookargs = {}
302 302 self._gettransaction = transactiongetter
303 303
304 304 def gettransaction(self):
305 305 transaction = self._gettransaction()
306 306
307 307 if self.hookargs:
308 308 # the ones added to the transaction supercede those added
309 309 # to the operation.
310 310 self.hookargs.update(transaction.hookargs)
311 311 transaction.hookargs = self.hookargs
312 312
313 313 # mark the hookargs as flushed. further attempts to add to
314 314 # hookargs will result in an abort.
315 315 self.hookargs = None
316 316
317 317 return transaction
318 318
319 319 def addhookargs(self, hookargs):
320 320 if self.hookargs is None:
321 321 raise error.ProgrammingError('attempted to add hookargs to '
322 322 'operation after transaction started')
323 323 self.hookargs.update(hookargs)
324 324
325 325 class TransactionUnavailable(RuntimeError):
326 326 pass
327 327
328 328 def _notransaction():
329 329 """default method to get a transaction while processing a bundle
330 330
331 331 Raise an exception to highlight the fact that no transaction was expected
332 332 to be created"""
333 333 raise TransactionUnavailable()
334 334
335 335 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
336 336 # transform me into unbundler.apply() as soon as the freeze is lifted
337 337 if isinstance(unbundler, unbundle20):
338 338 tr.hookargs['bundle2'] = '1'
339 339 if source is not None and 'source' not in tr.hookargs:
340 340 tr.hookargs['source'] = source
341 341 if url is not None and 'url' not in tr.hookargs:
342 342 tr.hookargs['url'] = url
343 343 return processbundle(repo, unbundler, lambda: tr)
344 344 else:
345 345 # the transactiongetter won't be used, but we might as well set it
346 346 op = bundleoperation(repo, lambda: tr)
347 347 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
348 348 return op
349 349
350 350 def processbundle(repo, unbundler, transactiongetter=None, op=None):
351 351 """This function process a bundle, apply effect to/from a repo
352 352
353 353 It iterates over each part then searches for and uses the proper handling
354 354 code to process the part. Parts are processed in order.
355 355
356 356 Unknown Mandatory part will abort the process.
357 357
358 358 It is temporarily possible to provide a prebuilt bundleoperation to the
359 359 function. This is used to ensure output is properly propagated in case of
360 360 an error during the unbundling. This output capturing part will likely be
361 361 reworked and this ability will probably go away in the process.
362 362 """
363 363 if op is None:
364 364 if transactiongetter is None:
365 365 transactiongetter = _notransaction
366 366 op = bundleoperation(repo, transactiongetter)
367 367 # todo:
368 368 # - replace this is a init function soon.
369 369 # - exception catching
370 370 unbundler.params
371 371 if repo.ui.debugflag:
372 372 msg = ['bundle2-input-bundle:']
373 373 if unbundler.params:
374 374 msg.append(' %i params' % len(unbundler.params))
375 375 if op._gettransaction is None or op._gettransaction is _notransaction:
376 376 msg.append(' no-transaction')
377 377 else:
378 378 msg.append(' with-transaction')
379 379 msg.append('\n')
380 380 repo.ui.debug(''.join(msg))
381 381 iterparts = enumerate(unbundler.iterparts())
382 382 part = None
383 383 nbpart = 0
384 384 try:
385 385 for nbpart, part in iterparts:
386 386 _processpart(op, part)
387 387 except Exception as exc:
388 388 # Any exceptions seeking to the end of the bundle at this point are
389 389 # almost certainly related to the underlying stream being bad.
390 390 # And, chances are that the exception we're handling is related to
391 391 # getting in that bad state. So, we swallow the seeking error and
392 392 # re-raise the original error.
393 393 seekerror = False
394 394 try:
395 395 for nbpart, part in iterparts:
396 396 # consume the bundle content
397 397 part.seek(0, 2)
398 398 except Exception:
399 399 seekerror = True
400 400
401 401 # Small hack to let caller code distinguish exceptions from bundle2
402 402 # processing from processing the old format. This is mostly
403 403 # needed to handle different return codes to unbundle according to the
404 404 # type of bundle. We should probably clean up or drop this return code
405 405 # craziness in a future version.
406 406 exc.duringunbundle2 = True
407 407 salvaged = []
408 408 replycaps = None
409 409 if op.reply is not None:
410 410 salvaged = op.reply.salvageoutput()
411 411 replycaps = op.reply.capabilities
412 412 exc._replycaps = replycaps
413 413 exc._bundle2salvagedoutput = salvaged
414 414
415 415 # Re-raising from a variable loses the original stack. So only use
416 416 # that form if we need to.
417 417 if seekerror:
418 418 raise exc
419 419 else:
420 420 raise
421 421 finally:
422 422 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
423 423
424 424 return op
425 425
426 426 def _processchangegroup(op, cg, tr, source, url, **kwargs):
427 427 ret = cg.apply(op.repo, tr, source, url, **kwargs)
428 428 op.records.add('changegroup', {
429 429 'return': ret,
430 430 })
431 431 return ret
432 432
433 433 def _processpart(op, part):
434 434 """process a single part from a bundle
435 435
436 436 The part is guaranteed to have been fully consumed when the function exits
437 437 (even if an exception is raised)."""
438 438 status = 'unknown' # used by debug output
439 439 hardabort = False
440 440 try:
441 441 try:
442 442 handler = parthandlermapping.get(part.type)
443 443 if handler is None:
444 444 status = 'unsupported-type'
445 445 raise error.BundleUnknownFeatureError(parttype=part.type)
446 446 indebug(op.ui, 'found a handler for part %r' % part.type)
447 447 unknownparams = part.mandatorykeys - handler.params
448 448 if unknownparams:
449 449 unknownparams = list(unknownparams)
450 450 unknownparams.sort()
451 451 status = 'unsupported-params (%s)' % unknownparams
452 452 raise error.BundleUnknownFeatureError(parttype=part.type,
453 453 params=unknownparams)
454 454 status = 'supported'
455 455 except error.BundleUnknownFeatureError as exc:
456 456 if part.mandatory: # mandatory parts
457 457 raise
458 458 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
459 459 return # skip to part processing
460 460 finally:
461 461 if op.ui.debugflag:
462 462 msg = ['bundle2-input-part: "%s"' % part.type]
463 463 if not part.mandatory:
464 464 msg.append(' (advisory)')
465 465 nbmp = len(part.mandatorykeys)
466 466 nbap = len(part.params) - nbmp
467 467 if nbmp or nbap:
468 468 msg.append(' (params:')
469 469 if nbmp:
470 470 msg.append(' %i mandatory' % nbmp)
471 471 if nbap:
472 472 msg.append(' %i advisory' % nbmp)
473 473 msg.append(')')
474 474 msg.append(' %s\n' % status)
475 475 op.ui.debug(''.join(msg))
476 476
477 477 # handler is called outside the above try block so that we don't
478 478 # risk catching KeyErrors from anything other than the
479 479 # parthandlermapping lookup (any KeyError raised by handler()
480 480 # itself represents a defect of a different variety).
481 481 output = None
482 482 if op.captureoutput and op.reply is not None:
483 483 op.ui.pushbuffer(error=True, subproc=True)
484 484 output = ''
485 485 try:
486 486 handler(op, part)
487 487 finally:
488 488 if output is not None:
489 489 output = op.ui.popbuffer()
490 490 if output:
491 491 outpart = op.reply.newpart('output', data=output,
492 492 mandatory=False)
493 493 outpart.addparam(
494 494 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
495 495 # If exiting or interrupted, do not attempt to seek the stream in the
496 496 # finally block below. This makes abort faster.
497 497 except (SystemExit, KeyboardInterrupt):
498 498 hardabort = True
499 499 raise
500 500 finally:
501 501 # consume the part content to not corrupt the stream.
502 502 if not hardabort:
503 503 part.seek(0, 2)
504 504
505 505
506 506 def decodecaps(blob):
507 507 """decode a bundle2 caps bytes blob into a dictionary
508 508
509 509 The blob is a list of capabilities (one per line)
510 510 Capabilities may have values using a line of the form::
511 511
512 512 capability=value1,value2,value3
513 513
514 514 The values are always a list."""
515 515 caps = {}
516 516 for line in blob.splitlines():
517 517 if not line:
518 518 continue
519 519 if '=' not in line:
520 520 key, vals = line, ()
521 521 else:
522 522 key, vals = line.split('=', 1)
523 523 vals = vals.split(',')
524 524 key = urlreq.unquote(key)
525 525 vals = [urlreq.unquote(v) for v in vals]
526 526 caps[key] = vals
527 527 return caps
528 528
529 529 def encodecaps(caps):
530 530 """encode a bundle2 caps dictionary into a bytes blob"""
531 531 chunks = []
532 532 for ca in sorted(caps):
533 533 vals = caps[ca]
534 534 ca = urlreq.quote(ca)
535 535 vals = [urlreq.quote(v) for v in vals]
536 536 if vals:
537 537 ca = "%s=%s" % (ca, ','.join(vals))
538 538 chunks.append(ca)
539 539 return '\n'.join(chunks)
540 540
541 541 bundletypes = {
542 542 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
543 543 # since the unification ssh accepts a header but there
544 544 # is no capability signaling it.
545 545 "HG20": (), # special-cased below
546 546 "HG10UN": ("HG10UN", 'UN'),
547 547 "HG10BZ": ("HG10", 'BZ'),
548 548 "HG10GZ": ("HG10GZ", 'GZ'),
549 549 }
550 550
551 551 # hgweb uses this list to communicate its preferred type
552 552 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
553 553
554 554 class bundle20(object):
555 555 """represent an outgoing bundle2 container
556 556
557 557 Use the `addparam` method to add stream level parameter. and `newpart` to
558 558 populate it. Then call `getchunks` to retrieve all the binary chunks of
559 559 data that compose the bundle2 container."""
560 560
561 561 _magicstring = 'HG20'
562 562
563 563 def __init__(self, ui, capabilities=()):
564 564 self.ui = ui
565 565 self._params = []
566 566 self._parts = []
567 567 self.capabilities = dict(capabilities)
568 568 self._compengine = util.compengines.forbundletype('UN')
569 569 self._compopts = None
570 570
571 571 def setcompression(self, alg, compopts=None):
572 572 """setup core part compression to <alg>"""
573 573 if alg in (None, 'UN'):
574 574 return
575 575 assert not any(n.lower() == 'compression' for n, v in self._params)
576 576 self.addparam('Compression', alg)
577 577 self._compengine = util.compengines.forbundletype(alg)
578 578 self._compopts = compopts
579 579
580 580 @property
581 581 def nbparts(self):
582 582 """total number of parts added to the bundler"""
583 583 return len(self._parts)
584 584
585 585 # methods used to defines the bundle2 content
586 586 def addparam(self, name, value=None):
587 587 """add a stream level parameter"""
588 588 if not name:
589 589 raise ValueError('empty parameter name')
590 590 if name[0] not in pycompat.bytestr(string.ascii_letters):
591 591 raise ValueError('non letter first character: %r' % name)
592 592 self._params.append((name, value))
593 593
594 594 def addpart(self, part):
595 595 """add a new part to the bundle2 container
596 596
597 597 Parts contains the actual applicative payload."""
598 598 assert part.id is None
599 599 part.id = len(self._parts) # very cheap counter
600 600 self._parts.append(part)
601 601
602 602 def newpart(self, typeid, *args, **kwargs):
603 603 """create a new part and add it to the containers
604 604
605 605 As the part is directly added to the containers. For now, this means
606 606 that any failure to properly initialize the part after calling
607 607 ``newpart`` should result in a failure of the whole bundling process.
608 608
609 609 You can still fall back to manually create and add if you need better
610 610 control."""
611 611 part = bundlepart(typeid, *args, **kwargs)
612 612 self.addpart(part)
613 613 return part
614 614
615 615 # methods used to generate the bundle2 stream
616 616 def getchunks(self):
617 617 if self.ui.debugflag:
618 618 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
619 619 if self._params:
620 620 msg.append(' (%i params)' % len(self._params))
621 621 msg.append(' %i parts total\n' % len(self._parts))
622 622 self.ui.debug(''.join(msg))
623 623 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
624 624 yield self._magicstring
625 625 param = self._paramchunk()
626 626 outdebug(self.ui, 'bundle parameter: %s' % param)
627 627 yield _pack(_fstreamparamsize, len(param))
628 628 if param:
629 629 yield param
630 630 for chunk in self._compengine.compressstream(self._getcorechunk(),
631 631 self._compopts):
632 632 yield chunk
633 633
634 634 def _paramchunk(self):
635 635 """return a encoded version of all stream parameters"""
636 636 blocks = []
637 637 for par, value in self._params:
638 638 par = urlreq.quote(par)
639 639 if value is not None:
640 640 value = urlreq.quote(value)
641 641 par = '%s=%s' % (par, value)
642 642 blocks.append(par)
643 643 return ' '.join(blocks)
644 644
645 645 def _getcorechunk(self):
646 646 """yield chunk for the core part of the bundle
647 647
648 648 (all but headers and parameters)"""
649 649 outdebug(self.ui, 'start of parts')
650 650 for part in self._parts:
651 651 outdebug(self.ui, 'bundle part: "%s"' % part.type)
652 652 for chunk in part.getchunks(ui=self.ui):
653 653 yield chunk
654 654 outdebug(self.ui, 'end of bundle')
655 655 yield _pack(_fpartheadersize, 0)
656 656
657 657
658 658 def salvageoutput(self):
659 659 """return a list with a copy of all output parts in the bundle
660 660
661 661 This is meant to be used during error handling to make sure we preserve
662 662 server output"""
663 663 salvaged = []
664 664 for part in self._parts:
665 665 if part.type.startswith('output'):
666 666 salvaged.append(part.copy())
667 667 return salvaged
668 668
669 669
670 670 class unpackermixin(object):
671 671 """A mixin to extract bytes and struct data from a stream"""
672 672
673 673 def __init__(self, fp):
674 674 self._fp = fp
675 675
676 676 def _unpack(self, format):
677 677 """unpack this struct format from the stream
678 678
679 679 This method is meant for internal usage by the bundle2 protocol only.
680 680 They directly manipulate the low level stream including bundle2 level
681 681 instruction.
682 682
683 683 Do not use it to implement higher-level logic or methods."""
684 684 data = self._readexact(struct.calcsize(format))
685 685 return _unpack(format, data)
686 686
687 687 def _readexact(self, size):
688 688 """read exactly <size> bytes from the stream
689 689
690 690 This method is meant for internal usage by the bundle2 protocol only.
691 691 They directly manipulate the low level stream including bundle2 level
692 692 instruction.
693 693
694 694 Do not use it to implement higher-level logic or methods."""
695 695 return changegroup.readexactly(self._fp, size)
696 696
697 697 def getunbundler(ui, fp, magicstring=None):
698 698 """return a valid unbundler object for a given magicstring"""
699 699 if magicstring is None:
700 700 magicstring = changegroup.readexactly(fp, 4)
701 701 magic, version = magicstring[0:2], magicstring[2:4]
702 702 if magic != 'HG':
703 703 ui.debug(
704 704 "error: invalid magic: %r (version %r), should be 'HG'\n"
705 705 % (magic, version))
706 706 raise error.Abort(_('not a Mercurial bundle'))
707 707 unbundlerclass = formatmap.get(version)
708 708 if unbundlerclass is None:
709 709 raise error.Abort(_('unknown bundle version %s') % version)
710 710 unbundler = unbundlerclass(ui, fp)
711 711 indebug(ui, 'start processing of %s stream' % magicstring)
712 712 return unbundler
713 713
714 714 class unbundle20(unpackermixin):
715 715 """interpret a bundle2 stream
716 716
717 717 This class is fed with a binary stream and yields parts through its
718 718 `iterparts` methods."""
719 719
720 720 _magicstring = 'HG20'
721 721
722 722 def __init__(self, ui, fp):
723 723 """If header is specified, we do not read it out of the stream."""
724 724 self.ui = ui
725 725 self._compengine = util.compengines.forbundletype('UN')
726 726 self._compressed = None
727 727 super(unbundle20, self).__init__(fp)
728 728
729 729 @util.propertycache
730 730 def params(self):
731 731 """dictionary of stream level parameters"""
732 732 indebug(self.ui, 'reading bundle2 stream parameters')
733 733 params = {}
734 734 paramssize = self._unpack(_fstreamparamsize)[0]
735 735 if paramssize < 0:
736 736 raise error.BundleValueError('negative bundle param size: %i'
737 737 % paramssize)
738 738 if paramssize:
739 739 params = self._readexact(paramssize)
740 740 params = self._processallparams(params)
741 741 return params
742 742
743 743 def _processallparams(self, paramsblock):
744 744 """"""
745 745 params = util.sortdict()
746 746 for p in paramsblock.split(' '):
747 747 p = p.split('=', 1)
748 748 p = [urlreq.unquote(i) for i in p]
749 749 if len(p) < 2:
750 750 p.append(None)
751 751 self._processparam(*p)
752 752 params[p[0]] = p[1]
753 753 return params
754 754
755 755
756 756 def _processparam(self, name, value):
757 757 """process a parameter, applying its effect if needed
758 758
759 759 Parameter starting with a lower case letter are advisory and will be
760 760 ignored when unknown. Those starting with an upper case letter are
761 761 mandatory and will this function will raise a KeyError when unknown.
762 762
763 763 Note: no option are currently supported. Any input will be either
764 764 ignored or failing.
765 765 """
766 766 if not name:
767 767 raise ValueError('empty parameter name')
768 768 if name[0] not in pycompat.bytestr(string.ascii_letters):
769 769 raise ValueError('non letter first character: %r' % name)
770 770 try:
771 771 handler = b2streamparamsmap[name.lower()]
772 772 except KeyError:
773 773 if name[0].islower():
774 774 indebug(self.ui, "ignoring unknown parameter %r" % name)
775 775 else:
776 776 raise error.BundleUnknownFeatureError(params=(name,))
777 777 else:
778 778 handler(self, name, value)
779 779
780 780 def _forwardchunks(self):
781 781 """utility to transfer a bundle2 as binary
782 782
783 783 This is made necessary by the fact the 'getbundle' command over 'ssh'
784 784 have no way to know then the reply end, relying on the bundle to be
785 785 interpreted to know its end. This is terrible and we are sorry, but we
786 786 needed to move forward to get general delta enabled.
787 787 """
788 788 yield self._magicstring
789 789 assert 'params' not in vars(self)
790 790 paramssize = self._unpack(_fstreamparamsize)[0]
791 791 if paramssize < 0:
792 792 raise error.BundleValueError('negative bundle param size: %i'
793 793 % paramssize)
794 794 yield _pack(_fstreamparamsize, paramssize)
795 795 if paramssize:
796 796 params = self._readexact(paramssize)
797 797 self._processallparams(params)
798 798 yield params
799 799 assert self._compengine.bundletype == 'UN'
800 800 # From there, payload might need to be decompressed
801 801 self._fp = self._compengine.decompressorreader(self._fp)
802 802 emptycount = 0
803 803 while emptycount < 2:
804 804 # so we can brainlessly loop
805 805 assert _fpartheadersize == _fpayloadsize
806 806 size = self._unpack(_fpartheadersize)[0]
807 807 yield _pack(_fpartheadersize, size)
808 808 if size:
809 809 emptycount = 0
810 810 else:
811 811 emptycount += 1
812 812 continue
813 813 if size == flaginterrupt:
814 814 continue
815 815 elif size < 0:
816 816 raise error.BundleValueError('negative chunk size: %i')
817 817 yield self._readexact(size)
818 818
819 819
820 820 def iterparts(self):
821 821 """yield all parts contained in the stream"""
822 822 # make sure param have been loaded
823 823 self.params
824 824 # From there, payload need to be decompressed
825 825 self._fp = self._compengine.decompressorreader(self._fp)
826 826 indebug(self.ui, 'start extraction of bundle2 parts')
827 827 headerblock = self._readpartheader()
828 828 while headerblock is not None:
829 829 part = unbundlepart(self.ui, headerblock, self._fp)
830 830 yield part
831 831 part.seek(0, 2)
832 832 headerblock = self._readpartheader()
833 833 indebug(self.ui, 'end of bundle2 stream')
834 834
835 835 def _readpartheader(self):
836 836 """reads a part header size and return the bytes blob
837 837
838 838 returns None if empty"""
839 839 headersize = self._unpack(_fpartheadersize)[0]
840 840 if headersize < 0:
841 841 raise error.BundleValueError('negative part header size: %i'
842 842 % headersize)
843 843 indebug(self.ui, 'part header size: %i' % headersize)
844 844 if headersize:
845 845 return self._readexact(headersize)
846 846 return None
847 847
848 848 def compressed(self):
849 849 self.params # load params
850 850 return self._compressed
851 851
852 852 def close(self):
853 853 """close underlying file"""
854 854 if util.safehasattr(self._fp, 'close'):
855 855 return self._fp.close()
856 856
857 857 formatmap = {'20': unbundle20}
858 858
859 859 b2streamparamsmap = {}
860 860
861 861 def b2streamparamhandler(name):
862 862 """register a handler for a stream level parameter"""
863 863 def decorator(func):
864 864 assert name not in formatmap
865 865 b2streamparamsmap[name] = func
866 866 return func
867 867 return decorator
868 868
869 869 @b2streamparamhandler('compression')
870 870 def processcompression(unbundler, param, value):
871 871 """read compression parameter and install payload decompression"""
872 872 if value not in util.compengines.supportedbundletypes:
873 873 raise error.BundleUnknownFeatureError(params=(param,),
874 874 values=(value,))
875 875 unbundler._compengine = util.compengines.forbundletype(value)
876 876 if value is not None:
877 877 unbundler._compressed = True
878 878
879 879 class bundlepart(object):
880 880 """A bundle2 part contains application level payload
881 881
882 882 The part `type` is used to route the part to the application level
883 883 handler.
884 884
885 885 The part payload is contained in ``part.data``. It could be raw bytes or a
886 886 generator of byte chunks.
887 887
888 888 You can add parameters to the part using the ``addparam`` method.
889 889 Parameters can be either mandatory (default) or advisory. Remote side
890 890 should be able to safely ignore the advisory ones.
891 891
892 892 Both data and parameters cannot be modified after the generation has begun.
893 893 """
894 894
895 895 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
896 896 data='', mandatory=True):
897 897 validateparttype(parttype)
898 898 self.id = None
899 899 self.type = parttype
900 900 self._data = data
901 901 self._mandatoryparams = list(mandatoryparams)
902 902 self._advisoryparams = list(advisoryparams)
903 903 # checking for duplicated entries
904 904 self._seenparams = set()
905 905 for pname, __ in self._mandatoryparams + self._advisoryparams:
906 906 if pname in self._seenparams:
907 907 raise error.ProgrammingError('duplicated params: %s' % pname)
908 908 self._seenparams.add(pname)
909 909 # status of the part's generation:
910 910 # - None: not started,
911 911 # - False: currently generated,
912 912 # - True: generation done.
913 913 self._generated = None
914 914 self.mandatory = mandatory
915 915
916 916 def __repr__(self):
917 917 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
918 918 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
919 919 % (cls, id(self), self.id, self.type, self.mandatory))
920 920
921 921 def copy(self):
922 922 """return a copy of the part
923 923
924 924 The new part have the very same content but no partid assigned yet.
925 925 Parts with generated data cannot be copied."""
926 926 assert not util.safehasattr(self.data, 'next')
927 927 return self.__class__(self.type, self._mandatoryparams,
928 928 self._advisoryparams, self._data, self.mandatory)
929 929
930 930 # methods used to defines the part content
931 931 @property
932 932 def data(self):
933 933 return self._data
934 934
935 935 @data.setter
936 936 def data(self, data):
937 937 if self._generated is not None:
938 938 raise error.ReadOnlyPartError('part is being generated')
939 939 self._data = data
940 940
941 941 @property
942 942 def mandatoryparams(self):
943 943 # make it an immutable tuple to force people through ``addparam``
944 944 return tuple(self._mandatoryparams)
945 945
946 946 @property
947 947 def advisoryparams(self):
948 948 # make it an immutable tuple to force people through ``addparam``
949 949 return tuple(self._advisoryparams)
950 950
951 951 def addparam(self, name, value='', mandatory=True):
952 952 """add a parameter to the part
953 953
954 954 If 'mandatory' is set to True, the remote handler must claim support
955 955 for this parameter or the unbundling will be aborted.
956 956
957 957 The 'name' and 'value' cannot exceed 255 bytes each.
958 958 """
959 959 if self._generated is not None:
960 960 raise error.ReadOnlyPartError('part is being generated')
961 961 if name in self._seenparams:
962 962 raise ValueError('duplicated params: %s' % name)
963 963 self._seenparams.add(name)
964 964 params = self._advisoryparams
965 965 if mandatory:
966 966 params = self._mandatoryparams
967 967 params.append((name, value))
968 968
969 969 # methods used to generates the bundle2 stream
970 970 def getchunks(self, ui):
971 971 if self._generated is not None:
972 972 raise error.ProgrammingError('part can only be consumed once')
973 973 self._generated = False
974 974
975 975 if ui.debugflag:
976 976 msg = ['bundle2-output-part: "%s"' % self.type]
977 977 if not self.mandatory:
978 978 msg.append(' (advisory)')
979 979 nbmp = len(self.mandatoryparams)
980 980 nbap = len(self.advisoryparams)
981 981 if nbmp or nbap:
982 982 msg.append(' (params:')
983 983 if nbmp:
984 984 msg.append(' %i mandatory' % nbmp)
985 985 if nbap:
986 986 msg.append(' %i advisory' % nbmp)
987 987 msg.append(')')
988 988 if not self.data:
989 989 msg.append(' empty payload')
990 990 elif util.safehasattr(self.data, 'next'):
991 991 msg.append(' streamed payload')
992 992 else:
993 993 msg.append(' %i bytes payload' % len(self.data))
994 994 msg.append('\n')
995 995 ui.debug(''.join(msg))
996 996
997 997 #### header
998 998 if self.mandatory:
999 999 parttype = self.type.upper()
1000 1000 else:
1001 1001 parttype = self.type.lower()
1002 1002 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1003 1003 ## parttype
1004 1004 header = [_pack(_fparttypesize, len(parttype)),
1005 1005 parttype, _pack(_fpartid, self.id),
1006 1006 ]
1007 1007 ## parameters
1008 1008 # count
1009 1009 manpar = self.mandatoryparams
1010 1010 advpar = self.advisoryparams
1011 1011 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1012 1012 # size
1013 1013 parsizes = []
1014 1014 for key, value in manpar:
1015 1015 parsizes.append(len(key))
1016 1016 parsizes.append(len(value))
1017 1017 for key, value in advpar:
1018 1018 parsizes.append(len(key))
1019 1019 parsizes.append(len(value))
1020 1020 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1021 1021 header.append(paramsizes)
1022 1022 # key, value
1023 1023 for key, value in manpar:
1024 1024 header.append(key)
1025 1025 header.append(value)
1026 1026 for key, value in advpar:
1027 1027 header.append(key)
1028 1028 header.append(value)
1029 1029 ## finalize header
1030 1030 headerchunk = ''.join(header)
1031 1031 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1032 1032 yield _pack(_fpartheadersize, len(headerchunk))
1033 1033 yield headerchunk
1034 1034 ## payload
1035 1035 try:
1036 1036 for chunk in self._payloadchunks():
1037 1037 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1038 1038 yield _pack(_fpayloadsize, len(chunk))
1039 1039 yield chunk
1040 1040 except GeneratorExit:
1041 1041 # GeneratorExit means that nobody is listening for our
1042 1042 # results anyway, so just bail quickly rather than trying
1043 1043 # to produce an error part.
1044 1044 ui.debug('bundle2-generatorexit\n')
1045 1045 raise
1046 1046 except BaseException as exc:
1047 1047 bexc = util.forcebytestr(exc)
1048 1048 # backup exception data for later
1049 1049 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1050 1050 % bexc)
1051 1051 tb = sys.exc_info()[2]
1052 1052 msg = 'unexpected error: %s' % bexc
1053 1053 interpart = bundlepart('error:abort', [('message', msg)],
1054 1054 mandatory=False)
1055 1055 interpart.id = 0
1056 1056 yield _pack(_fpayloadsize, -1)
1057 1057 for chunk in interpart.getchunks(ui=ui):
1058 1058 yield chunk
1059 1059 outdebug(ui, 'closing payload chunk')
1060 1060 # abort current part payload
1061 1061 yield _pack(_fpayloadsize, 0)
1062 1062 pycompat.raisewithtb(exc, tb)
1063 1063 # end of payload
1064 1064 outdebug(ui, 'closing payload chunk')
1065 1065 yield _pack(_fpayloadsize, 0)
1066 1066 self._generated = True
1067 1067
1068 1068 def _payloadchunks(self):
1069 1069 """yield chunks of a the part payload
1070 1070
1071 1071 Exists to handle the different methods to provide data to a part."""
1072 1072 # we only support fixed size data now.
1073 1073 # This will be improved in the future.
1074 1074 if (util.safehasattr(self.data, 'next')
1075 1075 or util.safehasattr(self.data, '__next__')):
1076 1076 buff = util.chunkbuffer(self.data)
1077 1077 chunk = buff.read(preferedchunksize)
1078 1078 while chunk:
1079 1079 yield chunk
1080 1080 chunk = buff.read(preferedchunksize)
1081 1081 elif len(self.data):
1082 1082 yield self.data
1083 1083
1084 1084
1085 1085 flaginterrupt = -1
1086 1086
1087 1087 class interrupthandler(unpackermixin):
1088 1088 """read one part and process it with restricted capability
1089 1089
1090 1090 This allows to transmit exception raised on the producer size during part
1091 1091 iteration while the consumer is reading a part.
1092 1092
1093 1093 Part processed in this manner only have access to a ui object,"""
1094 1094
1095 1095 def __init__(self, ui, fp):
1096 1096 super(interrupthandler, self).__init__(fp)
1097 1097 self.ui = ui
1098 1098
1099 1099 def _readpartheader(self):
1100 1100 """reads a part header size and return the bytes blob
1101 1101
1102 1102 returns None if empty"""
1103 1103 headersize = self._unpack(_fpartheadersize)[0]
1104 1104 if headersize < 0:
1105 1105 raise error.BundleValueError('negative part header size: %i'
1106 1106 % headersize)
1107 1107 indebug(self.ui, 'part header size: %i\n' % headersize)
1108 1108 if headersize:
1109 1109 return self._readexact(headersize)
1110 1110 return None
1111 1111
1112 1112 def __call__(self):
1113 1113
1114 1114 self.ui.debug('bundle2-input-stream-interrupt:'
1115 1115 ' opening out of band context\n')
1116 1116 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1117 1117 headerblock = self._readpartheader()
1118 1118 if headerblock is None:
1119 1119 indebug(self.ui, 'no part found during interruption.')
1120 1120 return
1121 1121 part = unbundlepart(self.ui, headerblock, self._fp)
1122 1122 op = interruptoperation(self.ui)
1123 1123 _processpart(op, part)
1124 1124 self.ui.debug('bundle2-input-stream-interrupt:'
1125 1125 ' closing out of band context\n')
1126 1126
1127 1127 class interruptoperation(object):
1128 1128 """A limited operation to be use by part handler during interruption
1129 1129
1130 1130 It only have access to an ui object.
1131 1131 """
1132 1132
1133 1133 def __init__(self, ui):
1134 1134 self.ui = ui
1135 1135 self.reply = None
1136 1136 self.captureoutput = False
1137 1137
1138 1138 @property
1139 1139 def repo(self):
1140 1140 raise error.ProgrammingError('no repo access from stream interruption')
1141 1141
1142 1142 def gettransaction(self):
1143 1143 raise TransactionUnavailable('no repo access from stream interruption')
1144 1144
1145 1145 class unbundlepart(unpackermixin):
1146 1146 """a bundle part read from a bundle"""
1147 1147
1148 1148 def __init__(self, ui, header, fp):
1149 1149 super(unbundlepart, self).__init__(fp)
1150 1150 self._seekable = (util.safehasattr(fp, 'seek') and
1151 1151 util.safehasattr(fp, 'tell'))
1152 1152 self.ui = ui
1153 1153 # unbundle state attr
1154 1154 self._headerdata = header
1155 1155 self._headeroffset = 0
1156 1156 self._initialized = False
1157 1157 self.consumed = False
1158 1158 # part data
1159 1159 self.id = None
1160 1160 self.type = None
1161 1161 self.mandatoryparams = None
1162 1162 self.advisoryparams = None
1163 1163 self.params = None
1164 1164 self.mandatorykeys = ()
1165 1165 self._payloadstream = None
1166 1166 self._readheader()
1167 1167 self._mandatory = None
1168 1168 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1169 1169 self._pos = 0
1170 1170
1171 1171 def _fromheader(self, size):
1172 1172 """return the next <size> byte from the header"""
1173 1173 offset = self._headeroffset
1174 1174 data = self._headerdata[offset:(offset + size)]
1175 1175 self._headeroffset = offset + size
1176 1176 return data
1177 1177
1178 1178 def _unpackheader(self, format):
1179 1179 """read given format from header
1180 1180
1181 1181 This automatically compute the size of the format to read."""
1182 1182 data = self._fromheader(struct.calcsize(format))
1183 1183 return _unpack(format, data)
1184 1184
1185 1185 def _initparams(self, mandatoryparams, advisoryparams):
1186 1186 """internal function to setup all logic related parameters"""
1187 1187 # make it read only to prevent people touching it by mistake.
1188 1188 self.mandatoryparams = tuple(mandatoryparams)
1189 1189 self.advisoryparams = tuple(advisoryparams)
1190 1190 # user friendly UI
1191 1191 self.params = util.sortdict(self.mandatoryparams)
1192 1192 self.params.update(self.advisoryparams)
1193 1193 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1194 1194
1195 1195 def _payloadchunks(self, chunknum=0):
1196 1196 '''seek to specified chunk and start yielding data'''
1197 1197 if len(self._chunkindex) == 0:
1198 1198 assert chunknum == 0, 'Must start with chunk 0'
1199 1199 self._chunkindex.append((0, self._tellfp()))
1200 1200 else:
1201 1201 assert chunknum < len(self._chunkindex), \
1202 1202 'Unknown chunk %d' % chunknum
1203 1203 self._seekfp(self._chunkindex[chunknum][1])
1204 1204
1205 1205 pos = self._chunkindex[chunknum][0]
1206 1206 payloadsize = self._unpack(_fpayloadsize)[0]
1207 1207 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1208 1208 while payloadsize:
1209 1209 if payloadsize == flaginterrupt:
1210 1210 # interruption detection, the handler will now read a
1211 1211 # single part and process it.
1212 1212 interrupthandler(self.ui, self._fp)()
1213 1213 elif payloadsize < 0:
1214 1214 msg = 'negative payload chunk size: %i' % payloadsize
1215 1215 raise error.BundleValueError(msg)
1216 1216 else:
1217 1217 result = self._readexact(payloadsize)
1218 1218 chunknum += 1
1219 1219 pos += payloadsize
1220 1220 if chunknum == len(self._chunkindex):
1221 1221 self._chunkindex.append((pos, self._tellfp()))
1222 1222 yield result
1223 1223 payloadsize = self._unpack(_fpayloadsize)[0]
1224 1224 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1225 1225
1226 1226 def _findchunk(self, pos):
1227 1227 '''for a given payload position, return a chunk number and offset'''
1228 1228 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1229 1229 if ppos == pos:
1230 1230 return chunk, 0
1231 1231 elif ppos > pos:
1232 1232 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1233 1233 raise ValueError('Unknown chunk')
1234 1234
1235 1235 def _readheader(self):
1236 1236 """read the header and setup the object"""
1237 1237 typesize = self._unpackheader(_fparttypesize)[0]
1238 1238 self.type = self._fromheader(typesize)
1239 1239 indebug(self.ui, 'part type: "%s"' % self.type)
1240 1240 self.id = self._unpackheader(_fpartid)[0]
1241 1241 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1242 1242 # extract mandatory bit from type
1243 1243 self.mandatory = (self.type != self.type.lower())
1244 1244 self.type = self.type.lower()
1245 1245 ## reading parameters
1246 1246 # param count
1247 1247 mancount, advcount = self._unpackheader(_fpartparamcount)
1248 1248 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1249 1249 # param size
1250 1250 fparamsizes = _makefpartparamsizes(mancount + advcount)
1251 1251 paramsizes = self._unpackheader(fparamsizes)
1252 1252 # make it a list of couple again
1253 1253 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1254 1254 # split mandatory from advisory
1255 1255 mansizes = paramsizes[:mancount]
1256 1256 advsizes = paramsizes[mancount:]
1257 1257 # retrieve param value
1258 1258 manparams = []
1259 1259 for key, value in mansizes:
1260 1260 manparams.append((self._fromheader(key), self._fromheader(value)))
1261 1261 advparams = []
1262 1262 for key, value in advsizes:
1263 1263 advparams.append((self._fromheader(key), self._fromheader(value)))
1264 1264 self._initparams(manparams, advparams)
1265 1265 ## part payload
1266 1266 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1267 1267 # we read the data, tell it
1268 1268 self._initialized = True
1269 1269
1270 1270 def read(self, size=None):
1271 1271 """read payload data"""
1272 1272 if not self._initialized:
1273 1273 self._readheader()
1274 1274 if size is None:
1275 1275 data = self._payloadstream.read()
1276 1276 else:
1277 1277 data = self._payloadstream.read(size)
1278 1278 self._pos += len(data)
1279 1279 if size is None or len(data) < size:
1280 1280 if not self.consumed and self._pos:
1281 1281 self.ui.debug('bundle2-input-part: total payload size %i\n'
1282 1282 % self._pos)
1283 1283 self.consumed = True
1284 1284 return data
1285 1285
1286 1286 def tell(self):
1287 1287 return self._pos
1288 1288
1289 1289 def seek(self, offset, whence=0):
1290 1290 if whence == 0:
1291 1291 newpos = offset
1292 1292 elif whence == 1:
1293 1293 newpos = self._pos + offset
1294 1294 elif whence == 2:
1295 1295 if not self.consumed:
1296 1296 self.read()
1297 1297 newpos = self._chunkindex[-1][0] - offset
1298 1298 else:
1299 1299 raise ValueError('Unknown whence value: %r' % (whence,))
1300 1300
1301 1301 if newpos > self._chunkindex[-1][0] and not self.consumed:
1302 1302 self.read()
1303 1303 if not 0 <= newpos <= self._chunkindex[-1][0]:
1304 1304 raise ValueError('Offset out of range')
1305 1305
1306 1306 if self._pos != newpos:
1307 1307 chunk, internaloffset = self._findchunk(newpos)
1308 1308 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1309 1309 adjust = self.read(internaloffset)
1310 1310 if len(adjust) != internaloffset:
1311 1311 raise error.Abort(_('Seek failed\n'))
1312 1312 self._pos = newpos
1313 1313
1314 1314 def _seekfp(self, offset, whence=0):
1315 1315 """move the underlying file pointer
1316 1316
1317 1317 This method is meant for internal usage by the bundle2 protocol only.
1318 1318 They directly manipulate the low level stream including bundle2 level
1319 1319 instruction.
1320 1320
1321 1321 Do not use it to implement higher-level logic or methods."""
1322 1322 if self._seekable:
1323 1323 return self._fp.seek(offset, whence)
1324 1324 else:
1325 1325 raise NotImplementedError(_('File pointer is not seekable'))
1326 1326
1327 1327 def _tellfp(self):
1328 1328 """return the file offset, or None if file is not seekable
1329 1329
1330 1330 This method is meant for internal usage by the bundle2 protocol only.
1331 1331 They directly manipulate the low level stream including bundle2 level
1332 1332 instruction.
1333 1333
1334 1334 Do not use it to implement higher-level logic or methods."""
1335 1335 if self._seekable:
1336 1336 try:
1337 1337 return self._fp.tell()
1338 1338 except IOError as e:
1339 1339 if e.errno == errno.ESPIPE:
1340 1340 self._seekable = False
1341 1341 else:
1342 1342 raise
1343 1343 return None
1344 1344
1345 1345 # These are only the static capabilities.
1346 1346 # Check the 'getrepocaps' function for the rest.
1347 1347 capabilities = {'HG20': (),
1348 1348 'error': ('abort', 'unsupportedcontent', 'pushraced',
1349 1349 'pushkey'),
1350 1350 'listkeys': (),
1351 1351 'pushkey': (),
1352 1352 'digests': tuple(sorted(util.DIGESTS.keys())),
1353 1353 'remote-changegroup': ('http', 'https'),
1354 1354 'hgtagsfnodes': (),
1355 1355 }
1356 1356
1357 1357 def getrepocaps(repo, allowpushback=False):
1358 1358 """return the bundle2 capabilities for a given repo
1359 1359
1360 1360 Exists to allow extensions (like evolution) to mutate the capabilities.
1361 1361 """
1362 1362 caps = capabilities.copy()
1363 1363 caps['changegroup'] = tuple(sorted(
1364 1364 changegroup.supportedincomingversions(repo)))
1365 1365 if obsolete.isenabled(repo, obsolete.exchangeopt):
1366 1366 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1367 1367 caps['obsmarkers'] = supportedformat
1368 1368 if allowpushback:
1369 1369 caps['pushback'] = ()
1370 1370 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1371 1371 if cpmode == 'check-related':
1372 1372 caps['checkheads'] = ('related',)
1373 1373 return caps
1374 1374
1375 1375 def bundle2caps(remote):
1376 1376 """return the bundle capabilities of a peer as dict"""
1377 1377 raw = remote.capable('bundle2')
1378 1378 if not raw and raw != '':
1379 1379 return {}
1380 1380 capsblob = urlreq.unquote(remote.capable('bundle2'))
1381 1381 return decodecaps(capsblob)
1382 1382
1383 1383 def obsmarkersversion(caps):
1384 1384 """extract the list of supported obsmarkers versions from a bundle2caps dict
1385 1385 """
1386 1386 obscaps = caps.get('obsmarkers', ())
1387 1387 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1388 1388
1389 1389 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1390 1390 vfs=None, compression=None, compopts=None):
1391 1391 if bundletype.startswith('HG10'):
1392 1392 cg = changegroup.getchangegroup(repo, source, outgoing, version='01')
1393 1393 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1394 1394 compression=compression, compopts=compopts)
1395 1395 elif not bundletype.startswith('HG20'):
1396 1396 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1397 1397
1398 1398 caps = {}
1399 1399 if 'obsolescence' in opts:
1400 1400 caps['obsmarkers'] = ('V1',)
1401 1401 bundle = bundle20(ui, caps)
1402 1402 bundle.setcompression(compression, compopts)
1403 1403 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1404 1404 chunkiter = bundle.getchunks()
1405 1405
1406 1406 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1407 1407
1408 1408 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1409 1409 # We should eventually reconcile this logic with the one behind
1410 1410 # 'exchange.getbundle2partsgenerator'.
1411 1411 #
1412 1412 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1413 1413 # different right now. So we keep them separated for now for the sake of
1414 1414 # simplicity.
1415 1415
1416 1416 # we always want a changegroup in such bundle
1417 1417 cgversion = opts.get('cg.version')
1418 1418 if cgversion is None:
1419 1419 cgversion = changegroup.safeversion(repo)
1420 1420 cg = changegroup.getchangegroup(repo, source, outgoing,
1421 1421 version=cgversion)
1422 1422 part = bundler.newpart('changegroup', data=cg.getchunks())
1423 1423 part.addparam('version', cg.version)
1424 1424 if 'clcount' in cg.extras:
1425 1425 part.addparam('nbchanges', str(cg.extras['clcount']),
1426 1426 mandatory=False)
1427 1427 if opts.get('phases') and repo.revs('%ln and secret()',
1428 1428 outgoing.missingheads):
1429 1429 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1430 1430
1431 1431 addparttagsfnodescache(repo, bundler, outgoing)
1432 1432
1433 1433 if opts.get('obsolescence', False):
1434 1434 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1435 1435 buildobsmarkerspart(bundler, obsmarkers)
1436 1436
1437 1437 if opts.get('phases', False):
1438 1438 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1439 1439 phasedata = []
1440 1440 for phase in phases.allphases:
1441 1441 for head in headsbyphase[phase]:
1442 1442 phasedata.append(_pack(_fphasesentry, phase, head))
1443 1443 bundler.newpart('phase-heads', data=''.join(phasedata))
1444 1444
1445 1445 def addparttagsfnodescache(repo, bundler, outgoing):
1446 1446 # we include the tags fnode cache for the bundle changeset
1447 1447 # (as an optional parts)
1448 1448 cache = tags.hgtagsfnodescache(repo.unfiltered())
1449 1449 chunks = []
1450 1450
1451 1451 # .hgtags fnodes are only relevant for head changesets. While we could
1452 1452 # transfer values for all known nodes, there will likely be little to
1453 1453 # no benefit.
1454 1454 #
1455 1455 # We don't bother using a generator to produce output data because
1456 1456 # a) we only have 40 bytes per head and even esoteric numbers of heads
1457 1457 # consume little memory (1M heads is 40MB) b) we don't want to send the
1458 1458 # part if we don't have entries and knowing if we have entries requires
1459 1459 # cache lookups.
1460 1460 for node in outgoing.missingheads:
1461 1461 # Don't compute missing, as this may slow down serving.
1462 1462 fnode = cache.getfnode(node, computemissing=False)
1463 1463 if fnode is not None:
1464 1464 chunks.extend([node, fnode])
1465 1465
1466 1466 if chunks:
1467 1467 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1468 1468
1469 1469 def buildobsmarkerspart(bundler, markers):
1470 1470 """add an obsmarker part to the bundler with <markers>
1471 1471
1472 1472 No part is created if markers is empty.
1473 1473 Raises ValueError if the bundler doesn't support any known obsmarker format.
1474 1474 """
1475 1475 if not markers:
1476 1476 return None
1477 1477
1478 1478 remoteversions = obsmarkersversion(bundler.capabilities)
1479 1479 version = obsolete.commonversion(remoteversions)
1480 1480 if version is None:
1481 1481 raise ValueError('bundler does not support common obsmarker format')
1482 1482 stream = obsolete.encodemarkers(markers, True, version=version)
1483 1483 return bundler.newpart('obsmarkers', data=stream)
1484 1484
1485 1485 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1486 1486 compopts=None):
1487 1487 """Write a bundle file and return its filename.
1488 1488
1489 1489 Existing files will not be overwritten.
1490 1490 If no filename is specified, a temporary file is created.
1491 1491 bz2 compression can be turned off.
1492 1492 The bundle file will be deleted in case of errors.
1493 1493 """
1494 1494
1495 1495 if bundletype == "HG20":
1496 1496 bundle = bundle20(ui)
1497 1497 bundle.setcompression(compression, compopts)
1498 1498 part = bundle.newpart('changegroup', data=cg.getchunks())
1499 1499 part.addparam('version', cg.version)
1500 1500 if 'clcount' in cg.extras:
1501 1501 part.addparam('nbchanges', str(cg.extras['clcount']),
1502 1502 mandatory=False)
1503 1503 chunkiter = bundle.getchunks()
1504 1504 else:
1505 1505 # compression argument is only for the bundle2 case
1506 1506 assert compression is None
1507 1507 if cg.version != '01':
1508 1508 raise error.Abort(_('old bundle types only supports v1 '
1509 1509 'changegroups'))
1510 1510 header, comp = bundletypes[bundletype]
1511 1511 if comp not in util.compengines.supportedbundletypes:
1512 1512 raise error.Abort(_('unknown stream compression type: %s')
1513 1513 % comp)
1514 1514 compengine = util.compengines.forbundletype(comp)
1515 1515 def chunkiter():
1516 1516 yield header
1517 1517 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1518 1518 yield chunk
1519 1519 chunkiter = chunkiter()
1520 1520
1521 1521 # parse the changegroup data, otherwise we will block
1522 1522 # in case of sshrepo because we don't know the end of the stream
1523 1523 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1524 1524
1525 1525 def combinechangegroupresults(op):
1526 1526 """logic to combine 0 or more addchangegroup results into one"""
1527 1527 results = [r.get('return', 0)
1528 1528 for r in op.records['changegroup']]
1529 1529 changedheads = 0
1530 1530 result = 1
1531 1531 for ret in results:
1532 1532 # If any changegroup result is 0, return 0
1533 1533 if ret == 0:
1534 1534 result = 0
1535 1535 break
1536 1536 if ret < -1:
1537 1537 changedheads += ret + 1
1538 1538 elif ret > 1:
1539 1539 changedheads += ret - 1
1540 1540 if changedheads > 0:
1541 1541 result = 1 + changedheads
1542 1542 elif changedheads < 0:
1543 1543 result = -1 + changedheads
1544 1544 return result
1545 1545
1546 1546 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1547 1547 'targetphase'))
1548 1548 def handlechangegroup(op, inpart):
1549 1549 """apply a changegroup part on the repo
1550 1550
1551 1551 This is a very early implementation that will massive rework before being
1552 1552 inflicted to any end-user.
1553 1553 """
1554 1554 tr = op.gettransaction()
1555 1555 unpackerversion = inpart.params.get('version', '01')
1556 1556 # We should raise an appropriate exception here
1557 1557 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1558 1558 # the source and url passed here are overwritten by the one contained in
1559 1559 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1560 1560 nbchangesets = None
1561 1561 if 'nbchanges' in inpart.params:
1562 1562 nbchangesets = int(inpart.params.get('nbchanges'))
1563 1563 if ('treemanifest' in inpart.params and
1564 1564 'treemanifest' not in op.repo.requirements):
1565 1565 if len(op.repo.changelog) != 0:
1566 1566 raise error.Abort(_(
1567 1567 "bundle contains tree manifests, but local repo is "
1568 1568 "non-empty and does not use tree manifests"))
1569 1569 op.repo.requirements.add('treemanifest')
1570 1570 op.repo._applyopenerreqs()
1571 1571 op.repo._writerequirements()
1572 1572 extrakwargs = {}
1573 1573 targetphase = inpart.params.get('targetphase')
1574 1574 if targetphase is not None:
1575 1575 extrakwargs['targetphase'] = int(targetphase)
1576 1576 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1577 1577 expectedtotal=nbchangesets, **extrakwargs)
1578 1578 if op.reply is not None:
1579 1579 # This is definitely not the final form of this
1580 1580 # return. But one need to start somewhere.
1581 1581 part = op.reply.newpart('reply:changegroup', mandatory=False)
1582 1582 part.addparam(
1583 1583 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1584 1584 part.addparam('return', '%i' % ret, mandatory=False)
1585 1585 assert not inpart.read()
1586 1586
1587 1587 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1588 1588 ['digest:%s' % k for k in util.DIGESTS.keys()])
1589 1589 @parthandler('remote-changegroup', _remotechangegroupparams)
1590 1590 def handleremotechangegroup(op, inpart):
1591 1591 """apply a bundle10 on the repo, given an url and validation information
1592 1592
1593 1593 All the information about the remote bundle to import are given as
1594 1594 parameters. The parameters include:
1595 1595 - url: the url to the bundle10.
1596 1596 - size: the bundle10 file size. It is used to validate what was
1597 1597 retrieved by the client matches the server knowledge about the bundle.
1598 1598 - digests: a space separated list of the digest types provided as
1599 1599 parameters.
1600 1600 - digest:<digest-type>: the hexadecimal representation of the digest with
1601 1601 that name. Like the size, it is used to validate what was retrieved by
1602 1602 the client matches what the server knows about the bundle.
1603 1603
1604 1604 When multiple digest types are given, all of them are checked.
1605 1605 """
1606 1606 try:
1607 1607 raw_url = inpart.params['url']
1608 1608 except KeyError:
1609 1609 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1610 1610 parsed_url = util.url(raw_url)
1611 1611 if parsed_url.scheme not in capabilities['remote-changegroup']:
1612 1612 raise error.Abort(_('remote-changegroup does not support %s urls') %
1613 1613 parsed_url.scheme)
1614 1614
1615 1615 try:
1616 1616 size = int(inpart.params['size'])
1617 1617 except ValueError:
1618 1618 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1619 1619 % 'size')
1620 1620 except KeyError:
1621 1621 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1622 1622
1623 1623 digests = {}
1624 1624 for typ in inpart.params.get('digests', '').split():
1625 1625 param = 'digest:%s' % typ
1626 1626 try:
1627 1627 value = inpart.params[param]
1628 1628 except KeyError:
1629 1629 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1630 1630 param)
1631 1631 digests[typ] = value
1632 1632
1633 1633 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1634 1634
1635 1635 tr = op.gettransaction()
1636 1636 from . import exchange
1637 1637 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1638 1638 if not isinstance(cg, changegroup.cg1unpacker):
1639 1639 raise error.Abort(_('%s: not a bundle version 1.0') %
1640 1640 util.hidepassword(raw_url))
1641 1641 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1642 1642 if op.reply is not None:
1643 1643 # This is definitely not the final form of this
1644 1644 # return. But one need to start somewhere.
1645 1645 part = op.reply.newpart('reply:changegroup')
1646 1646 part.addparam(
1647 1647 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1648 1648 part.addparam('return', '%i' % ret, mandatory=False)
1649 1649 try:
1650 1650 real_part.validate()
1651 1651 except error.Abort as e:
1652 1652 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1653 1653 (util.hidepassword(raw_url), str(e)))
1654 1654 assert not inpart.read()
1655 1655
1656 1656 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1657 1657 def handlereplychangegroup(op, inpart):
1658 1658 ret = int(inpart.params['return'])
1659 1659 replyto = int(inpart.params['in-reply-to'])
1660 1660 op.records.add('changegroup', {'return': ret}, replyto)
1661 1661
1662 1662 @parthandler('check:heads')
1663 1663 def handlecheckheads(op, inpart):
1664 1664 """check that head of the repo did not change
1665 1665
1666 1666 This is used to detect a push race when using unbundle.
1667 1667 This replaces the "heads" argument of unbundle."""
1668 1668 h = inpart.read(20)
1669 1669 heads = []
1670 1670 while len(h) == 20:
1671 1671 heads.append(h)
1672 1672 h = inpart.read(20)
1673 1673 assert not h
1674 1674 # Trigger a transaction so that we are guaranteed to have the lock now.
1675 1675 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1676 1676 op.gettransaction()
1677 1677 if sorted(heads) != sorted(op.repo.heads()):
1678 1678 raise error.PushRaced('repository changed while pushing - '
1679 1679 'please try again')
1680 1680
1681 1681 @parthandler('check:updated-heads')
1682 1682 def handlecheckupdatedheads(op, inpart):
1683 1683 """check for race on the heads touched by a push
1684 1684
1685 1685 This is similar to 'check:heads' but focus on the heads actually updated
1686 1686 during the push. If other activities happen on unrelated heads, it is
1687 1687 ignored.
1688 1688
1689 1689 This allow server with high traffic to avoid push contention as long as
1690 1690 unrelated parts of the graph are involved."""
1691 1691 h = inpart.read(20)
1692 1692 heads = []
1693 1693 while len(h) == 20:
1694 1694 heads.append(h)
1695 1695 h = inpart.read(20)
1696 1696 assert not h
1697 1697 # trigger a transaction so that we are guaranteed to have the lock now.
1698 1698 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1699 1699 op.gettransaction()
1700 1700
1701 1701 currentheads = set()
1702 1702 for ls in op.repo.branchmap().itervalues():
1703 1703 currentheads.update(ls)
1704 1704
1705 1705 for h in heads:
1706 1706 if h not in currentheads:
1707 1707 raise error.PushRaced('repository changed while pushing - '
1708 1708 'please try again')
1709 1709
1710 1710 @parthandler('output')
1711 1711 def handleoutput(op, inpart):
1712 1712 """forward output captured on the server to the client"""
1713 1713 for line in inpart.read().splitlines():
1714 1714 op.ui.status(_('remote: %s\n') % line)
1715 1715
1716 1716 @parthandler('replycaps')
1717 1717 def handlereplycaps(op, inpart):
1718 1718 """Notify that a reply bundle should be created
1719 1719
1720 1720 The payload contains the capabilities information for the reply"""
1721 1721 caps = decodecaps(inpart.read())
1722 1722 if op.reply is None:
1723 1723 op.reply = bundle20(op.ui, caps)
1724 1724
1725 1725 class AbortFromPart(error.Abort):
1726 1726 """Sub-class of Abort that denotes an error from a bundle2 part."""
1727 1727
1728 1728 @parthandler('error:abort', ('message', 'hint'))
1729 1729 def handleerrorabort(op, inpart):
1730 1730 """Used to transmit abort error over the wire"""
1731 1731 raise AbortFromPart(inpart.params['message'],
1732 1732 hint=inpart.params.get('hint'))
1733 1733
1734 1734 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1735 1735 'in-reply-to'))
1736 1736 def handleerrorpushkey(op, inpart):
1737 1737 """Used to transmit failure of a mandatory pushkey over the wire"""
1738 1738 kwargs = {}
1739 1739 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1740 1740 value = inpart.params.get(name)
1741 1741 if value is not None:
1742 1742 kwargs[name] = value
1743 1743 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1744 1744
1745 1745 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1746 1746 def handleerrorunsupportedcontent(op, inpart):
1747 1747 """Used to transmit unknown content error over the wire"""
1748 1748 kwargs = {}
1749 1749 parttype = inpart.params.get('parttype')
1750 1750 if parttype is not None:
1751 1751 kwargs['parttype'] = parttype
1752 1752 params = inpart.params.get('params')
1753 1753 if params is not None:
1754 1754 kwargs['params'] = params.split('\0')
1755 1755
1756 1756 raise error.BundleUnknownFeatureError(**kwargs)
1757 1757
1758 1758 @parthandler('error:pushraced', ('message',))
1759 1759 def handleerrorpushraced(op, inpart):
1760 1760 """Used to transmit push race error over the wire"""
1761 1761 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1762 1762
1763 1763 @parthandler('listkeys', ('namespace',))
1764 1764 def handlelistkeys(op, inpart):
1765 1765 """retrieve pushkey namespace content stored in a bundle2"""
1766 1766 namespace = inpart.params['namespace']
1767 1767 r = pushkey.decodekeys(inpart.read())
1768 1768 op.records.add('listkeys', (namespace, r))
1769 1769
1770 1770 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1771 1771 def handlepushkey(op, inpart):
1772 1772 """process a pushkey request"""
1773 1773 dec = pushkey.decode
1774 1774 namespace = dec(inpart.params['namespace'])
1775 1775 key = dec(inpart.params['key'])
1776 1776 old = dec(inpart.params['old'])
1777 1777 new = dec(inpart.params['new'])
1778 1778 # Grab the transaction to ensure that we have the lock before performing the
1779 1779 # pushkey.
1780 1780 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1781 1781 op.gettransaction()
1782 1782 ret = op.repo.pushkey(namespace, key, old, new)
1783 1783 record = {'namespace': namespace,
1784 1784 'key': key,
1785 1785 'old': old,
1786 1786 'new': new}
1787 1787 op.records.add('pushkey', record)
1788 1788 if op.reply is not None:
1789 1789 rpart = op.reply.newpart('reply:pushkey')
1790 1790 rpart.addparam(
1791 1791 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1792 1792 rpart.addparam('return', '%i' % ret, mandatory=False)
1793 1793 if inpart.mandatory and not ret:
1794 1794 kwargs = {}
1795 1795 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1796 1796 if key in inpart.params:
1797 1797 kwargs[key] = inpart.params[key]
1798 1798 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1799 1799
1800 1800 def _readphaseheads(inpart):
1801 1801 headsbyphase = [[] for i in phases.allphases]
1802 1802 entrysize = struct.calcsize(_fphasesentry)
1803 1803 while True:
1804 1804 entry = inpart.read(entrysize)
1805 1805 if len(entry) < entrysize:
1806 1806 if entry:
1807 1807 raise error.Abort(_('bad phase-heads bundle part'))
1808 1808 break
1809 1809 phase, node = struct.unpack(_fphasesentry, entry)
1810 1810 headsbyphase[phase].append(node)
1811 1811 return headsbyphase
1812 1812
1813 1813 @parthandler('phase-heads')
1814 1814 def handlephases(op, inpart):
1815 1815 """apply phases from bundle part to repo"""
1816 1816 headsbyphase = _readphaseheads(inpart)
1817 1817 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1818 op.records.add('phase-heads', {})
1818 1819
1819 1820 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1820 1821 def handlepushkeyreply(op, inpart):
1821 1822 """retrieve the result of a pushkey request"""
1822 1823 ret = int(inpart.params['return'])
1823 1824 partid = int(inpart.params['in-reply-to'])
1824 1825 op.records.add('pushkey', {'return': ret}, partid)
1825 1826
1826 1827 @parthandler('obsmarkers')
1827 1828 def handleobsmarker(op, inpart):
1828 1829 """add a stream of obsmarkers to the repo"""
1829 1830 tr = op.gettransaction()
1830 1831 markerdata = inpart.read()
1831 1832 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1832 1833 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1833 1834 % len(markerdata))
1834 1835 # The mergemarkers call will crash if marker creation is not enabled.
1835 1836 # we want to avoid this if the part is advisory.
1836 1837 if not inpart.mandatory and op.repo.obsstore.readonly:
1837 1838 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1838 1839 return
1839 1840 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1840 1841 op.repo.invalidatevolatilesets()
1841 1842 if new:
1842 1843 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1843 1844 op.records.add('obsmarkers', {'new': new})
1844 1845 if op.reply is not None:
1845 1846 rpart = op.reply.newpart('reply:obsmarkers')
1846 1847 rpart.addparam(
1847 1848 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1848 1849 rpart.addparam('new', '%i' % new, mandatory=False)
1849 1850
1850 1851
1851 1852 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1852 1853 def handleobsmarkerreply(op, inpart):
1853 1854 """retrieve the result of a pushkey request"""
1854 1855 ret = int(inpart.params['new'])
1855 1856 partid = int(inpart.params['in-reply-to'])
1856 1857 op.records.add('obsmarkers', {'new': ret}, partid)
1857 1858
1858 1859 @parthandler('hgtagsfnodes')
1859 1860 def handlehgtagsfnodes(op, inpart):
1860 1861 """Applies .hgtags fnodes cache entries to the local repo.
1861 1862
1862 1863 Payload is pairs of 20 byte changeset nodes and filenodes.
1863 1864 """
1864 1865 # Grab the transaction so we ensure that we have the lock at this point.
1865 1866 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1866 1867 op.gettransaction()
1867 1868 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1868 1869
1869 1870 count = 0
1870 1871 while True:
1871 1872 node = inpart.read(20)
1872 1873 fnode = inpart.read(20)
1873 1874 if len(node) < 20 or len(fnode) < 20:
1874 1875 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1875 1876 break
1876 1877 cache.setfnode(node, fnode)
1877 1878 count += 1
1878 1879
1879 1880 cache.write()
1880 1881 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1881 1882
1882 1883 @parthandler('pushvars')
1883 1884 def bundle2getvars(op, part):
1884 1885 '''unbundle a bundle2 containing shellvars on the server'''
1885 1886 # An option to disable unbundling on server-side for security reasons
1886 1887 if op.ui.configbool('push', 'pushvars.server'):
1887 1888 hookargs = {}
1888 1889 for key, value in part.advisoryparams:
1889 1890 key = key.upper()
1890 1891 # We want pushed variables to have USERVAR_ prepended so we know
1891 1892 # they came from the --pushvar flag.
1892 1893 key = "USERVAR_" + key
1893 1894 hookargs[key] = value
1894 1895 op.addhookargs(hookargs)
@@ -1,2013 +1,2017 b''
1 1 # exchange.py - utility to exchange data between repos.
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import errno
11 11 import hashlib
12 12
13 13 from .i18n import _
14 14 from .node import (
15 15 hex,
16 16 nullid,
17 17 )
18 18 from . import (
19 19 bookmarks as bookmod,
20 20 bundle2,
21 21 changegroup,
22 22 discovery,
23 23 error,
24 24 lock as lockmod,
25 25 obsolete,
26 26 phases,
27 27 pushkey,
28 28 pycompat,
29 29 scmutil,
30 30 sslutil,
31 31 streamclone,
32 32 url as urlmod,
33 33 util,
34 34 )
35 35
36 36 urlerr = util.urlerr
37 37 urlreq = util.urlreq
38 38
39 39 # Maps bundle version human names to changegroup versions.
40 40 _bundlespeccgversions = {'v1': '01',
41 41 'v2': '02',
42 42 'packed1': 's1',
43 43 'bundle2': '02', #legacy
44 44 }
45 45
46 46 # Compression engines allowed in version 1. THIS SHOULD NEVER CHANGE.
47 47 _bundlespecv1compengines = {'gzip', 'bzip2', 'none'}
48 48
49 49 def parsebundlespec(repo, spec, strict=True, externalnames=False):
50 50 """Parse a bundle string specification into parts.
51 51
52 52 Bundle specifications denote a well-defined bundle/exchange format.
53 53 The content of a given specification should not change over time in
54 54 order to ensure that bundles produced by a newer version of Mercurial are
55 55 readable from an older version.
56 56
57 57 The string currently has the form:
58 58
59 59 <compression>-<type>[;<parameter0>[;<parameter1>]]
60 60
61 61 Where <compression> is one of the supported compression formats
62 62 and <type> is (currently) a version string. A ";" can follow the type and
63 63 all text afterwards is interpreted as URI encoded, ";" delimited key=value
64 64 pairs.
65 65
66 66 If ``strict`` is True (the default) <compression> is required. Otherwise,
67 67 it is optional.
68 68
69 69 If ``externalnames`` is False (the default), the human-centric names will
70 70 be converted to their internal representation.
71 71
72 72 Returns a 3-tuple of (compression, version, parameters). Compression will
73 73 be ``None`` if not in strict mode and a compression isn't defined.
74 74
75 75 An ``InvalidBundleSpecification`` is raised when the specification is
76 76 not syntactically well formed.
77 77
78 78 An ``UnsupportedBundleSpecification`` is raised when the compression or
79 79 bundle type/version is not recognized.
80 80
81 81 Note: this function will likely eventually return a more complex data
82 82 structure, including bundle2 part information.
83 83 """
84 84 def parseparams(s):
85 85 if ';' not in s:
86 86 return s, {}
87 87
88 88 params = {}
89 89 version, paramstr = s.split(';', 1)
90 90
91 91 for p in paramstr.split(';'):
92 92 if '=' not in p:
93 93 raise error.InvalidBundleSpecification(
94 94 _('invalid bundle specification: '
95 95 'missing "=" in parameter: %s') % p)
96 96
97 97 key, value = p.split('=', 1)
98 98 key = urlreq.unquote(key)
99 99 value = urlreq.unquote(value)
100 100 params[key] = value
101 101
102 102 return version, params
103 103
104 104
105 105 if strict and '-' not in spec:
106 106 raise error.InvalidBundleSpecification(
107 107 _('invalid bundle specification; '
108 108 'must be prefixed with compression: %s') % spec)
109 109
110 110 if '-' in spec:
111 111 compression, version = spec.split('-', 1)
112 112
113 113 if compression not in util.compengines.supportedbundlenames:
114 114 raise error.UnsupportedBundleSpecification(
115 115 _('%s compression is not supported') % compression)
116 116
117 117 version, params = parseparams(version)
118 118
119 119 if version not in _bundlespeccgversions:
120 120 raise error.UnsupportedBundleSpecification(
121 121 _('%s is not a recognized bundle version') % version)
122 122 else:
123 123 # Value could be just the compression or just the version, in which
124 124 # case some defaults are assumed (but only when not in strict mode).
125 125 assert not strict
126 126
127 127 spec, params = parseparams(spec)
128 128
129 129 if spec in util.compengines.supportedbundlenames:
130 130 compression = spec
131 131 version = 'v1'
132 132 # Generaldelta repos require v2.
133 133 if 'generaldelta' in repo.requirements:
134 134 version = 'v2'
135 135 # Modern compression engines require v2.
136 136 if compression not in _bundlespecv1compengines:
137 137 version = 'v2'
138 138 elif spec in _bundlespeccgversions:
139 139 if spec == 'packed1':
140 140 compression = 'none'
141 141 else:
142 142 compression = 'bzip2'
143 143 version = spec
144 144 else:
145 145 raise error.UnsupportedBundleSpecification(
146 146 _('%s is not a recognized bundle specification') % spec)
147 147
148 148 # Bundle version 1 only supports a known set of compression engines.
149 149 if version == 'v1' and compression not in _bundlespecv1compengines:
150 150 raise error.UnsupportedBundleSpecification(
151 151 _('compression engine %s is not supported on v1 bundles') %
152 152 compression)
153 153
154 154 # The specification for packed1 can optionally declare the data formats
155 155 # required to apply it. If we see this metadata, compare against what the
156 156 # repo supports and error if the bundle isn't compatible.
157 157 if version == 'packed1' and 'requirements' in params:
158 158 requirements = set(params['requirements'].split(','))
159 159 missingreqs = requirements - repo.supportedformats
160 160 if missingreqs:
161 161 raise error.UnsupportedBundleSpecification(
162 162 _('missing support for repository features: %s') %
163 163 ', '.join(sorted(missingreqs)))
164 164
165 165 if not externalnames:
166 166 engine = util.compengines.forbundlename(compression)
167 167 compression = engine.bundletype()[1]
168 168 version = _bundlespeccgversions[version]
169 169 return compression, version, params
170 170
171 171 def readbundle(ui, fh, fname, vfs=None):
172 172 header = changegroup.readexactly(fh, 4)
173 173
174 174 alg = None
175 175 if not fname:
176 176 fname = "stream"
177 177 if not header.startswith('HG') and header.startswith('\0'):
178 178 fh = changegroup.headerlessfixup(fh, header)
179 179 header = "HG10"
180 180 alg = 'UN'
181 181 elif vfs:
182 182 fname = vfs.join(fname)
183 183
184 184 magic, version = header[0:2], header[2:4]
185 185
186 186 if magic != 'HG':
187 187 raise error.Abort(_('%s: not a Mercurial bundle') % fname)
188 188 if version == '10':
189 189 if alg is None:
190 190 alg = changegroup.readexactly(fh, 2)
191 191 return changegroup.cg1unpacker(fh, alg)
192 192 elif version.startswith('2'):
193 193 return bundle2.getunbundler(ui, fh, magicstring=magic + version)
194 194 elif version == 'S1':
195 195 return streamclone.streamcloneapplier(fh)
196 196 else:
197 197 raise error.Abort(_('%s: unknown bundle version %s') % (fname, version))
198 198
199 199 def getbundlespec(ui, fh):
200 200 """Infer the bundlespec from a bundle file handle.
201 201
202 202 The input file handle is seeked and the original seek position is not
203 203 restored.
204 204 """
205 205 def speccompression(alg):
206 206 try:
207 207 return util.compengines.forbundletype(alg).bundletype()[0]
208 208 except KeyError:
209 209 return None
210 210
211 211 b = readbundle(ui, fh, None)
212 212 if isinstance(b, changegroup.cg1unpacker):
213 213 alg = b._type
214 214 if alg == '_truncatedBZ':
215 215 alg = 'BZ'
216 216 comp = speccompression(alg)
217 217 if not comp:
218 218 raise error.Abort(_('unknown compression algorithm: %s') % alg)
219 219 return '%s-v1' % comp
220 220 elif isinstance(b, bundle2.unbundle20):
221 221 if 'Compression' in b.params:
222 222 comp = speccompression(b.params['Compression'])
223 223 if not comp:
224 224 raise error.Abort(_('unknown compression algorithm: %s') % comp)
225 225 else:
226 226 comp = 'none'
227 227
228 228 version = None
229 229 for part in b.iterparts():
230 230 if part.type == 'changegroup':
231 231 version = part.params['version']
232 232 if version in ('01', '02'):
233 233 version = 'v2'
234 234 else:
235 235 raise error.Abort(_('changegroup version %s does not have '
236 236 'a known bundlespec') % version,
237 237 hint=_('try upgrading your Mercurial '
238 238 'client'))
239 239
240 240 if not version:
241 241 raise error.Abort(_('could not identify changegroup version in '
242 242 'bundle'))
243 243
244 244 return '%s-%s' % (comp, version)
245 245 elif isinstance(b, streamclone.streamcloneapplier):
246 246 requirements = streamclone.readbundle1header(fh)[2]
247 247 params = 'requirements=%s' % ','.join(sorted(requirements))
248 248 return 'none-packed1;%s' % urlreq.quote(params)
249 249 else:
250 250 raise error.Abort(_('unknown bundle type: %s') % b)
251 251
252 252 def _computeoutgoing(repo, heads, common):
253 253 """Computes which revs are outgoing given a set of common
254 254 and a set of heads.
255 255
256 256 This is a separate function so extensions can have access to
257 257 the logic.
258 258
259 259 Returns a discovery.outgoing object.
260 260 """
261 261 cl = repo.changelog
262 262 if common:
263 263 hasnode = cl.hasnode
264 264 common = [n for n in common if hasnode(n)]
265 265 else:
266 266 common = [nullid]
267 267 if not heads:
268 268 heads = cl.heads()
269 269 return discovery.outgoing(repo, common, heads)
270 270
271 271 def _forcebundle1(op):
272 272 """return true if a pull/push must use bundle1
273 273
274 274 This function is used to allow testing of the older bundle version"""
275 275 ui = op.repo.ui
276 276 forcebundle1 = False
277 277 # The goal is this config is to allow developer to choose the bundle
278 278 # version used during exchanged. This is especially handy during test.
279 279 # Value is a list of bundle version to be picked from, highest version
280 280 # should be used.
281 281 #
282 282 # developer config: devel.legacy.exchange
283 283 exchange = ui.configlist('devel', 'legacy.exchange')
284 284 forcebundle1 = 'bundle2' not in exchange and 'bundle1' in exchange
285 285 return forcebundle1 or not op.remote.capable('bundle2')
286 286
287 287 class pushoperation(object):
288 288 """A object that represent a single push operation
289 289
290 290 Its purpose is to carry push related state and very common operations.
291 291
292 292 A new pushoperation should be created at the beginning of each push and
293 293 discarded afterward.
294 294 """
295 295
296 296 def __init__(self, repo, remote, force=False, revs=None, newbranch=False,
297 297 bookmarks=(), pushvars=None):
298 298 # repo we push from
299 299 self.repo = repo
300 300 self.ui = repo.ui
301 301 # repo we push to
302 302 self.remote = remote
303 303 # force option provided
304 304 self.force = force
305 305 # revs to be pushed (None is "all")
306 306 self.revs = revs
307 307 # bookmark explicitly pushed
308 308 self.bookmarks = bookmarks
309 309 # allow push of new branch
310 310 self.newbranch = newbranch
311 311 # step already performed
312 312 # (used to check what steps have been already performed through bundle2)
313 313 self.stepsdone = set()
314 314 # Integer version of the changegroup push result
315 315 # - None means nothing to push
316 316 # - 0 means HTTP error
317 317 # - 1 means we pushed and remote head count is unchanged *or*
318 318 # we have outgoing changesets but refused to push
319 319 # - other values as described by addchangegroup()
320 320 self.cgresult = None
321 321 # Boolean value for the bookmark push
322 322 self.bkresult = None
323 323 # discover.outgoing object (contains common and outgoing data)
324 324 self.outgoing = None
325 325 # all remote topological heads before the push
326 326 self.remoteheads = None
327 327 # Details of the remote branch pre and post push
328 328 #
329 329 # mapping: {'branch': ([remoteheads],
330 330 # [newheads],
331 331 # [unsyncedheads],
332 332 # [discardedheads])}
333 333 # - branch: the branch name
334 334 # - remoteheads: the list of remote heads known locally
335 335 # None if the branch is new
336 336 # - newheads: the new remote heads (known locally) with outgoing pushed
337 337 # - unsyncedheads: the list of remote heads unknown locally.
338 338 # - discardedheads: the list of remote heads made obsolete by the push
339 339 self.pushbranchmap = None
340 340 # testable as a boolean indicating if any nodes are missing locally.
341 341 self.incoming = None
342 342 # phases changes that must be pushed along side the changesets
343 343 self.outdatedphases = None
344 344 # phases changes that must be pushed if changeset push fails
345 345 self.fallbackoutdatedphases = None
346 346 # outgoing obsmarkers
347 347 self.outobsmarkers = set()
348 348 # outgoing bookmarks
349 349 self.outbookmarks = []
350 350 # transaction manager
351 351 self.trmanager = None
352 352 # map { pushkey partid -> callback handling failure}
353 353 # used to handle exception from mandatory pushkey part failure
354 354 self.pkfailcb = {}
355 355 # an iterable of pushvars or None
356 356 self.pushvars = pushvars
357 357
358 358 @util.propertycache
359 359 def futureheads(self):
360 360 """future remote heads if the changeset push succeeds"""
361 361 return self.outgoing.missingheads
362 362
363 363 @util.propertycache
364 364 def fallbackheads(self):
365 365 """future remote heads if the changeset push fails"""
366 366 if self.revs is None:
367 367 # not target to push, all common are relevant
368 368 return self.outgoing.commonheads
369 369 unfi = self.repo.unfiltered()
370 370 # I want cheads = heads(::missingheads and ::commonheads)
371 371 # (missingheads is revs with secret changeset filtered out)
372 372 #
373 373 # This can be expressed as:
374 374 # cheads = ( (missingheads and ::commonheads)
375 375 # + (commonheads and ::missingheads))"
376 376 # )
377 377 #
378 378 # while trying to push we already computed the following:
379 379 # common = (::commonheads)
380 380 # missing = ((commonheads::missingheads) - commonheads)
381 381 #
382 382 # We can pick:
383 383 # * missingheads part of common (::commonheads)
384 384 common = self.outgoing.common
385 385 nm = self.repo.changelog.nodemap
386 386 cheads = [node for node in self.revs if nm[node] in common]
387 387 # and
388 388 # * commonheads parents on missing
389 389 revset = unfi.set('%ln and parents(roots(%ln))',
390 390 self.outgoing.commonheads,
391 391 self.outgoing.missing)
392 392 cheads.extend(c.node() for c in revset)
393 393 return cheads
394 394
395 395 @property
396 396 def commonheads(self):
397 397 """set of all common heads after changeset bundle push"""
398 398 if self.cgresult:
399 399 return self.futureheads
400 400 else:
401 401 return self.fallbackheads
402 402
403 403 # mapping of message used when pushing bookmark
404 404 bookmsgmap = {'update': (_("updating bookmark %s\n"),
405 405 _('updating bookmark %s failed!\n')),
406 406 'export': (_("exporting bookmark %s\n"),
407 407 _('exporting bookmark %s failed!\n')),
408 408 'delete': (_("deleting remote bookmark %s\n"),
409 409 _('deleting remote bookmark %s failed!\n')),
410 410 }
411 411
412 412
413 413 def push(repo, remote, force=False, revs=None, newbranch=False, bookmarks=(),
414 414 opargs=None):
415 415 '''Push outgoing changesets (limited by revs) from a local
416 416 repository to remote. Return an integer:
417 417 - None means nothing to push
418 418 - 0 means HTTP error
419 419 - 1 means we pushed and remote head count is unchanged *or*
420 420 we have outgoing changesets but refused to push
421 421 - other values as described by addchangegroup()
422 422 '''
423 423 if opargs is None:
424 424 opargs = {}
425 425 pushop = pushoperation(repo, remote, force, revs, newbranch, bookmarks,
426 426 **opargs)
427 427 if pushop.remote.local():
428 428 missing = (set(pushop.repo.requirements)
429 429 - pushop.remote.local().supported)
430 430 if missing:
431 431 msg = _("required features are not"
432 432 " supported in the destination:"
433 433 " %s") % (', '.join(sorted(missing)))
434 434 raise error.Abort(msg)
435 435
436 436 if not pushop.remote.canpush():
437 437 raise error.Abort(_("destination does not support push"))
438 438
439 439 if not pushop.remote.capable('unbundle'):
440 440 raise error.Abort(_('cannot push: destination does not support the '
441 441 'unbundle wire protocol command'))
442 442
443 443 # get lock as we might write phase data
444 444 wlock = lock = None
445 445 try:
446 446 # bundle2 push may receive a reply bundle touching bookmarks or other
447 447 # things requiring the wlock. Take it now to ensure proper ordering.
448 448 maypushback = pushop.ui.configbool('experimental', 'bundle2.pushback')
449 449 if (not _forcebundle1(pushop)) and maypushback:
450 450 wlock = pushop.repo.wlock()
451 451 lock = pushop.repo.lock()
452 452 pushop.trmanager = transactionmanager(pushop.repo,
453 453 'push-response',
454 454 pushop.remote.url())
455 455 except IOError as err:
456 456 if err.errno != errno.EACCES:
457 457 raise
458 458 # source repo cannot be locked.
459 459 # We do not abort the push, but just disable the local phase
460 460 # synchronisation.
461 461 msg = 'cannot lock source repository: %s\n' % err
462 462 pushop.ui.debug(msg)
463 463
464 464 with wlock or util.nullcontextmanager(), \
465 465 lock or util.nullcontextmanager(), \
466 466 pushop.trmanager or util.nullcontextmanager():
467 467 pushop.repo.checkpush(pushop)
468 468 _pushdiscovery(pushop)
469 469 if not _forcebundle1(pushop):
470 470 _pushbundle2(pushop)
471 471 _pushchangeset(pushop)
472 472 _pushsyncphase(pushop)
473 473 _pushobsolete(pushop)
474 474 _pushbookmark(pushop)
475 475
476 476 return pushop
477 477
478 478 # list of steps to perform discovery before push
479 479 pushdiscoveryorder = []
480 480
481 481 # Mapping between step name and function
482 482 #
483 483 # This exists to help extensions wrap steps if necessary
484 484 pushdiscoverymapping = {}
485 485
486 486 def pushdiscovery(stepname):
487 487 """decorator for function performing discovery before push
488 488
489 489 The function is added to the step -> function mapping and appended to the
490 490 list of steps. Beware that decorated function will be added in order (this
491 491 may matter).
492 492
493 493 You can only use this decorator for a new step, if you want to wrap a step
494 494 from an extension, change the pushdiscovery dictionary directly."""
495 495 def dec(func):
496 496 assert stepname not in pushdiscoverymapping
497 497 pushdiscoverymapping[stepname] = func
498 498 pushdiscoveryorder.append(stepname)
499 499 return func
500 500 return dec
501 501
502 502 def _pushdiscovery(pushop):
503 503 """Run all discovery steps"""
504 504 for stepname in pushdiscoveryorder:
505 505 step = pushdiscoverymapping[stepname]
506 506 step(pushop)
507 507
508 508 @pushdiscovery('changeset')
509 509 def _pushdiscoverychangeset(pushop):
510 510 """discover the changeset that need to be pushed"""
511 511 fci = discovery.findcommonincoming
512 512 commoninc = fci(pushop.repo, pushop.remote, force=pushop.force)
513 513 common, inc, remoteheads = commoninc
514 514 fco = discovery.findcommonoutgoing
515 515 outgoing = fco(pushop.repo, pushop.remote, onlyheads=pushop.revs,
516 516 commoninc=commoninc, force=pushop.force)
517 517 pushop.outgoing = outgoing
518 518 pushop.remoteheads = remoteheads
519 519 pushop.incoming = inc
520 520
521 521 @pushdiscovery('phase')
522 522 def _pushdiscoveryphase(pushop):
523 523 """discover the phase that needs to be pushed
524 524
525 525 (computed for both success and failure case for changesets push)"""
526 526 outgoing = pushop.outgoing
527 527 unfi = pushop.repo.unfiltered()
528 528 remotephases = pushop.remote.listkeys('phases')
529 529 publishing = remotephases.get('publishing', False)
530 530 if (pushop.ui.configbool('ui', '_usedassubrepo')
531 531 and remotephases # server supports phases
532 532 and not pushop.outgoing.missing # no changesets to be pushed
533 533 and publishing):
534 534 # When:
535 535 # - this is a subrepo push
536 536 # - and remote support phase
537 537 # - and no changeset are to be pushed
538 538 # - and remote is publishing
539 539 # We may be in issue 3871 case!
540 540 # We drop the possible phase synchronisation done by
541 541 # courtesy to publish changesets possibly locally draft
542 542 # on the remote.
543 543 remotephases = {'publishing': 'True'}
544 544 ana = phases.analyzeremotephases(pushop.repo,
545 545 pushop.fallbackheads,
546 546 remotephases)
547 547 pheads, droots = ana
548 548 extracond = ''
549 549 if not publishing:
550 550 extracond = ' and public()'
551 551 revset = 'heads((%%ln::%%ln) %s)' % extracond
552 552 # Get the list of all revs draft on remote by public here.
553 553 # XXX Beware that revset break if droots is not strictly
554 554 # XXX root we may want to ensure it is but it is costly
555 555 fallback = list(unfi.set(revset, droots, pushop.fallbackheads))
556 556 if not outgoing.missing:
557 557 future = fallback
558 558 else:
559 559 # adds changeset we are going to push as draft
560 560 #
561 561 # should not be necessary for publishing server, but because of an
562 562 # issue fixed in xxxxx we have to do it anyway.
563 563 fdroots = list(unfi.set('roots(%ln + %ln::)',
564 564 outgoing.missing, droots))
565 565 fdroots = [f.node() for f in fdroots]
566 566 future = list(unfi.set(revset, fdroots, pushop.futureheads))
567 567 pushop.outdatedphases = future
568 568 pushop.fallbackoutdatedphases = fallback
569 569
570 570 @pushdiscovery('obsmarker')
571 571 def _pushdiscoveryobsmarkers(pushop):
572 572 if (obsolete.isenabled(pushop.repo, obsolete.exchangeopt)
573 573 and pushop.repo.obsstore
574 574 and 'obsolete' in pushop.remote.listkeys('namespaces')):
575 575 repo = pushop.repo
576 576 # very naive computation, that can be quite expensive on big repo.
577 577 # However: evolution is currently slow on them anyway.
578 578 nodes = (c.node() for c in repo.set('::%ln', pushop.futureheads))
579 579 pushop.outobsmarkers = pushop.repo.obsstore.relevantmarkers(nodes)
580 580
581 581 @pushdiscovery('bookmarks')
582 582 def _pushdiscoverybookmarks(pushop):
583 583 ui = pushop.ui
584 584 repo = pushop.repo.unfiltered()
585 585 remote = pushop.remote
586 586 ui.debug("checking for updated bookmarks\n")
587 587 ancestors = ()
588 588 if pushop.revs:
589 589 revnums = map(repo.changelog.rev, pushop.revs)
590 590 ancestors = repo.changelog.ancestors(revnums, inclusive=True)
591 591 remotebookmark = remote.listkeys('bookmarks')
592 592
593 593 explicit = set([repo._bookmarks.expandname(bookmark)
594 594 for bookmark in pushop.bookmarks])
595 595
596 596 remotebookmark = bookmod.unhexlifybookmarks(remotebookmark)
597 597 comp = bookmod.comparebookmarks(repo, repo._bookmarks, remotebookmark)
598 598
599 599 def safehex(x):
600 600 if x is None:
601 601 return x
602 602 return hex(x)
603 603
604 604 def hexifycompbookmarks(bookmarks):
605 605 for b, scid, dcid in bookmarks:
606 606 yield b, safehex(scid), safehex(dcid)
607 607
608 608 comp = [hexifycompbookmarks(marks) for marks in comp]
609 609 addsrc, adddst, advsrc, advdst, diverge, differ, invalid, same = comp
610 610
611 611 for b, scid, dcid in advsrc:
612 612 if b in explicit:
613 613 explicit.remove(b)
614 614 if not ancestors or repo[scid].rev() in ancestors:
615 615 pushop.outbookmarks.append((b, dcid, scid))
616 616 # search added bookmark
617 617 for b, scid, dcid in addsrc:
618 618 if b in explicit:
619 619 explicit.remove(b)
620 620 pushop.outbookmarks.append((b, '', scid))
621 621 # search for overwritten bookmark
622 622 for b, scid, dcid in list(advdst) + list(diverge) + list(differ):
623 623 if b in explicit:
624 624 explicit.remove(b)
625 625 pushop.outbookmarks.append((b, dcid, scid))
626 626 # search for bookmark to delete
627 627 for b, scid, dcid in adddst:
628 628 if b in explicit:
629 629 explicit.remove(b)
630 630 # treat as "deleted locally"
631 631 pushop.outbookmarks.append((b, dcid, ''))
632 632 # identical bookmarks shouldn't get reported
633 633 for b, scid, dcid in same:
634 634 if b in explicit:
635 635 explicit.remove(b)
636 636
637 637 if explicit:
638 638 explicit = sorted(explicit)
639 639 # we should probably list all of them
640 640 ui.warn(_('bookmark %s does not exist on the local '
641 641 'or remote repository!\n') % explicit[0])
642 642 pushop.bkresult = 2
643 643
644 644 pushop.outbookmarks.sort()
645 645
646 646 def _pushcheckoutgoing(pushop):
647 647 outgoing = pushop.outgoing
648 648 unfi = pushop.repo.unfiltered()
649 649 if not outgoing.missing:
650 650 # nothing to push
651 651 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
652 652 return False
653 653 # something to push
654 654 if not pushop.force:
655 655 # if repo.obsstore == False --> no obsolete
656 656 # then, save the iteration
657 657 if unfi.obsstore:
658 658 # this message are here for 80 char limit reason
659 659 mso = _("push includes obsolete changeset: %s!")
660 660 mspd = _("push includes phase-divergent changeset: %s!")
661 661 mscd = _("push includes content-divergent changeset: %s!")
662 662 mst = {"orphan": _("push includes orphan changeset: %s!"),
663 663 "phase-divergent": mspd,
664 664 "content-divergent": mscd}
665 665 # If we are to push if there is at least one
666 666 # obsolete or unstable changeset in missing, at
667 667 # least one of the missinghead will be obsolete or
668 668 # unstable. So checking heads only is ok
669 669 for node in outgoing.missingheads:
670 670 ctx = unfi[node]
671 671 if ctx.obsolete():
672 672 raise error.Abort(mso % ctx)
673 673 elif ctx.isunstable():
674 674 # TODO print more than one instability in the abort
675 675 # message
676 676 raise error.Abort(mst[ctx.instabilities()[0]] % ctx)
677 677
678 678 discovery.checkheads(pushop)
679 679 return True
680 680
681 681 # List of names of steps to perform for an outgoing bundle2, order matters.
682 682 b2partsgenorder = []
683 683
684 684 # Mapping between step name and function
685 685 #
686 686 # This exists to help extensions wrap steps if necessary
687 687 b2partsgenmapping = {}
688 688
689 689 def b2partsgenerator(stepname, idx=None):
690 690 """decorator for function generating bundle2 part
691 691
692 692 The function is added to the step -> function mapping and appended to the
693 693 list of steps. Beware that decorated functions will be added in order
694 694 (this may matter).
695 695
696 696 You can only use this decorator for new steps, if you want to wrap a step
697 697 from an extension, attack the b2partsgenmapping dictionary directly."""
698 698 def dec(func):
699 699 assert stepname not in b2partsgenmapping
700 700 b2partsgenmapping[stepname] = func
701 701 if idx is None:
702 702 b2partsgenorder.append(stepname)
703 703 else:
704 704 b2partsgenorder.insert(idx, stepname)
705 705 return func
706 706 return dec
707 707
708 708 def _pushb2ctxcheckheads(pushop, bundler):
709 709 """Generate race condition checking parts
710 710
711 711 Exists as an independent function to aid extensions
712 712 """
713 713 # * 'force' do not check for push race,
714 714 # * if we don't push anything, there are nothing to check.
715 715 if not pushop.force and pushop.outgoing.missingheads:
716 716 allowunrelated = 'related' in bundler.capabilities.get('checkheads', ())
717 717 emptyremote = pushop.pushbranchmap is None
718 718 if not allowunrelated or emptyremote:
719 719 bundler.newpart('check:heads', data=iter(pushop.remoteheads))
720 720 else:
721 721 affected = set()
722 722 for branch, heads in pushop.pushbranchmap.iteritems():
723 723 remoteheads, newheads, unsyncedheads, discardedheads = heads
724 724 if remoteheads is not None:
725 725 remote = set(remoteheads)
726 726 affected |= set(discardedheads) & remote
727 727 affected |= remote - set(newheads)
728 728 if affected:
729 729 data = iter(sorted(affected))
730 730 bundler.newpart('check:updated-heads', data=data)
731 731
732 732 @b2partsgenerator('changeset')
733 733 def _pushb2ctx(pushop, bundler):
734 734 """handle changegroup push through bundle2
735 735
736 736 addchangegroup result is stored in the ``pushop.cgresult`` attribute.
737 737 """
738 738 if 'changesets' in pushop.stepsdone:
739 739 return
740 740 pushop.stepsdone.add('changesets')
741 741 # Send known heads to the server for race detection.
742 742 if not _pushcheckoutgoing(pushop):
743 743 return
744 744 pushop.repo.prepushoutgoinghooks(pushop)
745 745
746 746 _pushb2ctxcheckheads(pushop, bundler)
747 747
748 748 b2caps = bundle2.bundle2caps(pushop.remote)
749 749 version = '01'
750 750 cgversions = b2caps.get('changegroup')
751 751 if cgversions: # 3.1 and 3.2 ship with an empty value
752 752 cgversions = [v for v in cgversions
753 753 if v in changegroup.supportedoutgoingversions(
754 754 pushop.repo)]
755 755 if not cgversions:
756 756 raise ValueError(_('no common changegroup version'))
757 757 version = max(cgversions)
758 758 cg = changegroup.getlocalchangegroupraw(pushop.repo, 'push',
759 759 pushop.outgoing,
760 760 version=version)
761 761 cgpart = bundler.newpart('changegroup', data=cg)
762 762 if cgversions:
763 763 cgpart.addparam('version', version)
764 764 if 'treemanifest' in pushop.repo.requirements:
765 765 cgpart.addparam('treemanifest', '1')
766 766 def handlereply(op):
767 767 """extract addchangegroup returns from server reply"""
768 768 cgreplies = op.records.getreplies(cgpart.id)
769 769 assert len(cgreplies['changegroup']) == 1
770 770 pushop.cgresult = cgreplies['changegroup'][0]['return']
771 771 return handlereply
772 772
773 773 @b2partsgenerator('phase')
774 774 def _pushb2phases(pushop, bundler):
775 775 """handle phase push through bundle2"""
776 776 if 'phases' in pushop.stepsdone:
777 777 return
778 778 b2caps = bundle2.bundle2caps(pushop.remote)
779 779 if not 'pushkey' in b2caps:
780 780 return
781 781 pushop.stepsdone.add('phases')
782 782 part2node = []
783 783
784 784 def handlefailure(pushop, exc):
785 785 targetid = int(exc.partid)
786 786 for partid, node in part2node:
787 787 if partid == targetid:
788 788 raise error.Abort(_('updating %s to public failed') % node)
789 789
790 790 enc = pushkey.encode
791 791 for newremotehead in pushop.outdatedphases:
792 792 part = bundler.newpart('pushkey')
793 793 part.addparam('namespace', enc('phases'))
794 794 part.addparam('key', enc(newremotehead.hex()))
795 795 part.addparam('old', enc(str(phases.draft)))
796 796 part.addparam('new', enc(str(phases.public)))
797 797 part2node.append((part.id, newremotehead))
798 798 pushop.pkfailcb[part.id] = handlefailure
799 799
800 800 def handlereply(op):
801 801 for partid, node in part2node:
802 802 partrep = op.records.getreplies(partid)
803 803 results = partrep['pushkey']
804 804 assert len(results) <= 1
805 805 msg = None
806 806 if not results:
807 807 msg = _('server ignored update of %s to public!\n') % node
808 808 elif not int(results[0]['return']):
809 809 msg = _('updating %s to public failed!\n') % node
810 810 if msg is not None:
811 811 pushop.ui.warn(msg)
812 812 return handlereply
813 813
814 814 @b2partsgenerator('obsmarkers')
815 815 def _pushb2obsmarkers(pushop, bundler):
816 816 if 'obsmarkers' in pushop.stepsdone:
817 817 return
818 818 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
819 819 if obsolete.commonversion(remoteversions) is None:
820 820 return
821 821 pushop.stepsdone.add('obsmarkers')
822 822 if pushop.outobsmarkers:
823 823 markers = sorted(pushop.outobsmarkers)
824 824 bundle2.buildobsmarkerspart(bundler, markers)
825 825
826 826 @b2partsgenerator('bookmarks')
827 827 def _pushb2bookmarks(pushop, bundler):
828 828 """handle bookmark push through bundle2"""
829 829 if 'bookmarks' in pushop.stepsdone:
830 830 return
831 831 b2caps = bundle2.bundle2caps(pushop.remote)
832 832 if 'pushkey' not in b2caps:
833 833 return
834 834 pushop.stepsdone.add('bookmarks')
835 835 part2book = []
836 836 enc = pushkey.encode
837 837
838 838 def handlefailure(pushop, exc):
839 839 targetid = int(exc.partid)
840 840 for partid, book, action in part2book:
841 841 if partid == targetid:
842 842 raise error.Abort(bookmsgmap[action][1].rstrip() % book)
843 843 # we should not be called for part we did not generated
844 844 assert False
845 845
846 846 for book, old, new in pushop.outbookmarks:
847 847 part = bundler.newpart('pushkey')
848 848 part.addparam('namespace', enc('bookmarks'))
849 849 part.addparam('key', enc(book))
850 850 part.addparam('old', enc(old))
851 851 part.addparam('new', enc(new))
852 852 action = 'update'
853 853 if not old:
854 854 action = 'export'
855 855 elif not new:
856 856 action = 'delete'
857 857 part2book.append((part.id, book, action))
858 858 pushop.pkfailcb[part.id] = handlefailure
859 859
860 860 def handlereply(op):
861 861 ui = pushop.ui
862 862 for partid, book, action in part2book:
863 863 partrep = op.records.getreplies(partid)
864 864 results = partrep['pushkey']
865 865 assert len(results) <= 1
866 866 if not results:
867 867 pushop.ui.warn(_('server ignored bookmark %s update\n') % book)
868 868 else:
869 869 ret = int(results[0]['return'])
870 870 if ret:
871 871 ui.status(bookmsgmap[action][0] % book)
872 872 else:
873 873 ui.warn(bookmsgmap[action][1] % book)
874 874 if pushop.bkresult is not None:
875 875 pushop.bkresult = 1
876 876 return handlereply
877 877
878 878 @b2partsgenerator('pushvars', idx=0)
879 879 def _getbundlesendvars(pushop, bundler):
880 880 '''send shellvars via bundle2'''
881 881 pushvars = pushop.pushvars
882 882 if pushvars:
883 883 shellvars = {}
884 884 for raw in pushvars:
885 885 if '=' not in raw:
886 886 msg = ("unable to parse variable '%s', should follow "
887 887 "'KEY=VALUE' or 'KEY=' format")
888 888 raise error.Abort(msg % raw)
889 889 k, v = raw.split('=', 1)
890 890 shellvars[k] = v
891 891
892 892 part = bundler.newpart('pushvars')
893 893
894 894 for key, value in shellvars.iteritems():
895 895 part.addparam(key, value, mandatory=False)
896 896
897 897 def _pushbundle2(pushop):
898 898 """push data to the remote using bundle2
899 899
900 900 The only currently supported type of data is changegroup but this will
901 901 evolve in the future."""
902 902 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
903 903 pushback = (pushop.trmanager
904 904 and pushop.ui.configbool('experimental', 'bundle2.pushback'))
905 905
906 906 # create reply capability
907 907 capsblob = bundle2.encodecaps(bundle2.getrepocaps(pushop.repo,
908 908 allowpushback=pushback))
909 909 bundler.newpart('replycaps', data=capsblob)
910 910 replyhandlers = []
911 911 for partgenname in b2partsgenorder:
912 912 partgen = b2partsgenmapping[partgenname]
913 913 ret = partgen(pushop, bundler)
914 914 if callable(ret):
915 915 replyhandlers.append(ret)
916 916 # do not push if nothing to push
917 917 if bundler.nbparts <= 1:
918 918 return
919 919 stream = util.chunkbuffer(bundler.getchunks())
920 920 try:
921 921 try:
922 922 reply = pushop.remote.unbundle(
923 923 stream, ['force'], pushop.remote.url())
924 924 except error.BundleValueError as exc:
925 925 raise error.Abort(_('missing support for %s') % exc)
926 926 try:
927 927 trgetter = None
928 928 if pushback:
929 929 trgetter = pushop.trmanager.transaction
930 930 op = bundle2.processbundle(pushop.repo, reply, trgetter)
931 931 except error.BundleValueError as exc:
932 932 raise error.Abort(_('missing support for %s') % exc)
933 933 except bundle2.AbortFromPart as exc:
934 934 pushop.ui.status(_('remote: %s\n') % exc)
935 935 if exc.hint is not None:
936 936 pushop.ui.status(_('remote: %s\n') % ('(%s)' % exc.hint))
937 937 raise error.Abort(_('push failed on remote'))
938 938 except error.PushkeyFailed as exc:
939 939 partid = int(exc.partid)
940 940 if partid not in pushop.pkfailcb:
941 941 raise
942 942 pushop.pkfailcb[partid](pushop, exc)
943 943 for rephand in replyhandlers:
944 944 rephand(op)
945 945
946 946 def _pushchangeset(pushop):
947 947 """Make the actual push of changeset bundle to remote repo"""
948 948 if 'changesets' in pushop.stepsdone:
949 949 return
950 950 pushop.stepsdone.add('changesets')
951 951 if not _pushcheckoutgoing(pushop):
952 952 return
953 953
954 954 # Should have verified this in push().
955 955 assert pushop.remote.capable('unbundle')
956 956
957 957 pushop.repo.prepushoutgoinghooks(pushop)
958 958 outgoing = pushop.outgoing
959 959 # TODO: get bundlecaps from remote
960 960 bundlecaps = None
961 961 # create a changegroup from local
962 962 if pushop.revs is None and not (outgoing.excluded
963 963 or pushop.repo.changelog.filteredrevs):
964 964 # push everything,
965 965 # use the fast path, no race possible on push
966 966 bundler = changegroup.cg1packer(pushop.repo, bundlecaps)
967 967 cg = changegroup.getsubset(pushop.repo,
968 968 outgoing,
969 969 bundler,
970 970 'push',
971 971 fastpath=True)
972 972 else:
973 973 cg = changegroup.getchangegroup(pushop.repo, 'push', outgoing,
974 974 bundlecaps=bundlecaps)
975 975
976 976 # apply changegroup to remote
977 977 # local repo finds heads on server, finds out what
978 978 # revs it must push. once revs transferred, if server
979 979 # finds it has different heads (someone else won
980 980 # commit/push race), server aborts.
981 981 if pushop.force:
982 982 remoteheads = ['force']
983 983 else:
984 984 remoteheads = pushop.remoteheads
985 985 # ssh: return remote's addchangegroup()
986 986 # http: return remote's addchangegroup() or 0 for error
987 987 pushop.cgresult = pushop.remote.unbundle(cg, remoteheads,
988 988 pushop.repo.url())
989 989
990 990 def _pushsyncphase(pushop):
991 991 """synchronise phase information locally and remotely"""
992 992 cheads = pushop.commonheads
993 993 # even when we don't push, exchanging phase data is useful
994 994 remotephases = pushop.remote.listkeys('phases')
995 995 if (pushop.ui.configbool('ui', '_usedassubrepo')
996 996 and remotephases # server supports phases
997 997 and pushop.cgresult is None # nothing was pushed
998 998 and remotephases.get('publishing', False)):
999 999 # When:
1000 1000 # - this is a subrepo push
1001 1001 # - and remote support phase
1002 1002 # - and no changeset was pushed
1003 1003 # - and remote is publishing
1004 1004 # We may be in issue 3871 case!
1005 1005 # We drop the possible phase synchronisation done by
1006 1006 # courtesy to publish changesets possibly locally draft
1007 1007 # on the remote.
1008 1008 remotephases = {'publishing': 'True'}
1009 1009 if not remotephases: # old server or public only reply from non-publishing
1010 1010 _localphasemove(pushop, cheads)
1011 1011 # don't push any phase data as there is nothing to push
1012 1012 else:
1013 1013 ana = phases.analyzeremotephases(pushop.repo, cheads,
1014 1014 remotephases)
1015 1015 pheads, droots = ana
1016 1016 ### Apply remote phase on local
1017 1017 if remotephases.get('publishing', False):
1018 1018 _localphasemove(pushop, cheads)
1019 1019 else: # publish = False
1020 1020 _localphasemove(pushop, pheads)
1021 1021 _localphasemove(pushop, cheads, phases.draft)
1022 1022 ### Apply local phase on remote
1023 1023
1024 1024 if pushop.cgresult:
1025 1025 if 'phases' in pushop.stepsdone:
1026 1026 # phases already pushed though bundle2
1027 1027 return
1028 1028 outdated = pushop.outdatedphases
1029 1029 else:
1030 1030 outdated = pushop.fallbackoutdatedphases
1031 1031
1032 1032 pushop.stepsdone.add('phases')
1033 1033
1034 1034 # filter heads already turned public by the push
1035 1035 outdated = [c for c in outdated if c.node() not in pheads]
1036 1036 # fallback to independent pushkey command
1037 1037 for newremotehead in outdated:
1038 1038 r = pushop.remote.pushkey('phases',
1039 1039 newremotehead.hex(),
1040 1040 str(phases.draft),
1041 1041 str(phases.public))
1042 1042 if not r:
1043 1043 pushop.ui.warn(_('updating %s to public failed!\n')
1044 1044 % newremotehead)
1045 1045
1046 1046 def _localphasemove(pushop, nodes, phase=phases.public):
1047 1047 """move <nodes> to <phase> in the local source repo"""
1048 1048 if pushop.trmanager:
1049 1049 phases.advanceboundary(pushop.repo,
1050 1050 pushop.trmanager.transaction(),
1051 1051 phase,
1052 1052 nodes)
1053 1053 else:
1054 1054 # repo is not locked, do not change any phases!
1055 1055 # Informs the user that phases should have been moved when
1056 1056 # applicable.
1057 1057 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
1058 1058 phasestr = phases.phasenames[phase]
1059 1059 if actualmoves:
1060 1060 pushop.ui.status(_('cannot lock source repo, skipping '
1061 1061 'local %s phase update\n') % phasestr)
1062 1062
1063 1063 def _pushobsolete(pushop):
1064 1064 """utility function to push obsolete markers to a remote"""
1065 1065 if 'obsmarkers' in pushop.stepsdone:
1066 1066 return
1067 1067 repo = pushop.repo
1068 1068 remote = pushop.remote
1069 1069 pushop.stepsdone.add('obsmarkers')
1070 1070 if pushop.outobsmarkers:
1071 1071 pushop.ui.debug('try to push obsolete markers to remote\n')
1072 1072 rslts = []
1073 1073 remotedata = obsolete._pushkeyescape(sorted(pushop.outobsmarkers))
1074 1074 for key in sorted(remotedata, reverse=True):
1075 1075 # reverse sort to ensure we end with dump0
1076 1076 data = remotedata[key]
1077 1077 rslts.append(remote.pushkey('obsolete', key, '', data))
1078 1078 if [r for r in rslts if not r]:
1079 1079 msg = _('failed to push some obsolete markers!\n')
1080 1080 repo.ui.warn(msg)
1081 1081
1082 1082 def _pushbookmark(pushop):
1083 1083 """Update bookmark position on remote"""
1084 1084 if pushop.cgresult == 0 or 'bookmarks' in pushop.stepsdone:
1085 1085 return
1086 1086 pushop.stepsdone.add('bookmarks')
1087 1087 ui = pushop.ui
1088 1088 remote = pushop.remote
1089 1089
1090 1090 for b, old, new in pushop.outbookmarks:
1091 1091 action = 'update'
1092 1092 if not old:
1093 1093 action = 'export'
1094 1094 elif not new:
1095 1095 action = 'delete'
1096 1096 if remote.pushkey('bookmarks', b, old, new):
1097 1097 ui.status(bookmsgmap[action][0] % b)
1098 1098 else:
1099 1099 ui.warn(bookmsgmap[action][1] % b)
1100 1100 # discovery can have set the value form invalid entry
1101 1101 if pushop.bkresult is not None:
1102 1102 pushop.bkresult = 1
1103 1103
1104 1104 class pulloperation(object):
1105 1105 """A object that represent a single pull operation
1106 1106
1107 1107 It purpose is to carry pull related state and very common operation.
1108 1108
1109 1109 A new should be created at the beginning of each pull and discarded
1110 1110 afterward.
1111 1111 """
1112 1112
1113 1113 def __init__(self, repo, remote, heads=None, force=False, bookmarks=(),
1114 1114 remotebookmarks=None, streamclonerequested=None):
1115 1115 # repo we pull into
1116 1116 self.repo = repo
1117 1117 # repo we pull from
1118 1118 self.remote = remote
1119 1119 # revision we try to pull (None is "all")
1120 1120 self.heads = heads
1121 1121 # bookmark pulled explicitly
1122 1122 self.explicitbookmarks = [repo._bookmarks.expandname(bookmark)
1123 1123 for bookmark in bookmarks]
1124 1124 # do we force pull?
1125 1125 self.force = force
1126 1126 # whether a streaming clone was requested
1127 1127 self.streamclonerequested = streamclonerequested
1128 1128 # transaction manager
1129 1129 self.trmanager = None
1130 1130 # set of common changeset between local and remote before pull
1131 1131 self.common = None
1132 1132 # set of pulled head
1133 1133 self.rheads = None
1134 1134 # list of missing changeset to fetch remotely
1135 1135 self.fetch = None
1136 1136 # remote bookmarks data
1137 1137 self.remotebookmarks = remotebookmarks
1138 1138 # result of changegroup pulling (used as return code by pull)
1139 1139 self.cgresult = None
1140 1140 # list of step already done
1141 1141 self.stepsdone = set()
1142 1142 # Whether we attempted a clone from pre-generated bundles.
1143 1143 self.clonebundleattempted = False
1144 1144
1145 1145 @util.propertycache
1146 1146 def pulledsubset(self):
1147 1147 """heads of the set of changeset target by the pull"""
1148 1148 # compute target subset
1149 1149 if self.heads is None:
1150 1150 # We pulled every thing possible
1151 1151 # sync on everything common
1152 1152 c = set(self.common)
1153 1153 ret = list(self.common)
1154 1154 for n in self.rheads:
1155 1155 if n not in c:
1156 1156 ret.append(n)
1157 1157 return ret
1158 1158 else:
1159 1159 # We pulled a specific subset
1160 1160 # sync on this subset
1161 1161 return self.heads
1162 1162
1163 1163 @util.propertycache
1164 1164 def canusebundle2(self):
1165 1165 return not _forcebundle1(self)
1166 1166
1167 1167 @util.propertycache
1168 1168 def remotebundle2caps(self):
1169 1169 return bundle2.bundle2caps(self.remote)
1170 1170
1171 1171 def gettransaction(self):
1172 1172 # deprecated; talk to trmanager directly
1173 1173 return self.trmanager.transaction()
1174 1174
1175 1175 class transactionmanager(util.transactional):
1176 1176 """An object to manage the life cycle of a transaction
1177 1177
1178 1178 It creates the transaction on demand and calls the appropriate hooks when
1179 1179 closing the transaction."""
1180 1180 def __init__(self, repo, source, url):
1181 1181 self.repo = repo
1182 1182 self.source = source
1183 1183 self.url = url
1184 1184 self._tr = None
1185 1185
1186 1186 def transaction(self):
1187 1187 """Return an open transaction object, constructing if necessary"""
1188 1188 if not self._tr:
1189 1189 trname = '%s\n%s' % (self.source, util.hidepassword(self.url))
1190 1190 self._tr = self.repo.transaction(trname)
1191 1191 self._tr.hookargs['source'] = self.source
1192 1192 self._tr.hookargs['url'] = self.url
1193 1193 return self._tr
1194 1194
1195 1195 def close(self):
1196 1196 """close transaction if created"""
1197 1197 if self._tr is not None:
1198 1198 self._tr.close()
1199 1199
1200 1200 def release(self):
1201 1201 """release transaction if created"""
1202 1202 if self._tr is not None:
1203 1203 self._tr.release()
1204 1204
1205 1205 def pull(repo, remote, heads=None, force=False, bookmarks=(), opargs=None,
1206 1206 streamclonerequested=None):
1207 1207 """Fetch repository data from a remote.
1208 1208
1209 1209 This is the main function used to retrieve data from a remote repository.
1210 1210
1211 1211 ``repo`` is the local repository to clone into.
1212 1212 ``remote`` is a peer instance.
1213 1213 ``heads`` is an iterable of revisions we want to pull. ``None`` (the
1214 1214 default) means to pull everything from the remote.
1215 1215 ``bookmarks`` is an iterable of bookmarks requesting to be pulled. By
1216 1216 default, all remote bookmarks are pulled.
1217 1217 ``opargs`` are additional keyword arguments to pass to ``pulloperation``
1218 1218 initialization.
1219 1219 ``streamclonerequested`` is a boolean indicating whether a "streaming
1220 1220 clone" is requested. A "streaming clone" is essentially a raw file copy
1221 1221 of revlogs from the server. This only works when the local repository is
1222 1222 empty. The default value of ``None`` means to respect the server
1223 1223 configuration for preferring stream clones.
1224 1224
1225 1225 Returns the ``pulloperation`` created for this pull.
1226 1226 """
1227 1227 if opargs is None:
1228 1228 opargs = {}
1229 1229 pullop = pulloperation(repo, remote, heads, force, bookmarks=bookmarks,
1230 1230 streamclonerequested=streamclonerequested, **opargs)
1231 1231
1232 1232 peerlocal = pullop.remote.local()
1233 1233 if peerlocal:
1234 1234 missing = set(peerlocal.requirements) - pullop.repo.supported
1235 1235 if missing:
1236 1236 msg = _("required features are not"
1237 1237 " supported in the destination:"
1238 1238 " %s") % (', '.join(sorted(missing)))
1239 1239 raise error.Abort(msg)
1240 1240
1241 1241 wlock = lock = None
1242 1242 try:
1243 1243 wlock = pullop.repo.wlock()
1244 1244 lock = pullop.repo.lock()
1245 1245 pullop.trmanager = transactionmanager(repo, 'pull', remote.url())
1246 1246 streamclone.maybeperformlegacystreamclone(pullop)
1247 1247 # This should ideally be in _pullbundle2(). However, it needs to run
1248 1248 # before discovery to avoid extra work.
1249 1249 _maybeapplyclonebundle(pullop)
1250 1250 _pulldiscovery(pullop)
1251 1251 if pullop.canusebundle2:
1252 1252 _pullbundle2(pullop)
1253 1253 _pullchangeset(pullop)
1254 1254 _pullphase(pullop)
1255 1255 _pullbookmarks(pullop)
1256 1256 _pullobsolete(pullop)
1257 1257 pullop.trmanager.close()
1258 1258 finally:
1259 1259 lockmod.release(pullop.trmanager, lock, wlock)
1260 1260
1261 1261 return pullop
1262 1262
1263 1263 # list of steps to perform discovery before pull
1264 1264 pulldiscoveryorder = []
1265 1265
1266 1266 # Mapping between step name and function
1267 1267 #
1268 1268 # This exists to help extensions wrap steps if necessary
1269 1269 pulldiscoverymapping = {}
1270 1270
1271 1271 def pulldiscovery(stepname):
1272 1272 """decorator for function performing discovery before pull
1273 1273
1274 1274 The function is added to the step -> function mapping and appended to the
1275 1275 list of steps. Beware that decorated function will be added in order (this
1276 1276 may matter).
1277 1277
1278 1278 You can only use this decorator for a new step, if you want to wrap a step
1279 1279 from an extension, change the pulldiscovery dictionary directly."""
1280 1280 def dec(func):
1281 1281 assert stepname not in pulldiscoverymapping
1282 1282 pulldiscoverymapping[stepname] = func
1283 1283 pulldiscoveryorder.append(stepname)
1284 1284 return func
1285 1285 return dec
1286 1286
1287 1287 def _pulldiscovery(pullop):
1288 1288 """Run all discovery steps"""
1289 1289 for stepname in pulldiscoveryorder:
1290 1290 step = pulldiscoverymapping[stepname]
1291 1291 step(pullop)
1292 1292
1293 1293 @pulldiscovery('b1:bookmarks')
1294 1294 def _pullbookmarkbundle1(pullop):
1295 1295 """fetch bookmark data in bundle1 case
1296 1296
1297 1297 If not using bundle2, we have to fetch bookmarks before changeset
1298 1298 discovery to reduce the chance and impact of race conditions."""
1299 1299 if pullop.remotebookmarks is not None:
1300 1300 return
1301 1301 if pullop.canusebundle2 and 'listkeys' in pullop.remotebundle2caps:
1302 1302 # all known bundle2 servers now support listkeys, but lets be nice with
1303 1303 # new implementation.
1304 1304 return
1305 1305 pullop.remotebookmarks = pullop.remote.listkeys('bookmarks')
1306 1306
1307 1307
1308 1308 @pulldiscovery('changegroup')
1309 1309 def _pulldiscoverychangegroup(pullop):
1310 1310 """discovery phase for the pull
1311 1311
1312 1312 Current handle changeset discovery only, will change handle all discovery
1313 1313 at some point."""
1314 1314 tmp = discovery.findcommonincoming(pullop.repo,
1315 1315 pullop.remote,
1316 1316 heads=pullop.heads,
1317 1317 force=pullop.force)
1318 1318 common, fetch, rheads = tmp
1319 1319 nm = pullop.repo.unfiltered().changelog.nodemap
1320 1320 if fetch and rheads:
1321 1321 # If a remote heads in filtered locally, lets drop it from the unknown
1322 1322 # remote heads and put in back in common.
1323 1323 #
1324 1324 # This is a hackish solution to catch most of "common but locally
1325 1325 # hidden situation". We do not performs discovery on unfiltered
1326 1326 # repository because it end up doing a pathological amount of round
1327 1327 # trip for w huge amount of changeset we do not care about.
1328 1328 #
1329 1329 # If a set of such "common but filtered" changeset exist on the server
1330 1330 # but are not including a remote heads, we'll not be able to detect it,
1331 1331 scommon = set(common)
1332 1332 filteredrheads = []
1333 1333 for n in rheads:
1334 1334 if n in nm:
1335 1335 if n not in scommon:
1336 1336 common.append(n)
1337 1337 else:
1338 1338 filteredrheads.append(n)
1339 1339 if not filteredrheads:
1340 1340 fetch = []
1341 1341 rheads = filteredrheads
1342 1342 pullop.common = common
1343 1343 pullop.fetch = fetch
1344 1344 pullop.rheads = rheads
1345 1345
1346 1346 def _pullbundle2(pullop):
1347 1347 """pull data using bundle2
1348 1348
1349 1349 For now, the only supported data are changegroup."""
1350 1350 kwargs = {'bundlecaps': caps20to10(pullop.repo)}
1351 1351
1352 1352 # At the moment we don't do stream clones over bundle2. If that is
1353 1353 # implemented then here's where the check for that will go.
1354 1354 streaming = False
1355 1355
1356 1356 # pulling changegroup
1357 1357 pullop.stepsdone.add('changegroup')
1358 1358
1359 1359 kwargs['common'] = pullop.common
1360 1360 kwargs['heads'] = pullop.heads or pullop.rheads
1361 1361 kwargs['cg'] = pullop.fetch
1362 1362 if 'listkeys' in pullop.remotebundle2caps:
1363 1363 kwargs['listkeys'] = ['phases']
1364 1364 if pullop.remotebookmarks is None:
1365 1365 # make sure to always includes bookmark data when migrating
1366 1366 # `hg incoming --bundle` to using this function.
1367 1367 kwargs['listkeys'].append('bookmarks')
1368 1368
1369 1369 # If this is a full pull / clone and the server supports the clone bundles
1370 1370 # feature, tell the server whether we attempted a clone bundle. The
1371 1371 # presence of this flag indicates the client supports clone bundles. This
1372 1372 # will enable the server to treat clients that support clone bundles
1373 1373 # differently from those that don't.
1374 1374 if (pullop.remote.capable('clonebundles')
1375 1375 and pullop.heads is None and list(pullop.common) == [nullid]):
1376 1376 kwargs['cbattempted'] = pullop.clonebundleattempted
1377 1377
1378 1378 if streaming:
1379 1379 pullop.repo.ui.status(_('streaming all changes\n'))
1380 1380 elif not pullop.fetch:
1381 1381 pullop.repo.ui.status(_("no changes found\n"))
1382 1382 pullop.cgresult = 0
1383 1383 else:
1384 1384 if pullop.heads is None and list(pullop.common) == [nullid]:
1385 1385 pullop.repo.ui.status(_("requesting all changes\n"))
1386 1386 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1387 1387 remoteversions = bundle2.obsmarkersversion(pullop.remotebundle2caps)
1388 1388 if obsolete.commonversion(remoteversions) is not None:
1389 1389 kwargs['obsmarkers'] = True
1390 1390 pullop.stepsdone.add('obsmarkers')
1391 1391 _pullbundle2extraprepare(pullop, kwargs)
1392 1392 bundle = pullop.remote.getbundle('pull', **pycompat.strkwargs(kwargs))
1393 1393 try:
1394 1394 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
1395 1395 except bundle2.AbortFromPart as exc:
1396 1396 pullop.repo.ui.status(_('remote: abort: %s\n') % exc)
1397 1397 raise error.Abort(_('pull failed on remote'), hint=exc.hint)
1398 1398 except error.BundleValueError as exc:
1399 1399 raise error.Abort(_('missing support for %s') % exc)
1400 1400
1401 1401 if pullop.fetch:
1402 1402 pullop.cgresult = bundle2.combinechangegroupresults(op)
1403 1403
1404 # If the bundle had a phase-heads part, then phase exchange is already done
1405 if op.records['phase-heads']:
1406 pullop.stepsdone.add('phases')
1407
1404 1408 # processing phases change
1405 1409 for namespace, value in op.records['listkeys']:
1406 1410 if namespace == 'phases':
1407 1411 _pullapplyphases(pullop, value)
1408 1412
1409 1413 # processing bookmark update
1410 1414 for namespace, value in op.records['listkeys']:
1411 1415 if namespace == 'bookmarks':
1412 1416 pullop.remotebookmarks = value
1413 1417
1414 1418 # bookmark data were either already there or pulled in the bundle
1415 1419 if pullop.remotebookmarks is not None:
1416 1420 _pullbookmarks(pullop)
1417 1421
1418 1422 def _pullbundle2extraprepare(pullop, kwargs):
1419 1423 """hook function so that extensions can extend the getbundle call"""
1420 1424 pass
1421 1425
1422 1426 def _pullchangeset(pullop):
1423 1427 """pull changeset from unbundle into the local repo"""
1424 1428 # We delay the open of the transaction as late as possible so we
1425 1429 # don't open transaction for nothing or you break future useful
1426 1430 # rollback call
1427 1431 if 'changegroup' in pullop.stepsdone:
1428 1432 return
1429 1433 pullop.stepsdone.add('changegroup')
1430 1434 if not pullop.fetch:
1431 1435 pullop.repo.ui.status(_("no changes found\n"))
1432 1436 pullop.cgresult = 0
1433 1437 return
1434 1438 tr = pullop.gettransaction()
1435 1439 if pullop.heads is None and list(pullop.common) == [nullid]:
1436 1440 pullop.repo.ui.status(_("requesting all changes\n"))
1437 1441 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
1438 1442 # issue1320, avoid a race if remote changed after discovery
1439 1443 pullop.heads = pullop.rheads
1440 1444
1441 1445 if pullop.remote.capable('getbundle'):
1442 1446 # TODO: get bundlecaps from remote
1443 1447 cg = pullop.remote.getbundle('pull', common=pullop.common,
1444 1448 heads=pullop.heads or pullop.rheads)
1445 1449 elif pullop.heads is None:
1446 1450 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
1447 1451 elif not pullop.remote.capable('changegroupsubset'):
1448 1452 raise error.Abort(_("partial pull cannot be done because "
1449 1453 "other repository doesn't support "
1450 1454 "changegroupsubset."))
1451 1455 else:
1452 1456 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
1453 1457 bundleop = bundle2.applybundle(pullop.repo, cg, tr, 'pull',
1454 1458 pullop.remote.url())
1455 1459 pullop.cgresult = bundle2.combinechangegroupresults(bundleop)
1456 1460
1457 1461 def _pullphase(pullop):
1458 1462 # Get remote phases data from remote
1459 1463 if 'phases' in pullop.stepsdone:
1460 1464 return
1461 1465 remotephases = pullop.remote.listkeys('phases')
1462 1466 _pullapplyphases(pullop, remotephases)
1463 1467
1464 1468 def _pullapplyphases(pullop, remotephases):
1465 1469 """apply phase movement from observed remote state"""
1466 1470 if 'phases' in pullop.stepsdone:
1467 1471 return
1468 1472 pullop.stepsdone.add('phases')
1469 1473 publishing = bool(remotephases.get('publishing', False))
1470 1474 if remotephases and not publishing:
1471 1475 # remote is new and non-publishing
1472 1476 pheads, _dr = phases.analyzeremotephases(pullop.repo,
1473 1477 pullop.pulledsubset,
1474 1478 remotephases)
1475 1479 dheads = pullop.pulledsubset
1476 1480 else:
1477 1481 # Remote is old or publishing all common changesets
1478 1482 # should be seen as public
1479 1483 pheads = pullop.pulledsubset
1480 1484 dheads = []
1481 1485 unfi = pullop.repo.unfiltered()
1482 1486 phase = unfi._phasecache.phase
1483 1487 rev = unfi.changelog.nodemap.get
1484 1488 public = phases.public
1485 1489 draft = phases.draft
1486 1490
1487 1491 # exclude changesets already public locally and update the others
1488 1492 pheads = [pn for pn in pheads if phase(unfi, rev(pn)) > public]
1489 1493 if pheads:
1490 1494 tr = pullop.gettransaction()
1491 1495 phases.advanceboundary(pullop.repo, tr, public, pheads)
1492 1496
1493 1497 # exclude changesets already draft locally and update the others
1494 1498 dheads = [pn for pn in dheads if phase(unfi, rev(pn)) > draft]
1495 1499 if dheads:
1496 1500 tr = pullop.gettransaction()
1497 1501 phases.advanceboundary(pullop.repo, tr, draft, dheads)
1498 1502
1499 1503 def _pullbookmarks(pullop):
1500 1504 """process the remote bookmark information to update the local one"""
1501 1505 if 'bookmarks' in pullop.stepsdone:
1502 1506 return
1503 1507 pullop.stepsdone.add('bookmarks')
1504 1508 repo = pullop.repo
1505 1509 remotebookmarks = pullop.remotebookmarks
1506 1510 remotebookmarks = bookmod.unhexlifybookmarks(remotebookmarks)
1507 1511 bookmod.updatefromremote(repo.ui, repo, remotebookmarks,
1508 1512 pullop.remote.url(),
1509 1513 pullop.gettransaction,
1510 1514 explicit=pullop.explicitbookmarks)
1511 1515
1512 1516 def _pullobsolete(pullop):
1513 1517 """utility function to pull obsolete markers from a remote
1514 1518
1515 1519 The `gettransaction` is function that return the pull transaction, creating
1516 1520 one if necessary. We return the transaction to inform the calling code that
1517 1521 a new transaction have been created (when applicable).
1518 1522
1519 1523 Exists mostly to allow overriding for experimentation purpose"""
1520 1524 if 'obsmarkers' in pullop.stepsdone:
1521 1525 return
1522 1526 pullop.stepsdone.add('obsmarkers')
1523 1527 tr = None
1524 1528 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1525 1529 pullop.repo.ui.debug('fetching remote obsolete markers\n')
1526 1530 remoteobs = pullop.remote.listkeys('obsolete')
1527 1531 if 'dump0' in remoteobs:
1528 1532 tr = pullop.gettransaction()
1529 1533 markers = []
1530 1534 for key in sorted(remoteobs, reverse=True):
1531 1535 if key.startswith('dump'):
1532 1536 data = util.b85decode(remoteobs[key])
1533 1537 version, newmarks = obsolete._readmarkers(data)
1534 1538 markers += newmarks
1535 1539 if markers:
1536 1540 pullop.repo.obsstore.add(tr, markers)
1537 1541 pullop.repo.invalidatevolatilesets()
1538 1542 return tr
1539 1543
1540 1544 def caps20to10(repo):
1541 1545 """return a set with appropriate options to use bundle20 during getbundle"""
1542 1546 caps = {'HG20'}
1543 1547 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
1544 1548 caps.add('bundle2=' + urlreq.quote(capsblob))
1545 1549 return caps
1546 1550
1547 1551 # List of names of steps to perform for a bundle2 for getbundle, order matters.
1548 1552 getbundle2partsorder = []
1549 1553
1550 1554 # Mapping between step name and function
1551 1555 #
1552 1556 # This exists to help extensions wrap steps if necessary
1553 1557 getbundle2partsmapping = {}
1554 1558
1555 1559 def getbundle2partsgenerator(stepname, idx=None):
1556 1560 """decorator for function generating bundle2 part for getbundle
1557 1561
1558 1562 The function is added to the step -> function mapping and appended to the
1559 1563 list of steps. Beware that decorated functions will be added in order
1560 1564 (this may matter).
1561 1565
1562 1566 You can only use this decorator for new steps, if you want to wrap a step
1563 1567 from an extension, attack the getbundle2partsmapping dictionary directly."""
1564 1568 def dec(func):
1565 1569 assert stepname not in getbundle2partsmapping
1566 1570 getbundle2partsmapping[stepname] = func
1567 1571 if idx is None:
1568 1572 getbundle2partsorder.append(stepname)
1569 1573 else:
1570 1574 getbundle2partsorder.insert(idx, stepname)
1571 1575 return func
1572 1576 return dec
1573 1577
1574 1578 def bundle2requested(bundlecaps):
1575 1579 if bundlecaps is not None:
1576 1580 return any(cap.startswith('HG2') for cap in bundlecaps)
1577 1581 return False
1578 1582
1579 1583 def getbundlechunks(repo, source, heads=None, common=None, bundlecaps=None,
1580 1584 **kwargs):
1581 1585 """Return chunks constituting a bundle's raw data.
1582 1586
1583 1587 Could be a bundle HG10 or a bundle HG20 depending on bundlecaps
1584 1588 passed.
1585 1589
1586 1590 Returns an iterator over raw chunks (of varying sizes).
1587 1591 """
1588 1592 kwargs = pycompat.byteskwargs(kwargs)
1589 1593 usebundle2 = bundle2requested(bundlecaps)
1590 1594 # bundle10 case
1591 1595 if not usebundle2:
1592 1596 if bundlecaps and not kwargs.get('cg', True):
1593 1597 raise ValueError(_('request for bundle10 must include changegroup'))
1594 1598
1595 1599 if kwargs:
1596 1600 raise ValueError(_('unsupported getbundle arguments: %s')
1597 1601 % ', '.join(sorted(kwargs.keys())))
1598 1602 outgoing = _computeoutgoing(repo, heads, common)
1599 1603 bundler = changegroup.getbundler('01', repo, bundlecaps)
1600 1604 return changegroup.getsubsetraw(repo, outgoing, bundler, source)
1601 1605
1602 1606 # bundle20 case
1603 1607 b2caps = {}
1604 1608 for bcaps in bundlecaps:
1605 1609 if bcaps.startswith('bundle2='):
1606 1610 blob = urlreq.unquote(bcaps[len('bundle2='):])
1607 1611 b2caps.update(bundle2.decodecaps(blob))
1608 1612 bundler = bundle2.bundle20(repo.ui, b2caps)
1609 1613
1610 1614 kwargs['heads'] = heads
1611 1615 kwargs['common'] = common
1612 1616
1613 1617 for name in getbundle2partsorder:
1614 1618 func = getbundle2partsmapping[name]
1615 1619 func(bundler, repo, source, bundlecaps=bundlecaps, b2caps=b2caps,
1616 1620 **pycompat.strkwargs(kwargs))
1617 1621
1618 1622 return bundler.getchunks()
1619 1623
1620 1624 @getbundle2partsgenerator('changegroup')
1621 1625 def _getbundlechangegrouppart(bundler, repo, source, bundlecaps=None,
1622 1626 b2caps=None, heads=None, common=None, **kwargs):
1623 1627 """add a changegroup part to the requested bundle"""
1624 1628 cg = None
1625 1629 if kwargs.get('cg', True):
1626 1630 # build changegroup bundle here.
1627 1631 version = '01'
1628 1632 cgversions = b2caps.get('changegroup')
1629 1633 if cgversions: # 3.1 and 3.2 ship with an empty value
1630 1634 cgversions = [v for v in cgversions
1631 1635 if v in changegroup.supportedoutgoingversions(repo)]
1632 1636 if not cgversions:
1633 1637 raise ValueError(_('no common changegroup version'))
1634 1638 version = max(cgversions)
1635 1639 outgoing = _computeoutgoing(repo, heads, common)
1636 1640 cg = changegroup.getlocalchangegroupraw(repo, source, outgoing,
1637 1641 bundlecaps=bundlecaps,
1638 1642 version=version)
1639 1643
1640 1644 if cg:
1641 1645 part = bundler.newpart('changegroup', data=cg)
1642 1646 if cgversions:
1643 1647 part.addparam('version', version)
1644 1648 part.addparam('nbchanges', str(len(outgoing.missing)), mandatory=False)
1645 1649 if 'treemanifest' in repo.requirements:
1646 1650 part.addparam('treemanifest', '1')
1647 1651
1648 1652 @getbundle2partsgenerator('listkeys')
1649 1653 def _getbundlelistkeysparts(bundler, repo, source, bundlecaps=None,
1650 1654 b2caps=None, **kwargs):
1651 1655 """add parts containing listkeys namespaces to the requested bundle"""
1652 1656 listkeys = kwargs.get('listkeys', ())
1653 1657 for namespace in listkeys:
1654 1658 part = bundler.newpart('listkeys')
1655 1659 part.addparam('namespace', namespace)
1656 1660 keys = repo.listkeys(namespace).items()
1657 1661 part.data = pushkey.encodekeys(keys)
1658 1662
1659 1663 @getbundle2partsgenerator('obsmarkers')
1660 1664 def _getbundleobsmarkerpart(bundler, repo, source, bundlecaps=None,
1661 1665 b2caps=None, heads=None, **kwargs):
1662 1666 """add an obsolescence markers part to the requested bundle"""
1663 1667 if kwargs.get('obsmarkers', False):
1664 1668 if heads is None:
1665 1669 heads = repo.heads()
1666 1670 subset = [c.node() for c in repo.set('::%ln', heads)]
1667 1671 markers = repo.obsstore.relevantmarkers(subset)
1668 1672 markers = sorted(markers)
1669 1673 bundle2.buildobsmarkerspart(bundler, markers)
1670 1674
1671 1675 @getbundle2partsgenerator('hgtagsfnodes')
1672 1676 def _getbundletagsfnodes(bundler, repo, source, bundlecaps=None,
1673 1677 b2caps=None, heads=None, common=None,
1674 1678 **kwargs):
1675 1679 """Transfer the .hgtags filenodes mapping.
1676 1680
1677 1681 Only values for heads in this bundle will be transferred.
1678 1682
1679 1683 The part data consists of pairs of 20 byte changeset node and .hgtags
1680 1684 filenodes raw values.
1681 1685 """
1682 1686 # Don't send unless:
1683 1687 # - changeset are being exchanged,
1684 1688 # - the client supports it.
1685 1689 if not (kwargs.get('cg', True) and 'hgtagsfnodes' in b2caps):
1686 1690 return
1687 1691
1688 1692 outgoing = _computeoutgoing(repo, heads, common)
1689 1693 bundle2.addparttagsfnodescache(repo, bundler, outgoing)
1690 1694
1691 1695 def _getbookmarks(repo, **kwargs):
1692 1696 """Returns bookmark to node mapping.
1693 1697
1694 1698 This function is primarily used to generate `bookmarks` bundle2 part.
1695 1699 It is a separate function in order to make it easy to wrap it
1696 1700 in extensions. Passing `kwargs` to the function makes it easy to
1697 1701 add new parameters in extensions.
1698 1702 """
1699 1703
1700 1704 return dict(bookmod.listbinbookmarks(repo))
1701 1705
1702 1706 def check_heads(repo, their_heads, context):
1703 1707 """check if the heads of a repo have been modified
1704 1708
1705 1709 Used by peer for unbundling.
1706 1710 """
1707 1711 heads = repo.heads()
1708 1712 heads_hash = hashlib.sha1(''.join(sorted(heads))).digest()
1709 1713 if not (their_heads == ['force'] or their_heads == heads or
1710 1714 their_heads == ['hashed', heads_hash]):
1711 1715 # someone else committed/pushed/unbundled while we
1712 1716 # were transferring data
1713 1717 raise error.PushRaced('repository changed while %s - '
1714 1718 'please try again' % context)
1715 1719
1716 1720 def unbundle(repo, cg, heads, source, url):
1717 1721 """Apply a bundle to a repo.
1718 1722
1719 1723 this function makes sure the repo is locked during the application and have
1720 1724 mechanism to check that no push race occurred between the creation of the
1721 1725 bundle and its application.
1722 1726
1723 1727 If the push was raced as PushRaced exception is raised."""
1724 1728 r = 0
1725 1729 # need a transaction when processing a bundle2 stream
1726 1730 # [wlock, lock, tr] - needs to be an array so nested functions can modify it
1727 1731 lockandtr = [None, None, None]
1728 1732 recordout = None
1729 1733 # quick fix for output mismatch with bundle2 in 3.4
1730 1734 captureoutput = repo.ui.configbool('experimental', 'bundle2-output-capture')
1731 1735 if url.startswith('remote:http:') or url.startswith('remote:https:'):
1732 1736 captureoutput = True
1733 1737 try:
1734 1738 # note: outside bundle1, 'heads' is expected to be empty and this
1735 1739 # 'check_heads' call wil be a no-op
1736 1740 check_heads(repo, heads, 'uploading changes')
1737 1741 # push can proceed
1738 1742 if not isinstance(cg, bundle2.unbundle20):
1739 1743 # legacy case: bundle1 (changegroup 01)
1740 1744 txnname = "\n".join([source, util.hidepassword(url)])
1741 1745 with repo.lock(), repo.transaction(txnname) as tr:
1742 1746 op = bundle2.applybundle(repo, cg, tr, source, url)
1743 1747 r = bundle2.combinechangegroupresults(op)
1744 1748 else:
1745 1749 r = None
1746 1750 try:
1747 1751 def gettransaction():
1748 1752 if not lockandtr[2]:
1749 1753 lockandtr[0] = repo.wlock()
1750 1754 lockandtr[1] = repo.lock()
1751 1755 lockandtr[2] = repo.transaction(source)
1752 1756 lockandtr[2].hookargs['source'] = source
1753 1757 lockandtr[2].hookargs['url'] = url
1754 1758 lockandtr[2].hookargs['bundle2'] = '1'
1755 1759 return lockandtr[2]
1756 1760
1757 1761 # Do greedy locking by default until we're satisfied with lazy
1758 1762 # locking.
1759 1763 if not repo.ui.configbool('experimental', 'bundle2lazylocking'):
1760 1764 gettransaction()
1761 1765
1762 1766 op = bundle2.bundleoperation(repo, gettransaction,
1763 1767 captureoutput=captureoutput)
1764 1768 try:
1765 1769 op = bundle2.processbundle(repo, cg, op=op)
1766 1770 finally:
1767 1771 r = op.reply
1768 1772 if captureoutput and r is not None:
1769 1773 repo.ui.pushbuffer(error=True, subproc=True)
1770 1774 def recordout(output):
1771 1775 r.newpart('output', data=output, mandatory=False)
1772 1776 if lockandtr[2] is not None:
1773 1777 lockandtr[2].close()
1774 1778 except BaseException as exc:
1775 1779 exc.duringunbundle2 = True
1776 1780 if captureoutput and r is not None:
1777 1781 parts = exc._bundle2salvagedoutput = r.salvageoutput()
1778 1782 def recordout(output):
1779 1783 part = bundle2.bundlepart('output', data=output,
1780 1784 mandatory=False)
1781 1785 parts.append(part)
1782 1786 raise
1783 1787 finally:
1784 1788 lockmod.release(lockandtr[2], lockandtr[1], lockandtr[0])
1785 1789 if recordout is not None:
1786 1790 recordout(repo.ui.popbuffer())
1787 1791 return r
1788 1792
1789 1793 def _maybeapplyclonebundle(pullop):
1790 1794 """Apply a clone bundle from a remote, if possible."""
1791 1795
1792 1796 repo = pullop.repo
1793 1797 remote = pullop.remote
1794 1798
1795 1799 if not repo.ui.configbool('ui', 'clonebundles'):
1796 1800 return
1797 1801
1798 1802 # Only run if local repo is empty.
1799 1803 if len(repo):
1800 1804 return
1801 1805
1802 1806 if pullop.heads:
1803 1807 return
1804 1808
1805 1809 if not remote.capable('clonebundles'):
1806 1810 return
1807 1811
1808 1812 res = remote._call('clonebundles')
1809 1813
1810 1814 # If we call the wire protocol command, that's good enough to record the
1811 1815 # attempt.
1812 1816 pullop.clonebundleattempted = True
1813 1817
1814 1818 entries = parseclonebundlesmanifest(repo, res)
1815 1819 if not entries:
1816 1820 repo.ui.note(_('no clone bundles available on remote; '
1817 1821 'falling back to regular clone\n'))
1818 1822 return
1819 1823
1820 1824 entries = filterclonebundleentries(repo, entries)
1821 1825 if not entries:
1822 1826 # There is a thundering herd concern here. However, if a server
1823 1827 # operator doesn't advertise bundles appropriate for its clients,
1824 1828 # they deserve what's coming. Furthermore, from a client's
1825 1829 # perspective, no automatic fallback would mean not being able to
1826 1830 # clone!
1827 1831 repo.ui.warn(_('no compatible clone bundles available on server; '
1828 1832 'falling back to regular clone\n'))
1829 1833 repo.ui.warn(_('(you may want to report this to the server '
1830 1834 'operator)\n'))
1831 1835 return
1832 1836
1833 1837 entries = sortclonebundleentries(repo.ui, entries)
1834 1838
1835 1839 url = entries[0]['URL']
1836 1840 repo.ui.status(_('applying clone bundle from %s\n') % url)
1837 1841 if trypullbundlefromurl(repo.ui, repo, url):
1838 1842 repo.ui.status(_('finished applying clone bundle\n'))
1839 1843 # Bundle failed.
1840 1844 #
1841 1845 # We abort by default to avoid the thundering herd of
1842 1846 # clients flooding a server that was expecting expensive
1843 1847 # clone load to be offloaded.
1844 1848 elif repo.ui.configbool('ui', 'clonebundlefallback'):
1845 1849 repo.ui.warn(_('falling back to normal clone\n'))
1846 1850 else:
1847 1851 raise error.Abort(_('error applying bundle'),
1848 1852 hint=_('if this error persists, consider contacting '
1849 1853 'the server operator or disable clone '
1850 1854 'bundles via '
1851 1855 '"--config ui.clonebundles=false"'))
1852 1856
1853 1857 def parseclonebundlesmanifest(repo, s):
1854 1858 """Parses the raw text of a clone bundles manifest.
1855 1859
1856 1860 Returns a list of dicts. The dicts have a ``URL`` key corresponding
1857 1861 to the URL and other keys are the attributes for the entry.
1858 1862 """
1859 1863 m = []
1860 1864 for line in s.splitlines():
1861 1865 fields = line.split()
1862 1866 if not fields:
1863 1867 continue
1864 1868 attrs = {'URL': fields[0]}
1865 1869 for rawattr in fields[1:]:
1866 1870 key, value = rawattr.split('=', 1)
1867 1871 key = urlreq.unquote(key)
1868 1872 value = urlreq.unquote(value)
1869 1873 attrs[key] = value
1870 1874
1871 1875 # Parse BUNDLESPEC into components. This makes client-side
1872 1876 # preferences easier to specify since you can prefer a single
1873 1877 # component of the BUNDLESPEC.
1874 1878 if key == 'BUNDLESPEC':
1875 1879 try:
1876 1880 comp, version, params = parsebundlespec(repo, value,
1877 1881 externalnames=True)
1878 1882 attrs['COMPRESSION'] = comp
1879 1883 attrs['VERSION'] = version
1880 1884 except error.InvalidBundleSpecification:
1881 1885 pass
1882 1886 except error.UnsupportedBundleSpecification:
1883 1887 pass
1884 1888
1885 1889 m.append(attrs)
1886 1890
1887 1891 return m
1888 1892
1889 1893 def filterclonebundleentries(repo, entries):
1890 1894 """Remove incompatible clone bundle manifest entries.
1891 1895
1892 1896 Accepts a list of entries parsed with ``parseclonebundlesmanifest``
1893 1897 and returns a new list consisting of only the entries that this client
1894 1898 should be able to apply.
1895 1899
1896 1900 There is no guarantee we'll be able to apply all returned entries because
1897 1901 the metadata we use to filter on may be missing or wrong.
1898 1902 """
1899 1903 newentries = []
1900 1904 for entry in entries:
1901 1905 spec = entry.get('BUNDLESPEC')
1902 1906 if spec:
1903 1907 try:
1904 1908 parsebundlespec(repo, spec, strict=True)
1905 1909 except error.InvalidBundleSpecification as e:
1906 1910 repo.ui.debug(str(e) + '\n')
1907 1911 continue
1908 1912 except error.UnsupportedBundleSpecification as e:
1909 1913 repo.ui.debug('filtering %s because unsupported bundle '
1910 1914 'spec: %s\n' % (entry['URL'], str(e)))
1911 1915 continue
1912 1916
1913 1917 if 'REQUIRESNI' in entry and not sslutil.hassni:
1914 1918 repo.ui.debug('filtering %s because SNI not supported\n' %
1915 1919 entry['URL'])
1916 1920 continue
1917 1921
1918 1922 newentries.append(entry)
1919 1923
1920 1924 return newentries
1921 1925
1922 1926 class clonebundleentry(object):
1923 1927 """Represents an item in a clone bundles manifest.
1924 1928
1925 1929 This rich class is needed to support sorting since sorted() in Python 3
1926 1930 doesn't support ``cmp`` and our comparison is complex enough that ``key=``
1927 1931 won't work.
1928 1932 """
1929 1933
1930 1934 def __init__(self, value, prefers):
1931 1935 self.value = value
1932 1936 self.prefers = prefers
1933 1937
1934 1938 def _cmp(self, other):
1935 1939 for prefkey, prefvalue in self.prefers:
1936 1940 avalue = self.value.get(prefkey)
1937 1941 bvalue = other.value.get(prefkey)
1938 1942
1939 1943 # Special case for b missing attribute and a matches exactly.
1940 1944 if avalue is not None and bvalue is None and avalue == prefvalue:
1941 1945 return -1
1942 1946
1943 1947 # Special case for a missing attribute and b matches exactly.
1944 1948 if bvalue is not None and avalue is None and bvalue == prefvalue:
1945 1949 return 1
1946 1950
1947 1951 # We can't compare unless attribute present on both.
1948 1952 if avalue is None or bvalue is None:
1949 1953 continue
1950 1954
1951 1955 # Same values should fall back to next attribute.
1952 1956 if avalue == bvalue:
1953 1957 continue
1954 1958
1955 1959 # Exact matches come first.
1956 1960 if avalue == prefvalue:
1957 1961 return -1
1958 1962 if bvalue == prefvalue:
1959 1963 return 1
1960 1964
1961 1965 # Fall back to next attribute.
1962 1966 continue
1963 1967
1964 1968 # If we got here we couldn't sort by attributes and prefers. Fall
1965 1969 # back to index order.
1966 1970 return 0
1967 1971
1968 1972 def __lt__(self, other):
1969 1973 return self._cmp(other) < 0
1970 1974
1971 1975 def __gt__(self, other):
1972 1976 return self._cmp(other) > 0
1973 1977
1974 1978 def __eq__(self, other):
1975 1979 return self._cmp(other) == 0
1976 1980
1977 1981 def __le__(self, other):
1978 1982 return self._cmp(other) <= 0
1979 1983
1980 1984 def __ge__(self, other):
1981 1985 return self._cmp(other) >= 0
1982 1986
1983 1987 def __ne__(self, other):
1984 1988 return self._cmp(other) != 0
1985 1989
1986 1990 def sortclonebundleentries(ui, entries):
1987 1991 prefers = ui.configlist('ui', 'clonebundleprefers')
1988 1992 if not prefers:
1989 1993 return list(entries)
1990 1994
1991 1995 prefers = [p.split('=', 1) for p in prefers]
1992 1996
1993 1997 items = sorted(clonebundleentry(v, prefers) for v in entries)
1994 1998 return [i.value for i in items]
1995 1999
1996 2000 def trypullbundlefromurl(ui, repo, url):
1997 2001 """Attempt to apply a bundle from a URL."""
1998 2002 with repo.lock(), repo.transaction('bundleurl') as tr:
1999 2003 try:
2000 2004 fh = urlmod.open(ui, url)
2001 2005 cg = readbundle(ui, fh, 'stream')
2002 2006
2003 2007 if isinstance(cg, streamclone.streamcloneapplier):
2004 2008 cg.apply(repo)
2005 2009 else:
2006 2010 bundle2.applybundle(repo, cg, tr, 'clonebundles', url)
2007 2011 return True
2008 2012 except urlerr.httperror as e:
2009 2013 ui.warn(_('HTTP error fetching bundle: %s\n') % str(e))
2010 2014 except urlerr.urlerror as e:
2011 2015 ui.warn(_('error fetching bundle: %s\n') % e.reason)
2012 2016
2013 2017 return False
General Comments 0
You need to be logged in to leave comments. Login now