##// END OF EJS Templates
bundles: turn nbchanges int into a bytestr using pycompat.bytestr...
Augie Fackler -
r34218:8e035802 default
parent child Browse files
Show More
@@ -1,1917 +1,1917 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import, division
149 149
150 150 import errno
151 151 import re
152 152 import string
153 153 import struct
154 154 import sys
155 155
156 156 from .i18n import _
157 157 from . import (
158 158 changegroup,
159 159 error,
160 160 obsolete,
161 161 phases,
162 162 pushkey,
163 163 pycompat,
164 164 tags,
165 165 url,
166 166 util,
167 167 )
168 168
169 169 urlerr = util.urlerr
170 170 urlreq = util.urlreq
171 171
172 172 _pack = struct.pack
173 173 _unpack = struct.unpack
174 174
175 175 _fstreamparamsize = '>i'
176 176 _fpartheadersize = '>i'
177 177 _fparttypesize = '>B'
178 178 _fpartid = '>I'
179 179 _fpayloadsize = '>i'
180 180 _fpartparamcount = '>BB'
181 181
182 182 _fphasesentry = '>i20s'
183 183
184 184 preferedchunksize = 4096
185 185
186 186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
187 187
188 188 def outdebug(ui, message):
189 189 """debug regarding output stream (bundling)"""
190 190 if ui.configbool('devel', 'bundle2.debug'):
191 191 ui.debug('bundle2-output: %s\n' % message)
192 192
193 193 def indebug(ui, message):
194 194 """debug on input stream (unbundling)"""
195 195 if ui.configbool('devel', 'bundle2.debug'):
196 196 ui.debug('bundle2-input: %s\n' % message)
197 197
198 198 def validateparttype(parttype):
199 199 """raise ValueError if a parttype contains invalid character"""
200 200 if _parttypeforbidden.search(parttype):
201 201 raise ValueError(parttype)
202 202
203 203 def _makefpartparamsizes(nbparams):
204 204 """return a struct format to read part parameter sizes
205 205
206 206 The number parameters is variable so we need to build that format
207 207 dynamically.
208 208 """
209 209 return '>'+('BB'*nbparams)
210 210
211 211 parthandlermapping = {}
212 212
213 213 def parthandler(parttype, params=()):
214 214 """decorator that register a function as a bundle2 part handler
215 215
216 216 eg::
217 217
218 218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
219 219 def myparttypehandler(...):
220 220 '''process a part of type "my part".'''
221 221 ...
222 222 """
223 223 validateparttype(parttype)
224 224 def _decorator(func):
225 225 lparttype = parttype.lower() # enforce lower case matching.
226 226 assert lparttype not in parthandlermapping
227 227 parthandlermapping[lparttype] = func
228 228 func.params = frozenset(params)
229 229 return func
230 230 return _decorator
231 231
232 232 class unbundlerecords(object):
233 233 """keep record of what happens during and unbundle
234 234
235 235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
236 236 category of record and obj is an arbitrary object.
237 237
238 238 `records['cat']` will return all entries of this category 'cat'.
239 239
240 240 Iterating on the object itself will yield `('category', obj)` tuples
241 241 for all entries.
242 242
243 243 All iterations happens in chronological order.
244 244 """
245 245
246 246 def __init__(self):
247 247 self._categories = {}
248 248 self._sequences = []
249 249 self._replies = {}
250 250
251 251 def add(self, category, entry, inreplyto=None):
252 252 """add a new record of a given category.
253 253
254 254 The entry can then be retrieved in the list returned by
255 255 self['category']."""
256 256 self._categories.setdefault(category, []).append(entry)
257 257 self._sequences.append((category, entry))
258 258 if inreplyto is not None:
259 259 self.getreplies(inreplyto).add(category, entry)
260 260
261 261 def getreplies(self, partid):
262 262 """get the records that are replies to a specific part"""
263 263 return self._replies.setdefault(partid, unbundlerecords())
264 264
265 265 def __getitem__(self, cat):
266 266 return tuple(self._categories.get(cat, ()))
267 267
268 268 def __iter__(self):
269 269 return iter(self._sequences)
270 270
271 271 def __len__(self):
272 272 return len(self._sequences)
273 273
274 274 def __nonzero__(self):
275 275 return bool(self._sequences)
276 276
277 277 __bool__ = __nonzero__
278 278
279 279 class bundleoperation(object):
280 280 """an object that represents a single bundling process
281 281
282 282 Its purpose is to carry unbundle-related objects and states.
283 283
284 284 A new object should be created at the beginning of each bundle processing.
285 285 The object is to be returned by the processing function.
286 286
287 287 The object has very little content now it will ultimately contain:
288 288 * an access to the repo the bundle is applied to,
289 289 * a ui object,
290 290 * a way to retrieve a transaction to add changes to the repo,
291 291 * a way to record the result of processing each part,
292 292 * a way to construct a bundle response when applicable.
293 293 """
294 294
295 295 def __init__(self, repo, transactiongetter, captureoutput=True):
296 296 self.repo = repo
297 297 self.ui = repo.ui
298 298 self.records = unbundlerecords()
299 299 self.reply = None
300 300 self.captureoutput = captureoutput
301 301 self.hookargs = {}
302 302 self._gettransaction = transactiongetter
303 303
304 304 def gettransaction(self):
305 305 transaction = self._gettransaction()
306 306
307 307 if self.hookargs:
308 308 # the ones added to the transaction supercede those added
309 309 # to the operation.
310 310 self.hookargs.update(transaction.hookargs)
311 311 transaction.hookargs = self.hookargs
312 312
313 313 # mark the hookargs as flushed. further attempts to add to
314 314 # hookargs will result in an abort.
315 315 self.hookargs = None
316 316
317 317 return transaction
318 318
319 319 def addhookargs(self, hookargs):
320 320 if self.hookargs is None:
321 321 raise error.ProgrammingError('attempted to add hookargs to '
322 322 'operation after transaction started')
323 323 self.hookargs.update(hookargs)
324 324
325 325 class TransactionUnavailable(RuntimeError):
326 326 pass
327 327
328 328 def _notransaction():
329 329 """default method to get a transaction while processing a bundle
330 330
331 331 Raise an exception to highlight the fact that no transaction was expected
332 332 to be created"""
333 333 raise TransactionUnavailable()
334 334
335 335 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
336 336 # transform me into unbundler.apply() as soon as the freeze is lifted
337 337 if isinstance(unbundler, unbundle20):
338 338 tr.hookargs['bundle2'] = '1'
339 339 if source is not None and 'source' not in tr.hookargs:
340 340 tr.hookargs['source'] = source
341 341 if url is not None and 'url' not in tr.hookargs:
342 342 tr.hookargs['url'] = url
343 343 return processbundle(repo, unbundler, lambda: tr)
344 344 else:
345 345 # the transactiongetter won't be used, but we might as well set it
346 346 op = bundleoperation(repo, lambda: tr)
347 347 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
348 348 return op
349 349
350 350 class partiterator(object):
351 351 def __init__(self, repo, op, unbundler):
352 352 self.repo = repo
353 353 self.op = op
354 354 self.unbundler = unbundler
355 355 self.iterator = None
356 356 self.count = 0
357 357
358 358 def __enter__(self):
359 359 def func():
360 360 itr = enumerate(self.unbundler.iterparts())
361 361 for count, p in itr:
362 362 self.count = count
363 363 yield p
364 364 self.iterator = func()
365 365 return self.iterator
366 366
367 367 def __exit__(self, type, exc, tb):
368 368 if not self.iterator:
369 369 return
370 370
371 371 if exc:
372 372 # Any exceptions seeking to the end of the bundle at this point are
373 373 # almost certainly related to the underlying stream being bad.
374 374 # And, chances are that the exception we're handling is related to
375 375 # getting in that bad state. So, we swallow the seeking error and
376 376 # re-raise the original error.
377 377 seekerror = False
378 378 try:
379 379 for part in self.iterator:
380 380 # consume the bundle content
381 381 part.seek(0, 2)
382 382 except Exception:
383 383 seekerror = True
384 384
385 385 # Small hack to let caller code distinguish exceptions from bundle2
386 386 # processing from processing the old format. This is mostly needed
387 387 # to handle different return codes to unbundle according to the type
388 388 # of bundle. We should probably clean up or drop this return code
389 389 # craziness in a future version.
390 390 exc.duringunbundle2 = True
391 391 salvaged = []
392 392 replycaps = None
393 393 if self.op.reply is not None:
394 394 salvaged = self.op.reply.salvageoutput()
395 395 replycaps = self.op.reply.capabilities
396 396 exc._replycaps = replycaps
397 397 exc._bundle2salvagedoutput = salvaged
398 398
399 399 # Re-raising from a variable loses the original stack. So only use
400 400 # that form if we need to.
401 401 if seekerror:
402 402 raise exc
403 403
404 404 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
405 405 self.count)
406 406
407 407 def processbundle(repo, unbundler, transactiongetter=None, op=None):
408 408 """This function process a bundle, apply effect to/from a repo
409 409
410 410 It iterates over each part then searches for and uses the proper handling
411 411 code to process the part. Parts are processed in order.
412 412
413 413 Unknown Mandatory part will abort the process.
414 414
415 415 It is temporarily possible to provide a prebuilt bundleoperation to the
416 416 function. This is used to ensure output is properly propagated in case of
417 417 an error during the unbundling. This output capturing part will likely be
418 418 reworked and this ability will probably go away in the process.
419 419 """
420 420 if op is None:
421 421 if transactiongetter is None:
422 422 transactiongetter = _notransaction
423 423 op = bundleoperation(repo, transactiongetter)
424 424 # todo:
425 425 # - replace this is a init function soon.
426 426 # - exception catching
427 427 unbundler.params
428 428 if repo.ui.debugflag:
429 429 msg = ['bundle2-input-bundle:']
430 430 if unbundler.params:
431 431 msg.append(' %i params' % len(unbundler.params))
432 432 if op._gettransaction is None or op._gettransaction is _notransaction:
433 433 msg.append(' no-transaction')
434 434 else:
435 435 msg.append(' with-transaction')
436 436 msg.append('\n')
437 437 repo.ui.debug(''.join(msg))
438 438
439 439 with partiterator(repo, op, unbundler) as parts:
440 440 for part in parts:
441 441 _processpart(op, part)
442 442
443 443 return op
444 444
445 445 def _processchangegroup(op, cg, tr, source, url, **kwargs):
446 446 ret = cg.apply(op.repo, tr, source, url, **kwargs)
447 447 op.records.add('changegroup', {
448 448 'return': ret,
449 449 })
450 450 return ret
451 451
452 452 def _processpart(op, part):
453 453 """process a single part from a bundle
454 454
455 455 The part is guaranteed to have been fully consumed when the function exits
456 456 (even if an exception is raised)."""
457 457 status = 'unknown' # used by debug output
458 458 hardabort = False
459 459 try:
460 460 try:
461 461 handler = parthandlermapping.get(part.type)
462 462 if handler is None:
463 463 status = 'unsupported-type'
464 464 raise error.BundleUnknownFeatureError(parttype=part.type)
465 465 indebug(op.ui, 'found a handler for part %r' % part.type)
466 466 unknownparams = part.mandatorykeys - handler.params
467 467 if unknownparams:
468 468 unknownparams = list(unknownparams)
469 469 unknownparams.sort()
470 470 status = 'unsupported-params (%s)' % unknownparams
471 471 raise error.BundleUnknownFeatureError(parttype=part.type,
472 472 params=unknownparams)
473 473 status = 'supported'
474 474 except error.BundleUnknownFeatureError as exc:
475 475 if part.mandatory: # mandatory parts
476 476 raise
477 477 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
478 478 return # skip to part processing
479 479 finally:
480 480 if op.ui.debugflag:
481 481 msg = ['bundle2-input-part: "%s"' % part.type]
482 482 if not part.mandatory:
483 483 msg.append(' (advisory)')
484 484 nbmp = len(part.mandatorykeys)
485 485 nbap = len(part.params) - nbmp
486 486 if nbmp or nbap:
487 487 msg.append(' (params:')
488 488 if nbmp:
489 489 msg.append(' %i mandatory' % nbmp)
490 490 if nbap:
491 491 msg.append(' %i advisory' % nbmp)
492 492 msg.append(')')
493 493 msg.append(' %s\n' % status)
494 494 op.ui.debug(''.join(msg))
495 495
496 496 # handler is called outside the above try block so that we don't
497 497 # risk catching KeyErrors from anything other than the
498 498 # parthandlermapping lookup (any KeyError raised by handler()
499 499 # itself represents a defect of a different variety).
500 500 output = None
501 501 if op.captureoutput and op.reply is not None:
502 502 op.ui.pushbuffer(error=True, subproc=True)
503 503 output = ''
504 504 try:
505 505 handler(op, part)
506 506 finally:
507 507 if output is not None:
508 508 output = op.ui.popbuffer()
509 509 if output:
510 510 outpart = op.reply.newpart('output', data=output,
511 511 mandatory=False)
512 512 outpart.addparam(
513 513 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
514 514 # If exiting or interrupted, do not attempt to seek the stream in the
515 515 # finally block below. This makes abort faster.
516 516 except (SystemExit, KeyboardInterrupt):
517 517 hardabort = True
518 518 raise
519 519 finally:
520 520 # consume the part content to not corrupt the stream.
521 521 if not hardabort:
522 522 part.seek(0, 2)
523 523
524 524
525 525 def decodecaps(blob):
526 526 """decode a bundle2 caps bytes blob into a dictionary
527 527
528 528 The blob is a list of capabilities (one per line)
529 529 Capabilities may have values using a line of the form::
530 530
531 531 capability=value1,value2,value3
532 532
533 533 The values are always a list."""
534 534 caps = {}
535 535 for line in blob.splitlines():
536 536 if not line:
537 537 continue
538 538 if '=' not in line:
539 539 key, vals = line, ()
540 540 else:
541 541 key, vals = line.split('=', 1)
542 542 vals = vals.split(',')
543 543 key = urlreq.unquote(key)
544 544 vals = [urlreq.unquote(v) for v in vals]
545 545 caps[key] = vals
546 546 return caps
547 547
548 548 def encodecaps(caps):
549 549 """encode a bundle2 caps dictionary into a bytes blob"""
550 550 chunks = []
551 551 for ca in sorted(caps):
552 552 vals = caps[ca]
553 553 ca = urlreq.quote(ca)
554 554 vals = [urlreq.quote(v) for v in vals]
555 555 if vals:
556 556 ca = "%s=%s" % (ca, ','.join(vals))
557 557 chunks.append(ca)
558 558 return '\n'.join(chunks)
559 559
560 560 bundletypes = {
561 561 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
562 562 # since the unification ssh accepts a header but there
563 563 # is no capability signaling it.
564 564 "HG20": (), # special-cased below
565 565 "HG10UN": ("HG10UN", 'UN'),
566 566 "HG10BZ": ("HG10", 'BZ'),
567 567 "HG10GZ": ("HG10GZ", 'GZ'),
568 568 }
569 569
570 570 # hgweb uses this list to communicate its preferred type
571 571 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
572 572
573 573 class bundle20(object):
574 574 """represent an outgoing bundle2 container
575 575
576 576 Use the `addparam` method to add stream level parameter. and `newpart` to
577 577 populate it. Then call `getchunks` to retrieve all the binary chunks of
578 578 data that compose the bundle2 container."""
579 579
580 580 _magicstring = 'HG20'
581 581
582 582 def __init__(self, ui, capabilities=()):
583 583 self.ui = ui
584 584 self._params = []
585 585 self._parts = []
586 586 self.capabilities = dict(capabilities)
587 587 self._compengine = util.compengines.forbundletype('UN')
588 588 self._compopts = None
589 589
590 590 def setcompression(self, alg, compopts=None):
591 591 """setup core part compression to <alg>"""
592 592 if alg in (None, 'UN'):
593 593 return
594 594 assert not any(n.lower() == 'compression' for n, v in self._params)
595 595 self.addparam('Compression', alg)
596 596 self._compengine = util.compengines.forbundletype(alg)
597 597 self._compopts = compopts
598 598
599 599 @property
600 600 def nbparts(self):
601 601 """total number of parts added to the bundler"""
602 602 return len(self._parts)
603 603
604 604 # methods used to defines the bundle2 content
605 605 def addparam(self, name, value=None):
606 606 """add a stream level parameter"""
607 607 if not name:
608 608 raise ValueError('empty parameter name')
609 609 if name[0] not in pycompat.bytestr(string.ascii_letters):
610 610 raise ValueError('non letter first character: %r' % name)
611 611 self._params.append((name, value))
612 612
613 613 def addpart(self, part):
614 614 """add a new part to the bundle2 container
615 615
616 616 Parts contains the actual applicative payload."""
617 617 assert part.id is None
618 618 part.id = len(self._parts) # very cheap counter
619 619 self._parts.append(part)
620 620
621 621 def newpart(self, typeid, *args, **kwargs):
622 622 """create a new part and add it to the containers
623 623
624 624 As the part is directly added to the containers. For now, this means
625 625 that any failure to properly initialize the part after calling
626 626 ``newpart`` should result in a failure of the whole bundling process.
627 627
628 628 You can still fall back to manually create and add if you need better
629 629 control."""
630 630 part = bundlepart(typeid, *args, **kwargs)
631 631 self.addpart(part)
632 632 return part
633 633
634 634 # methods used to generate the bundle2 stream
635 635 def getchunks(self):
636 636 if self.ui.debugflag:
637 637 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
638 638 if self._params:
639 639 msg.append(' (%i params)' % len(self._params))
640 640 msg.append(' %i parts total\n' % len(self._parts))
641 641 self.ui.debug(''.join(msg))
642 642 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
643 643 yield self._magicstring
644 644 param = self._paramchunk()
645 645 outdebug(self.ui, 'bundle parameter: %s' % param)
646 646 yield _pack(_fstreamparamsize, len(param))
647 647 if param:
648 648 yield param
649 649 for chunk in self._compengine.compressstream(self._getcorechunk(),
650 650 self._compopts):
651 651 yield chunk
652 652
653 653 def _paramchunk(self):
654 654 """return a encoded version of all stream parameters"""
655 655 blocks = []
656 656 for par, value in self._params:
657 657 par = urlreq.quote(par)
658 658 if value is not None:
659 659 value = urlreq.quote(value)
660 660 par = '%s=%s' % (par, value)
661 661 blocks.append(par)
662 662 return ' '.join(blocks)
663 663
664 664 def _getcorechunk(self):
665 665 """yield chunk for the core part of the bundle
666 666
667 667 (all but headers and parameters)"""
668 668 outdebug(self.ui, 'start of parts')
669 669 for part in self._parts:
670 670 outdebug(self.ui, 'bundle part: "%s"' % part.type)
671 671 for chunk in part.getchunks(ui=self.ui):
672 672 yield chunk
673 673 outdebug(self.ui, 'end of bundle')
674 674 yield _pack(_fpartheadersize, 0)
675 675
676 676
677 677 def salvageoutput(self):
678 678 """return a list with a copy of all output parts in the bundle
679 679
680 680 This is meant to be used during error handling to make sure we preserve
681 681 server output"""
682 682 salvaged = []
683 683 for part in self._parts:
684 684 if part.type.startswith('output'):
685 685 salvaged.append(part.copy())
686 686 return salvaged
687 687
688 688
689 689 class unpackermixin(object):
690 690 """A mixin to extract bytes and struct data from a stream"""
691 691
692 692 def __init__(self, fp):
693 693 self._fp = fp
694 694
695 695 def _unpack(self, format):
696 696 """unpack this struct format from the stream
697 697
698 698 This method is meant for internal usage by the bundle2 protocol only.
699 699 They directly manipulate the low level stream including bundle2 level
700 700 instruction.
701 701
702 702 Do not use it to implement higher-level logic or methods."""
703 703 data = self._readexact(struct.calcsize(format))
704 704 return _unpack(format, data)
705 705
706 706 def _readexact(self, size):
707 707 """read exactly <size> bytes from the stream
708 708
709 709 This method is meant for internal usage by the bundle2 protocol only.
710 710 They directly manipulate the low level stream including bundle2 level
711 711 instruction.
712 712
713 713 Do not use it to implement higher-level logic or methods."""
714 714 return changegroup.readexactly(self._fp, size)
715 715
716 716 def getunbundler(ui, fp, magicstring=None):
717 717 """return a valid unbundler object for a given magicstring"""
718 718 if magicstring is None:
719 719 magicstring = changegroup.readexactly(fp, 4)
720 720 magic, version = magicstring[0:2], magicstring[2:4]
721 721 if magic != 'HG':
722 722 ui.debug(
723 723 "error: invalid magic: %r (version %r), should be 'HG'\n"
724 724 % (magic, version))
725 725 raise error.Abort(_('not a Mercurial bundle'))
726 726 unbundlerclass = formatmap.get(version)
727 727 if unbundlerclass is None:
728 728 raise error.Abort(_('unknown bundle version %s') % version)
729 729 unbundler = unbundlerclass(ui, fp)
730 730 indebug(ui, 'start processing of %s stream' % magicstring)
731 731 return unbundler
732 732
733 733 class unbundle20(unpackermixin):
734 734 """interpret a bundle2 stream
735 735
736 736 This class is fed with a binary stream and yields parts through its
737 737 `iterparts` methods."""
738 738
739 739 _magicstring = 'HG20'
740 740
741 741 def __init__(self, ui, fp):
742 742 """If header is specified, we do not read it out of the stream."""
743 743 self.ui = ui
744 744 self._compengine = util.compengines.forbundletype('UN')
745 745 self._compressed = None
746 746 super(unbundle20, self).__init__(fp)
747 747
748 748 @util.propertycache
749 749 def params(self):
750 750 """dictionary of stream level parameters"""
751 751 indebug(self.ui, 'reading bundle2 stream parameters')
752 752 params = {}
753 753 paramssize = self._unpack(_fstreamparamsize)[0]
754 754 if paramssize < 0:
755 755 raise error.BundleValueError('negative bundle param size: %i'
756 756 % paramssize)
757 757 if paramssize:
758 758 params = self._readexact(paramssize)
759 759 params = self._processallparams(params)
760 760 return params
761 761
762 762 def _processallparams(self, paramsblock):
763 763 """"""
764 764 params = util.sortdict()
765 765 for p in paramsblock.split(' '):
766 766 p = p.split('=', 1)
767 767 p = [urlreq.unquote(i) for i in p]
768 768 if len(p) < 2:
769 769 p.append(None)
770 770 self._processparam(*p)
771 771 params[p[0]] = p[1]
772 772 return params
773 773
774 774
775 775 def _processparam(self, name, value):
776 776 """process a parameter, applying its effect if needed
777 777
778 778 Parameter starting with a lower case letter are advisory and will be
779 779 ignored when unknown. Those starting with an upper case letter are
780 780 mandatory and will this function will raise a KeyError when unknown.
781 781
782 782 Note: no option are currently supported. Any input will be either
783 783 ignored or failing.
784 784 """
785 785 if not name:
786 786 raise ValueError('empty parameter name')
787 787 if name[0] not in pycompat.bytestr(string.ascii_letters):
788 788 raise ValueError('non letter first character: %r' % name)
789 789 try:
790 790 handler = b2streamparamsmap[name.lower()]
791 791 except KeyError:
792 792 if name[0].islower():
793 793 indebug(self.ui, "ignoring unknown parameter %r" % name)
794 794 else:
795 795 raise error.BundleUnknownFeatureError(params=(name,))
796 796 else:
797 797 handler(self, name, value)
798 798
799 799 def _forwardchunks(self):
800 800 """utility to transfer a bundle2 as binary
801 801
802 802 This is made necessary by the fact the 'getbundle' command over 'ssh'
803 803 have no way to know then the reply end, relying on the bundle to be
804 804 interpreted to know its end. This is terrible and we are sorry, but we
805 805 needed to move forward to get general delta enabled.
806 806 """
807 807 yield self._magicstring
808 808 assert 'params' not in vars(self)
809 809 paramssize = self._unpack(_fstreamparamsize)[0]
810 810 if paramssize < 0:
811 811 raise error.BundleValueError('negative bundle param size: %i'
812 812 % paramssize)
813 813 yield _pack(_fstreamparamsize, paramssize)
814 814 if paramssize:
815 815 params = self._readexact(paramssize)
816 816 self._processallparams(params)
817 817 yield params
818 818 assert self._compengine.bundletype == 'UN'
819 819 # From there, payload might need to be decompressed
820 820 self._fp = self._compengine.decompressorreader(self._fp)
821 821 emptycount = 0
822 822 while emptycount < 2:
823 823 # so we can brainlessly loop
824 824 assert _fpartheadersize == _fpayloadsize
825 825 size = self._unpack(_fpartheadersize)[0]
826 826 yield _pack(_fpartheadersize, size)
827 827 if size:
828 828 emptycount = 0
829 829 else:
830 830 emptycount += 1
831 831 continue
832 832 if size == flaginterrupt:
833 833 continue
834 834 elif size < 0:
835 835 raise error.BundleValueError('negative chunk size: %i')
836 836 yield self._readexact(size)
837 837
838 838
839 839 def iterparts(self):
840 840 """yield all parts contained in the stream"""
841 841 # make sure param have been loaded
842 842 self.params
843 843 # From there, payload need to be decompressed
844 844 self._fp = self._compengine.decompressorreader(self._fp)
845 845 indebug(self.ui, 'start extraction of bundle2 parts')
846 846 headerblock = self._readpartheader()
847 847 while headerblock is not None:
848 848 part = unbundlepart(self.ui, headerblock, self._fp)
849 849 yield part
850 850 # Seek to the end of the part to force it's consumption so the next
851 851 # part can be read. But then seek back to the beginning so the
852 852 # code consuming this generator has a part that starts at 0.
853 853 part.seek(0, 2)
854 854 part.seek(0)
855 855 headerblock = self._readpartheader()
856 856 indebug(self.ui, 'end of bundle2 stream')
857 857
858 858 def _readpartheader(self):
859 859 """reads a part header size and return the bytes blob
860 860
861 861 returns None if empty"""
862 862 headersize = self._unpack(_fpartheadersize)[0]
863 863 if headersize < 0:
864 864 raise error.BundleValueError('negative part header size: %i'
865 865 % headersize)
866 866 indebug(self.ui, 'part header size: %i' % headersize)
867 867 if headersize:
868 868 return self._readexact(headersize)
869 869 return None
870 870
871 871 def compressed(self):
872 872 self.params # load params
873 873 return self._compressed
874 874
875 875 def close(self):
876 876 """close underlying file"""
877 877 if util.safehasattr(self._fp, 'close'):
878 878 return self._fp.close()
879 879
880 880 formatmap = {'20': unbundle20}
881 881
882 882 b2streamparamsmap = {}
883 883
884 884 def b2streamparamhandler(name):
885 885 """register a handler for a stream level parameter"""
886 886 def decorator(func):
887 887 assert name not in formatmap
888 888 b2streamparamsmap[name] = func
889 889 return func
890 890 return decorator
891 891
892 892 @b2streamparamhandler('compression')
893 893 def processcompression(unbundler, param, value):
894 894 """read compression parameter and install payload decompression"""
895 895 if value not in util.compengines.supportedbundletypes:
896 896 raise error.BundleUnknownFeatureError(params=(param,),
897 897 values=(value,))
898 898 unbundler._compengine = util.compengines.forbundletype(value)
899 899 if value is not None:
900 900 unbundler._compressed = True
901 901
902 902 class bundlepart(object):
903 903 """A bundle2 part contains application level payload
904 904
905 905 The part `type` is used to route the part to the application level
906 906 handler.
907 907
908 908 The part payload is contained in ``part.data``. It could be raw bytes or a
909 909 generator of byte chunks.
910 910
911 911 You can add parameters to the part using the ``addparam`` method.
912 912 Parameters can be either mandatory (default) or advisory. Remote side
913 913 should be able to safely ignore the advisory ones.
914 914
915 915 Both data and parameters cannot be modified after the generation has begun.
916 916 """
917 917
918 918 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
919 919 data='', mandatory=True):
920 920 validateparttype(parttype)
921 921 self.id = None
922 922 self.type = parttype
923 923 self._data = data
924 924 self._mandatoryparams = list(mandatoryparams)
925 925 self._advisoryparams = list(advisoryparams)
926 926 # checking for duplicated entries
927 927 self._seenparams = set()
928 928 for pname, __ in self._mandatoryparams + self._advisoryparams:
929 929 if pname in self._seenparams:
930 930 raise error.ProgrammingError('duplicated params: %s' % pname)
931 931 self._seenparams.add(pname)
932 932 # status of the part's generation:
933 933 # - None: not started,
934 934 # - False: currently generated,
935 935 # - True: generation done.
936 936 self._generated = None
937 937 self.mandatory = mandatory
938 938
939 939 def __repr__(self):
940 940 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
941 941 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
942 942 % (cls, id(self), self.id, self.type, self.mandatory))
943 943
944 944 def copy(self):
945 945 """return a copy of the part
946 946
947 947 The new part have the very same content but no partid assigned yet.
948 948 Parts with generated data cannot be copied."""
949 949 assert not util.safehasattr(self.data, 'next')
950 950 return self.__class__(self.type, self._mandatoryparams,
951 951 self._advisoryparams, self._data, self.mandatory)
952 952
953 953 # methods used to defines the part content
954 954 @property
955 955 def data(self):
956 956 return self._data
957 957
958 958 @data.setter
959 959 def data(self, data):
960 960 if self._generated is not None:
961 961 raise error.ReadOnlyPartError('part is being generated')
962 962 self._data = data
963 963
964 964 @property
965 965 def mandatoryparams(self):
966 966 # make it an immutable tuple to force people through ``addparam``
967 967 return tuple(self._mandatoryparams)
968 968
969 969 @property
970 970 def advisoryparams(self):
971 971 # make it an immutable tuple to force people through ``addparam``
972 972 return tuple(self._advisoryparams)
973 973
974 974 def addparam(self, name, value='', mandatory=True):
975 975 """add a parameter to the part
976 976
977 977 If 'mandatory' is set to True, the remote handler must claim support
978 978 for this parameter or the unbundling will be aborted.
979 979
980 980 The 'name' and 'value' cannot exceed 255 bytes each.
981 981 """
982 982 if self._generated is not None:
983 983 raise error.ReadOnlyPartError('part is being generated')
984 984 if name in self._seenparams:
985 985 raise ValueError('duplicated params: %s' % name)
986 986 self._seenparams.add(name)
987 987 params = self._advisoryparams
988 988 if mandatory:
989 989 params = self._mandatoryparams
990 990 params.append((name, value))
991 991
992 992 # methods used to generates the bundle2 stream
993 993 def getchunks(self, ui):
994 994 if self._generated is not None:
995 995 raise error.ProgrammingError('part can only be consumed once')
996 996 self._generated = False
997 997
998 998 if ui.debugflag:
999 999 msg = ['bundle2-output-part: "%s"' % self.type]
1000 1000 if not self.mandatory:
1001 1001 msg.append(' (advisory)')
1002 1002 nbmp = len(self.mandatoryparams)
1003 1003 nbap = len(self.advisoryparams)
1004 1004 if nbmp or nbap:
1005 1005 msg.append(' (params:')
1006 1006 if nbmp:
1007 1007 msg.append(' %i mandatory' % nbmp)
1008 1008 if nbap:
1009 1009 msg.append(' %i advisory' % nbmp)
1010 1010 msg.append(')')
1011 1011 if not self.data:
1012 1012 msg.append(' empty payload')
1013 1013 elif util.safehasattr(self.data, 'next'):
1014 1014 msg.append(' streamed payload')
1015 1015 else:
1016 1016 msg.append(' %i bytes payload' % len(self.data))
1017 1017 msg.append('\n')
1018 1018 ui.debug(''.join(msg))
1019 1019
1020 1020 #### header
1021 1021 if self.mandatory:
1022 1022 parttype = self.type.upper()
1023 1023 else:
1024 1024 parttype = self.type.lower()
1025 1025 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1026 1026 ## parttype
1027 1027 header = [_pack(_fparttypesize, len(parttype)),
1028 1028 parttype, _pack(_fpartid, self.id),
1029 1029 ]
1030 1030 ## parameters
1031 1031 # count
1032 1032 manpar = self.mandatoryparams
1033 1033 advpar = self.advisoryparams
1034 1034 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1035 1035 # size
1036 1036 parsizes = []
1037 1037 for key, value in manpar:
1038 1038 parsizes.append(len(key))
1039 1039 parsizes.append(len(value))
1040 1040 for key, value in advpar:
1041 1041 parsizes.append(len(key))
1042 1042 parsizes.append(len(value))
1043 1043 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1044 1044 header.append(paramsizes)
1045 1045 # key, value
1046 1046 for key, value in manpar:
1047 1047 header.append(key)
1048 1048 header.append(value)
1049 1049 for key, value in advpar:
1050 1050 header.append(key)
1051 1051 header.append(value)
1052 1052 ## finalize header
1053 1053 headerchunk = ''.join(header)
1054 1054 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1055 1055 yield _pack(_fpartheadersize, len(headerchunk))
1056 1056 yield headerchunk
1057 1057 ## payload
1058 1058 try:
1059 1059 for chunk in self._payloadchunks():
1060 1060 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1061 1061 yield _pack(_fpayloadsize, len(chunk))
1062 1062 yield chunk
1063 1063 except GeneratorExit:
1064 1064 # GeneratorExit means that nobody is listening for our
1065 1065 # results anyway, so just bail quickly rather than trying
1066 1066 # to produce an error part.
1067 1067 ui.debug('bundle2-generatorexit\n')
1068 1068 raise
1069 1069 except BaseException as exc:
1070 1070 bexc = util.forcebytestr(exc)
1071 1071 # backup exception data for later
1072 1072 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1073 1073 % bexc)
1074 1074 tb = sys.exc_info()[2]
1075 1075 msg = 'unexpected error: %s' % bexc
1076 1076 interpart = bundlepart('error:abort', [('message', msg)],
1077 1077 mandatory=False)
1078 1078 interpart.id = 0
1079 1079 yield _pack(_fpayloadsize, -1)
1080 1080 for chunk in interpart.getchunks(ui=ui):
1081 1081 yield chunk
1082 1082 outdebug(ui, 'closing payload chunk')
1083 1083 # abort current part payload
1084 1084 yield _pack(_fpayloadsize, 0)
1085 1085 pycompat.raisewithtb(exc, tb)
1086 1086 # end of payload
1087 1087 outdebug(ui, 'closing payload chunk')
1088 1088 yield _pack(_fpayloadsize, 0)
1089 1089 self._generated = True
1090 1090
1091 1091 def _payloadchunks(self):
1092 1092 """yield chunks of a the part payload
1093 1093
1094 1094 Exists to handle the different methods to provide data to a part."""
1095 1095 # we only support fixed size data now.
1096 1096 # This will be improved in the future.
1097 1097 if (util.safehasattr(self.data, 'next')
1098 1098 or util.safehasattr(self.data, '__next__')):
1099 1099 buff = util.chunkbuffer(self.data)
1100 1100 chunk = buff.read(preferedchunksize)
1101 1101 while chunk:
1102 1102 yield chunk
1103 1103 chunk = buff.read(preferedchunksize)
1104 1104 elif len(self.data):
1105 1105 yield self.data
1106 1106
1107 1107
1108 1108 flaginterrupt = -1
1109 1109
1110 1110 class interrupthandler(unpackermixin):
1111 1111 """read one part and process it with restricted capability
1112 1112
1113 1113 This allows to transmit exception raised on the producer size during part
1114 1114 iteration while the consumer is reading a part.
1115 1115
1116 1116 Part processed in this manner only have access to a ui object,"""
1117 1117
1118 1118 def __init__(self, ui, fp):
1119 1119 super(interrupthandler, self).__init__(fp)
1120 1120 self.ui = ui
1121 1121
1122 1122 def _readpartheader(self):
1123 1123 """reads a part header size and return the bytes blob
1124 1124
1125 1125 returns None if empty"""
1126 1126 headersize = self._unpack(_fpartheadersize)[0]
1127 1127 if headersize < 0:
1128 1128 raise error.BundleValueError('negative part header size: %i'
1129 1129 % headersize)
1130 1130 indebug(self.ui, 'part header size: %i\n' % headersize)
1131 1131 if headersize:
1132 1132 return self._readexact(headersize)
1133 1133 return None
1134 1134
1135 1135 def __call__(self):
1136 1136
1137 1137 self.ui.debug('bundle2-input-stream-interrupt:'
1138 1138 ' opening out of band context\n')
1139 1139 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1140 1140 headerblock = self._readpartheader()
1141 1141 if headerblock is None:
1142 1142 indebug(self.ui, 'no part found during interruption.')
1143 1143 return
1144 1144 part = unbundlepart(self.ui, headerblock, self._fp)
1145 1145 op = interruptoperation(self.ui)
1146 1146 _processpart(op, part)
1147 1147 self.ui.debug('bundle2-input-stream-interrupt:'
1148 1148 ' closing out of band context\n')
1149 1149
1150 1150 class interruptoperation(object):
1151 1151 """A limited operation to be use by part handler during interruption
1152 1152
1153 1153 It only have access to an ui object.
1154 1154 """
1155 1155
1156 1156 def __init__(self, ui):
1157 1157 self.ui = ui
1158 1158 self.reply = None
1159 1159 self.captureoutput = False
1160 1160
1161 1161 @property
1162 1162 def repo(self):
1163 1163 raise error.ProgrammingError('no repo access from stream interruption')
1164 1164
1165 1165 def gettransaction(self):
1166 1166 raise TransactionUnavailable('no repo access from stream interruption')
1167 1167
1168 1168 class unbundlepart(unpackermixin):
1169 1169 """a bundle part read from a bundle"""
1170 1170
1171 1171 def __init__(self, ui, header, fp):
1172 1172 super(unbundlepart, self).__init__(fp)
1173 1173 self._seekable = (util.safehasattr(fp, 'seek') and
1174 1174 util.safehasattr(fp, 'tell'))
1175 1175 self.ui = ui
1176 1176 # unbundle state attr
1177 1177 self._headerdata = header
1178 1178 self._headeroffset = 0
1179 1179 self._initialized = False
1180 1180 self.consumed = False
1181 1181 # part data
1182 1182 self.id = None
1183 1183 self.type = None
1184 1184 self.mandatoryparams = None
1185 1185 self.advisoryparams = None
1186 1186 self.params = None
1187 1187 self.mandatorykeys = ()
1188 1188 self._payloadstream = None
1189 1189 self._readheader()
1190 1190 self._mandatory = None
1191 1191 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1192 1192 self._pos = 0
1193 1193
1194 1194 def _fromheader(self, size):
1195 1195 """return the next <size> byte from the header"""
1196 1196 offset = self._headeroffset
1197 1197 data = self._headerdata[offset:(offset + size)]
1198 1198 self._headeroffset = offset + size
1199 1199 return data
1200 1200
1201 1201 def _unpackheader(self, format):
1202 1202 """read given format from header
1203 1203
1204 1204 This automatically compute the size of the format to read."""
1205 1205 data = self._fromheader(struct.calcsize(format))
1206 1206 return _unpack(format, data)
1207 1207
1208 1208 def _initparams(self, mandatoryparams, advisoryparams):
1209 1209 """internal function to setup all logic related parameters"""
1210 1210 # make it read only to prevent people touching it by mistake.
1211 1211 self.mandatoryparams = tuple(mandatoryparams)
1212 1212 self.advisoryparams = tuple(advisoryparams)
1213 1213 # user friendly UI
1214 1214 self.params = util.sortdict(self.mandatoryparams)
1215 1215 self.params.update(self.advisoryparams)
1216 1216 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1217 1217
1218 1218 def _payloadchunks(self, chunknum=0):
1219 1219 '''seek to specified chunk and start yielding data'''
1220 1220 if len(self._chunkindex) == 0:
1221 1221 assert chunknum == 0, 'Must start with chunk 0'
1222 1222 self._chunkindex.append((0, self._tellfp()))
1223 1223 else:
1224 1224 assert chunknum < len(self._chunkindex), \
1225 1225 'Unknown chunk %d' % chunknum
1226 1226 self._seekfp(self._chunkindex[chunknum][1])
1227 1227
1228 1228 pos = self._chunkindex[chunknum][0]
1229 1229 payloadsize = self._unpack(_fpayloadsize)[0]
1230 1230 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1231 1231 while payloadsize:
1232 1232 if payloadsize == flaginterrupt:
1233 1233 # interruption detection, the handler will now read a
1234 1234 # single part and process it.
1235 1235 interrupthandler(self.ui, self._fp)()
1236 1236 elif payloadsize < 0:
1237 1237 msg = 'negative payload chunk size: %i' % payloadsize
1238 1238 raise error.BundleValueError(msg)
1239 1239 else:
1240 1240 result = self._readexact(payloadsize)
1241 1241 chunknum += 1
1242 1242 pos += payloadsize
1243 1243 if chunknum == len(self._chunkindex):
1244 1244 self._chunkindex.append((pos, self._tellfp()))
1245 1245 yield result
1246 1246 payloadsize = self._unpack(_fpayloadsize)[0]
1247 1247 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1248 1248
1249 1249 def _findchunk(self, pos):
1250 1250 '''for a given payload position, return a chunk number and offset'''
1251 1251 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1252 1252 if ppos == pos:
1253 1253 return chunk, 0
1254 1254 elif ppos > pos:
1255 1255 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1256 1256 raise ValueError('Unknown chunk')
1257 1257
1258 1258 def _readheader(self):
1259 1259 """read the header and setup the object"""
1260 1260 typesize = self._unpackheader(_fparttypesize)[0]
1261 1261 self.type = self._fromheader(typesize)
1262 1262 indebug(self.ui, 'part type: "%s"' % self.type)
1263 1263 self.id = self._unpackheader(_fpartid)[0]
1264 1264 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1265 1265 # extract mandatory bit from type
1266 1266 self.mandatory = (self.type != self.type.lower())
1267 1267 self.type = self.type.lower()
1268 1268 ## reading parameters
1269 1269 # param count
1270 1270 mancount, advcount = self._unpackheader(_fpartparamcount)
1271 1271 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1272 1272 # param size
1273 1273 fparamsizes = _makefpartparamsizes(mancount + advcount)
1274 1274 paramsizes = self._unpackheader(fparamsizes)
1275 1275 # make it a list of couple again
1276 1276 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1277 1277 # split mandatory from advisory
1278 1278 mansizes = paramsizes[:mancount]
1279 1279 advsizes = paramsizes[mancount:]
1280 1280 # retrieve param value
1281 1281 manparams = []
1282 1282 for key, value in mansizes:
1283 1283 manparams.append((self._fromheader(key), self._fromheader(value)))
1284 1284 advparams = []
1285 1285 for key, value in advsizes:
1286 1286 advparams.append((self._fromheader(key), self._fromheader(value)))
1287 1287 self._initparams(manparams, advparams)
1288 1288 ## part payload
1289 1289 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1290 1290 # we read the data, tell it
1291 1291 self._initialized = True
1292 1292
1293 1293 def read(self, size=None):
1294 1294 """read payload data"""
1295 1295 if not self._initialized:
1296 1296 self._readheader()
1297 1297 if size is None:
1298 1298 data = self._payloadstream.read()
1299 1299 else:
1300 1300 data = self._payloadstream.read(size)
1301 1301 self._pos += len(data)
1302 1302 if size is None or len(data) < size:
1303 1303 if not self.consumed and self._pos:
1304 1304 self.ui.debug('bundle2-input-part: total payload size %i\n'
1305 1305 % self._pos)
1306 1306 self.consumed = True
1307 1307 return data
1308 1308
1309 1309 def tell(self):
1310 1310 return self._pos
1311 1311
1312 1312 def seek(self, offset, whence=0):
1313 1313 if whence == 0:
1314 1314 newpos = offset
1315 1315 elif whence == 1:
1316 1316 newpos = self._pos + offset
1317 1317 elif whence == 2:
1318 1318 if not self.consumed:
1319 1319 self.read()
1320 1320 newpos = self._chunkindex[-1][0] - offset
1321 1321 else:
1322 1322 raise ValueError('Unknown whence value: %r' % (whence,))
1323 1323
1324 1324 if newpos > self._chunkindex[-1][0] and not self.consumed:
1325 1325 self.read()
1326 1326 if not 0 <= newpos <= self._chunkindex[-1][0]:
1327 1327 raise ValueError('Offset out of range')
1328 1328
1329 1329 if self._pos != newpos:
1330 1330 chunk, internaloffset = self._findchunk(newpos)
1331 1331 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1332 1332 adjust = self.read(internaloffset)
1333 1333 if len(adjust) != internaloffset:
1334 1334 raise error.Abort(_('Seek failed\n'))
1335 1335 self._pos = newpos
1336 1336
1337 1337 def _seekfp(self, offset, whence=0):
1338 1338 """move the underlying file pointer
1339 1339
1340 1340 This method is meant for internal usage by the bundle2 protocol only.
1341 1341 They directly manipulate the low level stream including bundle2 level
1342 1342 instruction.
1343 1343
1344 1344 Do not use it to implement higher-level logic or methods."""
1345 1345 if self._seekable:
1346 1346 return self._fp.seek(offset, whence)
1347 1347 else:
1348 1348 raise NotImplementedError(_('File pointer is not seekable'))
1349 1349
1350 1350 def _tellfp(self):
1351 1351 """return the file offset, or None if file is not seekable
1352 1352
1353 1353 This method is meant for internal usage by the bundle2 protocol only.
1354 1354 They directly manipulate the low level stream including bundle2 level
1355 1355 instruction.
1356 1356
1357 1357 Do not use it to implement higher-level logic or methods."""
1358 1358 if self._seekable:
1359 1359 try:
1360 1360 return self._fp.tell()
1361 1361 except IOError as e:
1362 1362 if e.errno == errno.ESPIPE:
1363 1363 self._seekable = False
1364 1364 else:
1365 1365 raise
1366 1366 return None
1367 1367
1368 1368 # These are only the static capabilities.
1369 1369 # Check the 'getrepocaps' function for the rest.
1370 1370 capabilities = {'HG20': (),
1371 1371 'error': ('abort', 'unsupportedcontent', 'pushraced',
1372 1372 'pushkey'),
1373 1373 'listkeys': (),
1374 1374 'pushkey': (),
1375 1375 'digests': tuple(sorted(util.DIGESTS.keys())),
1376 1376 'remote-changegroup': ('http', 'https'),
1377 1377 'hgtagsfnodes': (),
1378 1378 }
1379 1379
1380 1380 def getrepocaps(repo, allowpushback=False):
1381 1381 """return the bundle2 capabilities for a given repo
1382 1382
1383 1383 Exists to allow extensions (like evolution) to mutate the capabilities.
1384 1384 """
1385 1385 caps = capabilities.copy()
1386 1386 caps['changegroup'] = tuple(sorted(
1387 1387 changegroup.supportedincomingversions(repo)))
1388 1388 if obsolete.isenabled(repo, obsolete.exchangeopt):
1389 1389 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1390 1390 caps['obsmarkers'] = supportedformat
1391 1391 if allowpushback:
1392 1392 caps['pushback'] = ()
1393 1393 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1394 1394 if cpmode == 'check-related':
1395 1395 caps['checkheads'] = ('related',)
1396 1396 return caps
1397 1397
1398 1398 def bundle2caps(remote):
1399 1399 """return the bundle capabilities of a peer as dict"""
1400 1400 raw = remote.capable('bundle2')
1401 1401 if not raw and raw != '':
1402 1402 return {}
1403 1403 capsblob = urlreq.unquote(remote.capable('bundle2'))
1404 1404 return decodecaps(capsblob)
1405 1405
1406 1406 def obsmarkersversion(caps):
1407 1407 """extract the list of supported obsmarkers versions from a bundle2caps dict
1408 1408 """
1409 1409 obscaps = caps.get('obsmarkers', ())
1410 1410 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1411 1411
1412 1412 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1413 1413 vfs=None, compression=None, compopts=None):
1414 1414 if bundletype.startswith('HG10'):
1415 1415 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1416 1416 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1417 1417 compression=compression, compopts=compopts)
1418 1418 elif not bundletype.startswith('HG20'):
1419 1419 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1420 1420
1421 1421 caps = {}
1422 1422 if 'obsolescence' in opts:
1423 1423 caps['obsmarkers'] = ('V1',)
1424 1424 bundle = bundle20(ui, caps)
1425 1425 bundle.setcompression(compression, compopts)
1426 1426 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1427 1427 chunkiter = bundle.getchunks()
1428 1428
1429 1429 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1430 1430
1431 1431 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1432 1432 # We should eventually reconcile this logic with the one behind
1433 1433 # 'exchange.getbundle2partsgenerator'.
1434 1434 #
1435 1435 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1436 1436 # different right now. So we keep them separated for now for the sake of
1437 1437 # simplicity.
1438 1438
1439 1439 # we always want a changegroup in such bundle
1440 1440 cgversion = opts.get('cg.version')
1441 1441 if cgversion is None:
1442 1442 cgversion = changegroup.safeversion(repo)
1443 1443 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1444 1444 part = bundler.newpart('changegroup', data=cg.getchunks())
1445 1445 part.addparam('version', cg.version)
1446 1446 if 'clcount' in cg.extras:
1447 part.addparam('nbchanges', str(cg.extras['clcount']),
1447 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1448 1448 mandatory=False)
1449 1449 if opts.get('phases') and repo.revs('%ln and secret()',
1450 1450 outgoing.missingheads):
1451 1451 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1452 1452
1453 1453 addparttagsfnodescache(repo, bundler, outgoing)
1454 1454
1455 1455 if opts.get('obsolescence', False):
1456 1456 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1457 1457 buildobsmarkerspart(bundler, obsmarkers)
1458 1458
1459 1459 if opts.get('phases', False):
1460 1460 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1461 1461 phasedata = []
1462 1462 for phase in phases.allphases:
1463 1463 for head in headsbyphase[phase]:
1464 1464 phasedata.append(_pack(_fphasesentry, phase, head))
1465 1465 bundler.newpart('phase-heads', data=''.join(phasedata))
1466 1466
1467 1467 def addparttagsfnodescache(repo, bundler, outgoing):
1468 1468 # we include the tags fnode cache for the bundle changeset
1469 1469 # (as an optional parts)
1470 1470 cache = tags.hgtagsfnodescache(repo.unfiltered())
1471 1471 chunks = []
1472 1472
1473 1473 # .hgtags fnodes are only relevant for head changesets. While we could
1474 1474 # transfer values for all known nodes, there will likely be little to
1475 1475 # no benefit.
1476 1476 #
1477 1477 # We don't bother using a generator to produce output data because
1478 1478 # a) we only have 40 bytes per head and even esoteric numbers of heads
1479 1479 # consume little memory (1M heads is 40MB) b) we don't want to send the
1480 1480 # part if we don't have entries and knowing if we have entries requires
1481 1481 # cache lookups.
1482 1482 for node in outgoing.missingheads:
1483 1483 # Don't compute missing, as this may slow down serving.
1484 1484 fnode = cache.getfnode(node, computemissing=False)
1485 1485 if fnode is not None:
1486 1486 chunks.extend([node, fnode])
1487 1487
1488 1488 if chunks:
1489 1489 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1490 1490
1491 1491 def buildobsmarkerspart(bundler, markers):
1492 1492 """add an obsmarker part to the bundler with <markers>
1493 1493
1494 1494 No part is created if markers is empty.
1495 1495 Raises ValueError if the bundler doesn't support any known obsmarker format.
1496 1496 """
1497 1497 if not markers:
1498 1498 return None
1499 1499
1500 1500 remoteversions = obsmarkersversion(bundler.capabilities)
1501 1501 version = obsolete.commonversion(remoteversions)
1502 1502 if version is None:
1503 1503 raise ValueError('bundler does not support common obsmarker format')
1504 1504 stream = obsolete.encodemarkers(markers, True, version=version)
1505 1505 return bundler.newpart('obsmarkers', data=stream)
1506 1506
1507 1507 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1508 1508 compopts=None):
1509 1509 """Write a bundle file and return its filename.
1510 1510
1511 1511 Existing files will not be overwritten.
1512 1512 If no filename is specified, a temporary file is created.
1513 1513 bz2 compression can be turned off.
1514 1514 The bundle file will be deleted in case of errors.
1515 1515 """
1516 1516
1517 1517 if bundletype == "HG20":
1518 1518 bundle = bundle20(ui)
1519 1519 bundle.setcompression(compression, compopts)
1520 1520 part = bundle.newpart('changegroup', data=cg.getchunks())
1521 1521 part.addparam('version', cg.version)
1522 1522 if 'clcount' in cg.extras:
1523 part.addparam('nbchanges', str(cg.extras['clcount']),
1523 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1524 1524 mandatory=False)
1525 1525 chunkiter = bundle.getchunks()
1526 1526 else:
1527 1527 # compression argument is only for the bundle2 case
1528 1528 assert compression is None
1529 1529 if cg.version != '01':
1530 1530 raise error.Abort(_('old bundle types only supports v1 '
1531 1531 'changegroups'))
1532 1532 header, comp = bundletypes[bundletype]
1533 1533 if comp not in util.compengines.supportedbundletypes:
1534 1534 raise error.Abort(_('unknown stream compression type: %s')
1535 1535 % comp)
1536 1536 compengine = util.compengines.forbundletype(comp)
1537 1537 def chunkiter():
1538 1538 yield header
1539 1539 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1540 1540 yield chunk
1541 1541 chunkiter = chunkiter()
1542 1542
1543 1543 # parse the changegroup data, otherwise we will block
1544 1544 # in case of sshrepo because we don't know the end of the stream
1545 1545 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1546 1546
1547 1547 def combinechangegroupresults(op):
1548 1548 """logic to combine 0 or more addchangegroup results into one"""
1549 1549 results = [r.get('return', 0)
1550 1550 for r in op.records['changegroup']]
1551 1551 changedheads = 0
1552 1552 result = 1
1553 1553 for ret in results:
1554 1554 # If any changegroup result is 0, return 0
1555 1555 if ret == 0:
1556 1556 result = 0
1557 1557 break
1558 1558 if ret < -1:
1559 1559 changedheads += ret + 1
1560 1560 elif ret > 1:
1561 1561 changedheads += ret - 1
1562 1562 if changedheads > 0:
1563 1563 result = 1 + changedheads
1564 1564 elif changedheads < 0:
1565 1565 result = -1 + changedheads
1566 1566 return result
1567 1567
1568 1568 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1569 1569 'targetphase'))
1570 1570 def handlechangegroup(op, inpart):
1571 1571 """apply a changegroup part on the repo
1572 1572
1573 1573 This is a very early implementation that will massive rework before being
1574 1574 inflicted to any end-user.
1575 1575 """
1576 1576 tr = op.gettransaction()
1577 1577 unpackerversion = inpart.params.get('version', '01')
1578 1578 # We should raise an appropriate exception here
1579 1579 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1580 1580 # the source and url passed here are overwritten by the one contained in
1581 1581 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1582 1582 nbchangesets = None
1583 1583 if 'nbchanges' in inpart.params:
1584 1584 nbchangesets = int(inpart.params.get('nbchanges'))
1585 1585 if ('treemanifest' in inpart.params and
1586 1586 'treemanifest' not in op.repo.requirements):
1587 1587 if len(op.repo.changelog) != 0:
1588 1588 raise error.Abort(_(
1589 1589 "bundle contains tree manifests, but local repo is "
1590 1590 "non-empty and does not use tree manifests"))
1591 1591 op.repo.requirements.add('treemanifest')
1592 1592 op.repo._applyopenerreqs()
1593 1593 op.repo._writerequirements()
1594 1594 extrakwargs = {}
1595 1595 targetphase = inpart.params.get('targetphase')
1596 1596 if targetphase is not None:
1597 1597 extrakwargs['targetphase'] = int(targetphase)
1598 1598 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1599 1599 expectedtotal=nbchangesets, **extrakwargs)
1600 1600 if op.reply is not None:
1601 1601 # This is definitely not the final form of this
1602 1602 # return. But one need to start somewhere.
1603 1603 part = op.reply.newpart('reply:changegroup', mandatory=False)
1604 1604 part.addparam(
1605 1605 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1606 1606 part.addparam('return', '%i' % ret, mandatory=False)
1607 1607 assert not inpart.read()
1608 1608
1609 1609 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1610 1610 ['digest:%s' % k for k in util.DIGESTS.keys()])
1611 1611 @parthandler('remote-changegroup', _remotechangegroupparams)
1612 1612 def handleremotechangegroup(op, inpart):
1613 1613 """apply a bundle10 on the repo, given an url and validation information
1614 1614
1615 1615 All the information about the remote bundle to import are given as
1616 1616 parameters. The parameters include:
1617 1617 - url: the url to the bundle10.
1618 1618 - size: the bundle10 file size. It is used to validate what was
1619 1619 retrieved by the client matches the server knowledge about the bundle.
1620 1620 - digests: a space separated list of the digest types provided as
1621 1621 parameters.
1622 1622 - digest:<digest-type>: the hexadecimal representation of the digest with
1623 1623 that name. Like the size, it is used to validate what was retrieved by
1624 1624 the client matches what the server knows about the bundle.
1625 1625
1626 1626 When multiple digest types are given, all of them are checked.
1627 1627 """
1628 1628 try:
1629 1629 raw_url = inpart.params['url']
1630 1630 except KeyError:
1631 1631 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1632 1632 parsed_url = util.url(raw_url)
1633 1633 if parsed_url.scheme not in capabilities['remote-changegroup']:
1634 1634 raise error.Abort(_('remote-changegroup does not support %s urls') %
1635 1635 parsed_url.scheme)
1636 1636
1637 1637 try:
1638 1638 size = int(inpart.params['size'])
1639 1639 except ValueError:
1640 1640 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1641 1641 % 'size')
1642 1642 except KeyError:
1643 1643 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1644 1644
1645 1645 digests = {}
1646 1646 for typ in inpart.params.get('digests', '').split():
1647 1647 param = 'digest:%s' % typ
1648 1648 try:
1649 1649 value = inpart.params[param]
1650 1650 except KeyError:
1651 1651 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1652 1652 param)
1653 1653 digests[typ] = value
1654 1654
1655 1655 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1656 1656
1657 1657 tr = op.gettransaction()
1658 1658 from . import exchange
1659 1659 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1660 1660 if not isinstance(cg, changegroup.cg1unpacker):
1661 1661 raise error.Abort(_('%s: not a bundle version 1.0') %
1662 1662 util.hidepassword(raw_url))
1663 1663 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1664 1664 if op.reply is not None:
1665 1665 # This is definitely not the final form of this
1666 1666 # return. But one need to start somewhere.
1667 1667 part = op.reply.newpart('reply:changegroup')
1668 1668 part.addparam(
1669 1669 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1670 1670 part.addparam('return', '%i' % ret, mandatory=False)
1671 1671 try:
1672 1672 real_part.validate()
1673 1673 except error.Abort as e:
1674 1674 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1675 1675 (util.hidepassword(raw_url), str(e)))
1676 1676 assert not inpart.read()
1677 1677
1678 1678 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1679 1679 def handlereplychangegroup(op, inpart):
1680 1680 ret = int(inpart.params['return'])
1681 1681 replyto = int(inpart.params['in-reply-to'])
1682 1682 op.records.add('changegroup', {'return': ret}, replyto)
1683 1683
1684 1684 @parthandler('check:heads')
1685 1685 def handlecheckheads(op, inpart):
1686 1686 """check that head of the repo did not change
1687 1687
1688 1688 This is used to detect a push race when using unbundle.
1689 1689 This replaces the "heads" argument of unbundle."""
1690 1690 h = inpart.read(20)
1691 1691 heads = []
1692 1692 while len(h) == 20:
1693 1693 heads.append(h)
1694 1694 h = inpart.read(20)
1695 1695 assert not h
1696 1696 # Trigger a transaction so that we are guaranteed to have the lock now.
1697 1697 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1698 1698 op.gettransaction()
1699 1699 if sorted(heads) != sorted(op.repo.heads()):
1700 1700 raise error.PushRaced('repository changed while pushing - '
1701 1701 'please try again')
1702 1702
1703 1703 @parthandler('check:updated-heads')
1704 1704 def handlecheckupdatedheads(op, inpart):
1705 1705 """check for race on the heads touched by a push
1706 1706
1707 1707 This is similar to 'check:heads' but focus on the heads actually updated
1708 1708 during the push. If other activities happen on unrelated heads, it is
1709 1709 ignored.
1710 1710
1711 1711 This allow server with high traffic to avoid push contention as long as
1712 1712 unrelated parts of the graph are involved."""
1713 1713 h = inpart.read(20)
1714 1714 heads = []
1715 1715 while len(h) == 20:
1716 1716 heads.append(h)
1717 1717 h = inpart.read(20)
1718 1718 assert not h
1719 1719 # trigger a transaction so that we are guaranteed to have the lock now.
1720 1720 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1721 1721 op.gettransaction()
1722 1722
1723 1723 currentheads = set()
1724 1724 for ls in op.repo.branchmap().itervalues():
1725 1725 currentheads.update(ls)
1726 1726
1727 1727 for h in heads:
1728 1728 if h not in currentheads:
1729 1729 raise error.PushRaced('repository changed while pushing - '
1730 1730 'please try again')
1731 1731
1732 1732 @parthandler('output')
1733 1733 def handleoutput(op, inpart):
1734 1734 """forward output captured on the server to the client"""
1735 1735 for line in inpart.read().splitlines():
1736 1736 op.ui.status(_('remote: %s\n') % line)
1737 1737
1738 1738 @parthandler('replycaps')
1739 1739 def handlereplycaps(op, inpart):
1740 1740 """Notify that a reply bundle should be created
1741 1741
1742 1742 The payload contains the capabilities information for the reply"""
1743 1743 caps = decodecaps(inpart.read())
1744 1744 if op.reply is None:
1745 1745 op.reply = bundle20(op.ui, caps)
1746 1746
1747 1747 class AbortFromPart(error.Abort):
1748 1748 """Sub-class of Abort that denotes an error from a bundle2 part."""
1749 1749
1750 1750 @parthandler('error:abort', ('message', 'hint'))
1751 1751 def handleerrorabort(op, inpart):
1752 1752 """Used to transmit abort error over the wire"""
1753 1753 raise AbortFromPart(inpart.params['message'],
1754 1754 hint=inpart.params.get('hint'))
1755 1755
1756 1756 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1757 1757 'in-reply-to'))
1758 1758 def handleerrorpushkey(op, inpart):
1759 1759 """Used to transmit failure of a mandatory pushkey over the wire"""
1760 1760 kwargs = {}
1761 1761 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1762 1762 value = inpart.params.get(name)
1763 1763 if value is not None:
1764 1764 kwargs[name] = value
1765 1765 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1766 1766
1767 1767 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1768 1768 def handleerrorunsupportedcontent(op, inpart):
1769 1769 """Used to transmit unknown content error over the wire"""
1770 1770 kwargs = {}
1771 1771 parttype = inpart.params.get('parttype')
1772 1772 if parttype is not None:
1773 1773 kwargs['parttype'] = parttype
1774 1774 params = inpart.params.get('params')
1775 1775 if params is not None:
1776 1776 kwargs['params'] = params.split('\0')
1777 1777
1778 1778 raise error.BundleUnknownFeatureError(**kwargs)
1779 1779
1780 1780 @parthandler('error:pushraced', ('message',))
1781 1781 def handleerrorpushraced(op, inpart):
1782 1782 """Used to transmit push race error over the wire"""
1783 1783 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1784 1784
1785 1785 @parthandler('listkeys', ('namespace',))
1786 1786 def handlelistkeys(op, inpart):
1787 1787 """retrieve pushkey namespace content stored in a bundle2"""
1788 1788 namespace = inpart.params['namespace']
1789 1789 r = pushkey.decodekeys(inpart.read())
1790 1790 op.records.add('listkeys', (namespace, r))
1791 1791
1792 1792 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1793 1793 def handlepushkey(op, inpart):
1794 1794 """process a pushkey request"""
1795 1795 dec = pushkey.decode
1796 1796 namespace = dec(inpart.params['namespace'])
1797 1797 key = dec(inpart.params['key'])
1798 1798 old = dec(inpart.params['old'])
1799 1799 new = dec(inpart.params['new'])
1800 1800 # Grab the transaction to ensure that we have the lock before performing the
1801 1801 # pushkey.
1802 1802 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1803 1803 op.gettransaction()
1804 1804 ret = op.repo.pushkey(namespace, key, old, new)
1805 1805 record = {'namespace': namespace,
1806 1806 'key': key,
1807 1807 'old': old,
1808 1808 'new': new}
1809 1809 op.records.add('pushkey', record)
1810 1810 if op.reply is not None:
1811 1811 rpart = op.reply.newpart('reply:pushkey')
1812 1812 rpart.addparam(
1813 1813 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1814 1814 rpart.addparam('return', '%i' % ret, mandatory=False)
1815 1815 if inpart.mandatory and not ret:
1816 1816 kwargs = {}
1817 1817 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1818 1818 if key in inpart.params:
1819 1819 kwargs[key] = inpart.params[key]
1820 1820 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1821 1821
1822 1822 def _readphaseheads(inpart):
1823 1823 headsbyphase = [[] for i in phases.allphases]
1824 1824 entrysize = struct.calcsize(_fphasesentry)
1825 1825 while True:
1826 1826 entry = inpart.read(entrysize)
1827 1827 if len(entry) < entrysize:
1828 1828 if entry:
1829 1829 raise error.Abort(_('bad phase-heads bundle part'))
1830 1830 break
1831 1831 phase, node = struct.unpack(_fphasesentry, entry)
1832 1832 headsbyphase[phase].append(node)
1833 1833 return headsbyphase
1834 1834
1835 1835 @parthandler('phase-heads')
1836 1836 def handlephases(op, inpart):
1837 1837 """apply phases from bundle part to repo"""
1838 1838 headsbyphase = _readphaseheads(inpart)
1839 1839 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1840 1840 op.records.add('phase-heads', {})
1841 1841
1842 1842 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1843 1843 def handlepushkeyreply(op, inpart):
1844 1844 """retrieve the result of a pushkey request"""
1845 1845 ret = int(inpart.params['return'])
1846 1846 partid = int(inpart.params['in-reply-to'])
1847 1847 op.records.add('pushkey', {'return': ret}, partid)
1848 1848
1849 1849 @parthandler('obsmarkers')
1850 1850 def handleobsmarker(op, inpart):
1851 1851 """add a stream of obsmarkers to the repo"""
1852 1852 tr = op.gettransaction()
1853 1853 markerdata = inpart.read()
1854 1854 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1855 1855 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1856 1856 % len(markerdata))
1857 1857 # The mergemarkers call will crash if marker creation is not enabled.
1858 1858 # we want to avoid this if the part is advisory.
1859 1859 if not inpart.mandatory and op.repo.obsstore.readonly:
1860 1860 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1861 1861 return
1862 1862 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1863 1863 op.repo.invalidatevolatilesets()
1864 1864 if new:
1865 1865 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1866 1866 op.records.add('obsmarkers', {'new': new})
1867 1867 if op.reply is not None:
1868 1868 rpart = op.reply.newpart('reply:obsmarkers')
1869 1869 rpart.addparam(
1870 1870 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1871 1871 rpart.addparam('new', '%i' % new, mandatory=False)
1872 1872
1873 1873
1874 1874 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1875 1875 def handleobsmarkerreply(op, inpart):
1876 1876 """retrieve the result of a pushkey request"""
1877 1877 ret = int(inpart.params['new'])
1878 1878 partid = int(inpart.params['in-reply-to'])
1879 1879 op.records.add('obsmarkers', {'new': ret}, partid)
1880 1880
1881 1881 @parthandler('hgtagsfnodes')
1882 1882 def handlehgtagsfnodes(op, inpart):
1883 1883 """Applies .hgtags fnodes cache entries to the local repo.
1884 1884
1885 1885 Payload is pairs of 20 byte changeset nodes and filenodes.
1886 1886 """
1887 1887 # Grab the transaction so we ensure that we have the lock at this point.
1888 1888 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1889 1889 op.gettransaction()
1890 1890 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1891 1891
1892 1892 count = 0
1893 1893 while True:
1894 1894 node = inpart.read(20)
1895 1895 fnode = inpart.read(20)
1896 1896 if len(node) < 20 or len(fnode) < 20:
1897 1897 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1898 1898 break
1899 1899 cache.setfnode(node, fnode)
1900 1900 count += 1
1901 1901
1902 1902 cache.write()
1903 1903 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1904 1904
1905 1905 @parthandler('pushvars')
1906 1906 def bundle2getvars(op, part):
1907 1907 '''unbundle a bundle2 containing shellvars on the server'''
1908 1908 # An option to disable unbundling on server-side for security reasons
1909 1909 if op.ui.configbool('push', 'pushvars.server'):
1910 1910 hookargs = {}
1911 1911 for key, value in part.advisoryparams:
1912 1912 key = key.upper()
1913 1913 # We want pushed variables to have USERVAR_ prepended so we know
1914 1914 # they came from the --pushvar flag.
1915 1915 key = "USERVAR_" + key
1916 1916 hookargs[key] = value
1917 1917 op.addhookargs(hookargs)
@@ -1,2012 +1,2013 b''
1 1 # exchange.py - utility to exchange data between repos.
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import errno
11 11 import hashlib
12 12
13 13 from .i18n import _
14 14 from .node import (
15 15 hex,
16 16 nullid,
17 17 )
18 18 from . import (
19 19 bookmarks as bookmod,
20 20 bundle2,
21 21 changegroup,
22 22 discovery,
23 23 error,
24 24 lock as lockmod,
25 25 obsolete,
26 26 phases,
27 27 pushkey,
28 28 pycompat,
29 29 scmutil,
30 30 sslutil,
31 31 streamclone,
32 32 url as urlmod,
33 33 util,
34 34 )
35 35
36 36 urlerr = util.urlerr
37 37 urlreq = util.urlreq
38 38
39 39 # Maps bundle version human names to changegroup versions.
40 40 _bundlespeccgversions = {'v1': '01',
41 41 'v2': '02',
42 42 'packed1': 's1',
43 43 'bundle2': '02', #legacy
44 44 }
45 45
46 46 # Compression engines allowed in version 1. THIS SHOULD NEVER CHANGE.
47 47 _bundlespecv1compengines = {'gzip', 'bzip2', 'none'}
48 48
49 49 def parsebundlespec(repo, spec, strict=True, externalnames=False):
50 50 """Parse a bundle string specification into parts.
51 51
52 52 Bundle specifications denote a well-defined bundle/exchange format.
53 53 The content of a given specification should not change over time in
54 54 order to ensure that bundles produced by a newer version of Mercurial are
55 55 readable from an older version.
56 56
57 57 The string currently has the form:
58 58
59 59 <compression>-<type>[;<parameter0>[;<parameter1>]]
60 60
61 61 Where <compression> is one of the supported compression formats
62 62 and <type> is (currently) a version string. A ";" can follow the type and
63 63 all text afterwards is interpreted as URI encoded, ";" delimited key=value
64 64 pairs.
65 65
66 66 If ``strict`` is True (the default) <compression> is required. Otherwise,
67 67 it is optional.
68 68
69 69 If ``externalnames`` is False (the default), the human-centric names will
70 70 be converted to their internal representation.
71 71
72 72 Returns a 3-tuple of (compression, version, parameters). Compression will
73 73 be ``None`` if not in strict mode and a compression isn't defined.
74 74
75 75 An ``InvalidBundleSpecification`` is raised when the specification is
76 76 not syntactically well formed.
77 77
78 78 An ``UnsupportedBundleSpecification`` is raised when the compression or
79 79 bundle type/version is not recognized.
80 80
81 81 Note: this function will likely eventually return a more complex data
82 82 structure, including bundle2 part information.
83 83 """
84 84 def parseparams(s):
85 85 if ';' not in s:
86 86 return s, {}
87 87
88 88 params = {}
89 89 version, paramstr = s.split(';', 1)
90 90
91 91 for p in paramstr.split(';'):
92 92 if '=' not in p:
93 93 raise error.InvalidBundleSpecification(
94 94 _('invalid bundle specification: '
95 95 'missing "=" in parameter: %s') % p)
96 96
97 97 key, value = p.split('=', 1)
98 98 key = urlreq.unquote(key)
99 99 value = urlreq.unquote(value)
100 100 params[key] = value
101 101
102 102 return version, params
103 103
104 104
105 105 if strict and '-' not in spec:
106 106 raise error.InvalidBundleSpecification(
107 107 _('invalid bundle specification; '
108 108 'must be prefixed with compression: %s') % spec)
109 109
110 110 if '-' in spec:
111 111 compression, version = spec.split('-', 1)
112 112
113 113 if compression not in util.compengines.supportedbundlenames:
114 114 raise error.UnsupportedBundleSpecification(
115 115 _('%s compression is not supported') % compression)
116 116
117 117 version, params = parseparams(version)
118 118
119 119 if version not in _bundlespeccgversions:
120 120 raise error.UnsupportedBundleSpecification(
121 121 _('%s is not a recognized bundle version') % version)
122 122 else:
123 123 # Value could be just the compression or just the version, in which
124 124 # case some defaults are assumed (but only when not in strict mode).
125 125 assert not strict
126 126
127 127 spec, params = parseparams(spec)
128 128
129 129 if spec in util.compengines.supportedbundlenames:
130 130 compression = spec
131 131 version = 'v1'
132 132 # Generaldelta repos require v2.
133 133 if 'generaldelta' in repo.requirements:
134 134 version = 'v2'
135 135 # Modern compression engines require v2.
136 136 if compression not in _bundlespecv1compengines:
137 137 version = 'v2'
138 138 elif spec in _bundlespeccgversions:
139 139 if spec == 'packed1':
140 140 compression = 'none'
141 141 else:
142 142 compression = 'bzip2'
143 143 version = spec
144 144 else:
145 145 raise error.UnsupportedBundleSpecification(
146 146 _('%s is not a recognized bundle specification') % spec)
147 147
148 148 # Bundle version 1 only supports a known set of compression engines.
149 149 if version == 'v1' and compression not in _bundlespecv1compengines:
150 150 raise error.UnsupportedBundleSpecification(
151 151 _('compression engine %s is not supported on v1 bundles') %
152 152 compression)
153 153
154 154 # The specification for packed1 can optionally declare the data formats
155 155 # required to apply it. If we see this metadata, compare against what the
156 156 # repo supports and error if the bundle isn't compatible.
157 157 if version == 'packed1' and 'requirements' in params:
158 158 requirements = set(params['requirements'].split(','))
159 159 missingreqs = requirements - repo.supportedformats
160 160 if missingreqs:
161 161 raise error.UnsupportedBundleSpecification(
162 162 _('missing support for repository features: %s') %
163 163 ', '.join(sorted(missingreqs)))
164 164
165 165 if not externalnames:
166 166 engine = util.compengines.forbundlename(compression)
167 167 compression = engine.bundletype()[1]
168 168 version = _bundlespeccgversions[version]
169 169 return compression, version, params
170 170
171 171 def readbundle(ui, fh, fname, vfs=None):
172 172 header = changegroup.readexactly(fh, 4)
173 173
174 174 alg = None
175 175 if not fname:
176 176 fname = "stream"
177 177 if not header.startswith('HG') and header.startswith('\0'):
178 178 fh = changegroup.headerlessfixup(fh, header)
179 179 header = "HG10"
180 180 alg = 'UN'
181 181 elif vfs:
182 182 fname = vfs.join(fname)
183 183
184 184 magic, version = header[0:2], header[2:4]
185 185
186 186 if magic != 'HG':
187 187 raise error.Abort(_('%s: not a Mercurial bundle') % fname)
188 188 if version == '10':
189 189 if alg is None:
190 190 alg = changegroup.readexactly(fh, 2)
191 191 return changegroup.cg1unpacker(fh, alg)
192 192 elif version.startswith('2'):
193 193 return bundle2.getunbundler(ui, fh, magicstring=magic + version)
194 194 elif version == 'S1':
195 195 return streamclone.streamcloneapplier(fh)
196 196 else:
197 197 raise error.Abort(_('%s: unknown bundle version %s') % (fname, version))
198 198
199 199 def getbundlespec(ui, fh):
200 200 """Infer the bundlespec from a bundle file handle.
201 201
202 202 The input file handle is seeked and the original seek position is not
203 203 restored.
204 204 """
205 205 def speccompression(alg):
206 206 try:
207 207 return util.compengines.forbundletype(alg).bundletype()[0]
208 208 except KeyError:
209 209 return None
210 210
211 211 b = readbundle(ui, fh, None)
212 212 if isinstance(b, changegroup.cg1unpacker):
213 213 alg = b._type
214 214 if alg == '_truncatedBZ':
215 215 alg = 'BZ'
216 216 comp = speccompression(alg)
217 217 if not comp:
218 218 raise error.Abort(_('unknown compression algorithm: %s') % alg)
219 219 return '%s-v1' % comp
220 220 elif isinstance(b, bundle2.unbundle20):
221 221 if 'Compression' in b.params:
222 222 comp = speccompression(b.params['Compression'])
223 223 if not comp:
224 224 raise error.Abort(_('unknown compression algorithm: %s') % comp)
225 225 else:
226 226 comp = 'none'
227 227
228 228 version = None
229 229 for part in b.iterparts():
230 230 if part.type == 'changegroup':
231 231 version = part.params['version']
232 232 if version in ('01', '02'):
233 233 version = 'v2'
234 234 else:
235 235 raise error.Abort(_('changegroup version %s does not have '
236 236 'a known bundlespec') % version,
237 237 hint=_('try upgrading your Mercurial '
238 238 'client'))
239 239
240 240 if not version:
241 241 raise error.Abort(_('could not identify changegroup version in '
242 242 'bundle'))
243 243
244 244 return '%s-%s' % (comp, version)
245 245 elif isinstance(b, streamclone.streamcloneapplier):
246 246 requirements = streamclone.readbundle1header(fh)[2]
247 247 params = 'requirements=%s' % ','.join(sorted(requirements))
248 248 return 'none-packed1;%s' % urlreq.quote(params)
249 249 else:
250 250 raise error.Abort(_('unknown bundle type: %s') % b)
251 251
252 252 def _computeoutgoing(repo, heads, common):
253 253 """Computes which revs are outgoing given a set of common
254 254 and a set of heads.
255 255
256 256 This is a separate function so extensions can have access to
257 257 the logic.
258 258
259 259 Returns a discovery.outgoing object.
260 260 """
261 261 cl = repo.changelog
262 262 if common:
263 263 hasnode = cl.hasnode
264 264 common = [n for n in common if hasnode(n)]
265 265 else:
266 266 common = [nullid]
267 267 if not heads:
268 268 heads = cl.heads()
269 269 return discovery.outgoing(repo, common, heads)
270 270
271 271 def _forcebundle1(op):
272 272 """return true if a pull/push must use bundle1
273 273
274 274 This function is used to allow testing of the older bundle version"""
275 275 ui = op.repo.ui
276 276 forcebundle1 = False
277 277 # The goal is this config is to allow developer to choose the bundle
278 278 # version used during exchanged. This is especially handy during test.
279 279 # Value is a list of bundle version to be picked from, highest version
280 280 # should be used.
281 281 #
282 282 # developer config: devel.legacy.exchange
283 283 exchange = ui.configlist('devel', 'legacy.exchange')
284 284 forcebundle1 = 'bundle2' not in exchange and 'bundle1' in exchange
285 285 return forcebundle1 or not op.remote.capable('bundle2')
286 286
287 287 class pushoperation(object):
288 288 """A object that represent a single push operation
289 289
290 290 Its purpose is to carry push related state and very common operations.
291 291
292 292 A new pushoperation should be created at the beginning of each push and
293 293 discarded afterward.
294 294 """
295 295
296 296 def __init__(self, repo, remote, force=False, revs=None, newbranch=False,
297 297 bookmarks=(), pushvars=None):
298 298 # repo we push from
299 299 self.repo = repo
300 300 self.ui = repo.ui
301 301 # repo we push to
302 302 self.remote = remote
303 303 # force option provided
304 304 self.force = force
305 305 # revs to be pushed (None is "all")
306 306 self.revs = revs
307 307 # bookmark explicitly pushed
308 308 self.bookmarks = bookmarks
309 309 # allow push of new branch
310 310 self.newbranch = newbranch
311 311 # step already performed
312 312 # (used to check what steps have been already performed through bundle2)
313 313 self.stepsdone = set()
314 314 # Integer version of the changegroup push result
315 315 # - None means nothing to push
316 316 # - 0 means HTTP error
317 317 # - 1 means we pushed and remote head count is unchanged *or*
318 318 # we have outgoing changesets but refused to push
319 319 # - other values as described by addchangegroup()
320 320 self.cgresult = None
321 321 # Boolean value for the bookmark push
322 322 self.bkresult = None
323 323 # discover.outgoing object (contains common and outgoing data)
324 324 self.outgoing = None
325 325 # all remote topological heads before the push
326 326 self.remoteheads = None
327 327 # Details of the remote branch pre and post push
328 328 #
329 329 # mapping: {'branch': ([remoteheads],
330 330 # [newheads],
331 331 # [unsyncedheads],
332 332 # [discardedheads])}
333 333 # - branch: the branch name
334 334 # - remoteheads: the list of remote heads known locally
335 335 # None if the branch is new
336 336 # - newheads: the new remote heads (known locally) with outgoing pushed
337 337 # - unsyncedheads: the list of remote heads unknown locally.
338 338 # - discardedheads: the list of remote heads made obsolete by the push
339 339 self.pushbranchmap = None
340 340 # testable as a boolean indicating if any nodes are missing locally.
341 341 self.incoming = None
342 342 # phases changes that must be pushed along side the changesets
343 343 self.outdatedphases = None
344 344 # phases changes that must be pushed if changeset push fails
345 345 self.fallbackoutdatedphases = None
346 346 # outgoing obsmarkers
347 347 self.outobsmarkers = set()
348 348 # outgoing bookmarks
349 349 self.outbookmarks = []
350 350 # transaction manager
351 351 self.trmanager = None
352 352 # map { pushkey partid -> callback handling failure}
353 353 # used to handle exception from mandatory pushkey part failure
354 354 self.pkfailcb = {}
355 355 # an iterable of pushvars or None
356 356 self.pushvars = pushvars
357 357
358 358 @util.propertycache
359 359 def futureheads(self):
360 360 """future remote heads if the changeset push succeeds"""
361 361 return self.outgoing.missingheads
362 362
363 363 @util.propertycache
364 364 def fallbackheads(self):
365 365 """future remote heads if the changeset push fails"""
366 366 if self.revs is None:
367 367 # not target to push, all common are relevant
368 368 return self.outgoing.commonheads
369 369 unfi = self.repo.unfiltered()
370 370 # I want cheads = heads(::missingheads and ::commonheads)
371 371 # (missingheads is revs with secret changeset filtered out)
372 372 #
373 373 # This can be expressed as:
374 374 # cheads = ( (missingheads and ::commonheads)
375 375 # + (commonheads and ::missingheads))"
376 376 # )
377 377 #
378 378 # while trying to push we already computed the following:
379 379 # common = (::commonheads)
380 380 # missing = ((commonheads::missingheads) - commonheads)
381 381 #
382 382 # We can pick:
383 383 # * missingheads part of common (::commonheads)
384 384 common = self.outgoing.common
385 385 nm = self.repo.changelog.nodemap
386 386 cheads = [node for node in self.revs if nm[node] in common]
387 387 # and
388 388 # * commonheads parents on missing
389 389 revset = unfi.set('%ln and parents(roots(%ln))',
390 390 self.outgoing.commonheads,
391 391 self.outgoing.missing)
392 392 cheads.extend(c.node() for c in revset)
393 393 return cheads
394 394
395 395 @property
396 396 def commonheads(self):
397 397 """set of all common heads after changeset bundle push"""
398 398 if self.cgresult:
399 399 return self.futureheads
400 400 else:
401 401 return self.fallbackheads
402 402
403 403 # mapping of message used when pushing bookmark
404 404 bookmsgmap = {'update': (_("updating bookmark %s\n"),
405 405 _('updating bookmark %s failed!\n')),
406 406 'export': (_("exporting bookmark %s\n"),
407 407 _('exporting bookmark %s failed!\n')),
408 408 'delete': (_("deleting remote bookmark %s\n"),
409 409 _('deleting remote bookmark %s failed!\n')),
410 410 }
411 411
412 412
413 413 def push(repo, remote, force=False, revs=None, newbranch=False, bookmarks=(),
414 414 opargs=None):
415 415 '''Push outgoing changesets (limited by revs) from a local
416 416 repository to remote. Return an integer:
417 417 - None means nothing to push
418 418 - 0 means HTTP error
419 419 - 1 means we pushed and remote head count is unchanged *or*
420 420 we have outgoing changesets but refused to push
421 421 - other values as described by addchangegroup()
422 422 '''
423 423 if opargs is None:
424 424 opargs = {}
425 425 pushop = pushoperation(repo, remote, force, revs, newbranch, bookmarks,
426 426 **pycompat.strkwargs(opargs))
427 427 if pushop.remote.local():
428 428 missing = (set(pushop.repo.requirements)
429 429 - pushop.remote.local().supported)
430 430 if missing:
431 431 msg = _("required features are not"
432 432 " supported in the destination:"
433 433 " %s") % (', '.join(sorted(missing)))
434 434 raise error.Abort(msg)
435 435
436 436 if not pushop.remote.canpush():
437 437 raise error.Abort(_("destination does not support push"))
438 438
439 439 if not pushop.remote.capable('unbundle'):
440 440 raise error.Abort(_('cannot push: destination does not support the '
441 441 'unbundle wire protocol command'))
442 442
443 443 # get lock as we might write phase data
444 444 wlock = lock = None
445 445 try:
446 446 # bundle2 push may receive a reply bundle touching bookmarks or other
447 447 # things requiring the wlock. Take it now to ensure proper ordering.
448 448 maypushback = pushop.ui.configbool('experimental', 'bundle2.pushback')
449 449 if (not _forcebundle1(pushop)) and maypushback:
450 450 wlock = pushop.repo.wlock()
451 451 lock = pushop.repo.lock()
452 452 pushop.trmanager = transactionmanager(pushop.repo,
453 453 'push-response',
454 454 pushop.remote.url())
455 455 except IOError as err:
456 456 if err.errno != errno.EACCES:
457 457 raise
458 458 # source repo cannot be locked.
459 459 # We do not abort the push, but just disable the local phase
460 460 # synchronisation.
461 461 msg = 'cannot lock source repository: %s\n' % err
462 462 pushop.ui.debug(msg)
463 463
464 464 with wlock or util.nullcontextmanager(), \
465 465 lock or util.nullcontextmanager(), \
466 466 pushop.trmanager or util.nullcontextmanager():
467 467 pushop.repo.checkpush(pushop)
468 468 _pushdiscovery(pushop)
469 469 if not _forcebundle1(pushop):
470 470 _pushbundle2(pushop)
471 471 _pushchangeset(pushop)
472 472 _pushsyncphase(pushop)
473 473 _pushobsolete(pushop)
474 474 _pushbookmark(pushop)
475 475
476 476 return pushop
477 477
478 478 # list of steps to perform discovery before push
479 479 pushdiscoveryorder = []
480 480
481 481 # Mapping between step name and function
482 482 #
483 483 # This exists to help extensions wrap steps if necessary
484 484 pushdiscoverymapping = {}
485 485
486 486 def pushdiscovery(stepname):
487 487 """decorator for function performing discovery before push
488 488
489 489 The function is added to the step -> function mapping and appended to the
490 490 list of steps. Beware that decorated function will be added in order (this
491 491 may matter).
492 492
493 493 You can only use this decorator for a new step, if you want to wrap a step
494 494 from an extension, change the pushdiscovery dictionary directly."""
495 495 def dec(func):
496 496 assert stepname not in pushdiscoverymapping
497 497 pushdiscoverymapping[stepname] = func
498 498 pushdiscoveryorder.append(stepname)
499 499 return func
500 500 return dec
501 501
502 502 def _pushdiscovery(pushop):
503 503 """Run all discovery steps"""
504 504 for stepname in pushdiscoveryorder:
505 505 step = pushdiscoverymapping[stepname]
506 506 step(pushop)
507 507
508 508 @pushdiscovery('changeset')
509 509 def _pushdiscoverychangeset(pushop):
510 510 """discover the changeset that need to be pushed"""
511 511 fci = discovery.findcommonincoming
512 512 commoninc = fci(pushop.repo, pushop.remote, force=pushop.force)
513 513 common, inc, remoteheads = commoninc
514 514 fco = discovery.findcommonoutgoing
515 515 outgoing = fco(pushop.repo, pushop.remote, onlyheads=pushop.revs,
516 516 commoninc=commoninc, force=pushop.force)
517 517 pushop.outgoing = outgoing
518 518 pushop.remoteheads = remoteheads
519 519 pushop.incoming = inc
520 520
521 521 @pushdiscovery('phase')
522 522 def _pushdiscoveryphase(pushop):
523 523 """discover the phase that needs to be pushed
524 524
525 525 (computed for both success and failure case for changesets push)"""
526 526 outgoing = pushop.outgoing
527 527 unfi = pushop.repo.unfiltered()
528 528 remotephases = pushop.remote.listkeys('phases')
529 529 publishing = remotephases.get('publishing', False)
530 530 if (pushop.ui.configbool('ui', '_usedassubrepo')
531 531 and remotephases # server supports phases
532 532 and not pushop.outgoing.missing # no changesets to be pushed
533 533 and publishing):
534 534 # When:
535 535 # - this is a subrepo push
536 536 # - and remote support phase
537 537 # - and no changeset are to be pushed
538 538 # - and remote is publishing
539 539 # We may be in issue 3871 case!
540 540 # We drop the possible phase synchronisation done by
541 541 # courtesy to publish changesets possibly locally draft
542 542 # on the remote.
543 543 remotephases = {'publishing': 'True'}
544 544 ana = phases.analyzeremotephases(pushop.repo,
545 545 pushop.fallbackheads,
546 546 remotephases)
547 547 pheads, droots = ana
548 548 extracond = ''
549 549 if not publishing:
550 550 extracond = ' and public()'
551 551 revset = 'heads((%%ln::%%ln) %s)' % extracond
552 552 # Get the list of all revs draft on remote by public here.
553 553 # XXX Beware that revset break if droots is not strictly
554 554 # XXX root we may want to ensure it is but it is costly
555 555 fallback = list(unfi.set(revset, droots, pushop.fallbackheads))
556 556 if not outgoing.missing:
557 557 future = fallback
558 558 else:
559 559 # adds changeset we are going to push as draft
560 560 #
561 561 # should not be necessary for publishing server, but because of an
562 562 # issue fixed in xxxxx we have to do it anyway.
563 563 fdroots = list(unfi.set('roots(%ln + %ln::)',
564 564 outgoing.missing, droots))
565 565 fdroots = [f.node() for f in fdroots]
566 566 future = list(unfi.set(revset, fdroots, pushop.futureheads))
567 567 pushop.outdatedphases = future
568 568 pushop.fallbackoutdatedphases = fallback
569 569
570 570 @pushdiscovery('obsmarker')
571 571 def _pushdiscoveryobsmarkers(pushop):
572 572 if (obsolete.isenabled(pushop.repo, obsolete.exchangeopt)
573 573 and pushop.repo.obsstore
574 574 and 'obsolete' in pushop.remote.listkeys('namespaces')):
575 575 repo = pushop.repo
576 576 # very naive computation, that can be quite expensive on big repo.
577 577 # However: evolution is currently slow on them anyway.
578 578 nodes = (c.node() for c in repo.set('::%ln', pushop.futureheads))
579 579 pushop.outobsmarkers = pushop.repo.obsstore.relevantmarkers(nodes)
580 580
581 581 @pushdiscovery('bookmarks')
582 582 def _pushdiscoverybookmarks(pushop):
583 583 ui = pushop.ui
584 584 repo = pushop.repo.unfiltered()
585 585 remote = pushop.remote
586 586 ui.debug("checking for updated bookmarks\n")
587 587 ancestors = ()
588 588 if pushop.revs:
589 589 revnums = map(repo.changelog.rev, pushop.revs)
590 590 ancestors = repo.changelog.ancestors(revnums, inclusive=True)
591 591 remotebookmark = remote.listkeys('bookmarks')
592 592
593 593 explicit = set([repo._bookmarks.expandname(bookmark)
594 594 for bookmark in pushop.bookmarks])
595 595
596 596 remotebookmark = bookmod.unhexlifybookmarks(remotebookmark)
597 597 comp = bookmod.comparebookmarks(repo, repo._bookmarks, remotebookmark)
598 598
599 599 def safehex(x):
600 600 if x is None:
601 601 return x
602 602 return hex(x)
603 603
604 604 def hexifycompbookmarks(bookmarks):
605 605 for b, scid, dcid in bookmarks:
606 606 yield b, safehex(scid), safehex(dcid)
607 607
608 608 comp = [hexifycompbookmarks(marks) for marks in comp]
609 609 addsrc, adddst, advsrc, advdst, diverge, differ, invalid, same = comp
610 610
611 611 for b, scid, dcid in advsrc:
612 612 if b in explicit:
613 613 explicit.remove(b)
614 614 if not ancestors or repo[scid].rev() in ancestors:
615 615 pushop.outbookmarks.append((b, dcid, scid))
616 616 # search added bookmark
617 617 for b, scid, dcid in addsrc:
618 618 if b in explicit:
619 619 explicit.remove(b)
620 620 pushop.outbookmarks.append((b, '', scid))
621 621 # search for overwritten bookmark
622 622 for b, scid, dcid in list(advdst) + list(diverge) + list(differ):
623 623 if b in explicit:
624 624 explicit.remove(b)
625 625 pushop.outbookmarks.append((b, dcid, scid))
626 626 # search for bookmark to delete
627 627 for b, scid, dcid in adddst:
628 628 if b in explicit:
629 629 explicit.remove(b)
630 630 # treat as "deleted locally"
631 631 pushop.outbookmarks.append((b, dcid, ''))
632 632 # identical bookmarks shouldn't get reported
633 633 for b, scid, dcid in same:
634 634 if b in explicit:
635 635 explicit.remove(b)
636 636
637 637 if explicit:
638 638 explicit = sorted(explicit)
639 639 # we should probably list all of them
640 640 ui.warn(_('bookmark %s does not exist on the local '
641 641 'or remote repository!\n') % explicit[0])
642 642 pushop.bkresult = 2
643 643
644 644 pushop.outbookmarks.sort()
645 645
646 646 def _pushcheckoutgoing(pushop):
647 647 outgoing = pushop.outgoing
648 648 unfi = pushop.repo.unfiltered()
649 649 if not outgoing.missing:
650 650 # nothing to push
651 651 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
652 652 return False
653 653 # something to push
654 654 if not pushop.force:
655 655 # if repo.obsstore == False --> no obsolete
656 656 # then, save the iteration
657 657 if unfi.obsstore:
658 658 # this message are here for 80 char limit reason
659 659 mso = _("push includes obsolete changeset: %s!")
660 660 mspd = _("push includes phase-divergent changeset: %s!")
661 661 mscd = _("push includes content-divergent changeset: %s!")
662 662 mst = {"orphan": _("push includes orphan changeset: %s!"),
663 663 "phase-divergent": mspd,
664 664 "content-divergent": mscd}
665 665 # If we are to push if there is at least one
666 666 # obsolete or unstable changeset in missing, at
667 667 # least one of the missinghead will be obsolete or
668 668 # unstable. So checking heads only is ok
669 669 for node in outgoing.missingheads:
670 670 ctx = unfi[node]
671 671 if ctx.obsolete():
672 672 raise error.Abort(mso % ctx)
673 673 elif ctx.isunstable():
674 674 # TODO print more than one instability in the abort
675 675 # message
676 676 raise error.Abort(mst[ctx.instabilities()[0]] % ctx)
677 677
678 678 discovery.checkheads(pushop)
679 679 return True
680 680
681 681 # List of names of steps to perform for an outgoing bundle2, order matters.
682 682 b2partsgenorder = []
683 683
684 684 # Mapping between step name and function
685 685 #
686 686 # This exists to help extensions wrap steps if necessary
687 687 b2partsgenmapping = {}
688 688
689 689 def b2partsgenerator(stepname, idx=None):
690 690 """decorator for function generating bundle2 part
691 691
692 692 The function is added to the step -> function mapping and appended to the
693 693 list of steps. Beware that decorated functions will be added in order
694 694 (this may matter).
695 695
696 696 You can only use this decorator for new steps, if you want to wrap a step
697 697 from an extension, attack the b2partsgenmapping dictionary directly."""
698 698 def dec(func):
699 699 assert stepname not in b2partsgenmapping
700 700 b2partsgenmapping[stepname] = func
701 701 if idx is None:
702 702 b2partsgenorder.append(stepname)
703 703 else:
704 704 b2partsgenorder.insert(idx, stepname)
705 705 return func
706 706 return dec
707 707
708 708 def _pushb2ctxcheckheads(pushop, bundler):
709 709 """Generate race condition checking parts
710 710
711 711 Exists as an independent function to aid extensions
712 712 """
713 713 # * 'force' do not check for push race,
714 714 # * if we don't push anything, there are nothing to check.
715 715 if not pushop.force and pushop.outgoing.missingheads:
716 716 allowunrelated = 'related' in bundler.capabilities.get('checkheads', ())
717 717 emptyremote = pushop.pushbranchmap is None
718 718 if not allowunrelated or emptyremote:
719 719 bundler.newpart('check:heads', data=iter(pushop.remoteheads))
720 720 else:
721 721 affected = set()
722 722 for branch, heads in pushop.pushbranchmap.iteritems():
723 723 remoteheads, newheads, unsyncedheads, discardedheads = heads
724 724 if remoteheads is not None:
725 725 remote = set(remoteheads)
726 726 affected |= set(discardedheads) & remote
727 727 affected |= remote - set(newheads)
728 728 if affected:
729 729 data = iter(sorted(affected))
730 730 bundler.newpart('check:updated-heads', data=data)
731 731
732 732 @b2partsgenerator('changeset')
733 733 def _pushb2ctx(pushop, bundler):
734 734 """handle changegroup push through bundle2
735 735
736 736 addchangegroup result is stored in the ``pushop.cgresult`` attribute.
737 737 """
738 738 if 'changesets' in pushop.stepsdone:
739 739 return
740 740 pushop.stepsdone.add('changesets')
741 741 # Send known heads to the server for race detection.
742 742 if not _pushcheckoutgoing(pushop):
743 743 return
744 744 pushop.repo.prepushoutgoinghooks(pushop)
745 745
746 746 _pushb2ctxcheckheads(pushop, bundler)
747 747
748 748 b2caps = bundle2.bundle2caps(pushop.remote)
749 749 version = '01'
750 750 cgversions = b2caps.get('changegroup')
751 751 if cgversions: # 3.1 and 3.2 ship with an empty value
752 752 cgversions = [v for v in cgversions
753 753 if v in changegroup.supportedoutgoingversions(
754 754 pushop.repo)]
755 755 if not cgversions:
756 756 raise ValueError(_('no common changegroup version'))
757 757 version = max(cgversions)
758 758 cgstream = changegroup.makestream(pushop.repo, pushop.outgoing, version,
759 759 'push')
760 760 cgpart = bundler.newpart('changegroup', data=cgstream)
761 761 if cgversions:
762 762 cgpart.addparam('version', version)
763 763 if 'treemanifest' in pushop.repo.requirements:
764 764 cgpart.addparam('treemanifest', '1')
765 765 def handlereply(op):
766 766 """extract addchangegroup returns from server reply"""
767 767 cgreplies = op.records.getreplies(cgpart.id)
768 768 assert len(cgreplies['changegroup']) == 1
769 769 pushop.cgresult = cgreplies['changegroup'][0]['return']
770 770 return handlereply
771 771
772 772 @b2partsgenerator('phase')
773 773 def _pushb2phases(pushop, bundler):
774 774 """handle phase push through bundle2"""
775 775 if 'phases' in pushop.stepsdone:
776 776 return
777 777 b2caps = bundle2.bundle2caps(pushop.remote)
778 778 if not 'pushkey' in b2caps:
779 779 return
780 780 pushop.stepsdone.add('phases')
781 781 part2node = []
782 782
783 783 def handlefailure(pushop, exc):
784 784 targetid = int(exc.partid)
785 785 for partid, node in part2node:
786 786 if partid == targetid:
787 787 raise error.Abort(_('updating %s to public failed') % node)
788 788
789 789 enc = pushkey.encode
790 790 for newremotehead in pushop.outdatedphases:
791 791 part = bundler.newpart('pushkey')
792 792 part.addparam('namespace', enc('phases'))
793 793 part.addparam('key', enc(newremotehead.hex()))
794 794 part.addparam('old', enc('%d' % phases.draft))
795 795 part.addparam('new', enc('%d' % phases.public))
796 796 part2node.append((part.id, newremotehead))
797 797 pushop.pkfailcb[part.id] = handlefailure
798 798
799 799 def handlereply(op):
800 800 for partid, node in part2node:
801 801 partrep = op.records.getreplies(partid)
802 802 results = partrep['pushkey']
803 803 assert len(results) <= 1
804 804 msg = None
805 805 if not results:
806 806 msg = _('server ignored update of %s to public!\n') % node
807 807 elif not int(results[0]['return']):
808 808 msg = _('updating %s to public failed!\n') % node
809 809 if msg is not None:
810 810 pushop.ui.warn(msg)
811 811 return handlereply
812 812
813 813 @b2partsgenerator('obsmarkers')
814 814 def _pushb2obsmarkers(pushop, bundler):
815 815 if 'obsmarkers' in pushop.stepsdone:
816 816 return
817 817 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
818 818 if obsolete.commonversion(remoteversions) is None:
819 819 return
820 820 pushop.stepsdone.add('obsmarkers')
821 821 if pushop.outobsmarkers:
822 822 markers = sorted(pushop.outobsmarkers)
823 823 bundle2.buildobsmarkerspart(bundler, markers)
824 824
825 825 @b2partsgenerator('bookmarks')
826 826 def _pushb2bookmarks(pushop, bundler):
827 827 """handle bookmark push through bundle2"""
828 828 if 'bookmarks' in pushop.stepsdone:
829 829 return
830 830 b2caps = bundle2.bundle2caps(pushop.remote)
831 831 if 'pushkey' not in b2caps:
832 832 return
833 833 pushop.stepsdone.add('bookmarks')
834 834 part2book = []
835 835 enc = pushkey.encode
836 836
837 837 def handlefailure(pushop, exc):
838 838 targetid = int(exc.partid)
839 839 for partid, book, action in part2book:
840 840 if partid == targetid:
841 841 raise error.Abort(bookmsgmap[action][1].rstrip() % book)
842 842 # we should not be called for part we did not generated
843 843 assert False
844 844
845 845 for book, old, new in pushop.outbookmarks:
846 846 part = bundler.newpart('pushkey')
847 847 part.addparam('namespace', enc('bookmarks'))
848 848 part.addparam('key', enc(book))
849 849 part.addparam('old', enc(old))
850 850 part.addparam('new', enc(new))
851 851 action = 'update'
852 852 if not old:
853 853 action = 'export'
854 854 elif not new:
855 855 action = 'delete'
856 856 part2book.append((part.id, book, action))
857 857 pushop.pkfailcb[part.id] = handlefailure
858 858
859 859 def handlereply(op):
860 860 ui = pushop.ui
861 861 for partid, book, action in part2book:
862 862 partrep = op.records.getreplies(partid)
863 863 results = partrep['pushkey']
864 864 assert len(results) <= 1
865 865 if not results:
866 866 pushop.ui.warn(_('server ignored bookmark %s update\n') % book)
867 867 else:
868 868 ret = int(results[0]['return'])
869 869 if ret:
870 870 ui.status(bookmsgmap[action][0] % book)
871 871 else:
872 872 ui.warn(bookmsgmap[action][1] % book)
873 873 if pushop.bkresult is not None:
874 874 pushop.bkresult = 1
875 875 return handlereply
876 876
877 877 @b2partsgenerator('pushvars', idx=0)
878 878 def _getbundlesendvars(pushop, bundler):
879 879 '''send shellvars via bundle2'''
880 880 pushvars = pushop.pushvars
881 881 if pushvars:
882 882 shellvars = {}
883 883 for raw in pushvars:
884 884 if '=' not in raw:
885 885 msg = ("unable to parse variable '%s', should follow "
886 886 "'KEY=VALUE' or 'KEY=' format")
887 887 raise error.Abort(msg % raw)
888 888 k, v = raw.split('=', 1)
889 889 shellvars[k] = v
890 890
891 891 part = bundler.newpart('pushvars')
892 892
893 893 for key, value in shellvars.iteritems():
894 894 part.addparam(key, value, mandatory=False)
895 895
896 896 def _pushbundle2(pushop):
897 897 """push data to the remote using bundle2
898 898
899 899 The only currently supported type of data is changegroup but this will
900 900 evolve in the future."""
901 901 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
902 902 pushback = (pushop.trmanager
903 903 and pushop.ui.configbool('experimental', 'bundle2.pushback'))
904 904
905 905 # create reply capability
906 906 capsblob = bundle2.encodecaps(bundle2.getrepocaps(pushop.repo,
907 907 allowpushback=pushback))
908 908 bundler.newpart('replycaps', data=capsblob)
909 909 replyhandlers = []
910 910 for partgenname in b2partsgenorder:
911 911 partgen = b2partsgenmapping[partgenname]
912 912 ret = partgen(pushop, bundler)
913 913 if callable(ret):
914 914 replyhandlers.append(ret)
915 915 # do not push if nothing to push
916 916 if bundler.nbparts <= 1:
917 917 return
918 918 stream = util.chunkbuffer(bundler.getchunks())
919 919 try:
920 920 try:
921 921 reply = pushop.remote.unbundle(
922 922 stream, ['force'], pushop.remote.url())
923 923 except error.BundleValueError as exc:
924 924 raise error.Abort(_('missing support for %s') % exc)
925 925 try:
926 926 trgetter = None
927 927 if pushback:
928 928 trgetter = pushop.trmanager.transaction
929 929 op = bundle2.processbundle(pushop.repo, reply, trgetter)
930 930 except error.BundleValueError as exc:
931 931 raise error.Abort(_('missing support for %s') % exc)
932 932 except bundle2.AbortFromPart as exc:
933 933 pushop.ui.status(_('remote: %s\n') % exc)
934 934 if exc.hint is not None:
935 935 pushop.ui.status(_('remote: %s\n') % ('(%s)' % exc.hint))
936 936 raise error.Abort(_('push failed on remote'))
937 937 except error.PushkeyFailed as exc:
938 938 partid = int(exc.partid)
939 939 if partid not in pushop.pkfailcb:
940 940 raise
941 941 pushop.pkfailcb[partid](pushop, exc)
942 942 for rephand in replyhandlers:
943 943 rephand(op)
944 944
945 945 def _pushchangeset(pushop):
946 946 """Make the actual push of changeset bundle to remote repo"""
947 947 if 'changesets' in pushop.stepsdone:
948 948 return
949 949 pushop.stepsdone.add('changesets')
950 950 if not _pushcheckoutgoing(pushop):
951 951 return
952 952
953 953 # Should have verified this in push().
954 954 assert pushop.remote.capable('unbundle')
955 955
956 956 pushop.repo.prepushoutgoinghooks(pushop)
957 957 outgoing = pushop.outgoing
958 958 # TODO: get bundlecaps from remote
959 959 bundlecaps = None
960 960 # create a changegroup from local
961 961 if pushop.revs is None and not (outgoing.excluded
962 962 or pushop.repo.changelog.filteredrevs):
963 963 # push everything,
964 964 # use the fast path, no race possible on push
965 965 cg = changegroup.makechangegroup(pushop.repo, outgoing, '01', 'push',
966 966 fastpath=True, bundlecaps=bundlecaps)
967 967 else:
968 968 cg = changegroup.makechangegroup(pushop.repo, outgoing, '01',
969 969 'push', bundlecaps=bundlecaps)
970 970
971 971 # apply changegroup to remote
972 972 # local repo finds heads on server, finds out what
973 973 # revs it must push. once revs transferred, if server
974 974 # finds it has different heads (someone else won
975 975 # commit/push race), server aborts.
976 976 if pushop.force:
977 977 remoteheads = ['force']
978 978 else:
979 979 remoteheads = pushop.remoteheads
980 980 # ssh: return remote's addchangegroup()
981 981 # http: return remote's addchangegroup() or 0 for error
982 982 pushop.cgresult = pushop.remote.unbundle(cg, remoteheads,
983 983 pushop.repo.url())
984 984
985 985 def _pushsyncphase(pushop):
986 986 """synchronise phase information locally and remotely"""
987 987 cheads = pushop.commonheads
988 988 # even when we don't push, exchanging phase data is useful
989 989 remotephases = pushop.remote.listkeys('phases')
990 990 if (pushop.ui.configbool('ui', '_usedassubrepo')
991 991 and remotephases # server supports phases
992 992 and pushop.cgresult is None # nothing was pushed
993 993 and remotephases.get('publishing', False)):
994 994 # When:
995 995 # - this is a subrepo push
996 996 # - and remote support phase
997 997 # - and no changeset was pushed
998 998 # - and remote is publishing
999 999 # We may be in issue 3871 case!
1000 1000 # We drop the possible phase synchronisation done by
1001 1001 # courtesy to publish changesets possibly locally draft
1002 1002 # on the remote.
1003 1003 remotephases = {'publishing': 'True'}
1004 1004 if not remotephases: # old server or public only reply from non-publishing
1005 1005 _localphasemove(pushop, cheads)
1006 1006 # don't push any phase data as there is nothing to push
1007 1007 else:
1008 1008 ana = phases.analyzeremotephases(pushop.repo, cheads,
1009 1009 remotephases)
1010 1010 pheads, droots = ana
1011 1011 ### Apply remote phase on local
1012 1012 if remotephases.get('publishing', False):
1013 1013 _localphasemove(pushop, cheads)
1014 1014 else: # publish = False
1015 1015 _localphasemove(pushop, pheads)
1016 1016 _localphasemove(pushop, cheads, phases.draft)
1017 1017 ### Apply local phase on remote
1018 1018
1019 1019 if pushop.cgresult:
1020 1020 if 'phases' in pushop.stepsdone:
1021 1021 # phases already pushed though bundle2
1022 1022 return
1023 1023 outdated = pushop.outdatedphases
1024 1024 else:
1025 1025 outdated = pushop.fallbackoutdatedphases
1026 1026
1027 1027 pushop.stepsdone.add('phases')
1028 1028
1029 1029 # filter heads already turned public by the push
1030 1030 outdated = [c for c in outdated if c.node() not in pheads]
1031 1031 # fallback to independent pushkey command
1032 1032 for newremotehead in outdated:
1033 1033 r = pushop.remote.pushkey('phases',
1034 1034 newremotehead.hex(),
1035 1035 str(phases.draft),
1036 1036 str(phases.public))
1037 1037 if not r:
1038 1038 pushop.ui.warn(_('updating %s to public failed!\n')
1039 1039 % newremotehead)
1040 1040
1041 1041 def _localphasemove(pushop, nodes, phase=phases.public):
1042 1042 """move <nodes> to <phase> in the local source repo"""
1043 1043 if pushop.trmanager:
1044 1044 phases.advanceboundary(pushop.repo,
1045 1045 pushop.trmanager.transaction(),
1046 1046 phase,
1047 1047 nodes)
1048 1048 else:
1049 1049 # repo is not locked, do not change any phases!
1050 1050 # Informs the user that phases should have been moved when
1051 1051 # applicable.
1052 1052 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
1053 1053 phasestr = phases.phasenames[phase]
1054 1054 if actualmoves:
1055 1055 pushop.ui.status(_('cannot lock source repo, skipping '
1056 1056 'local %s phase update\n') % phasestr)
1057 1057
1058 1058 def _pushobsolete(pushop):
1059 1059 """utility function to push obsolete markers to a remote"""
1060 1060 if 'obsmarkers' in pushop.stepsdone:
1061 1061 return
1062 1062 repo = pushop.repo
1063 1063 remote = pushop.remote
1064 1064 pushop.stepsdone.add('obsmarkers')
1065 1065 if pushop.outobsmarkers:
1066 1066 pushop.ui.debug('try to push obsolete markers to remote\n')
1067 1067 rslts = []
1068 1068 remotedata = obsolete._pushkeyescape(sorted(pushop.outobsmarkers))
1069 1069 for key in sorted(remotedata, reverse=True):
1070 1070 # reverse sort to ensure we end with dump0
1071 1071 data = remotedata[key]
1072 1072 rslts.append(remote.pushkey('obsolete', key, '', data))
1073 1073 if [r for r in rslts if not r]:
1074 1074 msg = _('failed to push some obsolete markers!\n')
1075 1075 repo.ui.warn(msg)
1076 1076
1077 1077 def _pushbookmark(pushop):
1078 1078 """Update bookmark position on remote"""
1079 1079 if pushop.cgresult == 0 or 'bookmarks' in pushop.stepsdone:
1080 1080 return
1081 1081 pushop.stepsdone.add('bookmarks')
1082 1082 ui = pushop.ui
1083 1083 remote = pushop.remote
1084 1084
1085 1085 for b, old, new in pushop.outbookmarks:
1086 1086 action = 'update'
1087 1087 if not old:
1088 1088 action = 'export'
1089 1089 elif not new:
1090 1090 action = 'delete'
1091 1091 if remote.pushkey('bookmarks', b, old, new):
1092 1092 ui.status(bookmsgmap[action][0] % b)
1093 1093 else:
1094 1094 ui.warn(bookmsgmap[action][1] % b)
1095 1095 # discovery can have set the value form invalid entry
1096 1096 if pushop.bkresult is not None:
1097 1097 pushop.bkresult = 1
1098 1098
1099 1099 class pulloperation(object):
1100 1100 """A object that represent a single pull operation
1101 1101
1102 1102 It purpose is to carry pull related state and very common operation.
1103 1103
1104 1104 A new should be created at the beginning of each pull and discarded
1105 1105 afterward.
1106 1106 """
1107 1107
1108 1108 def __init__(self, repo, remote, heads=None, force=False, bookmarks=(),
1109 1109 remotebookmarks=None, streamclonerequested=None):
1110 1110 # repo we pull into
1111 1111 self.repo = repo
1112 1112 # repo we pull from
1113 1113 self.remote = remote
1114 1114 # revision we try to pull (None is "all")
1115 1115 self.heads = heads
1116 1116 # bookmark pulled explicitly
1117 1117 self.explicitbookmarks = [repo._bookmarks.expandname(bookmark)
1118 1118 for bookmark in bookmarks]
1119 1119 # do we force pull?
1120 1120 self.force = force
1121 1121 # whether a streaming clone was requested
1122 1122 self.streamclonerequested = streamclonerequested
1123 1123 # transaction manager
1124 1124 self.trmanager = None
1125 1125 # set of common changeset between local and remote before pull
1126 1126 self.common = None
1127 1127 # set of pulled head
1128 1128 self.rheads = None
1129 1129 # list of missing changeset to fetch remotely
1130 1130 self.fetch = None
1131 1131 # remote bookmarks data
1132 1132 self.remotebookmarks = remotebookmarks
1133 1133 # result of changegroup pulling (used as return code by pull)
1134 1134 self.cgresult = None
1135 1135 # list of step already done
1136 1136 self.stepsdone = set()
1137 1137 # Whether we attempted a clone from pre-generated bundles.
1138 1138 self.clonebundleattempted = False
1139 1139
1140 1140 @util.propertycache
1141 1141 def pulledsubset(self):
1142 1142 """heads of the set of changeset target by the pull"""
1143 1143 # compute target subset
1144 1144 if self.heads is None:
1145 1145 # We pulled every thing possible
1146 1146 # sync on everything common
1147 1147 c = set(self.common)
1148 1148 ret = list(self.common)
1149 1149 for n in self.rheads:
1150 1150 if n not in c:
1151 1151 ret.append(n)
1152 1152 return ret
1153 1153 else:
1154 1154 # We pulled a specific subset
1155 1155 # sync on this subset
1156 1156 return self.heads
1157 1157
1158 1158 @util.propertycache
1159 1159 def canusebundle2(self):
1160 1160 return not _forcebundle1(self)
1161 1161
1162 1162 @util.propertycache
1163 1163 def remotebundle2caps(self):
1164 1164 return bundle2.bundle2caps(self.remote)
1165 1165
1166 1166 def gettransaction(self):
1167 1167 # deprecated; talk to trmanager directly
1168 1168 return self.trmanager.transaction()
1169 1169
1170 1170 class transactionmanager(util.transactional):
1171 1171 """An object to manage the life cycle of a transaction
1172 1172
1173 1173 It creates the transaction on demand and calls the appropriate hooks when
1174 1174 closing the transaction."""
1175 1175 def __init__(self, repo, source, url):
1176 1176 self.repo = repo
1177 1177 self.source = source
1178 1178 self.url = url
1179 1179 self._tr = None
1180 1180
1181 1181 def transaction(self):
1182 1182 """Return an open transaction object, constructing if necessary"""
1183 1183 if not self._tr:
1184 1184 trname = '%s\n%s' % (self.source, util.hidepassword(self.url))
1185 1185 self._tr = self.repo.transaction(trname)
1186 1186 self._tr.hookargs['source'] = self.source
1187 1187 self._tr.hookargs['url'] = self.url
1188 1188 return self._tr
1189 1189
1190 1190 def close(self):
1191 1191 """close transaction if created"""
1192 1192 if self._tr is not None:
1193 1193 self._tr.close()
1194 1194
1195 1195 def release(self):
1196 1196 """release transaction if created"""
1197 1197 if self._tr is not None:
1198 1198 self._tr.release()
1199 1199
1200 1200 def pull(repo, remote, heads=None, force=False, bookmarks=(), opargs=None,
1201 1201 streamclonerequested=None):
1202 1202 """Fetch repository data from a remote.
1203 1203
1204 1204 This is the main function used to retrieve data from a remote repository.
1205 1205
1206 1206 ``repo`` is the local repository to clone into.
1207 1207 ``remote`` is a peer instance.
1208 1208 ``heads`` is an iterable of revisions we want to pull. ``None`` (the
1209 1209 default) means to pull everything from the remote.
1210 1210 ``bookmarks`` is an iterable of bookmarks requesting to be pulled. By
1211 1211 default, all remote bookmarks are pulled.
1212 1212 ``opargs`` are additional keyword arguments to pass to ``pulloperation``
1213 1213 initialization.
1214 1214 ``streamclonerequested`` is a boolean indicating whether a "streaming
1215 1215 clone" is requested. A "streaming clone" is essentially a raw file copy
1216 1216 of revlogs from the server. This only works when the local repository is
1217 1217 empty. The default value of ``None`` means to respect the server
1218 1218 configuration for preferring stream clones.
1219 1219
1220 1220 Returns the ``pulloperation`` created for this pull.
1221 1221 """
1222 1222 if opargs is None:
1223 1223 opargs = {}
1224 1224 pullop = pulloperation(repo, remote, heads, force, bookmarks=bookmarks,
1225 1225 streamclonerequested=streamclonerequested, **opargs)
1226 1226
1227 1227 peerlocal = pullop.remote.local()
1228 1228 if peerlocal:
1229 1229 missing = set(peerlocal.requirements) - pullop.repo.supported
1230 1230 if missing:
1231 1231 msg = _("required features are not"
1232 1232 " supported in the destination:"
1233 1233 " %s") % (', '.join(sorted(missing)))
1234 1234 raise error.Abort(msg)
1235 1235
1236 1236 wlock = lock = None
1237 1237 try:
1238 1238 wlock = pullop.repo.wlock()
1239 1239 lock = pullop.repo.lock()
1240 1240 pullop.trmanager = transactionmanager(repo, 'pull', remote.url())
1241 1241 streamclone.maybeperformlegacystreamclone(pullop)
1242 1242 # This should ideally be in _pullbundle2(). However, it needs to run
1243 1243 # before discovery to avoid extra work.
1244 1244 _maybeapplyclonebundle(pullop)
1245 1245 _pulldiscovery(pullop)
1246 1246 if pullop.canusebundle2:
1247 1247 _pullbundle2(pullop)
1248 1248 _pullchangeset(pullop)
1249 1249 _pullphase(pullop)
1250 1250 _pullbookmarks(pullop)
1251 1251 _pullobsolete(pullop)
1252 1252 pullop.trmanager.close()
1253 1253 finally:
1254 1254 lockmod.release(pullop.trmanager, lock, wlock)
1255 1255
1256 1256 return pullop
1257 1257
1258 1258 # list of steps to perform discovery before pull
1259 1259 pulldiscoveryorder = []
1260 1260
1261 1261 # Mapping between step name and function
1262 1262 #
1263 1263 # This exists to help extensions wrap steps if necessary
1264 1264 pulldiscoverymapping = {}
1265 1265
1266 1266 def pulldiscovery(stepname):
1267 1267 """decorator for function performing discovery before pull
1268 1268
1269 1269 The function is added to the step -> function mapping and appended to the
1270 1270 list of steps. Beware that decorated function will be added in order (this
1271 1271 may matter).
1272 1272
1273 1273 You can only use this decorator for a new step, if you want to wrap a step
1274 1274 from an extension, change the pulldiscovery dictionary directly."""
1275 1275 def dec(func):
1276 1276 assert stepname not in pulldiscoverymapping
1277 1277 pulldiscoverymapping[stepname] = func
1278 1278 pulldiscoveryorder.append(stepname)
1279 1279 return func
1280 1280 return dec
1281 1281
1282 1282 def _pulldiscovery(pullop):
1283 1283 """Run all discovery steps"""
1284 1284 for stepname in pulldiscoveryorder:
1285 1285 step = pulldiscoverymapping[stepname]
1286 1286 step(pullop)
1287 1287
1288 1288 @pulldiscovery('b1:bookmarks')
1289 1289 def _pullbookmarkbundle1(pullop):
1290 1290 """fetch bookmark data in bundle1 case
1291 1291
1292 1292 If not using bundle2, we have to fetch bookmarks before changeset
1293 1293 discovery to reduce the chance and impact of race conditions."""
1294 1294 if pullop.remotebookmarks is not None:
1295 1295 return
1296 1296 if pullop.canusebundle2 and 'listkeys' in pullop.remotebundle2caps:
1297 1297 # all known bundle2 servers now support listkeys, but lets be nice with
1298 1298 # new implementation.
1299 1299 return
1300 1300 pullop.remotebookmarks = pullop.remote.listkeys('bookmarks')
1301 1301
1302 1302
1303 1303 @pulldiscovery('changegroup')
1304 1304 def _pulldiscoverychangegroup(pullop):
1305 1305 """discovery phase for the pull
1306 1306
1307 1307 Current handle changeset discovery only, will change handle all discovery
1308 1308 at some point."""
1309 1309 tmp = discovery.findcommonincoming(pullop.repo,
1310 1310 pullop.remote,
1311 1311 heads=pullop.heads,
1312 1312 force=pullop.force)
1313 1313 common, fetch, rheads = tmp
1314 1314 nm = pullop.repo.unfiltered().changelog.nodemap
1315 1315 if fetch and rheads:
1316 1316 # If a remote heads in filtered locally, lets drop it from the unknown
1317 1317 # remote heads and put in back in common.
1318 1318 #
1319 1319 # This is a hackish solution to catch most of "common but locally
1320 1320 # hidden situation". We do not performs discovery on unfiltered
1321 1321 # repository because it end up doing a pathological amount of round
1322 1322 # trip for w huge amount of changeset we do not care about.
1323 1323 #
1324 1324 # If a set of such "common but filtered" changeset exist on the server
1325 1325 # but are not including a remote heads, we'll not be able to detect it,
1326 1326 scommon = set(common)
1327 1327 filteredrheads = []
1328 1328 for n in rheads:
1329 1329 if n in nm:
1330 1330 if n not in scommon:
1331 1331 common.append(n)
1332 1332 else:
1333 1333 filteredrheads.append(n)
1334 1334 if not filteredrheads:
1335 1335 fetch = []
1336 1336 rheads = filteredrheads
1337 1337 pullop.common = common
1338 1338 pullop.fetch = fetch
1339 1339 pullop.rheads = rheads
1340 1340
1341 1341 def _pullbundle2(pullop):
1342 1342 """pull data using bundle2
1343 1343
1344 1344 For now, the only supported data are changegroup."""
1345 1345 kwargs = {'bundlecaps': caps20to10(pullop.repo)}
1346 1346
1347 1347 # At the moment we don't do stream clones over bundle2. If that is
1348 1348 # implemented then here's where the check for that will go.
1349 1349 streaming = False
1350 1350
1351 1351 # pulling changegroup
1352 1352 pullop.stepsdone.add('changegroup')
1353 1353
1354 1354 kwargs['common'] = pullop.common
1355 1355 kwargs['heads'] = pullop.heads or pullop.rheads
1356 1356 kwargs['cg'] = pullop.fetch
1357 1357 if 'listkeys' in pullop.remotebundle2caps:
1358 1358 kwargs['listkeys'] = ['phases']
1359 1359 if pullop.remotebookmarks is None:
1360 1360 # make sure to always includes bookmark data when migrating
1361 1361 # `hg incoming --bundle` to using this function.
1362 1362 kwargs['listkeys'].append('bookmarks')
1363 1363
1364 1364 # If this is a full pull / clone and the server supports the clone bundles
1365 1365 # feature, tell the server whether we attempted a clone bundle. The
1366 1366 # presence of this flag indicates the client supports clone bundles. This
1367 1367 # will enable the server to treat clients that support clone bundles
1368 1368 # differently from those that don't.
1369 1369 if (pullop.remote.capable('clonebundles')
1370 1370 and pullop.heads is None and list(pullop.common) == [nullid]):
1371 1371 kwargs['cbattempted'] = pullop.clonebundleattempted
1372 1372
1373 1373 if streaming:
1374 1374 pullop.repo.ui.status(_('streaming all changes\n'))
1375 1375 elif not pullop.fetch:
1376 1376 pullop.repo.ui.status(_("no changes found\n"))
1377 1377 pullop.cgresult = 0
1378 1378 else:
1379 1379 if pullop.heads is None and list(pullop.common) == [nullid]:
1380 1380 pullop.repo.ui.status(_("requesting all changes\n"))
1381 1381 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1382 1382 remoteversions = bundle2.obsmarkersversion(pullop.remotebundle2caps)
1383 1383 if obsolete.commonversion(remoteversions) is not None:
1384 1384 kwargs['obsmarkers'] = True
1385 1385 pullop.stepsdone.add('obsmarkers')
1386 1386 _pullbundle2extraprepare(pullop, kwargs)
1387 1387 bundle = pullop.remote.getbundle('pull', **pycompat.strkwargs(kwargs))
1388 1388 try:
1389 1389 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
1390 1390 except bundle2.AbortFromPart as exc:
1391 1391 pullop.repo.ui.status(_('remote: abort: %s\n') % exc)
1392 1392 raise error.Abort(_('pull failed on remote'), hint=exc.hint)
1393 1393 except error.BundleValueError as exc:
1394 1394 raise error.Abort(_('missing support for %s') % exc)
1395 1395
1396 1396 if pullop.fetch:
1397 1397 pullop.cgresult = bundle2.combinechangegroupresults(op)
1398 1398
1399 1399 # If the bundle had a phase-heads part, then phase exchange is already done
1400 1400 if op.records['phase-heads']:
1401 1401 pullop.stepsdone.add('phases')
1402 1402
1403 1403 # processing phases change
1404 1404 for namespace, value in op.records['listkeys']:
1405 1405 if namespace == 'phases':
1406 1406 _pullapplyphases(pullop, value)
1407 1407
1408 1408 # processing bookmark update
1409 1409 for namespace, value in op.records['listkeys']:
1410 1410 if namespace == 'bookmarks':
1411 1411 pullop.remotebookmarks = value
1412 1412
1413 1413 # bookmark data were either already there or pulled in the bundle
1414 1414 if pullop.remotebookmarks is not None:
1415 1415 _pullbookmarks(pullop)
1416 1416
1417 1417 def _pullbundle2extraprepare(pullop, kwargs):
1418 1418 """hook function so that extensions can extend the getbundle call"""
1419 1419 pass
1420 1420
1421 1421 def _pullchangeset(pullop):
1422 1422 """pull changeset from unbundle into the local repo"""
1423 1423 # We delay the open of the transaction as late as possible so we
1424 1424 # don't open transaction for nothing or you break future useful
1425 1425 # rollback call
1426 1426 if 'changegroup' in pullop.stepsdone:
1427 1427 return
1428 1428 pullop.stepsdone.add('changegroup')
1429 1429 if not pullop.fetch:
1430 1430 pullop.repo.ui.status(_("no changes found\n"))
1431 1431 pullop.cgresult = 0
1432 1432 return
1433 1433 tr = pullop.gettransaction()
1434 1434 if pullop.heads is None and list(pullop.common) == [nullid]:
1435 1435 pullop.repo.ui.status(_("requesting all changes\n"))
1436 1436 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
1437 1437 # issue1320, avoid a race if remote changed after discovery
1438 1438 pullop.heads = pullop.rheads
1439 1439
1440 1440 if pullop.remote.capable('getbundle'):
1441 1441 # TODO: get bundlecaps from remote
1442 1442 cg = pullop.remote.getbundle('pull', common=pullop.common,
1443 1443 heads=pullop.heads or pullop.rheads)
1444 1444 elif pullop.heads is None:
1445 1445 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
1446 1446 elif not pullop.remote.capable('changegroupsubset'):
1447 1447 raise error.Abort(_("partial pull cannot be done because "
1448 1448 "other repository doesn't support "
1449 1449 "changegroupsubset."))
1450 1450 else:
1451 1451 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
1452 1452 bundleop = bundle2.applybundle(pullop.repo, cg, tr, 'pull',
1453 1453 pullop.remote.url())
1454 1454 pullop.cgresult = bundle2.combinechangegroupresults(bundleop)
1455 1455
1456 1456 def _pullphase(pullop):
1457 1457 # Get remote phases data from remote
1458 1458 if 'phases' in pullop.stepsdone:
1459 1459 return
1460 1460 remotephases = pullop.remote.listkeys('phases')
1461 1461 _pullapplyphases(pullop, remotephases)
1462 1462
1463 1463 def _pullapplyphases(pullop, remotephases):
1464 1464 """apply phase movement from observed remote state"""
1465 1465 if 'phases' in pullop.stepsdone:
1466 1466 return
1467 1467 pullop.stepsdone.add('phases')
1468 1468 publishing = bool(remotephases.get('publishing', False))
1469 1469 if remotephases and not publishing:
1470 1470 # remote is new and non-publishing
1471 1471 pheads, _dr = phases.analyzeremotephases(pullop.repo,
1472 1472 pullop.pulledsubset,
1473 1473 remotephases)
1474 1474 dheads = pullop.pulledsubset
1475 1475 else:
1476 1476 # Remote is old or publishing all common changesets
1477 1477 # should be seen as public
1478 1478 pheads = pullop.pulledsubset
1479 1479 dheads = []
1480 1480 unfi = pullop.repo.unfiltered()
1481 1481 phase = unfi._phasecache.phase
1482 1482 rev = unfi.changelog.nodemap.get
1483 1483 public = phases.public
1484 1484 draft = phases.draft
1485 1485
1486 1486 # exclude changesets already public locally and update the others
1487 1487 pheads = [pn for pn in pheads if phase(unfi, rev(pn)) > public]
1488 1488 if pheads:
1489 1489 tr = pullop.gettransaction()
1490 1490 phases.advanceboundary(pullop.repo, tr, public, pheads)
1491 1491
1492 1492 # exclude changesets already draft locally and update the others
1493 1493 dheads = [pn for pn in dheads if phase(unfi, rev(pn)) > draft]
1494 1494 if dheads:
1495 1495 tr = pullop.gettransaction()
1496 1496 phases.advanceboundary(pullop.repo, tr, draft, dheads)
1497 1497
1498 1498 def _pullbookmarks(pullop):
1499 1499 """process the remote bookmark information to update the local one"""
1500 1500 if 'bookmarks' in pullop.stepsdone:
1501 1501 return
1502 1502 pullop.stepsdone.add('bookmarks')
1503 1503 repo = pullop.repo
1504 1504 remotebookmarks = pullop.remotebookmarks
1505 1505 remotebookmarks = bookmod.unhexlifybookmarks(remotebookmarks)
1506 1506 bookmod.updatefromremote(repo.ui, repo, remotebookmarks,
1507 1507 pullop.remote.url(),
1508 1508 pullop.gettransaction,
1509 1509 explicit=pullop.explicitbookmarks)
1510 1510
1511 1511 def _pullobsolete(pullop):
1512 1512 """utility function to pull obsolete markers from a remote
1513 1513
1514 1514 The `gettransaction` is function that return the pull transaction, creating
1515 1515 one if necessary. We return the transaction to inform the calling code that
1516 1516 a new transaction have been created (when applicable).
1517 1517
1518 1518 Exists mostly to allow overriding for experimentation purpose"""
1519 1519 if 'obsmarkers' in pullop.stepsdone:
1520 1520 return
1521 1521 pullop.stepsdone.add('obsmarkers')
1522 1522 tr = None
1523 1523 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1524 1524 pullop.repo.ui.debug('fetching remote obsolete markers\n')
1525 1525 remoteobs = pullop.remote.listkeys('obsolete')
1526 1526 if 'dump0' in remoteobs:
1527 1527 tr = pullop.gettransaction()
1528 1528 markers = []
1529 1529 for key in sorted(remoteobs, reverse=True):
1530 1530 if key.startswith('dump'):
1531 1531 data = util.b85decode(remoteobs[key])
1532 1532 version, newmarks = obsolete._readmarkers(data)
1533 1533 markers += newmarks
1534 1534 if markers:
1535 1535 pullop.repo.obsstore.add(tr, markers)
1536 1536 pullop.repo.invalidatevolatilesets()
1537 1537 return tr
1538 1538
1539 1539 def caps20to10(repo):
1540 1540 """return a set with appropriate options to use bundle20 during getbundle"""
1541 1541 caps = {'HG20'}
1542 1542 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
1543 1543 caps.add('bundle2=' + urlreq.quote(capsblob))
1544 1544 return caps
1545 1545
1546 1546 # List of names of steps to perform for a bundle2 for getbundle, order matters.
1547 1547 getbundle2partsorder = []
1548 1548
1549 1549 # Mapping between step name and function
1550 1550 #
1551 1551 # This exists to help extensions wrap steps if necessary
1552 1552 getbundle2partsmapping = {}
1553 1553
1554 1554 def getbundle2partsgenerator(stepname, idx=None):
1555 1555 """decorator for function generating bundle2 part for getbundle
1556 1556
1557 1557 The function is added to the step -> function mapping and appended to the
1558 1558 list of steps. Beware that decorated functions will be added in order
1559 1559 (this may matter).
1560 1560
1561 1561 You can only use this decorator for new steps, if you want to wrap a step
1562 1562 from an extension, attack the getbundle2partsmapping dictionary directly."""
1563 1563 def dec(func):
1564 1564 assert stepname not in getbundle2partsmapping
1565 1565 getbundle2partsmapping[stepname] = func
1566 1566 if idx is None:
1567 1567 getbundle2partsorder.append(stepname)
1568 1568 else:
1569 1569 getbundle2partsorder.insert(idx, stepname)
1570 1570 return func
1571 1571 return dec
1572 1572
1573 1573 def bundle2requested(bundlecaps):
1574 1574 if bundlecaps is not None:
1575 1575 return any(cap.startswith('HG2') for cap in bundlecaps)
1576 1576 return False
1577 1577
1578 1578 def getbundlechunks(repo, source, heads=None, common=None, bundlecaps=None,
1579 1579 **kwargs):
1580 1580 """Return chunks constituting a bundle's raw data.
1581 1581
1582 1582 Could be a bundle HG10 or a bundle HG20 depending on bundlecaps
1583 1583 passed.
1584 1584
1585 1585 Returns an iterator over raw chunks (of varying sizes).
1586 1586 """
1587 1587 kwargs = pycompat.byteskwargs(kwargs)
1588 1588 usebundle2 = bundle2requested(bundlecaps)
1589 1589 # bundle10 case
1590 1590 if not usebundle2:
1591 1591 if bundlecaps and not kwargs.get('cg', True):
1592 1592 raise ValueError(_('request for bundle10 must include changegroup'))
1593 1593
1594 1594 if kwargs:
1595 1595 raise ValueError(_('unsupported getbundle arguments: %s')
1596 1596 % ', '.join(sorted(kwargs.keys())))
1597 1597 outgoing = _computeoutgoing(repo, heads, common)
1598 1598 return changegroup.makestream(repo, outgoing, '01', source,
1599 1599 bundlecaps=bundlecaps)
1600 1600
1601 1601 # bundle20 case
1602 1602 b2caps = {}
1603 1603 for bcaps in bundlecaps:
1604 1604 if bcaps.startswith('bundle2='):
1605 1605 blob = urlreq.unquote(bcaps[len('bundle2='):])
1606 1606 b2caps.update(bundle2.decodecaps(blob))
1607 1607 bundler = bundle2.bundle20(repo.ui, b2caps)
1608 1608
1609 1609 kwargs['heads'] = heads
1610 1610 kwargs['common'] = common
1611 1611
1612 1612 for name in getbundle2partsorder:
1613 1613 func = getbundle2partsmapping[name]
1614 1614 func(bundler, repo, source, bundlecaps=bundlecaps, b2caps=b2caps,
1615 1615 **pycompat.strkwargs(kwargs))
1616 1616
1617 1617 return bundler.getchunks()
1618 1618
1619 1619 @getbundle2partsgenerator('changegroup')
1620 1620 def _getbundlechangegrouppart(bundler, repo, source, bundlecaps=None,
1621 1621 b2caps=None, heads=None, common=None, **kwargs):
1622 1622 """add a changegroup part to the requested bundle"""
1623 1623 cgstream = None
1624 1624 if kwargs.get('cg', True):
1625 1625 # build changegroup bundle here.
1626 1626 version = '01'
1627 1627 cgversions = b2caps.get('changegroup')
1628 1628 if cgversions: # 3.1 and 3.2 ship with an empty value
1629 1629 cgversions = [v for v in cgversions
1630 1630 if v in changegroup.supportedoutgoingversions(repo)]
1631 1631 if not cgversions:
1632 1632 raise ValueError(_('no common changegroup version'))
1633 1633 version = max(cgversions)
1634 1634 outgoing = _computeoutgoing(repo, heads, common)
1635 1635 if outgoing.missing:
1636 1636 cgstream = changegroup.makestream(repo, outgoing, version, source,
1637 1637 bundlecaps=bundlecaps)
1638 1638
1639 1639 if cgstream:
1640 1640 part = bundler.newpart('changegroup', data=cgstream)
1641 1641 if cgversions:
1642 1642 part.addparam('version', version)
1643 part.addparam('nbchanges', str(len(outgoing.missing)), mandatory=False)
1643 part.addparam('nbchanges', '%d' % len(outgoing.missing),
1644 mandatory=False)
1644 1645 if 'treemanifest' in repo.requirements:
1645 1646 part.addparam('treemanifest', '1')
1646 1647
1647 1648 @getbundle2partsgenerator('listkeys')
1648 1649 def _getbundlelistkeysparts(bundler, repo, source, bundlecaps=None,
1649 1650 b2caps=None, **kwargs):
1650 1651 """add parts containing listkeys namespaces to the requested bundle"""
1651 1652 listkeys = kwargs.get('listkeys', ())
1652 1653 for namespace in listkeys:
1653 1654 part = bundler.newpart('listkeys')
1654 1655 part.addparam('namespace', namespace)
1655 1656 keys = repo.listkeys(namespace).items()
1656 1657 part.data = pushkey.encodekeys(keys)
1657 1658
1658 1659 @getbundle2partsgenerator('obsmarkers')
1659 1660 def _getbundleobsmarkerpart(bundler, repo, source, bundlecaps=None,
1660 1661 b2caps=None, heads=None, **kwargs):
1661 1662 """add an obsolescence markers part to the requested bundle"""
1662 1663 if kwargs.get('obsmarkers', False):
1663 1664 if heads is None:
1664 1665 heads = repo.heads()
1665 1666 subset = [c.node() for c in repo.set('::%ln', heads)]
1666 1667 markers = repo.obsstore.relevantmarkers(subset)
1667 1668 markers = sorted(markers)
1668 1669 bundle2.buildobsmarkerspart(bundler, markers)
1669 1670
1670 1671 @getbundle2partsgenerator('hgtagsfnodes')
1671 1672 def _getbundletagsfnodes(bundler, repo, source, bundlecaps=None,
1672 1673 b2caps=None, heads=None, common=None,
1673 1674 **kwargs):
1674 1675 """Transfer the .hgtags filenodes mapping.
1675 1676
1676 1677 Only values for heads in this bundle will be transferred.
1677 1678
1678 1679 The part data consists of pairs of 20 byte changeset node and .hgtags
1679 1680 filenodes raw values.
1680 1681 """
1681 1682 # Don't send unless:
1682 1683 # - changeset are being exchanged,
1683 1684 # - the client supports it.
1684 1685 if not (kwargs.get('cg', True) and 'hgtagsfnodes' in b2caps):
1685 1686 return
1686 1687
1687 1688 outgoing = _computeoutgoing(repo, heads, common)
1688 1689 bundle2.addparttagsfnodescache(repo, bundler, outgoing)
1689 1690
1690 1691 def _getbookmarks(repo, **kwargs):
1691 1692 """Returns bookmark to node mapping.
1692 1693
1693 1694 This function is primarily used to generate `bookmarks` bundle2 part.
1694 1695 It is a separate function in order to make it easy to wrap it
1695 1696 in extensions. Passing `kwargs` to the function makes it easy to
1696 1697 add new parameters in extensions.
1697 1698 """
1698 1699
1699 1700 return dict(bookmod.listbinbookmarks(repo))
1700 1701
1701 1702 def check_heads(repo, their_heads, context):
1702 1703 """check if the heads of a repo have been modified
1703 1704
1704 1705 Used by peer for unbundling.
1705 1706 """
1706 1707 heads = repo.heads()
1707 1708 heads_hash = hashlib.sha1(''.join(sorted(heads))).digest()
1708 1709 if not (their_heads == ['force'] or their_heads == heads or
1709 1710 their_heads == ['hashed', heads_hash]):
1710 1711 # someone else committed/pushed/unbundled while we
1711 1712 # were transferring data
1712 1713 raise error.PushRaced('repository changed while %s - '
1713 1714 'please try again' % context)
1714 1715
1715 1716 def unbundle(repo, cg, heads, source, url):
1716 1717 """Apply a bundle to a repo.
1717 1718
1718 1719 this function makes sure the repo is locked during the application and have
1719 1720 mechanism to check that no push race occurred between the creation of the
1720 1721 bundle and its application.
1721 1722
1722 1723 If the push was raced as PushRaced exception is raised."""
1723 1724 r = 0
1724 1725 # need a transaction when processing a bundle2 stream
1725 1726 # [wlock, lock, tr] - needs to be an array so nested functions can modify it
1726 1727 lockandtr = [None, None, None]
1727 1728 recordout = None
1728 1729 # quick fix for output mismatch with bundle2 in 3.4
1729 1730 captureoutput = repo.ui.configbool('experimental', 'bundle2-output-capture')
1730 1731 if url.startswith('remote:http:') or url.startswith('remote:https:'):
1731 1732 captureoutput = True
1732 1733 try:
1733 1734 # note: outside bundle1, 'heads' is expected to be empty and this
1734 1735 # 'check_heads' call wil be a no-op
1735 1736 check_heads(repo, heads, 'uploading changes')
1736 1737 # push can proceed
1737 1738 if not isinstance(cg, bundle2.unbundle20):
1738 1739 # legacy case: bundle1 (changegroup 01)
1739 1740 txnname = "\n".join([source, util.hidepassword(url)])
1740 1741 with repo.lock(), repo.transaction(txnname) as tr:
1741 1742 op = bundle2.applybundle(repo, cg, tr, source, url)
1742 1743 r = bundle2.combinechangegroupresults(op)
1743 1744 else:
1744 1745 r = None
1745 1746 try:
1746 1747 def gettransaction():
1747 1748 if not lockandtr[2]:
1748 1749 lockandtr[0] = repo.wlock()
1749 1750 lockandtr[1] = repo.lock()
1750 1751 lockandtr[2] = repo.transaction(source)
1751 1752 lockandtr[2].hookargs['source'] = source
1752 1753 lockandtr[2].hookargs['url'] = url
1753 1754 lockandtr[2].hookargs['bundle2'] = '1'
1754 1755 return lockandtr[2]
1755 1756
1756 1757 # Do greedy locking by default until we're satisfied with lazy
1757 1758 # locking.
1758 1759 if not repo.ui.configbool('experimental', 'bundle2lazylocking'):
1759 1760 gettransaction()
1760 1761
1761 1762 op = bundle2.bundleoperation(repo, gettransaction,
1762 1763 captureoutput=captureoutput)
1763 1764 try:
1764 1765 op = bundle2.processbundle(repo, cg, op=op)
1765 1766 finally:
1766 1767 r = op.reply
1767 1768 if captureoutput and r is not None:
1768 1769 repo.ui.pushbuffer(error=True, subproc=True)
1769 1770 def recordout(output):
1770 1771 r.newpart('output', data=output, mandatory=False)
1771 1772 if lockandtr[2] is not None:
1772 1773 lockandtr[2].close()
1773 1774 except BaseException as exc:
1774 1775 exc.duringunbundle2 = True
1775 1776 if captureoutput and r is not None:
1776 1777 parts = exc._bundle2salvagedoutput = r.salvageoutput()
1777 1778 def recordout(output):
1778 1779 part = bundle2.bundlepart('output', data=output,
1779 1780 mandatory=False)
1780 1781 parts.append(part)
1781 1782 raise
1782 1783 finally:
1783 1784 lockmod.release(lockandtr[2], lockandtr[1], lockandtr[0])
1784 1785 if recordout is not None:
1785 1786 recordout(repo.ui.popbuffer())
1786 1787 return r
1787 1788
1788 1789 def _maybeapplyclonebundle(pullop):
1789 1790 """Apply a clone bundle from a remote, if possible."""
1790 1791
1791 1792 repo = pullop.repo
1792 1793 remote = pullop.remote
1793 1794
1794 1795 if not repo.ui.configbool('ui', 'clonebundles'):
1795 1796 return
1796 1797
1797 1798 # Only run if local repo is empty.
1798 1799 if len(repo):
1799 1800 return
1800 1801
1801 1802 if pullop.heads:
1802 1803 return
1803 1804
1804 1805 if not remote.capable('clonebundles'):
1805 1806 return
1806 1807
1807 1808 res = remote._call('clonebundles')
1808 1809
1809 1810 # If we call the wire protocol command, that's good enough to record the
1810 1811 # attempt.
1811 1812 pullop.clonebundleattempted = True
1812 1813
1813 1814 entries = parseclonebundlesmanifest(repo, res)
1814 1815 if not entries:
1815 1816 repo.ui.note(_('no clone bundles available on remote; '
1816 1817 'falling back to regular clone\n'))
1817 1818 return
1818 1819
1819 1820 entries = filterclonebundleentries(repo, entries)
1820 1821 if not entries:
1821 1822 # There is a thundering herd concern here. However, if a server
1822 1823 # operator doesn't advertise bundles appropriate for its clients,
1823 1824 # they deserve what's coming. Furthermore, from a client's
1824 1825 # perspective, no automatic fallback would mean not being able to
1825 1826 # clone!
1826 1827 repo.ui.warn(_('no compatible clone bundles available on server; '
1827 1828 'falling back to regular clone\n'))
1828 1829 repo.ui.warn(_('(you may want to report this to the server '
1829 1830 'operator)\n'))
1830 1831 return
1831 1832
1832 1833 entries = sortclonebundleentries(repo.ui, entries)
1833 1834
1834 1835 url = entries[0]['URL']
1835 1836 repo.ui.status(_('applying clone bundle from %s\n') % url)
1836 1837 if trypullbundlefromurl(repo.ui, repo, url):
1837 1838 repo.ui.status(_('finished applying clone bundle\n'))
1838 1839 # Bundle failed.
1839 1840 #
1840 1841 # We abort by default to avoid the thundering herd of
1841 1842 # clients flooding a server that was expecting expensive
1842 1843 # clone load to be offloaded.
1843 1844 elif repo.ui.configbool('ui', 'clonebundlefallback'):
1844 1845 repo.ui.warn(_('falling back to normal clone\n'))
1845 1846 else:
1846 1847 raise error.Abort(_('error applying bundle'),
1847 1848 hint=_('if this error persists, consider contacting '
1848 1849 'the server operator or disable clone '
1849 1850 'bundles via '
1850 1851 '"--config ui.clonebundles=false"'))
1851 1852
1852 1853 def parseclonebundlesmanifest(repo, s):
1853 1854 """Parses the raw text of a clone bundles manifest.
1854 1855
1855 1856 Returns a list of dicts. The dicts have a ``URL`` key corresponding
1856 1857 to the URL and other keys are the attributes for the entry.
1857 1858 """
1858 1859 m = []
1859 1860 for line in s.splitlines():
1860 1861 fields = line.split()
1861 1862 if not fields:
1862 1863 continue
1863 1864 attrs = {'URL': fields[0]}
1864 1865 for rawattr in fields[1:]:
1865 1866 key, value = rawattr.split('=', 1)
1866 1867 key = urlreq.unquote(key)
1867 1868 value = urlreq.unquote(value)
1868 1869 attrs[key] = value
1869 1870
1870 1871 # Parse BUNDLESPEC into components. This makes client-side
1871 1872 # preferences easier to specify since you can prefer a single
1872 1873 # component of the BUNDLESPEC.
1873 1874 if key == 'BUNDLESPEC':
1874 1875 try:
1875 1876 comp, version, params = parsebundlespec(repo, value,
1876 1877 externalnames=True)
1877 1878 attrs['COMPRESSION'] = comp
1878 1879 attrs['VERSION'] = version
1879 1880 except error.InvalidBundleSpecification:
1880 1881 pass
1881 1882 except error.UnsupportedBundleSpecification:
1882 1883 pass
1883 1884
1884 1885 m.append(attrs)
1885 1886
1886 1887 return m
1887 1888
1888 1889 def filterclonebundleentries(repo, entries):
1889 1890 """Remove incompatible clone bundle manifest entries.
1890 1891
1891 1892 Accepts a list of entries parsed with ``parseclonebundlesmanifest``
1892 1893 and returns a new list consisting of only the entries that this client
1893 1894 should be able to apply.
1894 1895
1895 1896 There is no guarantee we'll be able to apply all returned entries because
1896 1897 the metadata we use to filter on may be missing or wrong.
1897 1898 """
1898 1899 newentries = []
1899 1900 for entry in entries:
1900 1901 spec = entry.get('BUNDLESPEC')
1901 1902 if spec:
1902 1903 try:
1903 1904 parsebundlespec(repo, spec, strict=True)
1904 1905 except error.InvalidBundleSpecification as e:
1905 1906 repo.ui.debug(str(e) + '\n')
1906 1907 continue
1907 1908 except error.UnsupportedBundleSpecification as e:
1908 1909 repo.ui.debug('filtering %s because unsupported bundle '
1909 1910 'spec: %s\n' % (entry['URL'], str(e)))
1910 1911 continue
1911 1912
1912 1913 if 'REQUIRESNI' in entry and not sslutil.hassni:
1913 1914 repo.ui.debug('filtering %s because SNI not supported\n' %
1914 1915 entry['URL'])
1915 1916 continue
1916 1917
1917 1918 newentries.append(entry)
1918 1919
1919 1920 return newentries
1920 1921
1921 1922 class clonebundleentry(object):
1922 1923 """Represents an item in a clone bundles manifest.
1923 1924
1924 1925 This rich class is needed to support sorting since sorted() in Python 3
1925 1926 doesn't support ``cmp`` and our comparison is complex enough that ``key=``
1926 1927 won't work.
1927 1928 """
1928 1929
1929 1930 def __init__(self, value, prefers):
1930 1931 self.value = value
1931 1932 self.prefers = prefers
1932 1933
1933 1934 def _cmp(self, other):
1934 1935 for prefkey, prefvalue in self.prefers:
1935 1936 avalue = self.value.get(prefkey)
1936 1937 bvalue = other.value.get(prefkey)
1937 1938
1938 1939 # Special case for b missing attribute and a matches exactly.
1939 1940 if avalue is not None and bvalue is None and avalue == prefvalue:
1940 1941 return -1
1941 1942
1942 1943 # Special case for a missing attribute and b matches exactly.
1943 1944 if bvalue is not None and avalue is None and bvalue == prefvalue:
1944 1945 return 1
1945 1946
1946 1947 # We can't compare unless attribute present on both.
1947 1948 if avalue is None or bvalue is None:
1948 1949 continue
1949 1950
1950 1951 # Same values should fall back to next attribute.
1951 1952 if avalue == bvalue:
1952 1953 continue
1953 1954
1954 1955 # Exact matches come first.
1955 1956 if avalue == prefvalue:
1956 1957 return -1
1957 1958 if bvalue == prefvalue:
1958 1959 return 1
1959 1960
1960 1961 # Fall back to next attribute.
1961 1962 continue
1962 1963
1963 1964 # If we got here we couldn't sort by attributes and prefers. Fall
1964 1965 # back to index order.
1965 1966 return 0
1966 1967
1967 1968 def __lt__(self, other):
1968 1969 return self._cmp(other) < 0
1969 1970
1970 1971 def __gt__(self, other):
1971 1972 return self._cmp(other) > 0
1972 1973
1973 1974 def __eq__(self, other):
1974 1975 return self._cmp(other) == 0
1975 1976
1976 1977 def __le__(self, other):
1977 1978 return self._cmp(other) <= 0
1978 1979
1979 1980 def __ge__(self, other):
1980 1981 return self._cmp(other) >= 0
1981 1982
1982 1983 def __ne__(self, other):
1983 1984 return self._cmp(other) != 0
1984 1985
1985 1986 def sortclonebundleentries(ui, entries):
1986 1987 prefers = ui.configlist('ui', 'clonebundleprefers')
1987 1988 if not prefers:
1988 1989 return list(entries)
1989 1990
1990 1991 prefers = [p.split('=', 1) for p in prefers]
1991 1992
1992 1993 items = sorted(clonebundleentry(v, prefers) for v in entries)
1993 1994 return [i.value for i in items]
1994 1995
1995 1996 def trypullbundlefromurl(ui, repo, url):
1996 1997 """Attempt to apply a bundle from a URL."""
1997 1998 with repo.lock(), repo.transaction('bundleurl') as tr:
1998 1999 try:
1999 2000 fh = urlmod.open(ui, url)
2000 2001 cg = readbundle(ui, fh, 'stream')
2001 2002
2002 2003 if isinstance(cg, streamclone.streamcloneapplier):
2003 2004 cg.apply(repo)
2004 2005 else:
2005 2006 bundle2.applybundle(repo, cg, tr, 'clonebundles', url)
2006 2007 return True
2007 2008 except urlerr.httperror as e:
2008 2009 ui.warn(_('HTTP error fetching bundle: %s\n') % str(e))
2009 2010 except urlerr.urlerror as e:
2010 2011 ui.warn(_('error fetching bundle: %s\n') % e.reason)
2011 2012
2012 2013 return False
General Comments 0
You need to be logged in to leave comments. Login now