##// END OF EJS Templates
bundle2: fix the formatting of the stream part requirements...
Boris Feld -
r35831:76832637 stable
parent child Browse files
Show More
@@ -1,2157 +1,2157
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import, division
149 149
150 150 import errno
151 151 import os
152 152 import re
153 153 import string
154 154 import struct
155 155 import sys
156 156
157 157 from .i18n import _
158 158 from . import (
159 159 bookmarks,
160 160 changegroup,
161 161 error,
162 162 node as nodemod,
163 163 obsolete,
164 164 phases,
165 165 pushkey,
166 166 pycompat,
167 167 streamclone,
168 168 tags,
169 169 url,
170 170 util,
171 171 )
172 172
173 173 urlerr = util.urlerr
174 174 urlreq = util.urlreq
175 175
176 176 _pack = struct.pack
177 177 _unpack = struct.unpack
178 178
179 179 _fstreamparamsize = '>i'
180 180 _fpartheadersize = '>i'
181 181 _fparttypesize = '>B'
182 182 _fpartid = '>I'
183 183 _fpayloadsize = '>i'
184 184 _fpartparamcount = '>BB'
185 185
186 186 preferedchunksize = 32768
187 187
188 188 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
189 189
190 190 def outdebug(ui, message):
191 191 """debug regarding output stream (bundling)"""
192 192 if ui.configbool('devel', 'bundle2.debug'):
193 193 ui.debug('bundle2-output: %s\n' % message)
194 194
195 195 def indebug(ui, message):
196 196 """debug on input stream (unbundling)"""
197 197 if ui.configbool('devel', 'bundle2.debug'):
198 198 ui.debug('bundle2-input: %s\n' % message)
199 199
200 200 def validateparttype(parttype):
201 201 """raise ValueError if a parttype contains invalid character"""
202 202 if _parttypeforbidden.search(parttype):
203 203 raise ValueError(parttype)
204 204
205 205 def _makefpartparamsizes(nbparams):
206 206 """return a struct format to read part parameter sizes
207 207
208 208 The number parameters is variable so we need to build that format
209 209 dynamically.
210 210 """
211 211 return '>'+('BB'*nbparams)
212 212
213 213 parthandlermapping = {}
214 214
215 215 def parthandler(parttype, params=()):
216 216 """decorator that register a function as a bundle2 part handler
217 217
218 218 eg::
219 219
220 220 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
221 221 def myparttypehandler(...):
222 222 '''process a part of type "my part".'''
223 223 ...
224 224 """
225 225 validateparttype(parttype)
226 226 def _decorator(func):
227 227 lparttype = parttype.lower() # enforce lower case matching.
228 228 assert lparttype not in parthandlermapping
229 229 parthandlermapping[lparttype] = func
230 230 func.params = frozenset(params)
231 231 return func
232 232 return _decorator
233 233
234 234 class unbundlerecords(object):
235 235 """keep record of what happens during and unbundle
236 236
237 237 New records are added using `records.add('cat', obj)`. Where 'cat' is a
238 238 category of record and obj is an arbitrary object.
239 239
240 240 `records['cat']` will return all entries of this category 'cat'.
241 241
242 242 Iterating on the object itself will yield `('category', obj)` tuples
243 243 for all entries.
244 244
245 245 All iterations happens in chronological order.
246 246 """
247 247
248 248 def __init__(self):
249 249 self._categories = {}
250 250 self._sequences = []
251 251 self._replies = {}
252 252
253 253 def add(self, category, entry, inreplyto=None):
254 254 """add a new record of a given category.
255 255
256 256 The entry can then be retrieved in the list returned by
257 257 self['category']."""
258 258 self._categories.setdefault(category, []).append(entry)
259 259 self._sequences.append((category, entry))
260 260 if inreplyto is not None:
261 261 self.getreplies(inreplyto).add(category, entry)
262 262
263 263 def getreplies(self, partid):
264 264 """get the records that are replies to a specific part"""
265 265 return self._replies.setdefault(partid, unbundlerecords())
266 266
267 267 def __getitem__(self, cat):
268 268 return tuple(self._categories.get(cat, ()))
269 269
270 270 def __iter__(self):
271 271 return iter(self._sequences)
272 272
273 273 def __len__(self):
274 274 return len(self._sequences)
275 275
276 276 def __nonzero__(self):
277 277 return bool(self._sequences)
278 278
279 279 __bool__ = __nonzero__
280 280
281 281 class bundleoperation(object):
282 282 """an object that represents a single bundling process
283 283
284 284 Its purpose is to carry unbundle-related objects and states.
285 285
286 286 A new object should be created at the beginning of each bundle processing.
287 287 The object is to be returned by the processing function.
288 288
289 289 The object has very little content now it will ultimately contain:
290 290 * an access to the repo the bundle is applied to,
291 291 * a ui object,
292 292 * a way to retrieve a transaction to add changes to the repo,
293 293 * a way to record the result of processing each part,
294 294 * a way to construct a bundle response when applicable.
295 295 """
296 296
297 297 def __init__(self, repo, transactiongetter, captureoutput=True):
298 298 self.repo = repo
299 299 self.ui = repo.ui
300 300 self.records = unbundlerecords()
301 301 self.reply = None
302 302 self.captureoutput = captureoutput
303 303 self.hookargs = {}
304 304 self._gettransaction = transactiongetter
305 305 # carries value that can modify part behavior
306 306 self.modes = {}
307 307
308 308 def gettransaction(self):
309 309 transaction = self._gettransaction()
310 310
311 311 if self.hookargs:
312 312 # the ones added to the transaction supercede those added
313 313 # to the operation.
314 314 self.hookargs.update(transaction.hookargs)
315 315 transaction.hookargs = self.hookargs
316 316
317 317 # mark the hookargs as flushed. further attempts to add to
318 318 # hookargs will result in an abort.
319 319 self.hookargs = None
320 320
321 321 return transaction
322 322
323 323 def addhookargs(self, hookargs):
324 324 if self.hookargs is None:
325 325 raise error.ProgrammingError('attempted to add hookargs to '
326 326 'operation after transaction started')
327 327 self.hookargs.update(hookargs)
328 328
329 329 class TransactionUnavailable(RuntimeError):
330 330 pass
331 331
332 332 def _notransaction():
333 333 """default method to get a transaction while processing a bundle
334 334
335 335 Raise an exception to highlight the fact that no transaction was expected
336 336 to be created"""
337 337 raise TransactionUnavailable()
338 338
339 339 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
340 340 # transform me into unbundler.apply() as soon as the freeze is lifted
341 341 if isinstance(unbundler, unbundle20):
342 342 tr.hookargs['bundle2'] = '1'
343 343 if source is not None and 'source' not in tr.hookargs:
344 344 tr.hookargs['source'] = source
345 345 if url is not None and 'url' not in tr.hookargs:
346 346 tr.hookargs['url'] = url
347 347 return processbundle(repo, unbundler, lambda: tr)
348 348 else:
349 349 # the transactiongetter won't be used, but we might as well set it
350 350 op = bundleoperation(repo, lambda: tr)
351 351 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
352 352 return op
353 353
354 354 class partiterator(object):
355 355 def __init__(self, repo, op, unbundler):
356 356 self.repo = repo
357 357 self.op = op
358 358 self.unbundler = unbundler
359 359 self.iterator = None
360 360 self.count = 0
361 361 self.current = None
362 362
363 363 def __enter__(self):
364 364 def func():
365 365 itr = enumerate(self.unbundler.iterparts())
366 366 for count, p in itr:
367 367 self.count = count
368 368 self.current = p
369 369 yield p
370 370 p.consume()
371 371 self.current = None
372 372 self.iterator = func()
373 373 return self.iterator
374 374
375 375 def __exit__(self, type, exc, tb):
376 376 if not self.iterator:
377 377 return
378 378
379 379 # Only gracefully abort in a normal exception situation. User aborts
380 380 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
381 381 # and should not gracefully cleanup.
382 382 if isinstance(exc, Exception):
383 383 # Any exceptions seeking to the end of the bundle at this point are
384 384 # almost certainly related to the underlying stream being bad.
385 385 # And, chances are that the exception we're handling is related to
386 386 # getting in that bad state. So, we swallow the seeking error and
387 387 # re-raise the original error.
388 388 seekerror = False
389 389 try:
390 390 if self.current:
391 391 # consume the part content to not corrupt the stream.
392 392 self.current.consume()
393 393
394 394 for part in self.iterator:
395 395 # consume the bundle content
396 396 part.consume()
397 397 except Exception:
398 398 seekerror = True
399 399
400 400 # Small hack to let caller code distinguish exceptions from bundle2
401 401 # processing from processing the old format. This is mostly needed
402 402 # to handle different return codes to unbundle according to the type
403 403 # of bundle. We should probably clean up or drop this return code
404 404 # craziness in a future version.
405 405 exc.duringunbundle2 = True
406 406 salvaged = []
407 407 replycaps = None
408 408 if self.op.reply is not None:
409 409 salvaged = self.op.reply.salvageoutput()
410 410 replycaps = self.op.reply.capabilities
411 411 exc._replycaps = replycaps
412 412 exc._bundle2salvagedoutput = salvaged
413 413
414 414 # Re-raising from a variable loses the original stack. So only use
415 415 # that form if we need to.
416 416 if seekerror:
417 417 raise exc
418 418
419 419 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
420 420 self.count)
421 421
422 422 def processbundle(repo, unbundler, transactiongetter=None, op=None):
423 423 """This function process a bundle, apply effect to/from a repo
424 424
425 425 It iterates over each part then searches for and uses the proper handling
426 426 code to process the part. Parts are processed in order.
427 427
428 428 Unknown Mandatory part will abort the process.
429 429
430 430 It is temporarily possible to provide a prebuilt bundleoperation to the
431 431 function. This is used to ensure output is properly propagated in case of
432 432 an error during the unbundling. This output capturing part will likely be
433 433 reworked and this ability will probably go away in the process.
434 434 """
435 435 if op is None:
436 436 if transactiongetter is None:
437 437 transactiongetter = _notransaction
438 438 op = bundleoperation(repo, transactiongetter)
439 439 # todo:
440 440 # - replace this is a init function soon.
441 441 # - exception catching
442 442 unbundler.params
443 443 if repo.ui.debugflag:
444 444 msg = ['bundle2-input-bundle:']
445 445 if unbundler.params:
446 446 msg.append(' %i params' % len(unbundler.params))
447 447 if op._gettransaction is None or op._gettransaction is _notransaction:
448 448 msg.append(' no-transaction')
449 449 else:
450 450 msg.append(' with-transaction')
451 451 msg.append('\n')
452 452 repo.ui.debug(''.join(msg))
453 453
454 454 processparts(repo, op, unbundler)
455 455
456 456 return op
457 457
458 458 def processparts(repo, op, unbundler):
459 459 with partiterator(repo, op, unbundler) as parts:
460 460 for part in parts:
461 461 _processpart(op, part)
462 462
463 463 def _processchangegroup(op, cg, tr, source, url, **kwargs):
464 464 ret = cg.apply(op.repo, tr, source, url, **kwargs)
465 465 op.records.add('changegroup', {
466 466 'return': ret,
467 467 })
468 468 return ret
469 469
470 470 def _gethandler(op, part):
471 471 status = 'unknown' # used by debug output
472 472 try:
473 473 handler = parthandlermapping.get(part.type)
474 474 if handler is None:
475 475 status = 'unsupported-type'
476 476 raise error.BundleUnknownFeatureError(parttype=part.type)
477 477 indebug(op.ui, 'found a handler for part %s' % part.type)
478 478 unknownparams = part.mandatorykeys - handler.params
479 479 if unknownparams:
480 480 unknownparams = list(unknownparams)
481 481 unknownparams.sort()
482 482 status = 'unsupported-params (%s)' % ', '.join(unknownparams)
483 483 raise error.BundleUnknownFeatureError(parttype=part.type,
484 484 params=unknownparams)
485 485 status = 'supported'
486 486 except error.BundleUnknownFeatureError as exc:
487 487 if part.mandatory: # mandatory parts
488 488 raise
489 489 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
490 490 return # skip to part processing
491 491 finally:
492 492 if op.ui.debugflag:
493 493 msg = ['bundle2-input-part: "%s"' % part.type]
494 494 if not part.mandatory:
495 495 msg.append(' (advisory)')
496 496 nbmp = len(part.mandatorykeys)
497 497 nbap = len(part.params) - nbmp
498 498 if nbmp or nbap:
499 499 msg.append(' (params:')
500 500 if nbmp:
501 501 msg.append(' %i mandatory' % nbmp)
502 502 if nbap:
503 503 msg.append(' %i advisory' % nbmp)
504 504 msg.append(')')
505 505 msg.append(' %s\n' % status)
506 506 op.ui.debug(''.join(msg))
507 507
508 508 return handler
509 509
510 510 def _processpart(op, part):
511 511 """process a single part from a bundle
512 512
513 513 The part is guaranteed to have been fully consumed when the function exits
514 514 (even if an exception is raised)."""
515 515 handler = _gethandler(op, part)
516 516 if handler is None:
517 517 return
518 518
519 519 # handler is called outside the above try block so that we don't
520 520 # risk catching KeyErrors from anything other than the
521 521 # parthandlermapping lookup (any KeyError raised by handler()
522 522 # itself represents a defect of a different variety).
523 523 output = None
524 524 if op.captureoutput and op.reply is not None:
525 525 op.ui.pushbuffer(error=True, subproc=True)
526 526 output = ''
527 527 try:
528 528 handler(op, part)
529 529 finally:
530 530 if output is not None:
531 531 output = op.ui.popbuffer()
532 532 if output:
533 533 outpart = op.reply.newpart('output', data=output,
534 534 mandatory=False)
535 535 outpart.addparam(
536 536 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
537 537
538 538 def decodecaps(blob):
539 539 """decode a bundle2 caps bytes blob into a dictionary
540 540
541 541 The blob is a list of capabilities (one per line)
542 542 Capabilities may have values using a line of the form::
543 543
544 544 capability=value1,value2,value3
545 545
546 546 The values are always a list."""
547 547 caps = {}
548 548 for line in blob.splitlines():
549 549 if not line:
550 550 continue
551 551 if '=' not in line:
552 552 key, vals = line, ()
553 553 else:
554 554 key, vals = line.split('=', 1)
555 555 vals = vals.split(',')
556 556 key = urlreq.unquote(key)
557 557 vals = [urlreq.unquote(v) for v in vals]
558 558 caps[key] = vals
559 559 return caps
560 560
561 561 def encodecaps(caps):
562 562 """encode a bundle2 caps dictionary into a bytes blob"""
563 563 chunks = []
564 564 for ca in sorted(caps):
565 565 vals = caps[ca]
566 566 ca = urlreq.quote(ca)
567 567 vals = [urlreq.quote(v) for v in vals]
568 568 if vals:
569 569 ca = "%s=%s" % (ca, ','.join(vals))
570 570 chunks.append(ca)
571 571 return '\n'.join(chunks)
572 572
573 573 bundletypes = {
574 574 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
575 575 # since the unification ssh accepts a header but there
576 576 # is no capability signaling it.
577 577 "HG20": (), # special-cased below
578 578 "HG10UN": ("HG10UN", 'UN'),
579 579 "HG10BZ": ("HG10", 'BZ'),
580 580 "HG10GZ": ("HG10GZ", 'GZ'),
581 581 }
582 582
583 583 # hgweb uses this list to communicate its preferred type
584 584 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
585 585
586 586 class bundle20(object):
587 587 """represent an outgoing bundle2 container
588 588
589 589 Use the `addparam` method to add stream level parameter. and `newpart` to
590 590 populate it. Then call `getchunks` to retrieve all the binary chunks of
591 591 data that compose the bundle2 container."""
592 592
593 593 _magicstring = 'HG20'
594 594
595 595 def __init__(self, ui, capabilities=()):
596 596 self.ui = ui
597 597 self._params = []
598 598 self._parts = []
599 599 self.capabilities = dict(capabilities)
600 600 self._compengine = util.compengines.forbundletype('UN')
601 601 self._compopts = None
602 602 # If compression is being handled by a consumer of the raw
603 603 # data (e.g. the wire protocol), unsetting this flag tells
604 604 # consumers that the bundle is best left uncompressed.
605 605 self.prefercompressed = True
606 606
607 607 def setcompression(self, alg, compopts=None):
608 608 """setup core part compression to <alg>"""
609 609 if alg in (None, 'UN'):
610 610 return
611 611 assert not any(n.lower() == 'compression' for n, v in self._params)
612 612 self.addparam('Compression', alg)
613 613 self._compengine = util.compengines.forbundletype(alg)
614 614 self._compopts = compopts
615 615
616 616 @property
617 617 def nbparts(self):
618 618 """total number of parts added to the bundler"""
619 619 return len(self._parts)
620 620
621 621 # methods used to defines the bundle2 content
622 622 def addparam(self, name, value=None):
623 623 """add a stream level parameter"""
624 624 if not name:
625 625 raise ValueError(r'empty parameter name')
626 626 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
627 627 raise ValueError(r'non letter first character: %s' % name)
628 628 self._params.append((name, value))
629 629
630 630 def addpart(self, part):
631 631 """add a new part to the bundle2 container
632 632
633 633 Parts contains the actual applicative payload."""
634 634 assert part.id is None
635 635 part.id = len(self._parts) # very cheap counter
636 636 self._parts.append(part)
637 637
638 638 def newpart(self, typeid, *args, **kwargs):
639 639 """create a new part and add it to the containers
640 640
641 641 As the part is directly added to the containers. For now, this means
642 642 that any failure to properly initialize the part after calling
643 643 ``newpart`` should result in a failure of the whole bundling process.
644 644
645 645 You can still fall back to manually create and add if you need better
646 646 control."""
647 647 part = bundlepart(typeid, *args, **kwargs)
648 648 self.addpart(part)
649 649 return part
650 650
651 651 # methods used to generate the bundle2 stream
652 652 def getchunks(self):
653 653 if self.ui.debugflag:
654 654 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
655 655 if self._params:
656 656 msg.append(' (%i params)' % len(self._params))
657 657 msg.append(' %i parts total\n' % len(self._parts))
658 658 self.ui.debug(''.join(msg))
659 659 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
660 660 yield self._magicstring
661 661 param = self._paramchunk()
662 662 outdebug(self.ui, 'bundle parameter: %s' % param)
663 663 yield _pack(_fstreamparamsize, len(param))
664 664 if param:
665 665 yield param
666 666 for chunk in self._compengine.compressstream(self._getcorechunk(),
667 667 self._compopts):
668 668 yield chunk
669 669
670 670 def _paramchunk(self):
671 671 """return a encoded version of all stream parameters"""
672 672 blocks = []
673 673 for par, value in self._params:
674 674 par = urlreq.quote(par)
675 675 if value is not None:
676 676 value = urlreq.quote(value)
677 677 par = '%s=%s' % (par, value)
678 678 blocks.append(par)
679 679 return ' '.join(blocks)
680 680
681 681 def _getcorechunk(self):
682 682 """yield chunk for the core part of the bundle
683 683
684 684 (all but headers and parameters)"""
685 685 outdebug(self.ui, 'start of parts')
686 686 for part in self._parts:
687 687 outdebug(self.ui, 'bundle part: "%s"' % part.type)
688 688 for chunk in part.getchunks(ui=self.ui):
689 689 yield chunk
690 690 outdebug(self.ui, 'end of bundle')
691 691 yield _pack(_fpartheadersize, 0)
692 692
693 693
694 694 def salvageoutput(self):
695 695 """return a list with a copy of all output parts in the bundle
696 696
697 697 This is meant to be used during error handling to make sure we preserve
698 698 server output"""
699 699 salvaged = []
700 700 for part in self._parts:
701 701 if part.type.startswith('output'):
702 702 salvaged.append(part.copy())
703 703 return salvaged
704 704
705 705
706 706 class unpackermixin(object):
707 707 """A mixin to extract bytes and struct data from a stream"""
708 708
709 709 def __init__(self, fp):
710 710 self._fp = fp
711 711
712 712 def _unpack(self, format):
713 713 """unpack this struct format from the stream
714 714
715 715 This method is meant for internal usage by the bundle2 protocol only.
716 716 They directly manipulate the low level stream including bundle2 level
717 717 instruction.
718 718
719 719 Do not use it to implement higher-level logic or methods."""
720 720 data = self._readexact(struct.calcsize(format))
721 721 return _unpack(format, data)
722 722
723 723 def _readexact(self, size):
724 724 """read exactly <size> bytes from the stream
725 725
726 726 This method is meant for internal usage by the bundle2 protocol only.
727 727 They directly manipulate the low level stream including bundle2 level
728 728 instruction.
729 729
730 730 Do not use it to implement higher-level logic or methods."""
731 731 return changegroup.readexactly(self._fp, size)
732 732
733 733 def getunbundler(ui, fp, magicstring=None):
734 734 """return a valid unbundler object for a given magicstring"""
735 735 if magicstring is None:
736 736 magicstring = changegroup.readexactly(fp, 4)
737 737 magic, version = magicstring[0:2], magicstring[2:4]
738 738 if magic != 'HG':
739 739 ui.debug(
740 740 "error: invalid magic: %r (version %r), should be 'HG'\n"
741 741 % (magic, version))
742 742 raise error.Abort(_('not a Mercurial bundle'))
743 743 unbundlerclass = formatmap.get(version)
744 744 if unbundlerclass is None:
745 745 raise error.Abort(_('unknown bundle version %s') % version)
746 746 unbundler = unbundlerclass(ui, fp)
747 747 indebug(ui, 'start processing of %s stream' % magicstring)
748 748 return unbundler
749 749
750 750 class unbundle20(unpackermixin):
751 751 """interpret a bundle2 stream
752 752
753 753 This class is fed with a binary stream and yields parts through its
754 754 `iterparts` methods."""
755 755
756 756 _magicstring = 'HG20'
757 757
758 758 def __init__(self, ui, fp):
759 759 """If header is specified, we do not read it out of the stream."""
760 760 self.ui = ui
761 761 self._compengine = util.compengines.forbundletype('UN')
762 762 self._compressed = None
763 763 super(unbundle20, self).__init__(fp)
764 764
765 765 @util.propertycache
766 766 def params(self):
767 767 """dictionary of stream level parameters"""
768 768 indebug(self.ui, 'reading bundle2 stream parameters')
769 769 params = {}
770 770 paramssize = self._unpack(_fstreamparamsize)[0]
771 771 if paramssize < 0:
772 772 raise error.BundleValueError('negative bundle param size: %i'
773 773 % paramssize)
774 774 if paramssize:
775 775 params = self._readexact(paramssize)
776 776 params = self._processallparams(params)
777 777 return params
778 778
779 779 def _processallparams(self, paramsblock):
780 780 """"""
781 781 params = util.sortdict()
782 782 for p in paramsblock.split(' '):
783 783 p = p.split('=', 1)
784 784 p = [urlreq.unquote(i) for i in p]
785 785 if len(p) < 2:
786 786 p.append(None)
787 787 self._processparam(*p)
788 788 params[p[0]] = p[1]
789 789 return params
790 790
791 791
792 792 def _processparam(self, name, value):
793 793 """process a parameter, applying its effect if needed
794 794
795 795 Parameter starting with a lower case letter are advisory and will be
796 796 ignored when unknown. Those starting with an upper case letter are
797 797 mandatory and will this function will raise a KeyError when unknown.
798 798
799 799 Note: no option are currently supported. Any input will be either
800 800 ignored or failing.
801 801 """
802 802 if not name:
803 803 raise ValueError(r'empty parameter name')
804 804 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
805 805 raise ValueError(r'non letter first character: %s' % name)
806 806 try:
807 807 handler = b2streamparamsmap[name.lower()]
808 808 except KeyError:
809 809 if name[0:1].islower():
810 810 indebug(self.ui, "ignoring unknown parameter %s" % name)
811 811 else:
812 812 raise error.BundleUnknownFeatureError(params=(name,))
813 813 else:
814 814 handler(self, name, value)
815 815
816 816 def _forwardchunks(self):
817 817 """utility to transfer a bundle2 as binary
818 818
819 819 This is made necessary by the fact the 'getbundle' command over 'ssh'
820 820 have no way to know then the reply end, relying on the bundle to be
821 821 interpreted to know its end. This is terrible and we are sorry, but we
822 822 needed to move forward to get general delta enabled.
823 823 """
824 824 yield self._magicstring
825 825 assert 'params' not in vars(self)
826 826 paramssize = self._unpack(_fstreamparamsize)[0]
827 827 if paramssize < 0:
828 828 raise error.BundleValueError('negative bundle param size: %i'
829 829 % paramssize)
830 830 yield _pack(_fstreamparamsize, paramssize)
831 831 if paramssize:
832 832 params = self._readexact(paramssize)
833 833 self._processallparams(params)
834 834 yield params
835 835 assert self._compengine.bundletype == 'UN'
836 836 # From there, payload might need to be decompressed
837 837 self._fp = self._compengine.decompressorreader(self._fp)
838 838 emptycount = 0
839 839 while emptycount < 2:
840 840 # so we can brainlessly loop
841 841 assert _fpartheadersize == _fpayloadsize
842 842 size = self._unpack(_fpartheadersize)[0]
843 843 yield _pack(_fpartheadersize, size)
844 844 if size:
845 845 emptycount = 0
846 846 else:
847 847 emptycount += 1
848 848 continue
849 849 if size == flaginterrupt:
850 850 continue
851 851 elif size < 0:
852 852 raise error.BundleValueError('negative chunk size: %i')
853 853 yield self._readexact(size)
854 854
855 855
856 856 def iterparts(self, seekable=False):
857 857 """yield all parts contained in the stream"""
858 858 cls = seekableunbundlepart if seekable else unbundlepart
859 859 # make sure param have been loaded
860 860 self.params
861 861 # From there, payload need to be decompressed
862 862 self._fp = self._compengine.decompressorreader(self._fp)
863 863 indebug(self.ui, 'start extraction of bundle2 parts')
864 864 headerblock = self._readpartheader()
865 865 while headerblock is not None:
866 866 part = cls(self.ui, headerblock, self._fp)
867 867 yield part
868 868 # Ensure part is fully consumed so we can start reading the next
869 869 # part.
870 870 part.consume()
871 871
872 872 headerblock = self._readpartheader()
873 873 indebug(self.ui, 'end of bundle2 stream')
874 874
875 875 def _readpartheader(self):
876 876 """reads a part header size and return the bytes blob
877 877
878 878 returns None if empty"""
879 879 headersize = self._unpack(_fpartheadersize)[0]
880 880 if headersize < 0:
881 881 raise error.BundleValueError('negative part header size: %i'
882 882 % headersize)
883 883 indebug(self.ui, 'part header size: %i' % headersize)
884 884 if headersize:
885 885 return self._readexact(headersize)
886 886 return None
887 887
888 888 def compressed(self):
889 889 self.params # load params
890 890 return self._compressed
891 891
892 892 def close(self):
893 893 """close underlying file"""
894 894 if util.safehasattr(self._fp, 'close'):
895 895 return self._fp.close()
896 896
897 897 formatmap = {'20': unbundle20}
898 898
899 899 b2streamparamsmap = {}
900 900
901 901 def b2streamparamhandler(name):
902 902 """register a handler for a stream level parameter"""
903 903 def decorator(func):
904 904 assert name not in formatmap
905 905 b2streamparamsmap[name] = func
906 906 return func
907 907 return decorator
908 908
909 909 @b2streamparamhandler('compression')
910 910 def processcompression(unbundler, param, value):
911 911 """read compression parameter and install payload decompression"""
912 912 if value not in util.compengines.supportedbundletypes:
913 913 raise error.BundleUnknownFeatureError(params=(param,),
914 914 values=(value,))
915 915 unbundler._compengine = util.compengines.forbundletype(value)
916 916 if value is not None:
917 917 unbundler._compressed = True
918 918
919 919 class bundlepart(object):
920 920 """A bundle2 part contains application level payload
921 921
922 922 The part `type` is used to route the part to the application level
923 923 handler.
924 924
925 925 The part payload is contained in ``part.data``. It could be raw bytes or a
926 926 generator of byte chunks.
927 927
928 928 You can add parameters to the part using the ``addparam`` method.
929 929 Parameters can be either mandatory (default) or advisory. Remote side
930 930 should be able to safely ignore the advisory ones.
931 931
932 932 Both data and parameters cannot be modified after the generation has begun.
933 933 """
934 934
935 935 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
936 936 data='', mandatory=True):
937 937 validateparttype(parttype)
938 938 self.id = None
939 939 self.type = parttype
940 940 self._data = data
941 941 self._mandatoryparams = list(mandatoryparams)
942 942 self._advisoryparams = list(advisoryparams)
943 943 # checking for duplicated entries
944 944 self._seenparams = set()
945 945 for pname, __ in self._mandatoryparams + self._advisoryparams:
946 946 if pname in self._seenparams:
947 947 raise error.ProgrammingError('duplicated params: %s' % pname)
948 948 self._seenparams.add(pname)
949 949 # status of the part's generation:
950 950 # - None: not started,
951 951 # - False: currently generated,
952 952 # - True: generation done.
953 953 self._generated = None
954 954 self.mandatory = mandatory
955 955
956 956 def __repr__(self):
957 957 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
958 958 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
959 959 % (cls, id(self), self.id, self.type, self.mandatory))
960 960
961 961 def copy(self):
962 962 """return a copy of the part
963 963
964 964 The new part have the very same content but no partid assigned yet.
965 965 Parts with generated data cannot be copied."""
966 966 assert not util.safehasattr(self.data, 'next')
967 967 return self.__class__(self.type, self._mandatoryparams,
968 968 self._advisoryparams, self._data, self.mandatory)
969 969
970 970 # methods used to defines the part content
971 971 @property
972 972 def data(self):
973 973 return self._data
974 974
975 975 @data.setter
976 976 def data(self, data):
977 977 if self._generated is not None:
978 978 raise error.ReadOnlyPartError('part is being generated')
979 979 self._data = data
980 980
981 981 @property
982 982 def mandatoryparams(self):
983 983 # make it an immutable tuple to force people through ``addparam``
984 984 return tuple(self._mandatoryparams)
985 985
986 986 @property
987 987 def advisoryparams(self):
988 988 # make it an immutable tuple to force people through ``addparam``
989 989 return tuple(self._advisoryparams)
990 990
991 991 def addparam(self, name, value='', mandatory=True):
992 992 """add a parameter to the part
993 993
994 994 If 'mandatory' is set to True, the remote handler must claim support
995 995 for this parameter or the unbundling will be aborted.
996 996
997 997 The 'name' and 'value' cannot exceed 255 bytes each.
998 998 """
999 999 if self._generated is not None:
1000 1000 raise error.ReadOnlyPartError('part is being generated')
1001 1001 if name in self._seenparams:
1002 1002 raise ValueError('duplicated params: %s' % name)
1003 1003 self._seenparams.add(name)
1004 1004 params = self._advisoryparams
1005 1005 if mandatory:
1006 1006 params = self._mandatoryparams
1007 1007 params.append((name, value))
1008 1008
1009 1009 # methods used to generates the bundle2 stream
1010 1010 def getchunks(self, ui):
1011 1011 if self._generated is not None:
1012 1012 raise error.ProgrammingError('part can only be consumed once')
1013 1013 self._generated = False
1014 1014
1015 1015 if ui.debugflag:
1016 1016 msg = ['bundle2-output-part: "%s"' % self.type]
1017 1017 if not self.mandatory:
1018 1018 msg.append(' (advisory)')
1019 1019 nbmp = len(self.mandatoryparams)
1020 1020 nbap = len(self.advisoryparams)
1021 1021 if nbmp or nbap:
1022 1022 msg.append(' (params:')
1023 1023 if nbmp:
1024 1024 msg.append(' %i mandatory' % nbmp)
1025 1025 if nbap:
1026 1026 msg.append(' %i advisory' % nbmp)
1027 1027 msg.append(')')
1028 1028 if not self.data:
1029 1029 msg.append(' empty payload')
1030 1030 elif (util.safehasattr(self.data, 'next')
1031 1031 or util.safehasattr(self.data, '__next__')):
1032 1032 msg.append(' streamed payload')
1033 1033 else:
1034 1034 msg.append(' %i bytes payload' % len(self.data))
1035 1035 msg.append('\n')
1036 1036 ui.debug(''.join(msg))
1037 1037
1038 1038 #### header
1039 1039 if self.mandatory:
1040 1040 parttype = self.type.upper()
1041 1041 else:
1042 1042 parttype = self.type.lower()
1043 1043 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1044 1044 ## parttype
1045 1045 header = [_pack(_fparttypesize, len(parttype)),
1046 1046 parttype, _pack(_fpartid, self.id),
1047 1047 ]
1048 1048 ## parameters
1049 1049 # count
1050 1050 manpar = self.mandatoryparams
1051 1051 advpar = self.advisoryparams
1052 1052 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1053 1053 # size
1054 1054 parsizes = []
1055 1055 for key, value in manpar:
1056 1056 parsizes.append(len(key))
1057 1057 parsizes.append(len(value))
1058 1058 for key, value in advpar:
1059 1059 parsizes.append(len(key))
1060 1060 parsizes.append(len(value))
1061 1061 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1062 1062 header.append(paramsizes)
1063 1063 # key, value
1064 1064 for key, value in manpar:
1065 1065 header.append(key)
1066 1066 header.append(value)
1067 1067 for key, value in advpar:
1068 1068 header.append(key)
1069 1069 header.append(value)
1070 1070 ## finalize header
1071 1071 try:
1072 1072 headerchunk = ''.join(header)
1073 1073 except TypeError:
1074 1074 raise TypeError(r'Found a non-bytes trying to '
1075 1075 r'build bundle part header: %r' % header)
1076 1076 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1077 1077 yield _pack(_fpartheadersize, len(headerchunk))
1078 1078 yield headerchunk
1079 1079 ## payload
1080 1080 try:
1081 1081 for chunk in self._payloadchunks():
1082 1082 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1083 1083 yield _pack(_fpayloadsize, len(chunk))
1084 1084 yield chunk
1085 1085 except GeneratorExit:
1086 1086 # GeneratorExit means that nobody is listening for our
1087 1087 # results anyway, so just bail quickly rather than trying
1088 1088 # to produce an error part.
1089 1089 ui.debug('bundle2-generatorexit\n')
1090 1090 raise
1091 1091 except BaseException as exc:
1092 1092 bexc = util.forcebytestr(exc)
1093 1093 # backup exception data for later
1094 1094 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1095 1095 % bexc)
1096 1096 tb = sys.exc_info()[2]
1097 1097 msg = 'unexpected error: %s' % bexc
1098 1098 interpart = bundlepart('error:abort', [('message', msg)],
1099 1099 mandatory=False)
1100 1100 interpart.id = 0
1101 1101 yield _pack(_fpayloadsize, -1)
1102 1102 for chunk in interpart.getchunks(ui=ui):
1103 1103 yield chunk
1104 1104 outdebug(ui, 'closing payload chunk')
1105 1105 # abort current part payload
1106 1106 yield _pack(_fpayloadsize, 0)
1107 1107 pycompat.raisewithtb(exc, tb)
1108 1108 # end of payload
1109 1109 outdebug(ui, 'closing payload chunk')
1110 1110 yield _pack(_fpayloadsize, 0)
1111 1111 self._generated = True
1112 1112
1113 1113 def _payloadchunks(self):
1114 1114 """yield chunks of a the part payload
1115 1115
1116 1116 Exists to handle the different methods to provide data to a part."""
1117 1117 # we only support fixed size data now.
1118 1118 # This will be improved in the future.
1119 1119 if (util.safehasattr(self.data, 'next')
1120 1120 or util.safehasattr(self.data, '__next__')):
1121 1121 buff = util.chunkbuffer(self.data)
1122 1122 chunk = buff.read(preferedchunksize)
1123 1123 while chunk:
1124 1124 yield chunk
1125 1125 chunk = buff.read(preferedchunksize)
1126 1126 elif len(self.data):
1127 1127 yield self.data
1128 1128
1129 1129
1130 1130 flaginterrupt = -1
1131 1131
1132 1132 class interrupthandler(unpackermixin):
1133 1133 """read one part and process it with restricted capability
1134 1134
1135 1135 This allows to transmit exception raised on the producer size during part
1136 1136 iteration while the consumer is reading a part.
1137 1137
1138 1138 Part processed in this manner only have access to a ui object,"""
1139 1139
1140 1140 def __init__(self, ui, fp):
1141 1141 super(interrupthandler, self).__init__(fp)
1142 1142 self.ui = ui
1143 1143
1144 1144 def _readpartheader(self):
1145 1145 """reads a part header size and return the bytes blob
1146 1146
1147 1147 returns None if empty"""
1148 1148 headersize = self._unpack(_fpartheadersize)[0]
1149 1149 if headersize < 0:
1150 1150 raise error.BundleValueError('negative part header size: %i'
1151 1151 % headersize)
1152 1152 indebug(self.ui, 'part header size: %i\n' % headersize)
1153 1153 if headersize:
1154 1154 return self._readexact(headersize)
1155 1155 return None
1156 1156
1157 1157 def __call__(self):
1158 1158
1159 1159 self.ui.debug('bundle2-input-stream-interrupt:'
1160 1160 ' opening out of band context\n')
1161 1161 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1162 1162 headerblock = self._readpartheader()
1163 1163 if headerblock is None:
1164 1164 indebug(self.ui, 'no part found during interruption.')
1165 1165 return
1166 1166 part = unbundlepart(self.ui, headerblock, self._fp)
1167 1167 op = interruptoperation(self.ui)
1168 1168 hardabort = False
1169 1169 try:
1170 1170 _processpart(op, part)
1171 1171 except (SystemExit, KeyboardInterrupt):
1172 1172 hardabort = True
1173 1173 raise
1174 1174 finally:
1175 1175 if not hardabort:
1176 1176 part.consume()
1177 1177 self.ui.debug('bundle2-input-stream-interrupt:'
1178 1178 ' closing out of band context\n')
1179 1179
1180 1180 class interruptoperation(object):
1181 1181 """A limited operation to be use by part handler during interruption
1182 1182
1183 1183 It only have access to an ui object.
1184 1184 """
1185 1185
1186 1186 def __init__(self, ui):
1187 1187 self.ui = ui
1188 1188 self.reply = None
1189 1189 self.captureoutput = False
1190 1190
1191 1191 @property
1192 1192 def repo(self):
1193 1193 raise error.ProgrammingError('no repo access from stream interruption')
1194 1194
1195 1195 def gettransaction(self):
1196 1196 raise TransactionUnavailable('no repo access from stream interruption')
1197 1197
1198 1198 def decodepayloadchunks(ui, fh):
1199 1199 """Reads bundle2 part payload data into chunks.
1200 1200
1201 1201 Part payload data consists of framed chunks. This function takes
1202 1202 a file handle and emits those chunks.
1203 1203 """
1204 1204 dolog = ui.configbool('devel', 'bundle2.debug')
1205 1205 debug = ui.debug
1206 1206
1207 1207 headerstruct = struct.Struct(_fpayloadsize)
1208 1208 headersize = headerstruct.size
1209 1209 unpack = headerstruct.unpack
1210 1210
1211 1211 readexactly = changegroup.readexactly
1212 1212 read = fh.read
1213 1213
1214 1214 chunksize = unpack(readexactly(fh, headersize))[0]
1215 1215 indebug(ui, 'payload chunk size: %i' % chunksize)
1216 1216
1217 1217 # changegroup.readexactly() is inlined below for performance.
1218 1218 while chunksize:
1219 1219 if chunksize >= 0:
1220 1220 s = read(chunksize)
1221 1221 if len(s) < chunksize:
1222 1222 raise error.Abort(_('stream ended unexpectedly '
1223 1223 ' (got %d bytes, expected %d)') %
1224 1224 (len(s), chunksize))
1225 1225
1226 1226 yield s
1227 1227 elif chunksize == flaginterrupt:
1228 1228 # Interrupt "signal" detected. The regular stream is interrupted
1229 1229 # and a bundle2 part follows. Consume it.
1230 1230 interrupthandler(ui, fh)()
1231 1231 else:
1232 1232 raise error.BundleValueError(
1233 1233 'negative payload chunk size: %s' % chunksize)
1234 1234
1235 1235 s = read(headersize)
1236 1236 if len(s) < headersize:
1237 1237 raise error.Abort(_('stream ended unexpectedly '
1238 1238 ' (got %d bytes, expected %d)') %
1239 1239 (len(s), chunksize))
1240 1240
1241 1241 chunksize = unpack(s)[0]
1242 1242
1243 1243 # indebug() inlined for performance.
1244 1244 if dolog:
1245 1245 debug('bundle2-input: payload chunk size: %i\n' % chunksize)
1246 1246
1247 1247 class unbundlepart(unpackermixin):
1248 1248 """a bundle part read from a bundle"""
1249 1249
1250 1250 def __init__(self, ui, header, fp):
1251 1251 super(unbundlepart, self).__init__(fp)
1252 1252 self._seekable = (util.safehasattr(fp, 'seek') and
1253 1253 util.safehasattr(fp, 'tell'))
1254 1254 self.ui = ui
1255 1255 # unbundle state attr
1256 1256 self._headerdata = header
1257 1257 self._headeroffset = 0
1258 1258 self._initialized = False
1259 1259 self.consumed = False
1260 1260 # part data
1261 1261 self.id = None
1262 1262 self.type = None
1263 1263 self.mandatoryparams = None
1264 1264 self.advisoryparams = None
1265 1265 self.params = None
1266 1266 self.mandatorykeys = ()
1267 1267 self._readheader()
1268 1268 self._mandatory = None
1269 1269 self._pos = 0
1270 1270
1271 1271 def _fromheader(self, size):
1272 1272 """return the next <size> byte from the header"""
1273 1273 offset = self._headeroffset
1274 1274 data = self._headerdata[offset:(offset + size)]
1275 1275 self._headeroffset = offset + size
1276 1276 return data
1277 1277
1278 1278 def _unpackheader(self, format):
1279 1279 """read given format from header
1280 1280
1281 1281 This automatically compute the size of the format to read."""
1282 1282 data = self._fromheader(struct.calcsize(format))
1283 1283 return _unpack(format, data)
1284 1284
1285 1285 def _initparams(self, mandatoryparams, advisoryparams):
1286 1286 """internal function to setup all logic related parameters"""
1287 1287 # make it read only to prevent people touching it by mistake.
1288 1288 self.mandatoryparams = tuple(mandatoryparams)
1289 1289 self.advisoryparams = tuple(advisoryparams)
1290 1290 # user friendly UI
1291 1291 self.params = util.sortdict(self.mandatoryparams)
1292 1292 self.params.update(self.advisoryparams)
1293 1293 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1294 1294
1295 1295 def _readheader(self):
1296 1296 """read the header and setup the object"""
1297 1297 typesize = self._unpackheader(_fparttypesize)[0]
1298 1298 self.type = self._fromheader(typesize)
1299 1299 indebug(self.ui, 'part type: "%s"' % self.type)
1300 1300 self.id = self._unpackheader(_fpartid)[0]
1301 1301 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1302 1302 # extract mandatory bit from type
1303 1303 self.mandatory = (self.type != self.type.lower())
1304 1304 self.type = self.type.lower()
1305 1305 ## reading parameters
1306 1306 # param count
1307 1307 mancount, advcount = self._unpackheader(_fpartparamcount)
1308 1308 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1309 1309 # param size
1310 1310 fparamsizes = _makefpartparamsizes(mancount + advcount)
1311 1311 paramsizes = self._unpackheader(fparamsizes)
1312 1312 # make it a list of couple again
1313 1313 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1314 1314 # split mandatory from advisory
1315 1315 mansizes = paramsizes[:mancount]
1316 1316 advsizes = paramsizes[mancount:]
1317 1317 # retrieve param value
1318 1318 manparams = []
1319 1319 for key, value in mansizes:
1320 1320 manparams.append((self._fromheader(key), self._fromheader(value)))
1321 1321 advparams = []
1322 1322 for key, value in advsizes:
1323 1323 advparams.append((self._fromheader(key), self._fromheader(value)))
1324 1324 self._initparams(manparams, advparams)
1325 1325 ## part payload
1326 1326 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1327 1327 # we read the data, tell it
1328 1328 self._initialized = True
1329 1329
1330 1330 def _payloadchunks(self):
1331 1331 """Generator of decoded chunks in the payload."""
1332 1332 return decodepayloadchunks(self.ui, self._fp)
1333 1333
1334 1334 def consume(self):
1335 1335 """Read the part payload until completion.
1336 1336
1337 1337 By consuming the part data, the underlying stream read offset will
1338 1338 be advanced to the next part (or end of stream).
1339 1339 """
1340 1340 if self.consumed:
1341 1341 return
1342 1342
1343 1343 chunk = self.read(32768)
1344 1344 while chunk:
1345 1345 self._pos += len(chunk)
1346 1346 chunk = self.read(32768)
1347 1347
1348 1348 def read(self, size=None):
1349 1349 """read payload data"""
1350 1350 if not self._initialized:
1351 1351 self._readheader()
1352 1352 if size is None:
1353 1353 data = self._payloadstream.read()
1354 1354 else:
1355 1355 data = self._payloadstream.read(size)
1356 1356 self._pos += len(data)
1357 1357 if size is None or len(data) < size:
1358 1358 if not self.consumed and self._pos:
1359 1359 self.ui.debug('bundle2-input-part: total payload size %i\n'
1360 1360 % self._pos)
1361 1361 self.consumed = True
1362 1362 return data
1363 1363
1364 1364 class seekableunbundlepart(unbundlepart):
1365 1365 """A bundle2 part in a bundle that is seekable.
1366 1366
1367 1367 Regular ``unbundlepart`` instances can only be read once. This class
1368 1368 extends ``unbundlepart`` to enable bi-directional seeking within the
1369 1369 part.
1370 1370
1371 1371 Bundle2 part data consists of framed chunks. Offsets when seeking
1372 1372 refer to the decoded data, not the offsets in the underlying bundle2
1373 1373 stream.
1374 1374
1375 1375 To facilitate quickly seeking within the decoded data, instances of this
1376 1376 class maintain a mapping between offsets in the underlying stream and
1377 1377 the decoded payload. This mapping will consume memory in proportion
1378 1378 to the number of chunks within the payload (which almost certainly
1379 1379 increases in proportion with the size of the part).
1380 1380 """
1381 1381 def __init__(self, ui, header, fp):
1382 1382 # (payload, file) offsets for chunk starts.
1383 1383 self._chunkindex = []
1384 1384
1385 1385 super(seekableunbundlepart, self).__init__(ui, header, fp)
1386 1386
1387 1387 def _payloadchunks(self, chunknum=0):
1388 1388 '''seek to specified chunk and start yielding data'''
1389 1389 if len(self._chunkindex) == 0:
1390 1390 assert chunknum == 0, 'Must start with chunk 0'
1391 1391 self._chunkindex.append((0, self._tellfp()))
1392 1392 else:
1393 1393 assert chunknum < len(self._chunkindex), \
1394 1394 'Unknown chunk %d' % chunknum
1395 1395 self._seekfp(self._chunkindex[chunknum][1])
1396 1396
1397 1397 pos = self._chunkindex[chunknum][0]
1398 1398
1399 1399 for chunk in decodepayloadchunks(self.ui, self._fp):
1400 1400 chunknum += 1
1401 1401 pos += len(chunk)
1402 1402 if chunknum == len(self._chunkindex):
1403 1403 self._chunkindex.append((pos, self._tellfp()))
1404 1404
1405 1405 yield chunk
1406 1406
1407 1407 def _findchunk(self, pos):
1408 1408 '''for a given payload position, return a chunk number and offset'''
1409 1409 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1410 1410 if ppos == pos:
1411 1411 return chunk, 0
1412 1412 elif ppos > pos:
1413 1413 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1414 1414 raise ValueError('Unknown chunk')
1415 1415
1416 1416 def tell(self):
1417 1417 return self._pos
1418 1418
1419 1419 def seek(self, offset, whence=os.SEEK_SET):
1420 1420 if whence == os.SEEK_SET:
1421 1421 newpos = offset
1422 1422 elif whence == os.SEEK_CUR:
1423 1423 newpos = self._pos + offset
1424 1424 elif whence == os.SEEK_END:
1425 1425 if not self.consumed:
1426 1426 # Can't use self.consume() here because it advances self._pos.
1427 1427 chunk = self.read(32768)
1428 1428 while chunk:
1429 1429 chunk = self.read(32768)
1430 1430 newpos = self._chunkindex[-1][0] - offset
1431 1431 else:
1432 1432 raise ValueError('Unknown whence value: %r' % (whence,))
1433 1433
1434 1434 if newpos > self._chunkindex[-1][0] and not self.consumed:
1435 1435 # Can't use self.consume() here because it advances self._pos.
1436 1436 chunk = self.read(32768)
1437 1437 while chunk:
1438 1438 chunk = self.read(32668)
1439 1439
1440 1440 if not 0 <= newpos <= self._chunkindex[-1][0]:
1441 1441 raise ValueError('Offset out of range')
1442 1442
1443 1443 if self._pos != newpos:
1444 1444 chunk, internaloffset = self._findchunk(newpos)
1445 1445 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1446 1446 adjust = self.read(internaloffset)
1447 1447 if len(adjust) != internaloffset:
1448 1448 raise error.Abort(_('Seek failed\n'))
1449 1449 self._pos = newpos
1450 1450
1451 1451 def _seekfp(self, offset, whence=0):
1452 1452 """move the underlying file pointer
1453 1453
1454 1454 This method is meant for internal usage by the bundle2 protocol only.
1455 1455 They directly manipulate the low level stream including bundle2 level
1456 1456 instruction.
1457 1457
1458 1458 Do not use it to implement higher-level logic or methods."""
1459 1459 if self._seekable:
1460 1460 return self._fp.seek(offset, whence)
1461 1461 else:
1462 1462 raise NotImplementedError(_('File pointer is not seekable'))
1463 1463
1464 1464 def _tellfp(self):
1465 1465 """return the file offset, or None if file is not seekable
1466 1466
1467 1467 This method is meant for internal usage by the bundle2 protocol only.
1468 1468 They directly manipulate the low level stream including bundle2 level
1469 1469 instruction.
1470 1470
1471 1471 Do not use it to implement higher-level logic or methods."""
1472 1472 if self._seekable:
1473 1473 try:
1474 1474 return self._fp.tell()
1475 1475 except IOError as e:
1476 1476 if e.errno == errno.ESPIPE:
1477 1477 self._seekable = False
1478 1478 else:
1479 1479 raise
1480 1480 return None
1481 1481
1482 1482 # These are only the static capabilities.
1483 1483 # Check the 'getrepocaps' function for the rest.
1484 1484 capabilities = {'HG20': (),
1485 1485 'bookmarks': (),
1486 1486 'error': ('abort', 'unsupportedcontent', 'pushraced',
1487 1487 'pushkey'),
1488 1488 'listkeys': (),
1489 1489 'pushkey': (),
1490 1490 'digests': tuple(sorted(util.DIGESTS.keys())),
1491 1491 'remote-changegroup': ('http', 'https'),
1492 1492 'hgtagsfnodes': (),
1493 1493 'phases': ('heads',),
1494 1494 'stream': ('v2',),
1495 1495 }
1496 1496
1497 1497 def getrepocaps(repo, allowpushback=False, role=None):
1498 1498 """return the bundle2 capabilities for a given repo
1499 1499
1500 1500 Exists to allow extensions (like evolution) to mutate the capabilities.
1501 1501
1502 1502 The returned value is used for servers advertising their capabilities as
1503 1503 well as clients advertising their capabilities to servers as part of
1504 1504 bundle2 requests. The ``role`` argument specifies which is which.
1505 1505 """
1506 1506 if role not in ('client', 'server'):
1507 1507 raise error.ProgrammingError('role argument must be client or server')
1508 1508
1509 1509 caps = capabilities.copy()
1510 1510 caps['changegroup'] = tuple(sorted(
1511 1511 changegroup.supportedincomingversions(repo)))
1512 1512 if obsolete.isenabled(repo, obsolete.exchangeopt):
1513 1513 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1514 1514 caps['obsmarkers'] = supportedformat
1515 1515 if allowpushback:
1516 1516 caps['pushback'] = ()
1517 1517 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1518 1518 if cpmode == 'check-related':
1519 1519 caps['checkheads'] = ('related',)
1520 1520 if 'phases' in repo.ui.configlist('devel', 'legacy.exchange'):
1521 1521 caps.pop('phases')
1522 1522
1523 1523 # Don't advertise stream clone support in server mode if not configured.
1524 1524 if role == 'server':
1525 1525 streamsupported = repo.ui.configbool('server', 'uncompressed',
1526 1526 untrusted=True)
1527 1527 featuresupported = repo.ui.configbool('experimental', 'bundle2.stream')
1528 1528
1529 1529 if not streamsupported or not featuresupported:
1530 1530 caps.pop('stream')
1531 1531 # Else always advertise support on client, because payload support
1532 1532 # should always be advertised.
1533 1533
1534 1534 return caps
1535 1535
1536 1536 def bundle2caps(remote):
1537 1537 """return the bundle capabilities of a peer as dict"""
1538 1538 raw = remote.capable('bundle2')
1539 1539 if not raw and raw != '':
1540 1540 return {}
1541 1541 capsblob = urlreq.unquote(remote.capable('bundle2'))
1542 1542 return decodecaps(capsblob)
1543 1543
1544 1544 def obsmarkersversion(caps):
1545 1545 """extract the list of supported obsmarkers versions from a bundle2caps dict
1546 1546 """
1547 1547 obscaps = caps.get('obsmarkers', ())
1548 1548 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1549 1549
1550 1550 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1551 1551 vfs=None, compression=None, compopts=None):
1552 1552 if bundletype.startswith('HG10'):
1553 1553 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1554 1554 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1555 1555 compression=compression, compopts=compopts)
1556 1556 elif not bundletype.startswith('HG20'):
1557 1557 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1558 1558
1559 1559 caps = {}
1560 1560 if 'obsolescence' in opts:
1561 1561 caps['obsmarkers'] = ('V1',)
1562 1562 bundle = bundle20(ui, caps)
1563 1563 bundle.setcompression(compression, compopts)
1564 1564 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1565 1565 chunkiter = bundle.getchunks()
1566 1566
1567 1567 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1568 1568
1569 1569 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1570 1570 # We should eventually reconcile this logic with the one behind
1571 1571 # 'exchange.getbundle2partsgenerator'.
1572 1572 #
1573 1573 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1574 1574 # different right now. So we keep them separated for now for the sake of
1575 1575 # simplicity.
1576 1576
1577 1577 # we always want a changegroup in such bundle
1578 1578 cgversion = opts.get('cg.version')
1579 1579 if cgversion is None:
1580 1580 cgversion = changegroup.safeversion(repo)
1581 1581 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1582 1582 part = bundler.newpart('changegroup', data=cg.getchunks())
1583 1583 part.addparam('version', cg.version)
1584 1584 if 'clcount' in cg.extras:
1585 1585 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1586 1586 mandatory=False)
1587 1587 if opts.get('phases') and repo.revs('%ln and secret()',
1588 1588 outgoing.missingheads):
1589 1589 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1590 1590
1591 1591 addparttagsfnodescache(repo, bundler, outgoing)
1592 1592
1593 1593 if opts.get('obsolescence', False):
1594 1594 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1595 1595 buildobsmarkerspart(bundler, obsmarkers)
1596 1596
1597 1597 if opts.get('phases', False):
1598 1598 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1599 1599 phasedata = phases.binaryencode(headsbyphase)
1600 1600 bundler.newpart('phase-heads', data=phasedata)
1601 1601
1602 1602 def addparttagsfnodescache(repo, bundler, outgoing):
1603 1603 # we include the tags fnode cache for the bundle changeset
1604 1604 # (as an optional parts)
1605 1605 cache = tags.hgtagsfnodescache(repo.unfiltered())
1606 1606 chunks = []
1607 1607
1608 1608 # .hgtags fnodes are only relevant for head changesets. While we could
1609 1609 # transfer values for all known nodes, there will likely be little to
1610 1610 # no benefit.
1611 1611 #
1612 1612 # We don't bother using a generator to produce output data because
1613 1613 # a) we only have 40 bytes per head and even esoteric numbers of heads
1614 1614 # consume little memory (1M heads is 40MB) b) we don't want to send the
1615 1615 # part if we don't have entries and knowing if we have entries requires
1616 1616 # cache lookups.
1617 1617 for node in outgoing.missingheads:
1618 1618 # Don't compute missing, as this may slow down serving.
1619 1619 fnode = cache.getfnode(node, computemissing=False)
1620 1620 if fnode is not None:
1621 1621 chunks.extend([node, fnode])
1622 1622
1623 1623 if chunks:
1624 1624 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1625 1625
1626 1626 def buildobsmarkerspart(bundler, markers):
1627 1627 """add an obsmarker part to the bundler with <markers>
1628 1628
1629 1629 No part is created if markers is empty.
1630 1630 Raises ValueError if the bundler doesn't support any known obsmarker format.
1631 1631 """
1632 1632 if not markers:
1633 1633 return None
1634 1634
1635 1635 remoteversions = obsmarkersversion(bundler.capabilities)
1636 1636 version = obsolete.commonversion(remoteversions)
1637 1637 if version is None:
1638 1638 raise ValueError('bundler does not support common obsmarker format')
1639 1639 stream = obsolete.encodemarkers(markers, True, version=version)
1640 1640 return bundler.newpart('obsmarkers', data=stream)
1641 1641
1642 1642 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1643 1643 compopts=None):
1644 1644 """Write a bundle file and return its filename.
1645 1645
1646 1646 Existing files will not be overwritten.
1647 1647 If no filename is specified, a temporary file is created.
1648 1648 bz2 compression can be turned off.
1649 1649 The bundle file will be deleted in case of errors.
1650 1650 """
1651 1651
1652 1652 if bundletype == "HG20":
1653 1653 bundle = bundle20(ui)
1654 1654 bundle.setcompression(compression, compopts)
1655 1655 part = bundle.newpart('changegroup', data=cg.getchunks())
1656 1656 part.addparam('version', cg.version)
1657 1657 if 'clcount' in cg.extras:
1658 1658 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1659 1659 mandatory=False)
1660 1660 chunkiter = bundle.getchunks()
1661 1661 else:
1662 1662 # compression argument is only for the bundle2 case
1663 1663 assert compression is None
1664 1664 if cg.version != '01':
1665 1665 raise error.Abort(_('old bundle types only supports v1 '
1666 1666 'changegroups'))
1667 1667 header, comp = bundletypes[bundletype]
1668 1668 if comp not in util.compengines.supportedbundletypes:
1669 1669 raise error.Abort(_('unknown stream compression type: %s')
1670 1670 % comp)
1671 1671 compengine = util.compengines.forbundletype(comp)
1672 1672 def chunkiter():
1673 1673 yield header
1674 1674 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1675 1675 yield chunk
1676 1676 chunkiter = chunkiter()
1677 1677
1678 1678 # parse the changegroup data, otherwise we will block
1679 1679 # in case of sshrepo because we don't know the end of the stream
1680 1680 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1681 1681
1682 1682 def combinechangegroupresults(op):
1683 1683 """logic to combine 0 or more addchangegroup results into one"""
1684 1684 results = [r.get('return', 0)
1685 1685 for r in op.records['changegroup']]
1686 1686 changedheads = 0
1687 1687 result = 1
1688 1688 for ret in results:
1689 1689 # If any changegroup result is 0, return 0
1690 1690 if ret == 0:
1691 1691 result = 0
1692 1692 break
1693 1693 if ret < -1:
1694 1694 changedheads += ret + 1
1695 1695 elif ret > 1:
1696 1696 changedheads += ret - 1
1697 1697 if changedheads > 0:
1698 1698 result = 1 + changedheads
1699 1699 elif changedheads < 0:
1700 1700 result = -1 + changedheads
1701 1701 return result
1702 1702
1703 1703 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1704 1704 'targetphase'))
1705 1705 def handlechangegroup(op, inpart):
1706 1706 """apply a changegroup part on the repo
1707 1707
1708 1708 This is a very early implementation that will massive rework before being
1709 1709 inflicted to any end-user.
1710 1710 """
1711 1711 tr = op.gettransaction()
1712 1712 unpackerversion = inpart.params.get('version', '01')
1713 1713 # We should raise an appropriate exception here
1714 1714 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1715 1715 # the source and url passed here are overwritten by the one contained in
1716 1716 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1717 1717 nbchangesets = None
1718 1718 if 'nbchanges' in inpart.params:
1719 1719 nbchangesets = int(inpart.params.get('nbchanges'))
1720 1720 if ('treemanifest' in inpart.params and
1721 1721 'treemanifest' not in op.repo.requirements):
1722 1722 if len(op.repo.changelog) != 0:
1723 1723 raise error.Abort(_(
1724 1724 "bundle contains tree manifests, but local repo is "
1725 1725 "non-empty and does not use tree manifests"))
1726 1726 op.repo.requirements.add('treemanifest')
1727 1727 op.repo._applyopenerreqs()
1728 1728 op.repo._writerequirements()
1729 1729 extrakwargs = {}
1730 1730 targetphase = inpart.params.get('targetphase')
1731 1731 if targetphase is not None:
1732 1732 extrakwargs['targetphase'] = int(targetphase)
1733 1733 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1734 1734 expectedtotal=nbchangesets, **extrakwargs)
1735 1735 if op.reply is not None:
1736 1736 # This is definitely not the final form of this
1737 1737 # return. But one need to start somewhere.
1738 1738 part = op.reply.newpart('reply:changegroup', mandatory=False)
1739 1739 part.addparam(
1740 1740 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1741 1741 part.addparam('return', '%i' % ret, mandatory=False)
1742 1742 assert not inpart.read()
1743 1743
1744 1744 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1745 1745 ['digest:%s' % k for k in util.DIGESTS.keys()])
1746 1746 @parthandler('remote-changegroup', _remotechangegroupparams)
1747 1747 def handleremotechangegroup(op, inpart):
1748 1748 """apply a bundle10 on the repo, given an url and validation information
1749 1749
1750 1750 All the information about the remote bundle to import are given as
1751 1751 parameters. The parameters include:
1752 1752 - url: the url to the bundle10.
1753 1753 - size: the bundle10 file size. It is used to validate what was
1754 1754 retrieved by the client matches the server knowledge about the bundle.
1755 1755 - digests: a space separated list of the digest types provided as
1756 1756 parameters.
1757 1757 - digest:<digest-type>: the hexadecimal representation of the digest with
1758 1758 that name. Like the size, it is used to validate what was retrieved by
1759 1759 the client matches what the server knows about the bundle.
1760 1760
1761 1761 When multiple digest types are given, all of them are checked.
1762 1762 """
1763 1763 try:
1764 1764 raw_url = inpart.params['url']
1765 1765 except KeyError:
1766 1766 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1767 1767 parsed_url = util.url(raw_url)
1768 1768 if parsed_url.scheme not in capabilities['remote-changegroup']:
1769 1769 raise error.Abort(_('remote-changegroup does not support %s urls') %
1770 1770 parsed_url.scheme)
1771 1771
1772 1772 try:
1773 1773 size = int(inpart.params['size'])
1774 1774 except ValueError:
1775 1775 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1776 1776 % 'size')
1777 1777 except KeyError:
1778 1778 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1779 1779
1780 1780 digests = {}
1781 1781 for typ in inpart.params.get('digests', '').split():
1782 1782 param = 'digest:%s' % typ
1783 1783 try:
1784 1784 value = inpart.params[param]
1785 1785 except KeyError:
1786 1786 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1787 1787 param)
1788 1788 digests[typ] = value
1789 1789
1790 1790 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1791 1791
1792 1792 tr = op.gettransaction()
1793 1793 from . import exchange
1794 1794 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1795 1795 if not isinstance(cg, changegroup.cg1unpacker):
1796 1796 raise error.Abort(_('%s: not a bundle version 1.0') %
1797 1797 util.hidepassword(raw_url))
1798 1798 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1799 1799 if op.reply is not None:
1800 1800 # This is definitely not the final form of this
1801 1801 # return. But one need to start somewhere.
1802 1802 part = op.reply.newpart('reply:changegroup')
1803 1803 part.addparam(
1804 1804 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1805 1805 part.addparam('return', '%i' % ret, mandatory=False)
1806 1806 try:
1807 1807 real_part.validate()
1808 1808 except error.Abort as e:
1809 1809 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1810 1810 (util.hidepassword(raw_url), str(e)))
1811 1811 assert not inpart.read()
1812 1812
1813 1813 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1814 1814 def handlereplychangegroup(op, inpart):
1815 1815 ret = int(inpart.params['return'])
1816 1816 replyto = int(inpart.params['in-reply-to'])
1817 1817 op.records.add('changegroup', {'return': ret}, replyto)
1818 1818
1819 1819 @parthandler('check:bookmarks')
1820 1820 def handlecheckbookmarks(op, inpart):
1821 1821 """check location of bookmarks
1822 1822
1823 1823 This part is to be used to detect push race regarding bookmark, it
1824 1824 contains binary encoded (bookmark, node) tuple. If the local state does
1825 1825 not marks the one in the part, a PushRaced exception is raised
1826 1826 """
1827 1827 bookdata = bookmarks.binarydecode(inpart)
1828 1828
1829 1829 msgstandard = ('repository changed while pushing - please try again '
1830 1830 '(bookmark "%s" move from %s to %s)')
1831 1831 msgmissing = ('repository changed while pushing - please try again '
1832 1832 '(bookmark "%s" is missing, expected %s)')
1833 1833 msgexist = ('repository changed while pushing - please try again '
1834 1834 '(bookmark "%s" set on %s, expected missing)')
1835 1835 for book, node in bookdata:
1836 1836 currentnode = op.repo._bookmarks.get(book)
1837 1837 if currentnode != node:
1838 1838 if node is None:
1839 1839 finalmsg = msgexist % (book, nodemod.short(currentnode))
1840 1840 elif currentnode is None:
1841 1841 finalmsg = msgmissing % (book, nodemod.short(node))
1842 1842 else:
1843 1843 finalmsg = msgstandard % (book, nodemod.short(node),
1844 1844 nodemod.short(currentnode))
1845 1845 raise error.PushRaced(finalmsg)
1846 1846
1847 1847 @parthandler('check:heads')
1848 1848 def handlecheckheads(op, inpart):
1849 1849 """check that head of the repo did not change
1850 1850
1851 1851 This is used to detect a push race when using unbundle.
1852 1852 This replaces the "heads" argument of unbundle."""
1853 1853 h = inpart.read(20)
1854 1854 heads = []
1855 1855 while len(h) == 20:
1856 1856 heads.append(h)
1857 1857 h = inpart.read(20)
1858 1858 assert not h
1859 1859 # Trigger a transaction so that we are guaranteed to have the lock now.
1860 1860 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1861 1861 op.gettransaction()
1862 1862 if sorted(heads) != sorted(op.repo.heads()):
1863 1863 raise error.PushRaced('repository changed while pushing - '
1864 1864 'please try again')
1865 1865
1866 1866 @parthandler('check:updated-heads')
1867 1867 def handlecheckupdatedheads(op, inpart):
1868 1868 """check for race on the heads touched by a push
1869 1869
1870 1870 This is similar to 'check:heads' but focus on the heads actually updated
1871 1871 during the push. If other activities happen on unrelated heads, it is
1872 1872 ignored.
1873 1873
1874 1874 This allow server with high traffic to avoid push contention as long as
1875 1875 unrelated parts of the graph are involved."""
1876 1876 h = inpart.read(20)
1877 1877 heads = []
1878 1878 while len(h) == 20:
1879 1879 heads.append(h)
1880 1880 h = inpart.read(20)
1881 1881 assert not h
1882 1882 # trigger a transaction so that we are guaranteed to have the lock now.
1883 1883 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1884 1884 op.gettransaction()
1885 1885
1886 1886 currentheads = set()
1887 1887 for ls in op.repo.branchmap().itervalues():
1888 1888 currentheads.update(ls)
1889 1889
1890 1890 for h in heads:
1891 1891 if h not in currentheads:
1892 1892 raise error.PushRaced('repository changed while pushing - '
1893 1893 'please try again')
1894 1894
1895 1895 @parthandler('check:phases')
1896 1896 def handlecheckphases(op, inpart):
1897 1897 """check that phase boundaries of the repository did not change
1898 1898
1899 1899 This is used to detect a push race.
1900 1900 """
1901 1901 phasetonodes = phases.binarydecode(inpart)
1902 1902 unfi = op.repo.unfiltered()
1903 1903 cl = unfi.changelog
1904 1904 phasecache = unfi._phasecache
1905 1905 msg = ('repository changed while pushing - please try again '
1906 1906 '(%s is %s expected %s)')
1907 1907 for expectedphase, nodes in enumerate(phasetonodes):
1908 1908 for n in nodes:
1909 1909 actualphase = phasecache.phase(unfi, cl.rev(n))
1910 1910 if actualphase != expectedphase:
1911 1911 finalmsg = msg % (nodemod.short(n),
1912 1912 phases.phasenames[actualphase],
1913 1913 phases.phasenames[expectedphase])
1914 1914 raise error.PushRaced(finalmsg)
1915 1915
1916 1916 @parthandler('output')
1917 1917 def handleoutput(op, inpart):
1918 1918 """forward output captured on the server to the client"""
1919 1919 for line in inpart.read().splitlines():
1920 1920 op.ui.status(_('remote: %s\n') % line)
1921 1921
1922 1922 @parthandler('replycaps')
1923 1923 def handlereplycaps(op, inpart):
1924 1924 """Notify that a reply bundle should be created
1925 1925
1926 1926 The payload contains the capabilities information for the reply"""
1927 1927 caps = decodecaps(inpart.read())
1928 1928 if op.reply is None:
1929 1929 op.reply = bundle20(op.ui, caps)
1930 1930
1931 1931 class AbortFromPart(error.Abort):
1932 1932 """Sub-class of Abort that denotes an error from a bundle2 part."""
1933 1933
1934 1934 @parthandler('error:abort', ('message', 'hint'))
1935 1935 def handleerrorabort(op, inpart):
1936 1936 """Used to transmit abort error over the wire"""
1937 1937 raise AbortFromPart(inpart.params['message'],
1938 1938 hint=inpart.params.get('hint'))
1939 1939
1940 1940 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1941 1941 'in-reply-to'))
1942 1942 def handleerrorpushkey(op, inpart):
1943 1943 """Used to transmit failure of a mandatory pushkey over the wire"""
1944 1944 kwargs = {}
1945 1945 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1946 1946 value = inpart.params.get(name)
1947 1947 if value is not None:
1948 1948 kwargs[name] = value
1949 1949 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1950 1950
1951 1951 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1952 1952 def handleerrorunsupportedcontent(op, inpart):
1953 1953 """Used to transmit unknown content error over the wire"""
1954 1954 kwargs = {}
1955 1955 parttype = inpart.params.get('parttype')
1956 1956 if parttype is not None:
1957 1957 kwargs['parttype'] = parttype
1958 1958 params = inpart.params.get('params')
1959 1959 if params is not None:
1960 1960 kwargs['params'] = params.split('\0')
1961 1961
1962 1962 raise error.BundleUnknownFeatureError(**kwargs)
1963 1963
1964 1964 @parthandler('error:pushraced', ('message',))
1965 1965 def handleerrorpushraced(op, inpart):
1966 1966 """Used to transmit push race error over the wire"""
1967 1967 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1968 1968
1969 1969 @parthandler('listkeys', ('namespace',))
1970 1970 def handlelistkeys(op, inpart):
1971 1971 """retrieve pushkey namespace content stored in a bundle2"""
1972 1972 namespace = inpart.params['namespace']
1973 1973 r = pushkey.decodekeys(inpart.read())
1974 1974 op.records.add('listkeys', (namespace, r))
1975 1975
1976 1976 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1977 1977 def handlepushkey(op, inpart):
1978 1978 """process a pushkey request"""
1979 1979 dec = pushkey.decode
1980 1980 namespace = dec(inpart.params['namespace'])
1981 1981 key = dec(inpart.params['key'])
1982 1982 old = dec(inpart.params['old'])
1983 1983 new = dec(inpart.params['new'])
1984 1984 # Grab the transaction to ensure that we have the lock before performing the
1985 1985 # pushkey.
1986 1986 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1987 1987 op.gettransaction()
1988 1988 ret = op.repo.pushkey(namespace, key, old, new)
1989 1989 record = {'namespace': namespace,
1990 1990 'key': key,
1991 1991 'old': old,
1992 1992 'new': new}
1993 1993 op.records.add('pushkey', record)
1994 1994 if op.reply is not None:
1995 1995 rpart = op.reply.newpart('reply:pushkey')
1996 1996 rpart.addparam(
1997 1997 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1998 1998 rpart.addparam('return', '%i' % ret, mandatory=False)
1999 1999 if inpart.mandatory and not ret:
2000 2000 kwargs = {}
2001 2001 for key in ('namespace', 'key', 'new', 'old', 'ret'):
2002 2002 if key in inpart.params:
2003 2003 kwargs[key] = inpart.params[key]
2004 2004 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
2005 2005
2006 2006 @parthandler('bookmarks')
2007 2007 def handlebookmark(op, inpart):
2008 2008 """transmit bookmark information
2009 2009
2010 2010 The part contains binary encoded bookmark information.
2011 2011
2012 2012 The exact behavior of this part can be controlled by the 'bookmarks' mode
2013 2013 on the bundle operation.
2014 2014
2015 2015 When mode is 'apply' (the default) the bookmark information is applied as
2016 2016 is to the unbundling repository. Make sure a 'check:bookmarks' part is
2017 2017 issued earlier to check for push races in such update. This behavior is
2018 2018 suitable for pushing.
2019 2019
2020 2020 When mode is 'records', the information is recorded into the 'bookmarks'
2021 2021 records of the bundle operation. This behavior is suitable for pulling.
2022 2022 """
2023 2023 changes = bookmarks.binarydecode(inpart)
2024 2024
2025 2025 pushkeycompat = op.repo.ui.configbool('server', 'bookmarks-pushkey-compat')
2026 2026 bookmarksmode = op.modes.get('bookmarks', 'apply')
2027 2027
2028 2028 if bookmarksmode == 'apply':
2029 2029 tr = op.gettransaction()
2030 2030 bookstore = op.repo._bookmarks
2031 2031 if pushkeycompat:
2032 2032 allhooks = []
2033 2033 for book, node in changes:
2034 2034 hookargs = tr.hookargs.copy()
2035 2035 hookargs['pushkeycompat'] = '1'
2036 2036 hookargs['namespace'] = 'bookmarks'
2037 2037 hookargs['key'] = book
2038 2038 hookargs['old'] = nodemod.hex(bookstore.get(book, ''))
2039 2039 hookargs['new'] = nodemod.hex(node if node is not None else '')
2040 2040 allhooks.append(hookargs)
2041 2041
2042 2042 for hookargs in allhooks:
2043 2043 op.repo.hook('prepushkey', throw=True, **hookargs)
2044 2044
2045 2045 bookstore.applychanges(op.repo, op.gettransaction(), changes)
2046 2046
2047 2047 if pushkeycompat:
2048 2048 def runhook():
2049 2049 for hookargs in allhooks:
2050 2050 op.repo.hook('pushkey', **hookargs)
2051 2051 op.repo._afterlock(runhook)
2052 2052
2053 2053 elif bookmarksmode == 'records':
2054 2054 for book, node in changes:
2055 2055 record = {'bookmark': book, 'node': node}
2056 2056 op.records.add('bookmarks', record)
2057 2057 else:
2058 2058 raise error.ProgrammingError('unkown bookmark mode: %s' % bookmarksmode)
2059 2059
2060 2060 @parthandler('phase-heads')
2061 2061 def handlephases(op, inpart):
2062 2062 """apply phases from bundle part to repo"""
2063 2063 headsbyphase = phases.binarydecode(inpart)
2064 2064 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
2065 2065
2066 2066 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
2067 2067 def handlepushkeyreply(op, inpart):
2068 2068 """retrieve the result of a pushkey request"""
2069 2069 ret = int(inpart.params['return'])
2070 2070 partid = int(inpart.params['in-reply-to'])
2071 2071 op.records.add('pushkey', {'return': ret}, partid)
2072 2072
2073 2073 @parthandler('obsmarkers')
2074 2074 def handleobsmarker(op, inpart):
2075 2075 """add a stream of obsmarkers to the repo"""
2076 2076 tr = op.gettransaction()
2077 2077 markerdata = inpart.read()
2078 2078 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
2079 2079 op.ui.write(('obsmarker-exchange: %i bytes received\n')
2080 2080 % len(markerdata))
2081 2081 # The mergemarkers call will crash if marker creation is not enabled.
2082 2082 # we want to avoid this if the part is advisory.
2083 2083 if not inpart.mandatory and op.repo.obsstore.readonly:
2084 2084 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled\n')
2085 2085 return
2086 2086 new = op.repo.obsstore.mergemarkers(tr, markerdata)
2087 2087 op.repo.invalidatevolatilesets()
2088 2088 if new:
2089 2089 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
2090 2090 op.records.add('obsmarkers', {'new': new})
2091 2091 if op.reply is not None:
2092 2092 rpart = op.reply.newpart('reply:obsmarkers')
2093 2093 rpart.addparam(
2094 2094 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
2095 2095 rpart.addparam('new', '%i' % new, mandatory=False)
2096 2096
2097 2097
2098 2098 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
2099 2099 def handleobsmarkerreply(op, inpart):
2100 2100 """retrieve the result of a pushkey request"""
2101 2101 ret = int(inpart.params['new'])
2102 2102 partid = int(inpart.params['in-reply-to'])
2103 2103 op.records.add('obsmarkers', {'new': ret}, partid)
2104 2104
2105 2105 @parthandler('hgtagsfnodes')
2106 2106 def handlehgtagsfnodes(op, inpart):
2107 2107 """Applies .hgtags fnodes cache entries to the local repo.
2108 2108
2109 2109 Payload is pairs of 20 byte changeset nodes and filenodes.
2110 2110 """
2111 2111 # Grab the transaction so we ensure that we have the lock at this point.
2112 2112 if op.ui.configbool('experimental', 'bundle2lazylocking'):
2113 2113 op.gettransaction()
2114 2114 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
2115 2115
2116 2116 count = 0
2117 2117 while True:
2118 2118 node = inpart.read(20)
2119 2119 fnode = inpart.read(20)
2120 2120 if len(node) < 20 or len(fnode) < 20:
2121 2121 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
2122 2122 break
2123 2123 cache.setfnode(node, fnode)
2124 2124 count += 1
2125 2125
2126 2126 cache.write()
2127 2127 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
2128 2128
2129 2129 @parthandler('pushvars')
2130 2130 def bundle2getvars(op, part):
2131 2131 '''unbundle a bundle2 containing shellvars on the server'''
2132 2132 # An option to disable unbundling on server-side for security reasons
2133 2133 if op.ui.configbool('push', 'pushvars.server'):
2134 2134 hookargs = {}
2135 2135 for key, value in part.advisoryparams:
2136 2136 key = key.upper()
2137 2137 # We want pushed variables to have USERVAR_ prepended so we know
2138 2138 # they came from the --pushvar flag.
2139 2139 key = "USERVAR_" + key
2140 2140 hookargs[key] = value
2141 2141 op.addhookargs(hookargs)
2142 2142
2143 2143 @parthandler('stream2', ('requirements', 'filecount', 'bytecount'))
2144 2144 def handlestreamv2bundle(op, part):
2145 2145
2146 requirements = part.params['requirements'].split()
2146 requirements = urlreq.unquote(part.params['requirements']).split(',')
2147 2147 filecount = int(part.params['filecount'])
2148 2148 bytecount = int(part.params['bytecount'])
2149 2149
2150 2150 repo = op.repo
2151 2151 if len(repo):
2152 2152 msg = _('cannot apply stream clone to non empty repository')
2153 2153 raise error.Abort(msg)
2154 2154
2155 2155 repo.ui.debug('applying stream bundle\n')
2156 2156 streamclone.applybundlev2(repo, part, filecount, bytecount,
2157 2157 requirements)
@@ -1,2261 +1,2261
1 1 # exchange.py - utility to exchange data between repos.
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import collections
11 11 import errno
12 12 import hashlib
13 13
14 14 from .i18n import _
15 15 from .node import (
16 16 bin,
17 17 hex,
18 18 nullid,
19 19 )
20 20 from . import (
21 21 bookmarks as bookmod,
22 22 bundle2,
23 23 changegroup,
24 24 discovery,
25 25 error,
26 26 lock as lockmod,
27 27 logexchange,
28 28 obsolete,
29 29 phases,
30 30 pushkey,
31 31 pycompat,
32 32 scmutil,
33 33 sslutil,
34 34 streamclone,
35 35 url as urlmod,
36 36 util,
37 37 )
38 38
39 39 urlerr = util.urlerr
40 40 urlreq = util.urlreq
41 41
42 42 # Maps bundle version human names to changegroup versions.
43 43 _bundlespeccgversions = {'v1': '01',
44 44 'v2': '02',
45 45 'packed1': 's1',
46 46 'bundle2': '02', #legacy
47 47 }
48 48
49 49 # Compression engines allowed in version 1. THIS SHOULD NEVER CHANGE.
50 50 _bundlespecv1compengines = {'gzip', 'bzip2', 'none'}
51 51
52 52 def parsebundlespec(repo, spec, strict=True, externalnames=False):
53 53 """Parse a bundle string specification into parts.
54 54
55 55 Bundle specifications denote a well-defined bundle/exchange format.
56 56 The content of a given specification should not change over time in
57 57 order to ensure that bundles produced by a newer version of Mercurial are
58 58 readable from an older version.
59 59
60 60 The string currently has the form:
61 61
62 62 <compression>-<type>[;<parameter0>[;<parameter1>]]
63 63
64 64 Where <compression> is one of the supported compression formats
65 65 and <type> is (currently) a version string. A ";" can follow the type and
66 66 all text afterwards is interpreted as URI encoded, ";" delimited key=value
67 67 pairs.
68 68
69 69 If ``strict`` is True (the default) <compression> is required. Otherwise,
70 70 it is optional.
71 71
72 72 If ``externalnames`` is False (the default), the human-centric names will
73 73 be converted to their internal representation.
74 74
75 75 Returns a 3-tuple of (compression, version, parameters). Compression will
76 76 be ``None`` if not in strict mode and a compression isn't defined.
77 77
78 78 An ``InvalidBundleSpecification`` is raised when the specification is
79 79 not syntactically well formed.
80 80
81 81 An ``UnsupportedBundleSpecification`` is raised when the compression or
82 82 bundle type/version is not recognized.
83 83
84 84 Note: this function will likely eventually return a more complex data
85 85 structure, including bundle2 part information.
86 86 """
87 87 def parseparams(s):
88 88 if ';' not in s:
89 89 return s, {}
90 90
91 91 params = {}
92 92 version, paramstr = s.split(';', 1)
93 93
94 94 for p in paramstr.split(';'):
95 95 if '=' not in p:
96 96 raise error.InvalidBundleSpecification(
97 97 _('invalid bundle specification: '
98 98 'missing "=" in parameter: %s') % p)
99 99
100 100 key, value = p.split('=', 1)
101 101 key = urlreq.unquote(key)
102 102 value = urlreq.unquote(value)
103 103 params[key] = value
104 104
105 105 return version, params
106 106
107 107
108 108 if strict and '-' not in spec:
109 109 raise error.InvalidBundleSpecification(
110 110 _('invalid bundle specification; '
111 111 'must be prefixed with compression: %s') % spec)
112 112
113 113 if '-' in spec:
114 114 compression, version = spec.split('-', 1)
115 115
116 116 if compression not in util.compengines.supportedbundlenames:
117 117 raise error.UnsupportedBundleSpecification(
118 118 _('%s compression is not supported') % compression)
119 119
120 120 version, params = parseparams(version)
121 121
122 122 if version not in _bundlespeccgversions:
123 123 raise error.UnsupportedBundleSpecification(
124 124 _('%s is not a recognized bundle version') % version)
125 125 else:
126 126 # Value could be just the compression or just the version, in which
127 127 # case some defaults are assumed (but only when not in strict mode).
128 128 assert not strict
129 129
130 130 spec, params = parseparams(spec)
131 131
132 132 if spec in util.compengines.supportedbundlenames:
133 133 compression = spec
134 134 version = 'v1'
135 135 # Generaldelta repos require v2.
136 136 if 'generaldelta' in repo.requirements:
137 137 version = 'v2'
138 138 # Modern compression engines require v2.
139 139 if compression not in _bundlespecv1compengines:
140 140 version = 'v2'
141 141 elif spec in _bundlespeccgversions:
142 142 if spec == 'packed1':
143 143 compression = 'none'
144 144 else:
145 145 compression = 'bzip2'
146 146 version = spec
147 147 else:
148 148 raise error.UnsupportedBundleSpecification(
149 149 _('%s is not a recognized bundle specification') % spec)
150 150
151 151 # Bundle version 1 only supports a known set of compression engines.
152 152 if version == 'v1' and compression not in _bundlespecv1compengines:
153 153 raise error.UnsupportedBundleSpecification(
154 154 _('compression engine %s is not supported on v1 bundles') %
155 155 compression)
156 156
157 157 # The specification for packed1 can optionally declare the data formats
158 158 # required to apply it. If we see this metadata, compare against what the
159 159 # repo supports and error if the bundle isn't compatible.
160 160 if version == 'packed1' and 'requirements' in params:
161 161 requirements = set(params['requirements'].split(','))
162 162 missingreqs = requirements - repo.supportedformats
163 163 if missingreqs:
164 164 raise error.UnsupportedBundleSpecification(
165 165 _('missing support for repository features: %s') %
166 166 ', '.join(sorted(missingreqs)))
167 167
168 168 if not externalnames:
169 169 engine = util.compengines.forbundlename(compression)
170 170 compression = engine.bundletype()[1]
171 171 version = _bundlespeccgversions[version]
172 172 return compression, version, params
173 173
174 174 def readbundle(ui, fh, fname, vfs=None):
175 175 header = changegroup.readexactly(fh, 4)
176 176
177 177 alg = None
178 178 if not fname:
179 179 fname = "stream"
180 180 if not header.startswith('HG') and header.startswith('\0'):
181 181 fh = changegroup.headerlessfixup(fh, header)
182 182 header = "HG10"
183 183 alg = 'UN'
184 184 elif vfs:
185 185 fname = vfs.join(fname)
186 186
187 187 magic, version = header[0:2], header[2:4]
188 188
189 189 if magic != 'HG':
190 190 raise error.Abort(_('%s: not a Mercurial bundle') % fname)
191 191 if version == '10':
192 192 if alg is None:
193 193 alg = changegroup.readexactly(fh, 2)
194 194 return changegroup.cg1unpacker(fh, alg)
195 195 elif version.startswith('2'):
196 196 return bundle2.getunbundler(ui, fh, magicstring=magic + version)
197 197 elif version == 'S1':
198 198 return streamclone.streamcloneapplier(fh)
199 199 else:
200 200 raise error.Abort(_('%s: unknown bundle version %s') % (fname, version))
201 201
202 202 def _formatrequirementsspec(requirements):
203 203 return urlreq.quote(','.join(sorted(requirements)))
204 204
205 205 def _formatrequirementsparams(requirements):
206 206 requirements = _formatrequirementsspec(requirements)
207 207 params = "%s%s" % (urlreq.quote("requirements="), requirements)
208 208 return params
209 209
210 210 def getbundlespec(ui, fh):
211 211 """Infer the bundlespec from a bundle file handle.
212 212
213 213 The input file handle is seeked and the original seek position is not
214 214 restored.
215 215 """
216 216 def speccompression(alg):
217 217 try:
218 218 return util.compengines.forbundletype(alg).bundletype()[0]
219 219 except KeyError:
220 220 return None
221 221
222 222 b = readbundle(ui, fh, None)
223 223 if isinstance(b, changegroup.cg1unpacker):
224 224 alg = b._type
225 225 if alg == '_truncatedBZ':
226 226 alg = 'BZ'
227 227 comp = speccompression(alg)
228 228 if not comp:
229 229 raise error.Abort(_('unknown compression algorithm: %s') % alg)
230 230 return '%s-v1' % comp
231 231 elif isinstance(b, bundle2.unbundle20):
232 232 if 'Compression' in b.params:
233 233 comp = speccompression(b.params['Compression'])
234 234 if not comp:
235 235 raise error.Abort(_('unknown compression algorithm: %s') % comp)
236 236 else:
237 237 comp = 'none'
238 238
239 239 version = None
240 240 for part in b.iterparts():
241 241 if part.type == 'changegroup':
242 242 version = part.params['version']
243 243 if version in ('01', '02'):
244 244 version = 'v2'
245 245 else:
246 246 raise error.Abort(_('changegroup version %s does not have '
247 247 'a known bundlespec') % version,
248 248 hint=_('try upgrading your Mercurial '
249 249 'client'))
250 250
251 251 if not version:
252 252 raise error.Abort(_('could not identify changegroup version in '
253 253 'bundle'))
254 254
255 255 return '%s-%s' % (comp, version)
256 256 elif isinstance(b, streamclone.streamcloneapplier):
257 257 requirements = streamclone.readbundle1header(fh)[2]
258 258 return 'none-packed1;%s' % _formatrequirementsparams(requirements)
259 259 else:
260 260 raise error.Abort(_('unknown bundle type: %s') % b)
261 261
262 262 def _computeoutgoing(repo, heads, common):
263 263 """Computes which revs are outgoing given a set of common
264 264 and a set of heads.
265 265
266 266 This is a separate function so extensions can have access to
267 267 the logic.
268 268
269 269 Returns a discovery.outgoing object.
270 270 """
271 271 cl = repo.changelog
272 272 if common:
273 273 hasnode = cl.hasnode
274 274 common = [n for n in common if hasnode(n)]
275 275 else:
276 276 common = [nullid]
277 277 if not heads:
278 278 heads = cl.heads()
279 279 return discovery.outgoing(repo, common, heads)
280 280
281 281 def _forcebundle1(op):
282 282 """return true if a pull/push must use bundle1
283 283
284 284 This function is used to allow testing of the older bundle version"""
285 285 ui = op.repo.ui
286 286 forcebundle1 = False
287 287 # The goal is this config is to allow developer to choose the bundle
288 288 # version used during exchanged. This is especially handy during test.
289 289 # Value is a list of bundle version to be picked from, highest version
290 290 # should be used.
291 291 #
292 292 # developer config: devel.legacy.exchange
293 293 exchange = ui.configlist('devel', 'legacy.exchange')
294 294 forcebundle1 = 'bundle2' not in exchange and 'bundle1' in exchange
295 295 return forcebundle1 or not op.remote.capable('bundle2')
296 296
297 297 class pushoperation(object):
298 298 """A object that represent a single push operation
299 299
300 300 Its purpose is to carry push related state and very common operations.
301 301
302 302 A new pushoperation should be created at the beginning of each push and
303 303 discarded afterward.
304 304 """
305 305
306 306 def __init__(self, repo, remote, force=False, revs=None, newbranch=False,
307 307 bookmarks=(), pushvars=None):
308 308 # repo we push from
309 309 self.repo = repo
310 310 self.ui = repo.ui
311 311 # repo we push to
312 312 self.remote = remote
313 313 # force option provided
314 314 self.force = force
315 315 # revs to be pushed (None is "all")
316 316 self.revs = revs
317 317 # bookmark explicitly pushed
318 318 self.bookmarks = bookmarks
319 319 # allow push of new branch
320 320 self.newbranch = newbranch
321 321 # step already performed
322 322 # (used to check what steps have been already performed through bundle2)
323 323 self.stepsdone = set()
324 324 # Integer version of the changegroup push result
325 325 # - None means nothing to push
326 326 # - 0 means HTTP error
327 327 # - 1 means we pushed and remote head count is unchanged *or*
328 328 # we have outgoing changesets but refused to push
329 329 # - other values as described by addchangegroup()
330 330 self.cgresult = None
331 331 # Boolean value for the bookmark push
332 332 self.bkresult = None
333 333 # discover.outgoing object (contains common and outgoing data)
334 334 self.outgoing = None
335 335 # all remote topological heads before the push
336 336 self.remoteheads = None
337 337 # Details of the remote branch pre and post push
338 338 #
339 339 # mapping: {'branch': ([remoteheads],
340 340 # [newheads],
341 341 # [unsyncedheads],
342 342 # [discardedheads])}
343 343 # - branch: the branch name
344 344 # - remoteheads: the list of remote heads known locally
345 345 # None if the branch is new
346 346 # - newheads: the new remote heads (known locally) with outgoing pushed
347 347 # - unsyncedheads: the list of remote heads unknown locally.
348 348 # - discardedheads: the list of remote heads made obsolete by the push
349 349 self.pushbranchmap = None
350 350 # testable as a boolean indicating if any nodes are missing locally.
351 351 self.incoming = None
352 352 # summary of the remote phase situation
353 353 self.remotephases = None
354 354 # phases changes that must be pushed along side the changesets
355 355 self.outdatedphases = None
356 356 # phases changes that must be pushed if changeset push fails
357 357 self.fallbackoutdatedphases = None
358 358 # outgoing obsmarkers
359 359 self.outobsmarkers = set()
360 360 # outgoing bookmarks
361 361 self.outbookmarks = []
362 362 # transaction manager
363 363 self.trmanager = None
364 364 # map { pushkey partid -> callback handling failure}
365 365 # used to handle exception from mandatory pushkey part failure
366 366 self.pkfailcb = {}
367 367 # an iterable of pushvars or None
368 368 self.pushvars = pushvars
369 369
370 370 @util.propertycache
371 371 def futureheads(self):
372 372 """future remote heads if the changeset push succeeds"""
373 373 return self.outgoing.missingheads
374 374
375 375 @util.propertycache
376 376 def fallbackheads(self):
377 377 """future remote heads if the changeset push fails"""
378 378 if self.revs is None:
379 379 # not target to push, all common are relevant
380 380 return self.outgoing.commonheads
381 381 unfi = self.repo.unfiltered()
382 382 # I want cheads = heads(::missingheads and ::commonheads)
383 383 # (missingheads is revs with secret changeset filtered out)
384 384 #
385 385 # This can be expressed as:
386 386 # cheads = ( (missingheads and ::commonheads)
387 387 # + (commonheads and ::missingheads))"
388 388 # )
389 389 #
390 390 # while trying to push we already computed the following:
391 391 # common = (::commonheads)
392 392 # missing = ((commonheads::missingheads) - commonheads)
393 393 #
394 394 # We can pick:
395 395 # * missingheads part of common (::commonheads)
396 396 common = self.outgoing.common
397 397 nm = self.repo.changelog.nodemap
398 398 cheads = [node for node in self.revs if nm[node] in common]
399 399 # and
400 400 # * commonheads parents on missing
401 401 revset = unfi.set('%ln and parents(roots(%ln))',
402 402 self.outgoing.commonheads,
403 403 self.outgoing.missing)
404 404 cheads.extend(c.node() for c in revset)
405 405 return cheads
406 406
407 407 @property
408 408 def commonheads(self):
409 409 """set of all common heads after changeset bundle push"""
410 410 if self.cgresult:
411 411 return self.futureheads
412 412 else:
413 413 return self.fallbackheads
414 414
415 415 # mapping of message used when pushing bookmark
416 416 bookmsgmap = {'update': (_("updating bookmark %s\n"),
417 417 _('updating bookmark %s failed!\n')),
418 418 'export': (_("exporting bookmark %s\n"),
419 419 _('exporting bookmark %s failed!\n')),
420 420 'delete': (_("deleting remote bookmark %s\n"),
421 421 _('deleting remote bookmark %s failed!\n')),
422 422 }
423 423
424 424
425 425 def push(repo, remote, force=False, revs=None, newbranch=False, bookmarks=(),
426 426 opargs=None):
427 427 '''Push outgoing changesets (limited by revs) from a local
428 428 repository to remote. Return an integer:
429 429 - None means nothing to push
430 430 - 0 means HTTP error
431 431 - 1 means we pushed and remote head count is unchanged *or*
432 432 we have outgoing changesets but refused to push
433 433 - other values as described by addchangegroup()
434 434 '''
435 435 if opargs is None:
436 436 opargs = {}
437 437 pushop = pushoperation(repo, remote, force, revs, newbranch, bookmarks,
438 438 **pycompat.strkwargs(opargs))
439 439 if pushop.remote.local():
440 440 missing = (set(pushop.repo.requirements)
441 441 - pushop.remote.local().supported)
442 442 if missing:
443 443 msg = _("required features are not"
444 444 " supported in the destination:"
445 445 " %s") % (', '.join(sorted(missing)))
446 446 raise error.Abort(msg)
447 447
448 448 if not pushop.remote.canpush():
449 449 raise error.Abort(_("destination does not support push"))
450 450
451 451 if not pushop.remote.capable('unbundle'):
452 452 raise error.Abort(_('cannot push: destination does not support the '
453 453 'unbundle wire protocol command'))
454 454
455 455 # get lock as we might write phase data
456 456 wlock = lock = None
457 457 try:
458 458 # bundle2 push may receive a reply bundle touching bookmarks or other
459 459 # things requiring the wlock. Take it now to ensure proper ordering.
460 460 maypushback = pushop.ui.configbool('experimental', 'bundle2.pushback')
461 461 if (not _forcebundle1(pushop)) and maypushback:
462 462 wlock = pushop.repo.wlock()
463 463 lock = pushop.repo.lock()
464 464 pushop.trmanager = transactionmanager(pushop.repo,
465 465 'push-response',
466 466 pushop.remote.url())
467 467 except IOError as err:
468 468 if err.errno != errno.EACCES:
469 469 raise
470 470 # source repo cannot be locked.
471 471 # We do not abort the push, but just disable the local phase
472 472 # synchronisation.
473 473 msg = 'cannot lock source repository: %s\n' % err
474 474 pushop.ui.debug(msg)
475 475
476 476 with wlock or util.nullcontextmanager(), \
477 477 lock or util.nullcontextmanager(), \
478 478 pushop.trmanager or util.nullcontextmanager():
479 479 pushop.repo.checkpush(pushop)
480 480 _pushdiscovery(pushop)
481 481 if not _forcebundle1(pushop):
482 482 _pushbundle2(pushop)
483 483 _pushchangeset(pushop)
484 484 _pushsyncphase(pushop)
485 485 _pushobsolete(pushop)
486 486 _pushbookmark(pushop)
487 487
488 488 return pushop
489 489
490 490 # list of steps to perform discovery before push
491 491 pushdiscoveryorder = []
492 492
493 493 # Mapping between step name and function
494 494 #
495 495 # This exists to help extensions wrap steps if necessary
496 496 pushdiscoverymapping = {}
497 497
498 498 def pushdiscovery(stepname):
499 499 """decorator for function performing discovery before push
500 500
501 501 The function is added to the step -> function mapping and appended to the
502 502 list of steps. Beware that decorated function will be added in order (this
503 503 may matter).
504 504
505 505 You can only use this decorator for a new step, if you want to wrap a step
506 506 from an extension, change the pushdiscovery dictionary directly."""
507 507 def dec(func):
508 508 assert stepname not in pushdiscoverymapping
509 509 pushdiscoverymapping[stepname] = func
510 510 pushdiscoveryorder.append(stepname)
511 511 return func
512 512 return dec
513 513
514 514 def _pushdiscovery(pushop):
515 515 """Run all discovery steps"""
516 516 for stepname in pushdiscoveryorder:
517 517 step = pushdiscoverymapping[stepname]
518 518 step(pushop)
519 519
520 520 @pushdiscovery('changeset')
521 521 def _pushdiscoverychangeset(pushop):
522 522 """discover the changeset that need to be pushed"""
523 523 fci = discovery.findcommonincoming
524 524 if pushop.revs:
525 525 commoninc = fci(pushop.repo, pushop.remote, force=pushop.force,
526 526 ancestorsof=pushop.revs)
527 527 else:
528 528 commoninc = fci(pushop.repo, pushop.remote, force=pushop.force)
529 529 common, inc, remoteheads = commoninc
530 530 fco = discovery.findcommonoutgoing
531 531 outgoing = fco(pushop.repo, pushop.remote, onlyheads=pushop.revs,
532 532 commoninc=commoninc, force=pushop.force)
533 533 pushop.outgoing = outgoing
534 534 pushop.remoteheads = remoteheads
535 535 pushop.incoming = inc
536 536
537 537 @pushdiscovery('phase')
538 538 def _pushdiscoveryphase(pushop):
539 539 """discover the phase that needs to be pushed
540 540
541 541 (computed for both success and failure case for changesets push)"""
542 542 outgoing = pushop.outgoing
543 543 unfi = pushop.repo.unfiltered()
544 544 remotephases = pushop.remote.listkeys('phases')
545 545 if (pushop.ui.configbool('ui', '_usedassubrepo')
546 546 and remotephases # server supports phases
547 547 and not pushop.outgoing.missing # no changesets to be pushed
548 548 and remotephases.get('publishing', False)):
549 549 # When:
550 550 # - this is a subrepo push
551 551 # - and remote support phase
552 552 # - and no changeset are to be pushed
553 553 # - and remote is publishing
554 554 # We may be in issue 3781 case!
555 555 # We drop the possible phase synchronisation done by
556 556 # courtesy to publish changesets possibly locally draft
557 557 # on the remote.
558 558 pushop.outdatedphases = []
559 559 pushop.fallbackoutdatedphases = []
560 560 return
561 561
562 562 pushop.remotephases = phases.remotephasessummary(pushop.repo,
563 563 pushop.fallbackheads,
564 564 remotephases)
565 565 droots = pushop.remotephases.draftroots
566 566
567 567 extracond = ''
568 568 if not pushop.remotephases.publishing:
569 569 extracond = ' and public()'
570 570 revset = 'heads((%%ln::%%ln) %s)' % extracond
571 571 # Get the list of all revs draft on remote by public here.
572 572 # XXX Beware that revset break if droots is not strictly
573 573 # XXX root we may want to ensure it is but it is costly
574 574 fallback = list(unfi.set(revset, droots, pushop.fallbackheads))
575 575 if not outgoing.missing:
576 576 future = fallback
577 577 else:
578 578 # adds changeset we are going to push as draft
579 579 #
580 580 # should not be necessary for publishing server, but because of an
581 581 # issue fixed in xxxxx we have to do it anyway.
582 582 fdroots = list(unfi.set('roots(%ln + %ln::)',
583 583 outgoing.missing, droots))
584 584 fdroots = [f.node() for f in fdroots]
585 585 future = list(unfi.set(revset, fdroots, pushop.futureheads))
586 586 pushop.outdatedphases = future
587 587 pushop.fallbackoutdatedphases = fallback
588 588
589 589 @pushdiscovery('obsmarker')
590 590 def _pushdiscoveryobsmarkers(pushop):
591 591 if (obsolete.isenabled(pushop.repo, obsolete.exchangeopt)
592 592 and pushop.repo.obsstore
593 593 and 'obsolete' in pushop.remote.listkeys('namespaces')):
594 594 repo = pushop.repo
595 595 # very naive computation, that can be quite expensive on big repo.
596 596 # However: evolution is currently slow on them anyway.
597 597 nodes = (c.node() for c in repo.set('::%ln', pushop.futureheads))
598 598 pushop.outobsmarkers = pushop.repo.obsstore.relevantmarkers(nodes)
599 599
600 600 @pushdiscovery('bookmarks')
601 601 def _pushdiscoverybookmarks(pushop):
602 602 ui = pushop.ui
603 603 repo = pushop.repo.unfiltered()
604 604 remote = pushop.remote
605 605 ui.debug("checking for updated bookmarks\n")
606 606 ancestors = ()
607 607 if pushop.revs:
608 608 revnums = map(repo.changelog.rev, pushop.revs)
609 609 ancestors = repo.changelog.ancestors(revnums, inclusive=True)
610 610 remotebookmark = remote.listkeys('bookmarks')
611 611
612 612 explicit = set([repo._bookmarks.expandname(bookmark)
613 613 for bookmark in pushop.bookmarks])
614 614
615 615 remotebookmark = bookmod.unhexlifybookmarks(remotebookmark)
616 616 comp = bookmod.comparebookmarks(repo, repo._bookmarks, remotebookmark)
617 617
618 618 def safehex(x):
619 619 if x is None:
620 620 return x
621 621 return hex(x)
622 622
623 623 def hexifycompbookmarks(bookmarks):
624 624 for b, scid, dcid in bookmarks:
625 625 yield b, safehex(scid), safehex(dcid)
626 626
627 627 comp = [hexifycompbookmarks(marks) for marks in comp]
628 628 addsrc, adddst, advsrc, advdst, diverge, differ, invalid, same = comp
629 629
630 630 for b, scid, dcid in advsrc:
631 631 if b in explicit:
632 632 explicit.remove(b)
633 633 if not ancestors or repo[scid].rev() in ancestors:
634 634 pushop.outbookmarks.append((b, dcid, scid))
635 635 # search added bookmark
636 636 for b, scid, dcid in addsrc:
637 637 if b in explicit:
638 638 explicit.remove(b)
639 639 pushop.outbookmarks.append((b, '', scid))
640 640 # search for overwritten bookmark
641 641 for b, scid, dcid in list(advdst) + list(diverge) + list(differ):
642 642 if b in explicit:
643 643 explicit.remove(b)
644 644 pushop.outbookmarks.append((b, dcid, scid))
645 645 # search for bookmark to delete
646 646 for b, scid, dcid in adddst:
647 647 if b in explicit:
648 648 explicit.remove(b)
649 649 # treat as "deleted locally"
650 650 pushop.outbookmarks.append((b, dcid, ''))
651 651 # identical bookmarks shouldn't get reported
652 652 for b, scid, dcid in same:
653 653 if b in explicit:
654 654 explicit.remove(b)
655 655
656 656 if explicit:
657 657 explicit = sorted(explicit)
658 658 # we should probably list all of them
659 659 ui.warn(_('bookmark %s does not exist on the local '
660 660 'or remote repository!\n') % explicit[0])
661 661 pushop.bkresult = 2
662 662
663 663 pushop.outbookmarks.sort()
664 664
665 665 def _pushcheckoutgoing(pushop):
666 666 outgoing = pushop.outgoing
667 667 unfi = pushop.repo.unfiltered()
668 668 if not outgoing.missing:
669 669 # nothing to push
670 670 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
671 671 return False
672 672 # something to push
673 673 if not pushop.force:
674 674 # if repo.obsstore == False --> no obsolete
675 675 # then, save the iteration
676 676 if unfi.obsstore:
677 677 # this message are here for 80 char limit reason
678 678 mso = _("push includes obsolete changeset: %s!")
679 679 mspd = _("push includes phase-divergent changeset: %s!")
680 680 mscd = _("push includes content-divergent changeset: %s!")
681 681 mst = {"orphan": _("push includes orphan changeset: %s!"),
682 682 "phase-divergent": mspd,
683 683 "content-divergent": mscd}
684 684 # If we are to push if there is at least one
685 685 # obsolete or unstable changeset in missing, at
686 686 # least one of the missinghead will be obsolete or
687 687 # unstable. So checking heads only is ok
688 688 for node in outgoing.missingheads:
689 689 ctx = unfi[node]
690 690 if ctx.obsolete():
691 691 raise error.Abort(mso % ctx)
692 692 elif ctx.isunstable():
693 693 # TODO print more than one instability in the abort
694 694 # message
695 695 raise error.Abort(mst[ctx.instabilities()[0]] % ctx)
696 696
697 697 discovery.checkheads(pushop)
698 698 return True
699 699
700 700 # List of names of steps to perform for an outgoing bundle2, order matters.
701 701 b2partsgenorder = []
702 702
703 703 # Mapping between step name and function
704 704 #
705 705 # This exists to help extensions wrap steps if necessary
706 706 b2partsgenmapping = {}
707 707
708 708 def b2partsgenerator(stepname, idx=None):
709 709 """decorator for function generating bundle2 part
710 710
711 711 The function is added to the step -> function mapping and appended to the
712 712 list of steps. Beware that decorated functions will be added in order
713 713 (this may matter).
714 714
715 715 You can only use this decorator for new steps, if you want to wrap a step
716 716 from an extension, attack the b2partsgenmapping dictionary directly."""
717 717 def dec(func):
718 718 assert stepname not in b2partsgenmapping
719 719 b2partsgenmapping[stepname] = func
720 720 if idx is None:
721 721 b2partsgenorder.append(stepname)
722 722 else:
723 723 b2partsgenorder.insert(idx, stepname)
724 724 return func
725 725 return dec
726 726
727 727 def _pushb2ctxcheckheads(pushop, bundler):
728 728 """Generate race condition checking parts
729 729
730 730 Exists as an independent function to aid extensions
731 731 """
732 732 # * 'force' do not check for push race,
733 733 # * if we don't push anything, there are nothing to check.
734 734 if not pushop.force and pushop.outgoing.missingheads:
735 735 allowunrelated = 'related' in bundler.capabilities.get('checkheads', ())
736 736 emptyremote = pushop.pushbranchmap is None
737 737 if not allowunrelated or emptyremote:
738 738 bundler.newpart('check:heads', data=iter(pushop.remoteheads))
739 739 else:
740 740 affected = set()
741 741 for branch, heads in pushop.pushbranchmap.iteritems():
742 742 remoteheads, newheads, unsyncedheads, discardedheads = heads
743 743 if remoteheads is not None:
744 744 remote = set(remoteheads)
745 745 affected |= set(discardedheads) & remote
746 746 affected |= remote - set(newheads)
747 747 if affected:
748 748 data = iter(sorted(affected))
749 749 bundler.newpart('check:updated-heads', data=data)
750 750
751 751 def _pushing(pushop):
752 752 """return True if we are pushing anything"""
753 753 return bool(pushop.outgoing.missing
754 754 or pushop.outdatedphases
755 755 or pushop.outobsmarkers
756 756 or pushop.outbookmarks)
757 757
758 758 @b2partsgenerator('check-bookmarks')
759 759 def _pushb2checkbookmarks(pushop, bundler):
760 760 """insert bookmark move checking"""
761 761 if not _pushing(pushop) or pushop.force:
762 762 return
763 763 b2caps = bundle2.bundle2caps(pushop.remote)
764 764 hasbookmarkcheck = 'bookmarks' in b2caps
765 765 if not (pushop.outbookmarks and hasbookmarkcheck):
766 766 return
767 767 data = []
768 768 for book, old, new in pushop.outbookmarks:
769 769 old = bin(old)
770 770 data.append((book, old))
771 771 checkdata = bookmod.binaryencode(data)
772 772 bundler.newpart('check:bookmarks', data=checkdata)
773 773
774 774 @b2partsgenerator('check-phases')
775 775 def _pushb2checkphases(pushop, bundler):
776 776 """insert phase move checking"""
777 777 if not _pushing(pushop) or pushop.force:
778 778 return
779 779 b2caps = bundle2.bundle2caps(pushop.remote)
780 780 hasphaseheads = 'heads' in b2caps.get('phases', ())
781 781 if pushop.remotephases is not None and hasphaseheads:
782 782 # check that the remote phase has not changed
783 783 checks = [[] for p in phases.allphases]
784 784 checks[phases.public].extend(pushop.remotephases.publicheads)
785 785 checks[phases.draft].extend(pushop.remotephases.draftroots)
786 786 if any(checks):
787 787 for nodes in checks:
788 788 nodes.sort()
789 789 checkdata = phases.binaryencode(checks)
790 790 bundler.newpart('check:phases', data=checkdata)
791 791
792 792 @b2partsgenerator('changeset')
793 793 def _pushb2ctx(pushop, bundler):
794 794 """handle changegroup push through bundle2
795 795
796 796 addchangegroup result is stored in the ``pushop.cgresult`` attribute.
797 797 """
798 798 if 'changesets' in pushop.stepsdone:
799 799 return
800 800 pushop.stepsdone.add('changesets')
801 801 # Send known heads to the server for race detection.
802 802 if not _pushcheckoutgoing(pushop):
803 803 return
804 804 pushop.repo.prepushoutgoinghooks(pushop)
805 805
806 806 _pushb2ctxcheckheads(pushop, bundler)
807 807
808 808 b2caps = bundle2.bundle2caps(pushop.remote)
809 809 version = '01'
810 810 cgversions = b2caps.get('changegroup')
811 811 if cgversions: # 3.1 and 3.2 ship with an empty value
812 812 cgversions = [v for v in cgversions
813 813 if v in changegroup.supportedoutgoingversions(
814 814 pushop.repo)]
815 815 if not cgversions:
816 816 raise ValueError(_('no common changegroup version'))
817 817 version = max(cgversions)
818 818 cgstream = changegroup.makestream(pushop.repo, pushop.outgoing, version,
819 819 'push')
820 820 cgpart = bundler.newpart('changegroup', data=cgstream)
821 821 if cgversions:
822 822 cgpart.addparam('version', version)
823 823 if 'treemanifest' in pushop.repo.requirements:
824 824 cgpart.addparam('treemanifest', '1')
825 825 def handlereply(op):
826 826 """extract addchangegroup returns from server reply"""
827 827 cgreplies = op.records.getreplies(cgpart.id)
828 828 assert len(cgreplies['changegroup']) == 1
829 829 pushop.cgresult = cgreplies['changegroup'][0]['return']
830 830 return handlereply
831 831
832 832 @b2partsgenerator('phase')
833 833 def _pushb2phases(pushop, bundler):
834 834 """handle phase push through bundle2"""
835 835 if 'phases' in pushop.stepsdone:
836 836 return
837 837 b2caps = bundle2.bundle2caps(pushop.remote)
838 838 ui = pushop.repo.ui
839 839
840 840 legacyphase = 'phases' in ui.configlist('devel', 'legacy.exchange')
841 841 haspushkey = 'pushkey' in b2caps
842 842 hasphaseheads = 'heads' in b2caps.get('phases', ())
843 843
844 844 if hasphaseheads and not legacyphase:
845 845 return _pushb2phaseheads(pushop, bundler)
846 846 elif haspushkey:
847 847 return _pushb2phasespushkey(pushop, bundler)
848 848
849 849 def _pushb2phaseheads(pushop, bundler):
850 850 """push phase information through a bundle2 - binary part"""
851 851 pushop.stepsdone.add('phases')
852 852 if pushop.outdatedphases:
853 853 updates = [[] for p in phases.allphases]
854 854 updates[0].extend(h.node() for h in pushop.outdatedphases)
855 855 phasedata = phases.binaryencode(updates)
856 856 bundler.newpart('phase-heads', data=phasedata)
857 857
858 858 def _pushb2phasespushkey(pushop, bundler):
859 859 """push phase information through a bundle2 - pushkey part"""
860 860 pushop.stepsdone.add('phases')
861 861 part2node = []
862 862
863 863 def handlefailure(pushop, exc):
864 864 targetid = int(exc.partid)
865 865 for partid, node in part2node:
866 866 if partid == targetid:
867 867 raise error.Abort(_('updating %s to public failed') % node)
868 868
869 869 enc = pushkey.encode
870 870 for newremotehead in pushop.outdatedphases:
871 871 part = bundler.newpart('pushkey')
872 872 part.addparam('namespace', enc('phases'))
873 873 part.addparam('key', enc(newremotehead.hex()))
874 874 part.addparam('old', enc('%d' % phases.draft))
875 875 part.addparam('new', enc('%d' % phases.public))
876 876 part2node.append((part.id, newremotehead))
877 877 pushop.pkfailcb[part.id] = handlefailure
878 878
879 879 def handlereply(op):
880 880 for partid, node in part2node:
881 881 partrep = op.records.getreplies(partid)
882 882 results = partrep['pushkey']
883 883 assert len(results) <= 1
884 884 msg = None
885 885 if not results:
886 886 msg = _('server ignored update of %s to public!\n') % node
887 887 elif not int(results[0]['return']):
888 888 msg = _('updating %s to public failed!\n') % node
889 889 if msg is not None:
890 890 pushop.ui.warn(msg)
891 891 return handlereply
892 892
893 893 @b2partsgenerator('obsmarkers')
894 894 def _pushb2obsmarkers(pushop, bundler):
895 895 if 'obsmarkers' in pushop.stepsdone:
896 896 return
897 897 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
898 898 if obsolete.commonversion(remoteversions) is None:
899 899 return
900 900 pushop.stepsdone.add('obsmarkers')
901 901 if pushop.outobsmarkers:
902 902 markers = sorted(pushop.outobsmarkers)
903 903 bundle2.buildobsmarkerspart(bundler, markers)
904 904
905 905 @b2partsgenerator('bookmarks')
906 906 def _pushb2bookmarks(pushop, bundler):
907 907 """handle bookmark push through bundle2"""
908 908 if 'bookmarks' in pushop.stepsdone:
909 909 return
910 910 b2caps = bundle2.bundle2caps(pushop.remote)
911 911
912 912 legacy = pushop.repo.ui.configlist('devel', 'legacy.exchange')
913 913 legacybooks = 'bookmarks' in legacy
914 914
915 915 if not legacybooks and 'bookmarks' in b2caps:
916 916 return _pushb2bookmarkspart(pushop, bundler)
917 917 elif 'pushkey' in b2caps:
918 918 return _pushb2bookmarkspushkey(pushop, bundler)
919 919
920 920 def _bmaction(old, new):
921 921 """small utility for bookmark pushing"""
922 922 if not old:
923 923 return 'export'
924 924 elif not new:
925 925 return 'delete'
926 926 return 'update'
927 927
928 928 def _pushb2bookmarkspart(pushop, bundler):
929 929 pushop.stepsdone.add('bookmarks')
930 930 if not pushop.outbookmarks:
931 931 return
932 932
933 933 allactions = []
934 934 data = []
935 935 for book, old, new in pushop.outbookmarks:
936 936 new = bin(new)
937 937 data.append((book, new))
938 938 allactions.append((book, _bmaction(old, new)))
939 939 checkdata = bookmod.binaryencode(data)
940 940 bundler.newpart('bookmarks', data=checkdata)
941 941
942 942 def handlereply(op):
943 943 ui = pushop.ui
944 944 # if success
945 945 for book, action in allactions:
946 946 ui.status(bookmsgmap[action][0] % book)
947 947
948 948 return handlereply
949 949
950 950 def _pushb2bookmarkspushkey(pushop, bundler):
951 951 pushop.stepsdone.add('bookmarks')
952 952 part2book = []
953 953 enc = pushkey.encode
954 954
955 955 def handlefailure(pushop, exc):
956 956 targetid = int(exc.partid)
957 957 for partid, book, action in part2book:
958 958 if partid == targetid:
959 959 raise error.Abort(bookmsgmap[action][1].rstrip() % book)
960 960 # we should not be called for part we did not generated
961 961 assert False
962 962
963 963 for book, old, new in pushop.outbookmarks:
964 964 part = bundler.newpart('pushkey')
965 965 part.addparam('namespace', enc('bookmarks'))
966 966 part.addparam('key', enc(book))
967 967 part.addparam('old', enc(old))
968 968 part.addparam('new', enc(new))
969 969 action = 'update'
970 970 if not old:
971 971 action = 'export'
972 972 elif not new:
973 973 action = 'delete'
974 974 part2book.append((part.id, book, action))
975 975 pushop.pkfailcb[part.id] = handlefailure
976 976
977 977 def handlereply(op):
978 978 ui = pushop.ui
979 979 for partid, book, action in part2book:
980 980 partrep = op.records.getreplies(partid)
981 981 results = partrep['pushkey']
982 982 assert len(results) <= 1
983 983 if not results:
984 984 pushop.ui.warn(_('server ignored bookmark %s update\n') % book)
985 985 else:
986 986 ret = int(results[0]['return'])
987 987 if ret:
988 988 ui.status(bookmsgmap[action][0] % book)
989 989 else:
990 990 ui.warn(bookmsgmap[action][1] % book)
991 991 if pushop.bkresult is not None:
992 992 pushop.bkresult = 1
993 993 return handlereply
994 994
995 995 @b2partsgenerator('pushvars', idx=0)
996 996 def _getbundlesendvars(pushop, bundler):
997 997 '''send shellvars via bundle2'''
998 998 pushvars = pushop.pushvars
999 999 if pushvars:
1000 1000 shellvars = {}
1001 1001 for raw in pushvars:
1002 1002 if '=' not in raw:
1003 1003 msg = ("unable to parse variable '%s', should follow "
1004 1004 "'KEY=VALUE' or 'KEY=' format")
1005 1005 raise error.Abort(msg % raw)
1006 1006 k, v = raw.split('=', 1)
1007 1007 shellvars[k] = v
1008 1008
1009 1009 part = bundler.newpart('pushvars')
1010 1010
1011 1011 for key, value in shellvars.iteritems():
1012 1012 part.addparam(key, value, mandatory=False)
1013 1013
1014 1014 def _pushbundle2(pushop):
1015 1015 """push data to the remote using bundle2
1016 1016
1017 1017 The only currently supported type of data is changegroup but this will
1018 1018 evolve in the future."""
1019 1019 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
1020 1020 pushback = (pushop.trmanager
1021 1021 and pushop.ui.configbool('experimental', 'bundle2.pushback'))
1022 1022
1023 1023 # create reply capability
1024 1024 capsblob = bundle2.encodecaps(bundle2.getrepocaps(pushop.repo,
1025 1025 allowpushback=pushback,
1026 1026 role='client'))
1027 1027 bundler.newpart('replycaps', data=capsblob)
1028 1028 replyhandlers = []
1029 1029 for partgenname in b2partsgenorder:
1030 1030 partgen = b2partsgenmapping[partgenname]
1031 1031 ret = partgen(pushop, bundler)
1032 1032 if callable(ret):
1033 1033 replyhandlers.append(ret)
1034 1034 # do not push if nothing to push
1035 1035 if bundler.nbparts <= 1:
1036 1036 return
1037 1037 stream = util.chunkbuffer(bundler.getchunks())
1038 1038 try:
1039 1039 try:
1040 1040 reply = pushop.remote.unbundle(
1041 1041 stream, ['force'], pushop.remote.url())
1042 1042 except error.BundleValueError as exc:
1043 1043 raise error.Abort(_('missing support for %s') % exc)
1044 1044 try:
1045 1045 trgetter = None
1046 1046 if pushback:
1047 1047 trgetter = pushop.trmanager.transaction
1048 1048 op = bundle2.processbundle(pushop.repo, reply, trgetter)
1049 1049 except error.BundleValueError as exc:
1050 1050 raise error.Abort(_('missing support for %s') % exc)
1051 1051 except bundle2.AbortFromPart as exc:
1052 1052 pushop.ui.status(_('remote: %s\n') % exc)
1053 1053 if exc.hint is not None:
1054 1054 pushop.ui.status(_('remote: %s\n') % ('(%s)' % exc.hint))
1055 1055 raise error.Abort(_('push failed on remote'))
1056 1056 except error.PushkeyFailed as exc:
1057 1057 partid = int(exc.partid)
1058 1058 if partid not in pushop.pkfailcb:
1059 1059 raise
1060 1060 pushop.pkfailcb[partid](pushop, exc)
1061 1061 for rephand in replyhandlers:
1062 1062 rephand(op)
1063 1063
1064 1064 def _pushchangeset(pushop):
1065 1065 """Make the actual push of changeset bundle to remote repo"""
1066 1066 if 'changesets' in pushop.stepsdone:
1067 1067 return
1068 1068 pushop.stepsdone.add('changesets')
1069 1069 if not _pushcheckoutgoing(pushop):
1070 1070 return
1071 1071
1072 1072 # Should have verified this in push().
1073 1073 assert pushop.remote.capable('unbundle')
1074 1074
1075 1075 pushop.repo.prepushoutgoinghooks(pushop)
1076 1076 outgoing = pushop.outgoing
1077 1077 # TODO: get bundlecaps from remote
1078 1078 bundlecaps = None
1079 1079 # create a changegroup from local
1080 1080 if pushop.revs is None and not (outgoing.excluded
1081 1081 or pushop.repo.changelog.filteredrevs):
1082 1082 # push everything,
1083 1083 # use the fast path, no race possible on push
1084 1084 cg = changegroup.makechangegroup(pushop.repo, outgoing, '01', 'push',
1085 1085 fastpath=True, bundlecaps=bundlecaps)
1086 1086 else:
1087 1087 cg = changegroup.makechangegroup(pushop.repo, outgoing, '01',
1088 1088 'push', bundlecaps=bundlecaps)
1089 1089
1090 1090 # apply changegroup to remote
1091 1091 # local repo finds heads on server, finds out what
1092 1092 # revs it must push. once revs transferred, if server
1093 1093 # finds it has different heads (someone else won
1094 1094 # commit/push race), server aborts.
1095 1095 if pushop.force:
1096 1096 remoteheads = ['force']
1097 1097 else:
1098 1098 remoteheads = pushop.remoteheads
1099 1099 # ssh: return remote's addchangegroup()
1100 1100 # http: return remote's addchangegroup() or 0 for error
1101 1101 pushop.cgresult = pushop.remote.unbundle(cg, remoteheads,
1102 1102 pushop.repo.url())
1103 1103
1104 1104 def _pushsyncphase(pushop):
1105 1105 """synchronise phase information locally and remotely"""
1106 1106 cheads = pushop.commonheads
1107 1107 # even when we don't push, exchanging phase data is useful
1108 1108 remotephases = pushop.remote.listkeys('phases')
1109 1109 if (pushop.ui.configbool('ui', '_usedassubrepo')
1110 1110 and remotephases # server supports phases
1111 1111 and pushop.cgresult is None # nothing was pushed
1112 1112 and remotephases.get('publishing', False)):
1113 1113 # When:
1114 1114 # - this is a subrepo push
1115 1115 # - and remote support phase
1116 1116 # - and no changeset was pushed
1117 1117 # - and remote is publishing
1118 1118 # We may be in issue 3871 case!
1119 1119 # We drop the possible phase synchronisation done by
1120 1120 # courtesy to publish changesets possibly locally draft
1121 1121 # on the remote.
1122 1122 remotephases = {'publishing': 'True'}
1123 1123 if not remotephases: # old server or public only reply from non-publishing
1124 1124 _localphasemove(pushop, cheads)
1125 1125 # don't push any phase data as there is nothing to push
1126 1126 else:
1127 1127 ana = phases.analyzeremotephases(pushop.repo, cheads,
1128 1128 remotephases)
1129 1129 pheads, droots = ana
1130 1130 ### Apply remote phase on local
1131 1131 if remotephases.get('publishing', False):
1132 1132 _localphasemove(pushop, cheads)
1133 1133 else: # publish = False
1134 1134 _localphasemove(pushop, pheads)
1135 1135 _localphasemove(pushop, cheads, phases.draft)
1136 1136 ### Apply local phase on remote
1137 1137
1138 1138 if pushop.cgresult:
1139 1139 if 'phases' in pushop.stepsdone:
1140 1140 # phases already pushed though bundle2
1141 1141 return
1142 1142 outdated = pushop.outdatedphases
1143 1143 else:
1144 1144 outdated = pushop.fallbackoutdatedphases
1145 1145
1146 1146 pushop.stepsdone.add('phases')
1147 1147
1148 1148 # filter heads already turned public by the push
1149 1149 outdated = [c for c in outdated if c.node() not in pheads]
1150 1150 # fallback to independent pushkey command
1151 1151 for newremotehead in outdated:
1152 1152 r = pushop.remote.pushkey('phases',
1153 1153 newremotehead.hex(),
1154 1154 str(phases.draft),
1155 1155 str(phases.public))
1156 1156 if not r:
1157 1157 pushop.ui.warn(_('updating %s to public failed!\n')
1158 1158 % newremotehead)
1159 1159
1160 1160 def _localphasemove(pushop, nodes, phase=phases.public):
1161 1161 """move <nodes> to <phase> in the local source repo"""
1162 1162 if pushop.trmanager:
1163 1163 phases.advanceboundary(pushop.repo,
1164 1164 pushop.trmanager.transaction(),
1165 1165 phase,
1166 1166 nodes)
1167 1167 else:
1168 1168 # repo is not locked, do not change any phases!
1169 1169 # Informs the user that phases should have been moved when
1170 1170 # applicable.
1171 1171 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
1172 1172 phasestr = phases.phasenames[phase]
1173 1173 if actualmoves:
1174 1174 pushop.ui.status(_('cannot lock source repo, skipping '
1175 1175 'local %s phase update\n') % phasestr)
1176 1176
1177 1177 def _pushobsolete(pushop):
1178 1178 """utility function to push obsolete markers to a remote"""
1179 1179 if 'obsmarkers' in pushop.stepsdone:
1180 1180 return
1181 1181 repo = pushop.repo
1182 1182 remote = pushop.remote
1183 1183 pushop.stepsdone.add('obsmarkers')
1184 1184 if pushop.outobsmarkers:
1185 1185 pushop.ui.debug('try to push obsolete markers to remote\n')
1186 1186 rslts = []
1187 1187 remotedata = obsolete._pushkeyescape(sorted(pushop.outobsmarkers))
1188 1188 for key in sorted(remotedata, reverse=True):
1189 1189 # reverse sort to ensure we end with dump0
1190 1190 data = remotedata[key]
1191 1191 rslts.append(remote.pushkey('obsolete', key, '', data))
1192 1192 if [r for r in rslts if not r]:
1193 1193 msg = _('failed to push some obsolete markers!\n')
1194 1194 repo.ui.warn(msg)
1195 1195
1196 1196 def _pushbookmark(pushop):
1197 1197 """Update bookmark position on remote"""
1198 1198 if pushop.cgresult == 0 or 'bookmarks' in pushop.stepsdone:
1199 1199 return
1200 1200 pushop.stepsdone.add('bookmarks')
1201 1201 ui = pushop.ui
1202 1202 remote = pushop.remote
1203 1203
1204 1204 for b, old, new in pushop.outbookmarks:
1205 1205 action = 'update'
1206 1206 if not old:
1207 1207 action = 'export'
1208 1208 elif not new:
1209 1209 action = 'delete'
1210 1210 if remote.pushkey('bookmarks', b, old, new):
1211 1211 ui.status(bookmsgmap[action][0] % b)
1212 1212 else:
1213 1213 ui.warn(bookmsgmap[action][1] % b)
1214 1214 # discovery can have set the value form invalid entry
1215 1215 if pushop.bkresult is not None:
1216 1216 pushop.bkresult = 1
1217 1217
1218 1218 class pulloperation(object):
1219 1219 """A object that represent a single pull operation
1220 1220
1221 1221 It purpose is to carry pull related state and very common operation.
1222 1222
1223 1223 A new should be created at the beginning of each pull and discarded
1224 1224 afterward.
1225 1225 """
1226 1226
1227 1227 def __init__(self, repo, remote, heads=None, force=False, bookmarks=(),
1228 1228 remotebookmarks=None, streamclonerequested=None):
1229 1229 # repo we pull into
1230 1230 self.repo = repo
1231 1231 # repo we pull from
1232 1232 self.remote = remote
1233 1233 # revision we try to pull (None is "all")
1234 1234 self.heads = heads
1235 1235 # bookmark pulled explicitly
1236 1236 self.explicitbookmarks = [repo._bookmarks.expandname(bookmark)
1237 1237 for bookmark in bookmarks]
1238 1238 # do we force pull?
1239 1239 self.force = force
1240 1240 # whether a streaming clone was requested
1241 1241 self.streamclonerequested = streamclonerequested
1242 1242 # transaction manager
1243 1243 self.trmanager = None
1244 1244 # set of common changeset between local and remote before pull
1245 1245 self.common = None
1246 1246 # set of pulled head
1247 1247 self.rheads = None
1248 1248 # list of missing changeset to fetch remotely
1249 1249 self.fetch = None
1250 1250 # remote bookmarks data
1251 1251 self.remotebookmarks = remotebookmarks
1252 1252 # result of changegroup pulling (used as return code by pull)
1253 1253 self.cgresult = None
1254 1254 # list of step already done
1255 1255 self.stepsdone = set()
1256 1256 # Whether we attempted a clone from pre-generated bundles.
1257 1257 self.clonebundleattempted = False
1258 1258
1259 1259 @util.propertycache
1260 1260 def pulledsubset(self):
1261 1261 """heads of the set of changeset target by the pull"""
1262 1262 # compute target subset
1263 1263 if self.heads is None:
1264 1264 # We pulled every thing possible
1265 1265 # sync on everything common
1266 1266 c = set(self.common)
1267 1267 ret = list(self.common)
1268 1268 for n in self.rheads:
1269 1269 if n not in c:
1270 1270 ret.append(n)
1271 1271 return ret
1272 1272 else:
1273 1273 # We pulled a specific subset
1274 1274 # sync on this subset
1275 1275 return self.heads
1276 1276
1277 1277 @util.propertycache
1278 1278 def canusebundle2(self):
1279 1279 return not _forcebundle1(self)
1280 1280
1281 1281 @util.propertycache
1282 1282 def remotebundle2caps(self):
1283 1283 return bundle2.bundle2caps(self.remote)
1284 1284
1285 1285 def gettransaction(self):
1286 1286 # deprecated; talk to trmanager directly
1287 1287 return self.trmanager.transaction()
1288 1288
1289 1289 class transactionmanager(util.transactional):
1290 1290 """An object to manage the life cycle of a transaction
1291 1291
1292 1292 It creates the transaction on demand and calls the appropriate hooks when
1293 1293 closing the transaction."""
1294 1294 def __init__(self, repo, source, url):
1295 1295 self.repo = repo
1296 1296 self.source = source
1297 1297 self.url = url
1298 1298 self._tr = None
1299 1299
1300 1300 def transaction(self):
1301 1301 """Return an open transaction object, constructing if necessary"""
1302 1302 if not self._tr:
1303 1303 trname = '%s\n%s' % (self.source, util.hidepassword(self.url))
1304 1304 self._tr = self.repo.transaction(trname)
1305 1305 self._tr.hookargs['source'] = self.source
1306 1306 self._tr.hookargs['url'] = self.url
1307 1307 return self._tr
1308 1308
1309 1309 def close(self):
1310 1310 """close transaction if created"""
1311 1311 if self._tr is not None:
1312 1312 self._tr.close()
1313 1313
1314 1314 def release(self):
1315 1315 """release transaction if created"""
1316 1316 if self._tr is not None:
1317 1317 self._tr.release()
1318 1318
1319 1319 def pull(repo, remote, heads=None, force=False, bookmarks=(), opargs=None,
1320 1320 streamclonerequested=None):
1321 1321 """Fetch repository data from a remote.
1322 1322
1323 1323 This is the main function used to retrieve data from a remote repository.
1324 1324
1325 1325 ``repo`` is the local repository to clone into.
1326 1326 ``remote`` is a peer instance.
1327 1327 ``heads`` is an iterable of revisions we want to pull. ``None`` (the
1328 1328 default) means to pull everything from the remote.
1329 1329 ``bookmarks`` is an iterable of bookmarks requesting to be pulled. By
1330 1330 default, all remote bookmarks are pulled.
1331 1331 ``opargs`` are additional keyword arguments to pass to ``pulloperation``
1332 1332 initialization.
1333 1333 ``streamclonerequested`` is a boolean indicating whether a "streaming
1334 1334 clone" is requested. A "streaming clone" is essentially a raw file copy
1335 1335 of revlogs from the server. This only works when the local repository is
1336 1336 empty. The default value of ``None`` means to respect the server
1337 1337 configuration for preferring stream clones.
1338 1338
1339 1339 Returns the ``pulloperation`` created for this pull.
1340 1340 """
1341 1341 if opargs is None:
1342 1342 opargs = {}
1343 1343 pullop = pulloperation(repo, remote, heads, force, bookmarks=bookmarks,
1344 1344 streamclonerequested=streamclonerequested,
1345 1345 **pycompat.strkwargs(opargs))
1346 1346
1347 1347 peerlocal = pullop.remote.local()
1348 1348 if peerlocal:
1349 1349 missing = set(peerlocal.requirements) - pullop.repo.supported
1350 1350 if missing:
1351 1351 msg = _("required features are not"
1352 1352 " supported in the destination:"
1353 1353 " %s") % (', '.join(sorted(missing)))
1354 1354 raise error.Abort(msg)
1355 1355
1356 1356 pullop.trmanager = transactionmanager(repo, 'pull', remote.url())
1357 1357 with repo.wlock(), repo.lock(), pullop.trmanager:
1358 1358 # This should ideally be in _pullbundle2(). However, it needs to run
1359 1359 # before discovery to avoid extra work.
1360 1360 _maybeapplyclonebundle(pullop)
1361 1361 streamclone.maybeperformlegacystreamclone(pullop)
1362 1362 _pulldiscovery(pullop)
1363 1363 if pullop.canusebundle2:
1364 1364 _pullbundle2(pullop)
1365 1365 _pullchangeset(pullop)
1366 1366 _pullphase(pullop)
1367 1367 _pullbookmarks(pullop)
1368 1368 _pullobsolete(pullop)
1369 1369
1370 1370 # storing remotenames
1371 1371 if repo.ui.configbool('experimental', 'remotenames'):
1372 1372 logexchange.pullremotenames(repo, remote)
1373 1373
1374 1374 return pullop
1375 1375
1376 1376 # list of steps to perform discovery before pull
1377 1377 pulldiscoveryorder = []
1378 1378
1379 1379 # Mapping between step name and function
1380 1380 #
1381 1381 # This exists to help extensions wrap steps if necessary
1382 1382 pulldiscoverymapping = {}
1383 1383
1384 1384 def pulldiscovery(stepname):
1385 1385 """decorator for function performing discovery before pull
1386 1386
1387 1387 The function is added to the step -> function mapping and appended to the
1388 1388 list of steps. Beware that decorated function will be added in order (this
1389 1389 may matter).
1390 1390
1391 1391 You can only use this decorator for a new step, if you want to wrap a step
1392 1392 from an extension, change the pulldiscovery dictionary directly."""
1393 1393 def dec(func):
1394 1394 assert stepname not in pulldiscoverymapping
1395 1395 pulldiscoverymapping[stepname] = func
1396 1396 pulldiscoveryorder.append(stepname)
1397 1397 return func
1398 1398 return dec
1399 1399
1400 1400 def _pulldiscovery(pullop):
1401 1401 """Run all discovery steps"""
1402 1402 for stepname in pulldiscoveryorder:
1403 1403 step = pulldiscoverymapping[stepname]
1404 1404 step(pullop)
1405 1405
1406 1406 @pulldiscovery('b1:bookmarks')
1407 1407 def _pullbookmarkbundle1(pullop):
1408 1408 """fetch bookmark data in bundle1 case
1409 1409
1410 1410 If not using bundle2, we have to fetch bookmarks before changeset
1411 1411 discovery to reduce the chance and impact of race conditions."""
1412 1412 if pullop.remotebookmarks is not None:
1413 1413 return
1414 1414 if pullop.canusebundle2 and 'listkeys' in pullop.remotebundle2caps:
1415 1415 # all known bundle2 servers now support listkeys, but lets be nice with
1416 1416 # new implementation.
1417 1417 return
1418 1418 books = pullop.remote.listkeys('bookmarks')
1419 1419 pullop.remotebookmarks = bookmod.unhexlifybookmarks(books)
1420 1420
1421 1421
1422 1422 @pulldiscovery('changegroup')
1423 1423 def _pulldiscoverychangegroup(pullop):
1424 1424 """discovery phase for the pull
1425 1425
1426 1426 Current handle changeset discovery only, will change handle all discovery
1427 1427 at some point."""
1428 1428 tmp = discovery.findcommonincoming(pullop.repo,
1429 1429 pullop.remote,
1430 1430 heads=pullop.heads,
1431 1431 force=pullop.force)
1432 1432 common, fetch, rheads = tmp
1433 1433 nm = pullop.repo.unfiltered().changelog.nodemap
1434 1434 if fetch and rheads:
1435 1435 # If a remote heads is filtered locally, put in back in common.
1436 1436 #
1437 1437 # This is a hackish solution to catch most of "common but locally
1438 1438 # hidden situation". We do not performs discovery on unfiltered
1439 1439 # repository because it end up doing a pathological amount of round
1440 1440 # trip for w huge amount of changeset we do not care about.
1441 1441 #
1442 1442 # If a set of such "common but filtered" changeset exist on the server
1443 1443 # but are not including a remote heads, we'll not be able to detect it,
1444 1444 scommon = set(common)
1445 1445 for n in rheads:
1446 1446 if n in nm:
1447 1447 if n not in scommon:
1448 1448 common.append(n)
1449 1449 if set(rheads).issubset(set(common)):
1450 1450 fetch = []
1451 1451 pullop.common = common
1452 1452 pullop.fetch = fetch
1453 1453 pullop.rheads = rheads
1454 1454
1455 1455 def _pullbundle2(pullop):
1456 1456 """pull data using bundle2
1457 1457
1458 1458 For now, the only supported data are changegroup."""
1459 1459 kwargs = {'bundlecaps': caps20to10(pullop.repo, role='client')}
1460 1460
1461 1461 # make ui easier to access
1462 1462 ui = pullop.repo.ui
1463 1463
1464 1464 # At the moment we don't do stream clones over bundle2. If that is
1465 1465 # implemented then here's where the check for that will go.
1466 1466 streaming = streamclone.canperformstreamclone(pullop, bundle2=True)[0]
1467 1467
1468 1468 # declare pull perimeters
1469 1469 kwargs['common'] = pullop.common
1470 1470 kwargs['heads'] = pullop.heads or pullop.rheads
1471 1471
1472 1472 if streaming:
1473 1473 kwargs['cg'] = False
1474 1474 kwargs['stream'] = True
1475 1475 pullop.stepsdone.add('changegroup')
1476 1476 pullop.stepsdone.add('phases')
1477 1477
1478 1478 else:
1479 1479 # pulling changegroup
1480 1480 pullop.stepsdone.add('changegroup')
1481 1481
1482 1482 kwargs['cg'] = pullop.fetch
1483 1483
1484 1484 legacyphase = 'phases' in ui.configlist('devel', 'legacy.exchange')
1485 1485 hasbinaryphase = 'heads' in pullop.remotebundle2caps.get('phases', ())
1486 1486 if (not legacyphase and hasbinaryphase):
1487 1487 kwargs['phases'] = True
1488 1488 pullop.stepsdone.add('phases')
1489 1489
1490 1490 if 'listkeys' in pullop.remotebundle2caps:
1491 1491 if 'phases' not in pullop.stepsdone:
1492 1492 kwargs['listkeys'] = ['phases']
1493 1493
1494 1494 bookmarksrequested = False
1495 1495 legacybookmark = 'bookmarks' in ui.configlist('devel', 'legacy.exchange')
1496 1496 hasbinarybook = 'bookmarks' in pullop.remotebundle2caps
1497 1497
1498 1498 if pullop.remotebookmarks is not None:
1499 1499 pullop.stepsdone.add('request-bookmarks')
1500 1500
1501 1501 if ('request-bookmarks' not in pullop.stepsdone
1502 1502 and pullop.remotebookmarks is None
1503 1503 and not legacybookmark and hasbinarybook):
1504 1504 kwargs['bookmarks'] = True
1505 1505 bookmarksrequested = True
1506 1506
1507 1507 if 'listkeys' in pullop.remotebundle2caps:
1508 1508 if 'request-bookmarks' not in pullop.stepsdone:
1509 1509 # make sure to always includes bookmark data when migrating
1510 1510 # `hg incoming --bundle` to using this function.
1511 1511 pullop.stepsdone.add('request-bookmarks')
1512 1512 kwargs.setdefault('listkeys', []).append('bookmarks')
1513 1513
1514 1514 # If this is a full pull / clone and the server supports the clone bundles
1515 1515 # feature, tell the server whether we attempted a clone bundle. The
1516 1516 # presence of this flag indicates the client supports clone bundles. This
1517 1517 # will enable the server to treat clients that support clone bundles
1518 1518 # differently from those that don't.
1519 1519 if (pullop.remote.capable('clonebundles')
1520 1520 and pullop.heads is None and list(pullop.common) == [nullid]):
1521 1521 kwargs['cbattempted'] = pullop.clonebundleattempted
1522 1522
1523 1523 if streaming:
1524 1524 pullop.repo.ui.status(_('streaming all changes\n'))
1525 1525 elif not pullop.fetch:
1526 1526 pullop.repo.ui.status(_("no changes found\n"))
1527 1527 pullop.cgresult = 0
1528 1528 else:
1529 1529 if pullop.heads is None and list(pullop.common) == [nullid]:
1530 1530 pullop.repo.ui.status(_("requesting all changes\n"))
1531 1531 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1532 1532 remoteversions = bundle2.obsmarkersversion(pullop.remotebundle2caps)
1533 1533 if obsolete.commonversion(remoteversions) is not None:
1534 1534 kwargs['obsmarkers'] = True
1535 1535 pullop.stepsdone.add('obsmarkers')
1536 1536 _pullbundle2extraprepare(pullop, kwargs)
1537 1537 bundle = pullop.remote.getbundle('pull', **pycompat.strkwargs(kwargs))
1538 1538 try:
1539 1539 op = bundle2.bundleoperation(pullop.repo, pullop.gettransaction)
1540 1540 op.modes['bookmarks'] = 'records'
1541 1541 bundle2.processbundle(pullop.repo, bundle, op=op)
1542 1542 except bundle2.AbortFromPart as exc:
1543 1543 pullop.repo.ui.status(_('remote: abort: %s\n') % exc)
1544 1544 raise error.Abort(_('pull failed on remote'), hint=exc.hint)
1545 1545 except error.BundleValueError as exc:
1546 1546 raise error.Abort(_('missing support for %s') % exc)
1547 1547
1548 1548 if pullop.fetch:
1549 1549 pullop.cgresult = bundle2.combinechangegroupresults(op)
1550 1550
1551 1551 # processing phases change
1552 1552 for namespace, value in op.records['listkeys']:
1553 1553 if namespace == 'phases':
1554 1554 _pullapplyphases(pullop, value)
1555 1555
1556 1556 # processing bookmark update
1557 1557 if bookmarksrequested:
1558 1558 books = {}
1559 1559 for record in op.records['bookmarks']:
1560 1560 books[record['bookmark']] = record["node"]
1561 1561 pullop.remotebookmarks = books
1562 1562 else:
1563 1563 for namespace, value in op.records['listkeys']:
1564 1564 if namespace == 'bookmarks':
1565 1565 pullop.remotebookmarks = bookmod.unhexlifybookmarks(value)
1566 1566
1567 1567 # bookmark data were either already there or pulled in the bundle
1568 1568 if pullop.remotebookmarks is not None:
1569 1569 _pullbookmarks(pullop)
1570 1570
1571 1571 def _pullbundle2extraprepare(pullop, kwargs):
1572 1572 """hook function so that extensions can extend the getbundle call"""
1573 1573
1574 1574 def _pullchangeset(pullop):
1575 1575 """pull changeset from unbundle into the local repo"""
1576 1576 # We delay the open of the transaction as late as possible so we
1577 1577 # don't open transaction for nothing or you break future useful
1578 1578 # rollback call
1579 1579 if 'changegroup' in pullop.stepsdone:
1580 1580 return
1581 1581 pullop.stepsdone.add('changegroup')
1582 1582 if not pullop.fetch:
1583 1583 pullop.repo.ui.status(_("no changes found\n"))
1584 1584 pullop.cgresult = 0
1585 1585 return
1586 1586 tr = pullop.gettransaction()
1587 1587 if pullop.heads is None and list(pullop.common) == [nullid]:
1588 1588 pullop.repo.ui.status(_("requesting all changes\n"))
1589 1589 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
1590 1590 # issue1320, avoid a race if remote changed after discovery
1591 1591 pullop.heads = pullop.rheads
1592 1592
1593 1593 if pullop.remote.capable('getbundle'):
1594 1594 # TODO: get bundlecaps from remote
1595 1595 cg = pullop.remote.getbundle('pull', common=pullop.common,
1596 1596 heads=pullop.heads or pullop.rheads)
1597 1597 elif pullop.heads is None:
1598 1598 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
1599 1599 elif not pullop.remote.capable('changegroupsubset'):
1600 1600 raise error.Abort(_("partial pull cannot be done because "
1601 1601 "other repository doesn't support "
1602 1602 "changegroupsubset."))
1603 1603 else:
1604 1604 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
1605 1605 bundleop = bundle2.applybundle(pullop.repo, cg, tr, 'pull',
1606 1606 pullop.remote.url())
1607 1607 pullop.cgresult = bundle2.combinechangegroupresults(bundleop)
1608 1608
1609 1609 def _pullphase(pullop):
1610 1610 # Get remote phases data from remote
1611 1611 if 'phases' in pullop.stepsdone:
1612 1612 return
1613 1613 remotephases = pullop.remote.listkeys('phases')
1614 1614 _pullapplyphases(pullop, remotephases)
1615 1615
1616 1616 def _pullapplyphases(pullop, remotephases):
1617 1617 """apply phase movement from observed remote state"""
1618 1618 if 'phases' in pullop.stepsdone:
1619 1619 return
1620 1620 pullop.stepsdone.add('phases')
1621 1621 publishing = bool(remotephases.get('publishing', False))
1622 1622 if remotephases and not publishing:
1623 1623 # remote is new and non-publishing
1624 1624 pheads, _dr = phases.analyzeremotephases(pullop.repo,
1625 1625 pullop.pulledsubset,
1626 1626 remotephases)
1627 1627 dheads = pullop.pulledsubset
1628 1628 else:
1629 1629 # Remote is old or publishing all common changesets
1630 1630 # should be seen as public
1631 1631 pheads = pullop.pulledsubset
1632 1632 dheads = []
1633 1633 unfi = pullop.repo.unfiltered()
1634 1634 phase = unfi._phasecache.phase
1635 1635 rev = unfi.changelog.nodemap.get
1636 1636 public = phases.public
1637 1637 draft = phases.draft
1638 1638
1639 1639 # exclude changesets already public locally and update the others
1640 1640 pheads = [pn for pn in pheads if phase(unfi, rev(pn)) > public]
1641 1641 if pheads:
1642 1642 tr = pullop.gettransaction()
1643 1643 phases.advanceboundary(pullop.repo, tr, public, pheads)
1644 1644
1645 1645 # exclude changesets already draft locally and update the others
1646 1646 dheads = [pn for pn in dheads if phase(unfi, rev(pn)) > draft]
1647 1647 if dheads:
1648 1648 tr = pullop.gettransaction()
1649 1649 phases.advanceboundary(pullop.repo, tr, draft, dheads)
1650 1650
1651 1651 def _pullbookmarks(pullop):
1652 1652 """process the remote bookmark information to update the local one"""
1653 1653 if 'bookmarks' in pullop.stepsdone:
1654 1654 return
1655 1655 pullop.stepsdone.add('bookmarks')
1656 1656 repo = pullop.repo
1657 1657 remotebookmarks = pullop.remotebookmarks
1658 1658 bookmod.updatefromremote(repo.ui, repo, remotebookmarks,
1659 1659 pullop.remote.url(),
1660 1660 pullop.gettransaction,
1661 1661 explicit=pullop.explicitbookmarks)
1662 1662
1663 1663 def _pullobsolete(pullop):
1664 1664 """utility function to pull obsolete markers from a remote
1665 1665
1666 1666 The `gettransaction` is function that return the pull transaction, creating
1667 1667 one if necessary. We return the transaction to inform the calling code that
1668 1668 a new transaction have been created (when applicable).
1669 1669
1670 1670 Exists mostly to allow overriding for experimentation purpose"""
1671 1671 if 'obsmarkers' in pullop.stepsdone:
1672 1672 return
1673 1673 pullop.stepsdone.add('obsmarkers')
1674 1674 tr = None
1675 1675 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1676 1676 pullop.repo.ui.debug('fetching remote obsolete markers\n')
1677 1677 remoteobs = pullop.remote.listkeys('obsolete')
1678 1678 if 'dump0' in remoteobs:
1679 1679 tr = pullop.gettransaction()
1680 1680 markers = []
1681 1681 for key in sorted(remoteobs, reverse=True):
1682 1682 if key.startswith('dump'):
1683 1683 data = util.b85decode(remoteobs[key])
1684 1684 version, newmarks = obsolete._readmarkers(data)
1685 1685 markers += newmarks
1686 1686 if markers:
1687 1687 pullop.repo.obsstore.add(tr, markers)
1688 1688 pullop.repo.invalidatevolatilesets()
1689 1689 return tr
1690 1690
1691 1691 def caps20to10(repo, role):
1692 1692 """return a set with appropriate options to use bundle20 during getbundle"""
1693 1693 caps = {'HG20'}
1694 1694 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role=role))
1695 1695 caps.add('bundle2=' + urlreq.quote(capsblob))
1696 1696 return caps
1697 1697
1698 1698 # List of names of steps to perform for a bundle2 for getbundle, order matters.
1699 1699 getbundle2partsorder = []
1700 1700
1701 1701 # Mapping between step name and function
1702 1702 #
1703 1703 # This exists to help extensions wrap steps if necessary
1704 1704 getbundle2partsmapping = {}
1705 1705
1706 1706 def getbundle2partsgenerator(stepname, idx=None):
1707 1707 """decorator for function generating bundle2 part for getbundle
1708 1708
1709 1709 The function is added to the step -> function mapping and appended to the
1710 1710 list of steps. Beware that decorated functions will be added in order
1711 1711 (this may matter).
1712 1712
1713 1713 You can only use this decorator for new steps, if you want to wrap a step
1714 1714 from an extension, attack the getbundle2partsmapping dictionary directly."""
1715 1715 def dec(func):
1716 1716 assert stepname not in getbundle2partsmapping
1717 1717 getbundle2partsmapping[stepname] = func
1718 1718 if idx is None:
1719 1719 getbundle2partsorder.append(stepname)
1720 1720 else:
1721 1721 getbundle2partsorder.insert(idx, stepname)
1722 1722 return func
1723 1723 return dec
1724 1724
1725 1725 def bundle2requested(bundlecaps):
1726 1726 if bundlecaps is not None:
1727 1727 return any(cap.startswith('HG2') for cap in bundlecaps)
1728 1728 return False
1729 1729
1730 1730 def getbundlechunks(repo, source, heads=None, common=None, bundlecaps=None,
1731 1731 **kwargs):
1732 1732 """Return chunks constituting a bundle's raw data.
1733 1733
1734 1734 Could be a bundle HG10 or a bundle HG20 depending on bundlecaps
1735 1735 passed.
1736 1736
1737 1737 Returns a 2-tuple of a dict with metadata about the generated bundle
1738 1738 and an iterator over raw chunks (of varying sizes).
1739 1739 """
1740 1740 kwargs = pycompat.byteskwargs(kwargs)
1741 1741 info = {}
1742 1742 usebundle2 = bundle2requested(bundlecaps)
1743 1743 # bundle10 case
1744 1744 if not usebundle2:
1745 1745 if bundlecaps and not kwargs.get('cg', True):
1746 1746 raise ValueError(_('request for bundle10 must include changegroup'))
1747 1747
1748 1748 if kwargs:
1749 1749 raise ValueError(_('unsupported getbundle arguments: %s')
1750 1750 % ', '.join(sorted(kwargs.keys())))
1751 1751 outgoing = _computeoutgoing(repo, heads, common)
1752 1752 info['bundleversion'] = 1
1753 1753 return info, changegroup.makestream(repo, outgoing, '01', source,
1754 1754 bundlecaps=bundlecaps)
1755 1755
1756 1756 # bundle20 case
1757 1757 info['bundleversion'] = 2
1758 1758 b2caps = {}
1759 1759 for bcaps in bundlecaps:
1760 1760 if bcaps.startswith('bundle2='):
1761 1761 blob = urlreq.unquote(bcaps[len('bundle2='):])
1762 1762 b2caps.update(bundle2.decodecaps(blob))
1763 1763 bundler = bundle2.bundle20(repo.ui, b2caps)
1764 1764
1765 1765 kwargs['heads'] = heads
1766 1766 kwargs['common'] = common
1767 1767
1768 1768 for name in getbundle2partsorder:
1769 1769 func = getbundle2partsmapping[name]
1770 1770 func(bundler, repo, source, bundlecaps=bundlecaps, b2caps=b2caps,
1771 1771 **pycompat.strkwargs(kwargs))
1772 1772
1773 1773 info['prefercompressed'] = bundler.prefercompressed
1774 1774
1775 1775 return info, bundler.getchunks()
1776 1776
1777 1777 @getbundle2partsgenerator('stream2')
1778 1778 def _getbundlestream2(bundler, repo, source, bundlecaps=None,
1779 1779 b2caps=None, heads=None, common=None, **kwargs):
1780 1780 if not kwargs.get('stream', False):
1781 1781 return
1782 1782
1783 1783 if not streamclone.allowservergeneration(repo):
1784 1784 raise error.Abort(_('stream data requested but server does not allow '
1785 1785 'this feature'),
1786 1786 hint=_('well-behaved clients should not be '
1787 1787 'requesting stream data from servers not '
1788 1788 'advertising it; the client may be buggy'))
1789 1789
1790 1790 # Stream clones don't compress well. And compression undermines a
1791 1791 # goal of stream clones, which is to be fast. Communicate the desire
1792 1792 # to avoid compression to consumers of the bundle.
1793 1793 bundler.prefercompressed = False
1794 1794
1795 1795 filecount, bytecount, it = streamclone.generatev2(repo)
1796 requirements = ' '.join(sorted(repo.requirements))
1796 requirements = _formatrequirementsspec(repo.requirements)
1797 1797 part = bundler.newpart('stream2', data=it)
1798 1798 part.addparam('bytecount', '%d' % bytecount, mandatory=True)
1799 1799 part.addparam('filecount', '%d' % filecount, mandatory=True)
1800 1800 part.addparam('requirements', requirements, mandatory=True)
1801 1801
1802 1802 @getbundle2partsgenerator('changegroup')
1803 1803 def _getbundlechangegrouppart(bundler, repo, source, bundlecaps=None,
1804 1804 b2caps=None, heads=None, common=None, **kwargs):
1805 1805 """add a changegroup part to the requested bundle"""
1806 1806 cgstream = None
1807 1807 if kwargs.get(r'cg', True):
1808 1808 # build changegroup bundle here.
1809 1809 version = '01'
1810 1810 cgversions = b2caps.get('changegroup')
1811 1811 if cgversions: # 3.1 and 3.2 ship with an empty value
1812 1812 cgversions = [v for v in cgversions
1813 1813 if v in changegroup.supportedoutgoingversions(repo)]
1814 1814 if not cgversions:
1815 1815 raise ValueError(_('no common changegroup version'))
1816 1816 version = max(cgversions)
1817 1817 outgoing = _computeoutgoing(repo, heads, common)
1818 1818 if outgoing.missing:
1819 1819 cgstream = changegroup.makestream(repo, outgoing, version, source,
1820 1820 bundlecaps=bundlecaps)
1821 1821
1822 1822 if cgstream:
1823 1823 part = bundler.newpart('changegroup', data=cgstream)
1824 1824 if cgversions:
1825 1825 part.addparam('version', version)
1826 1826 part.addparam('nbchanges', '%d' % len(outgoing.missing),
1827 1827 mandatory=False)
1828 1828 if 'treemanifest' in repo.requirements:
1829 1829 part.addparam('treemanifest', '1')
1830 1830
1831 1831 @getbundle2partsgenerator('bookmarks')
1832 1832 def _getbundlebookmarkpart(bundler, repo, source, bundlecaps=None,
1833 1833 b2caps=None, **kwargs):
1834 1834 """add a bookmark part to the requested bundle"""
1835 1835 if not kwargs.get(r'bookmarks', False):
1836 1836 return
1837 1837 if 'bookmarks' not in b2caps:
1838 1838 raise ValueError(_('no common bookmarks exchange method'))
1839 1839 books = bookmod.listbinbookmarks(repo)
1840 1840 data = bookmod.binaryencode(books)
1841 1841 if data:
1842 1842 bundler.newpart('bookmarks', data=data)
1843 1843
1844 1844 @getbundle2partsgenerator('listkeys')
1845 1845 def _getbundlelistkeysparts(bundler, repo, source, bundlecaps=None,
1846 1846 b2caps=None, **kwargs):
1847 1847 """add parts containing listkeys namespaces to the requested bundle"""
1848 1848 listkeys = kwargs.get(r'listkeys', ())
1849 1849 for namespace in listkeys:
1850 1850 part = bundler.newpart('listkeys')
1851 1851 part.addparam('namespace', namespace)
1852 1852 keys = repo.listkeys(namespace).items()
1853 1853 part.data = pushkey.encodekeys(keys)
1854 1854
1855 1855 @getbundle2partsgenerator('obsmarkers')
1856 1856 def _getbundleobsmarkerpart(bundler, repo, source, bundlecaps=None,
1857 1857 b2caps=None, heads=None, **kwargs):
1858 1858 """add an obsolescence markers part to the requested bundle"""
1859 1859 if kwargs.get(r'obsmarkers', False):
1860 1860 if heads is None:
1861 1861 heads = repo.heads()
1862 1862 subset = [c.node() for c in repo.set('::%ln', heads)]
1863 1863 markers = repo.obsstore.relevantmarkers(subset)
1864 1864 markers = sorted(markers)
1865 1865 bundle2.buildobsmarkerspart(bundler, markers)
1866 1866
1867 1867 @getbundle2partsgenerator('phases')
1868 1868 def _getbundlephasespart(bundler, repo, source, bundlecaps=None,
1869 1869 b2caps=None, heads=None, **kwargs):
1870 1870 """add phase heads part to the requested bundle"""
1871 1871 if kwargs.get(r'phases', False):
1872 1872 if not 'heads' in b2caps.get('phases'):
1873 1873 raise ValueError(_('no common phases exchange method'))
1874 1874 if heads is None:
1875 1875 heads = repo.heads()
1876 1876
1877 1877 headsbyphase = collections.defaultdict(set)
1878 1878 if repo.publishing():
1879 1879 headsbyphase[phases.public] = heads
1880 1880 else:
1881 1881 # find the appropriate heads to move
1882 1882
1883 1883 phase = repo._phasecache.phase
1884 1884 node = repo.changelog.node
1885 1885 rev = repo.changelog.rev
1886 1886 for h in heads:
1887 1887 headsbyphase[phase(repo, rev(h))].add(h)
1888 1888 seenphases = list(headsbyphase.keys())
1889 1889
1890 1890 # We do not handle anything but public and draft phase for now)
1891 1891 if seenphases:
1892 1892 assert max(seenphases) <= phases.draft
1893 1893
1894 1894 # if client is pulling non-public changesets, we need to find
1895 1895 # intermediate public heads.
1896 1896 draftheads = headsbyphase.get(phases.draft, set())
1897 1897 if draftheads:
1898 1898 publicheads = headsbyphase.get(phases.public, set())
1899 1899
1900 1900 revset = 'heads(only(%ln, %ln) and public())'
1901 1901 extraheads = repo.revs(revset, draftheads, publicheads)
1902 1902 for r in extraheads:
1903 1903 headsbyphase[phases.public].add(node(r))
1904 1904
1905 1905 # transform data in a format used by the encoding function
1906 1906 phasemapping = []
1907 1907 for phase in phases.allphases:
1908 1908 phasemapping.append(sorted(headsbyphase[phase]))
1909 1909
1910 1910 # generate the actual part
1911 1911 phasedata = phases.binaryencode(phasemapping)
1912 1912 bundler.newpart('phase-heads', data=phasedata)
1913 1913
1914 1914 @getbundle2partsgenerator('hgtagsfnodes')
1915 1915 def _getbundletagsfnodes(bundler, repo, source, bundlecaps=None,
1916 1916 b2caps=None, heads=None, common=None,
1917 1917 **kwargs):
1918 1918 """Transfer the .hgtags filenodes mapping.
1919 1919
1920 1920 Only values for heads in this bundle will be transferred.
1921 1921
1922 1922 The part data consists of pairs of 20 byte changeset node and .hgtags
1923 1923 filenodes raw values.
1924 1924 """
1925 1925 # Don't send unless:
1926 1926 # - changeset are being exchanged,
1927 1927 # - the client supports it.
1928 1928 if not (kwargs.get(r'cg', True) and 'hgtagsfnodes' in b2caps):
1929 1929 return
1930 1930
1931 1931 outgoing = _computeoutgoing(repo, heads, common)
1932 1932 bundle2.addparttagsfnodescache(repo, bundler, outgoing)
1933 1933
1934 1934 def check_heads(repo, their_heads, context):
1935 1935 """check if the heads of a repo have been modified
1936 1936
1937 1937 Used by peer for unbundling.
1938 1938 """
1939 1939 heads = repo.heads()
1940 1940 heads_hash = hashlib.sha1(''.join(sorted(heads))).digest()
1941 1941 if not (their_heads == ['force'] or their_heads == heads or
1942 1942 their_heads == ['hashed', heads_hash]):
1943 1943 # someone else committed/pushed/unbundled while we
1944 1944 # were transferring data
1945 1945 raise error.PushRaced('repository changed while %s - '
1946 1946 'please try again' % context)
1947 1947
1948 1948 def unbundle(repo, cg, heads, source, url):
1949 1949 """Apply a bundle to a repo.
1950 1950
1951 1951 this function makes sure the repo is locked during the application and have
1952 1952 mechanism to check that no push race occurred between the creation of the
1953 1953 bundle and its application.
1954 1954
1955 1955 If the push was raced as PushRaced exception is raised."""
1956 1956 r = 0
1957 1957 # need a transaction when processing a bundle2 stream
1958 1958 # [wlock, lock, tr] - needs to be an array so nested functions can modify it
1959 1959 lockandtr = [None, None, None]
1960 1960 recordout = None
1961 1961 # quick fix for output mismatch with bundle2 in 3.4
1962 1962 captureoutput = repo.ui.configbool('experimental', 'bundle2-output-capture')
1963 1963 if url.startswith('remote:http:') or url.startswith('remote:https:'):
1964 1964 captureoutput = True
1965 1965 try:
1966 1966 # note: outside bundle1, 'heads' is expected to be empty and this
1967 1967 # 'check_heads' call wil be a no-op
1968 1968 check_heads(repo, heads, 'uploading changes')
1969 1969 # push can proceed
1970 1970 if not isinstance(cg, bundle2.unbundle20):
1971 1971 # legacy case: bundle1 (changegroup 01)
1972 1972 txnname = "\n".join([source, util.hidepassword(url)])
1973 1973 with repo.lock(), repo.transaction(txnname) as tr:
1974 1974 op = bundle2.applybundle(repo, cg, tr, source, url)
1975 1975 r = bundle2.combinechangegroupresults(op)
1976 1976 else:
1977 1977 r = None
1978 1978 try:
1979 1979 def gettransaction():
1980 1980 if not lockandtr[2]:
1981 1981 lockandtr[0] = repo.wlock()
1982 1982 lockandtr[1] = repo.lock()
1983 1983 lockandtr[2] = repo.transaction(source)
1984 1984 lockandtr[2].hookargs['source'] = source
1985 1985 lockandtr[2].hookargs['url'] = url
1986 1986 lockandtr[2].hookargs['bundle2'] = '1'
1987 1987 return lockandtr[2]
1988 1988
1989 1989 # Do greedy locking by default until we're satisfied with lazy
1990 1990 # locking.
1991 1991 if not repo.ui.configbool('experimental', 'bundle2lazylocking'):
1992 1992 gettransaction()
1993 1993
1994 1994 op = bundle2.bundleoperation(repo, gettransaction,
1995 1995 captureoutput=captureoutput)
1996 1996 try:
1997 1997 op = bundle2.processbundle(repo, cg, op=op)
1998 1998 finally:
1999 1999 r = op.reply
2000 2000 if captureoutput and r is not None:
2001 2001 repo.ui.pushbuffer(error=True, subproc=True)
2002 2002 def recordout(output):
2003 2003 r.newpart('output', data=output, mandatory=False)
2004 2004 if lockandtr[2] is not None:
2005 2005 lockandtr[2].close()
2006 2006 except BaseException as exc:
2007 2007 exc.duringunbundle2 = True
2008 2008 if captureoutput and r is not None:
2009 2009 parts = exc._bundle2salvagedoutput = r.salvageoutput()
2010 2010 def recordout(output):
2011 2011 part = bundle2.bundlepart('output', data=output,
2012 2012 mandatory=False)
2013 2013 parts.append(part)
2014 2014 raise
2015 2015 finally:
2016 2016 lockmod.release(lockandtr[2], lockandtr[1], lockandtr[0])
2017 2017 if recordout is not None:
2018 2018 recordout(repo.ui.popbuffer())
2019 2019 return r
2020 2020
2021 2021 def _maybeapplyclonebundle(pullop):
2022 2022 """Apply a clone bundle from a remote, if possible."""
2023 2023
2024 2024 repo = pullop.repo
2025 2025 remote = pullop.remote
2026 2026
2027 2027 if not repo.ui.configbool('ui', 'clonebundles'):
2028 2028 return
2029 2029
2030 2030 # Only run if local repo is empty.
2031 2031 if len(repo):
2032 2032 return
2033 2033
2034 2034 if pullop.heads:
2035 2035 return
2036 2036
2037 2037 if not remote.capable('clonebundles'):
2038 2038 return
2039 2039
2040 2040 res = remote._call('clonebundles')
2041 2041
2042 2042 # If we call the wire protocol command, that's good enough to record the
2043 2043 # attempt.
2044 2044 pullop.clonebundleattempted = True
2045 2045
2046 2046 entries = parseclonebundlesmanifest(repo, res)
2047 2047 if not entries:
2048 2048 repo.ui.note(_('no clone bundles available on remote; '
2049 2049 'falling back to regular clone\n'))
2050 2050 return
2051 2051
2052 2052 entries = filterclonebundleentries(
2053 2053 repo, entries, streamclonerequested=pullop.streamclonerequested)
2054 2054
2055 2055 if not entries:
2056 2056 # There is a thundering herd concern here. However, if a server
2057 2057 # operator doesn't advertise bundles appropriate for its clients,
2058 2058 # they deserve what's coming. Furthermore, from a client's
2059 2059 # perspective, no automatic fallback would mean not being able to
2060 2060 # clone!
2061 2061 repo.ui.warn(_('no compatible clone bundles available on server; '
2062 2062 'falling back to regular clone\n'))
2063 2063 repo.ui.warn(_('(you may want to report this to the server '
2064 2064 'operator)\n'))
2065 2065 return
2066 2066
2067 2067 entries = sortclonebundleentries(repo.ui, entries)
2068 2068
2069 2069 url = entries[0]['URL']
2070 2070 repo.ui.status(_('applying clone bundle from %s\n') % url)
2071 2071 if trypullbundlefromurl(repo.ui, repo, url):
2072 2072 repo.ui.status(_('finished applying clone bundle\n'))
2073 2073 # Bundle failed.
2074 2074 #
2075 2075 # We abort by default to avoid the thundering herd of
2076 2076 # clients flooding a server that was expecting expensive
2077 2077 # clone load to be offloaded.
2078 2078 elif repo.ui.configbool('ui', 'clonebundlefallback'):
2079 2079 repo.ui.warn(_('falling back to normal clone\n'))
2080 2080 else:
2081 2081 raise error.Abort(_('error applying bundle'),
2082 2082 hint=_('if this error persists, consider contacting '
2083 2083 'the server operator or disable clone '
2084 2084 'bundles via '
2085 2085 '"--config ui.clonebundles=false"'))
2086 2086
2087 2087 def parseclonebundlesmanifest(repo, s):
2088 2088 """Parses the raw text of a clone bundles manifest.
2089 2089
2090 2090 Returns a list of dicts. The dicts have a ``URL`` key corresponding
2091 2091 to the URL and other keys are the attributes for the entry.
2092 2092 """
2093 2093 m = []
2094 2094 for line in s.splitlines():
2095 2095 fields = line.split()
2096 2096 if not fields:
2097 2097 continue
2098 2098 attrs = {'URL': fields[0]}
2099 2099 for rawattr in fields[1:]:
2100 2100 key, value = rawattr.split('=', 1)
2101 2101 key = urlreq.unquote(key)
2102 2102 value = urlreq.unquote(value)
2103 2103 attrs[key] = value
2104 2104
2105 2105 # Parse BUNDLESPEC into components. This makes client-side
2106 2106 # preferences easier to specify since you can prefer a single
2107 2107 # component of the BUNDLESPEC.
2108 2108 if key == 'BUNDLESPEC':
2109 2109 try:
2110 2110 comp, version, params = parsebundlespec(repo, value,
2111 2111 externalnames=True)
2112 2112 attrs['COMPRESSION'] = comp
2113 2113 attrs['VERSION'] = version
2114 2114 except error.InvalidBundleSpecification:
2115 2115 pass
2116 2116 except error.UnsupportedBundleSpecification:
2117 2117 pass
2118 2118
2119 2119 m.append(attrs)
2120 2120
2121 2121 return m
2122 2122
2123 2123 def filterclonebundleentries(repo, entries, streamclonerequested=False):
2124 2124 """Remove incompatible clone bundle manifest entries.
2125 2125
2126 2126 Accepts a list of entries parsed with ``parseclonebundlesmanifest``
2127 2127 and returns a new list consisting of only the entries that this client
2128 2128 should be able to apply.
2129 2129
2130 2130 There is no guarantee we'll be able to apply all returned entries because
2131 2131 the metadata we use to filter on may be missing or wrong.
2132 2132 """
2133 2133 newentries = []
2134 2134 for entry in entries:
2135 2135 spec = entry.get('BUNDLESPEC')
2136 2136 if spec:
2137 2137 try:
2138 2138 comp, version, params = parsebundlespec(repo, spec, strict=True)
2139 2139
2140 2140 # If a stream clone was requested, filter out non-streamclone
2141 2141 # entries.
2142 2142 if streamclonerequested and (comp != 'UN' or version != 's1'):
2143 2143 repo.ui.debug('filtering %s because not a stream clone\n' %
2144 2144 entry['URL'])
2145 2145 continue
2146 2146
2147 2147 except error.InvalidBundleSpecification as e:
2148 2148 repo.ui.debug(str(e) + '\n')
2149 2149 continue
2150 2150 except error.UnsupportedBundleSpecification as e:
2151 2151 repo.ui.debug('filtering %s because unsupported bundle '
2152 2152 'spec: %s\n' % (entry['URL'], str(e)))
2153 2153 continue
2154 2154 # If we don't have a spec and requested a stream clone, we don't know
2155 2155 # what the entry is so don't attempt to apply it.
2156 2156 elif streamclonerequested:
2157 2157 repo.ui.debug('filtering %s because cannot determine if a stream '
2158 2158 'clone bundle\n' % entry['URL'])
2159 2159 continue
2160 2160
2161 2161 if 'REQUIRESNI' in entry and not sslutil.hassni:
2162 2162 repo.ui.debug('filtering %s because SNI not supported\n' %
2163 2163 entry['URL'])
2164 2164 continue
2165 2165
2166 2166 newentries.append(entry)
2167 2167
2168 2168 return newentries
2169 2169
2170 2170 class clonebundleentry(object):
2171 2171 """Represents an item in a clone bundles manifest.
2172 2172
2173 2173 This rich class is needed to support sorting since sorted() in Python 3
2174 2174 doesn't support ``cmp`` and our comparison is complex enough that ``key=``
2175 2175 won't work.
2176 2176 """
2177 2177
2178 2178 def __init__(self, value, prefers):
2179 2179 self.value = value
2180 2180 self.prefers = prefers
2181 2181
2182 2182 def _cmp(self, other):
2183 2183 for prefkey, prefvalue in self.prefers:
2184 2184 avalue = self.value.get(prefkey)
2185 2185 bvalue = other.value.get(prefkey)
2186 2186
2187 2187 # Special case for b missing attribute and a matches exactly.
2188 2188 if avalue is not None and bvalue is None and avalue == prefvalue:
2189 2189 return -1
2190 2190
2191 2191 # Special case for a missing attribute and b matches exactly.
2192 2192 if bvalue is not None and avalue is None and bvalue == prefvalue:
2193 2193 return 1
2194 2194
2195 2195 # We can't compare unless attribute present on both.
2196 2196 if avalue is None or bvalue is None:
2197 2197 continue
2198 2198
2199 2199 # Same values should fall back to next attribute.
2200 2200 if avalue == bvalue:
2201 2201 continue
2202 2202
2203 2203 # Exact matches come first.
2204 2204 if avalue == prefvalue:
2205 2205 return -1
2206 2206 if bvalue == prefvalue:
2207 2207 return 1
2208 2208
2209 2209 # Fall back to next attribute.
2210 2210 continue
2211 2211
2212 2212 # If we got here we couldn't sort by attributes and prefers. Fall
2213 2213 # back to index order.
2214 2214 return 0
2215 2215
2216 2216 def __lt__(self, other):
2217 2217 return self._cmp(other) < 0
2218 2218
2219 2219 def __gt__(self, other):
2220 2220 return self._cmp(other) > 0
2221 2221
2222 2222 def __eq__(self, other):
2223 2223 return self._cmp(other) == 0
2224 2224
2225 2225 def __le__(self, other):
2226 2226 return self._cmp(other) <= 0
2227 2227
2228 2228 def __ge__(self, other):
2229 2229 return self._cmp(other) >= 0
2230 2230
2231 2231 def __ne__(self, other):
2232 2232 return self._cmp(other) != 0
2233 2233
2234 2234 def sortclonebundleentries(ui, entries):
2235 2235 prefers = ui.configlist('ui', 'clonebundleprefers')
2236 2236 if not prefers:
2237 2237 return list(entries)
2238 2238
2239 2239 prefers = [p.split('=', 1) for p in prefers]
2240 2240
2241 2241 items = sorted(clonebundleentry(v, prefers) for v in entries)
2242 2242 return [i.value for i in items]
2243 2243
2244 2244 def trypullbundlefromurl(ui, repo, url):
2245 2245 """Attempt to apply a bundle from a URL."""
2246 2246 with repo.lock(), repo.transaction('bundleurl') as tr:
2247 2247 try:
2248 2248 fh = urlmod.open(ui, url)
2249 2249 cg = readbundle(ui, fh, 'stream')
2250 2250
2251 2251 if isinstance(cg, streamclone.streamcloneapplier):
2252 2252 cg.apply(repo)
2253 2253 else:
2254 2254 bundle2.applybundle(repo, cg, tr, 'clonebundles', url)
2255 2255 return True
2256 2256 except urlerr.httperror as e:
2257 2257 ui.warn(_('HTTP error fetching bundle: %s\n') % str(e))
2258 2258 except urlerr.urlerror as e:
2259 2259 ui.warn(_('error fetching bundle: %s\n') % e.reason)
2260 2260
2261 2261 return False
@@ -1,510 +1,510
1 1 #require serve
2 2
3 3 #testcases stream-legacy stream-bundle2
4 4
5 5 #if stream-bundle2
6 6 $ cat << EOF >> $HGRCPATH
7 7 > [experimental]
8 8 > bundle2.stream = yes
9 9 > EOF
10 10 #endif
11 11
12 12 Initialize repository
13 13 the status call is to check for issue5130
14 14
15 15 $ hg init server
16 16 $ cd server
17 17 $ touch foo
18 18 $ hg -q commit -A -m initial
19 19 >>> for i in range(1024):
20 20 ... with open(str(i), 'wb') as fh:
21 21 ... fh.write(str(i))
22 22 $ hg -q commit -A -m 'add a lot of files'
23 23 $ hg st
24 24 $ hg --config server.uncompressed=false serve -p $HGPORT -d --pid-file=hg.pid
25 25 $ cat hg.pid > $DAEMON_PIDS
26 26 $ cd ..
27 27
28 28 Cannot stream clone when server.uncompressed is set
29 29
30 30 $ get-with-headers.py $LOCALIP:$HGPORT '?cmd=stream_out'
31 31 200 Script output follows
32 32
33 33 1
34 34
35 35 #if stream-legacy
36 36 $ hg debugcapabilities http://localhost:$HGPORT
37 37 Main capabilities:
38 38 batch
39 39 branchmap
40 40 $USUAL_BUNDLE2_CAPS_SERVER$
41 41 changegroupsubset
42 42 compression=$BUNDLE2_COMPRESSIONS$
43 43 getbundle
44 44 httpheader=1024
45 45 httpmediatype=0.1rx,0.1tx,0.2tx
46 46 known
47 47 lookup
48 48 pushkey
49 49 unbundle=HG10GZ,HG10BZ,HG10UN
50 50 unbundlehash
51 51 Bundle2 capabilities:
52 52 HG20
53 53 bookmarks
54 54 changegroup
55 55 01
56 56 02
57 57 digests
58 58 md5
59 59 sha1
60 60 sha512
61 61 error
62 62 abort
63 63 unsupportedcontent
64 64 pushraced
65 65 pushkey
66 66 hgtagsfnodes
67 67 listkeys
68 68 phases
69 69 heads
70 70 pushkey
71 71 remote-changegroup
72 72 http
73 73 https
74 74
75 75 $ hg clone --stream -U http://localhost:$HGPORT server-disabled
76 76 warning: stream clone requested but server has them disabled
77 77 requesting all changes
78 78 adding changesets
79 79 adding manifests
80 80 adding file changes
81 81 added 2 changesets with 1025 changes to 1025 files
82 82 new changesets 96ee1d7354c4:c17445101a72
83 83
84 84 $ get-with-headers.py $LOCALIP:$HGPORT '?cmd=getbundle' content-type --bodyfile body --hgproto 0.2 --requestheader "x-hgarg-1=bundlecaps=HG20%2Cbundle2%3DHG20%250Abookmarks%250Achangegroup%253D01%252C02%250Adigests%253Dmd5%252Csha1%252Csha512%250Aerror%253Dabort%252Cunsupportedcontent%252Cpushraced%252Cpushkey%250Ahgtagsfnodes%250Alistkeys%250Aphases%253Dheads%250Apushkey%250Aremote-changegroup%253Dhttp%252Chttps&cg=0&common=0000000000000000000000000000000000000000&heads=c17445101a72edac06facd130d14808dfbd5c7c2&stream=1"
85 85 200 Script output follows
86 86 content-type: application/mercurial-0.2
87 87
88 88
89 89 $ f --size body --hexdump --bytes 100
90 90 body: size=232
91 91 0000: 04 6e 6f 6e 65 48 47 32 30 00 00 00 00 00 00 00 |.noneHG20.......|
92 92 0010: cf 0b 45 52 52 4f 52 3a 41 42 4f 52 54 00 00 00 |..ERROR:ABORT...|
93 93 0020: 00 01 01 07 3c 04 72 6d 65 73 73 61 67 65 73 74 |....<.rmessagest|
94 94 0030: 72 65 61 6d 20 64 61 74 61 20 72 65 71 75 65 73 |ream data reques|
95 95 0040: 74 65 64 20 62 75 74 20 73 65 72 76 65 72 20 64 |ted but server d|
96 96 0050: 6f 65 73 20 6e 6f 74 20 61 6c 6c 6f 77 20 74 68 |oes not allow th|
97 97 0060: 69 73 20 66 |is f|
98 98
99 99 #endif
100 100 #if stream-bundle2
101 101 $ hg debugcapabilities http://localhost:$HGPORT
102 102 Main capabilities:
103 103 batch
104 104 branchmap
105 105 $USUAL_BUNDLE2_CAPS_SERVER$
106 106 changegroupsubset
107 107 compression=$BUNDLE2_COMPRESSIONS$
108 108 getbundle
109 109 httpheader=1024
110 110 httpmediatype=0.1rx,0.1tx,0.2tx
111 111 known
112 112 lookup
113 113 pushkey
114 114 unbundle=HG10GZ,HG10BZ,HG10UN
115 115 unbundlehash
116 116 Bundle2 capabilities:
117 117 HG20
118 118 bookmarks
119 119 changegroup
120 120 01
121 121 02
122 122 digests
123 123 md5
124 124 sha1
125 125 sha512
126 126 error
127 127 abort
128 128 unsupportedcontent
129 129 pushraced
130 130 pushkey
131 131 hgtagsfnodes
132 132 listkeys
133 133 phases
134 134 heads
135 135 pushkey
136 136 remote-changegroup
137 137 http
138 138 https
139 139
140 140 $ hg clone --stream -U http://localhost:$HGPORT server-disabled
141 141 warning: stream clone requested but server has them disabled
142 142 requesting all changes
143 143 adding changesets
144 144 adding manifests
145 145 adding file changes
146 146 added 2 changesets with 1025 changes to 1025 files
147 147 new changesets 96ee1d7354c4:c17445101a72
148 148
149 149 $ get-with-headers.py $LOCALIP:$HGPORT '?cmd=getbundle' content-type --bodyfile body --hgproto 0.2 --requestheader "x-hgarg-1=bundlecaps=HG20%2Cbundle2%3DHG20%250Abookmarks%250Achangegroup%253D01%252C02%250Adigests%253Dmd5%252Csha1%252Csha512%250Aerror%253Dabort%252Cunsupportedcontent%252Cpushraced%252Cpushkey%250Ahgtagsfnodes%250Alistkeys%250Aphases%253Dheads%250Apushkey%250Aremote-changegroup%253Dhttp%252Chttps&cg=0&common=0000000000000000000000000000000000000000&heads=c17445101a72edac06facd130d14808dfbd5c7c2&stream=1"
150 150 200 Script output follows
151 151 content-type: application/mercurial-0.2
152 152
153 153
154 154 $ f --size body --hexdump --bytes 100
155 155 body: size=232
156 156 0000: 04 6e 6f 6e 65 48 47 32 30 00 00 00 00 00 00 00 |.noneHG20.......|
157 157 0010: cf 0b 45 52 52 4f 52 3a 41 42 4f 52 54 00 00 00 |..ERROR:ABORT...|
158 158 0020: 00 01 01 07 3c 04 72 6d 65 73 73 61 67 65 73 74 |....<.rmessagest|
159 159 0030: 72 65 61 6d 20 64 61 74 61 20 72 65 71 75 65 73 |ream data reques|
160 160 0040: 74 65 64 20 62 75 74 20 73 65 72 76 65 72 20 64 |ted but server d|
161 161 0050: 6f 65 73 20 6e 6f 74 20 61 6c 6c 6f 77 20 74 68 |oes not allow th|
162 162 0060: 69 73 20 66 |is f|
163 163
164 164 #endif
165 165
166 166 $ killdaemons.py
167 167 $ cd server
168 168 $ hg serve -p $HGPORT -d --pid-file=hg.pid
169 169 $ cat hg.pid > $DAEMON_PIDS
170 170 $ cd ..
171 171
172 172 Basic clone
173 173
174 174 #if stream-legacy
175 175 $ hg clone --stream -U http://localhost:$HGPORT clone1
176 176 streaming all changes
177 177 1027 files to transfer, 96.3 KB of data
178 178 transferred 96.3 KB in * seconds (*/sec) (glob)
179 179 searching for changes
180 180 no changes found
181 181 #endif
182 182 #if stream-bundle2
183 183 $ hg clone --stream -U http://localhost:$HGPORT clone1
184 184 streaming all changes
185 185 1030 files to transfer, 96.4 KB of data
186 186 transferred 96.4 KB in * seconds (* */sec) (glob)
187 187
188 188 $ ls -1 clone1/.hg/cache
189 189 branch2-served
190 190 rbc-names-v1
191 191 rbc-revs-v1
192 192 #endif
193 193
194 194 getbundle requests with stream=1 are uncompressed
195 195
196 196 $ get-with-headers.py $LOCALIP:$HGPORT '?cmd=getbundle' content-type --bodyfile body --hgproto '0.1 0.2 comp=zlib,none' --requestheader "x-hgarg-1=bundlecaps=HG20%2Cbundle2%3DHG20%250Abookmarks%250Achangegroup%253D01%252C02%250Adigests%253Dmd5%252Csha1%252Csha512%250Aerror%253Dabort%252Cunsupportedcontent%252Cpushraced%252Cpushkey%250Ahgtagsfnodes%250Alistkeys%250Aphases%253Dheads%250Apushkey%250Aremote-changegroup%253Dhttp%252Chttps&cg=0&common=0000000000000000000000000000000000000000&heads=c17445101a72edac06facd130d14808dfbd5c7c2&stream=1"
197 197 200 Script output follows
198 198 content-type: application/mercurial-0.2
199 199
200 200
201 201 $ f --size --hex --bytes 256 body
202 body: size=112222
202 body: size=112230
203 203 0000: 04 6e 6f 6e 65 48 47 32 30 00 00 00 00 00 00 00 |.noneHG20.......|
204 0010: 68 07 53 54 52 45 41 4d 32 00 00 00 00 03 00 09 |h.STREAM2.......|
205 0020: 05 09 04 0c 2d 62 79 74 65 63 6f 75 6e 74 39 38 |....-bytecount98|
204 0010: 70 07 53 54 52 45 41 4d 32 00 00 00 00 03 00 09 |p.STREAM2.......|
205 0020: 05 09 04 0c 35 62 79 74 65 63 6f 75 6e 74 39 38 |....5bytecount98|
206 206 0030: 37 35 38 66 69 6c 65 63 6f 75 6e 74 31 30 33 30 |758filecount1030|
207 207 0040: 72 65 71 75 69 72 65 6d 65 6e 74 73 64 6f 74 65 |requirementsdote|
208 0050: 6e 63 6f 64 65 20 66 6e 63 61 63 68 65 20 67 65 |ncode fncache ge|
209 0060: 6e 65 72 61 6c 64 65 6c 74 61 20 72 65 76 6c 6f |neraldelta revlo|
210 0070: 67 76 31 20 73 74 6f 72 65 00 00 80 00 73 08 42 |gv1 store....s.B|
211 0080: 64 61 74 61 2f 30 2e 69 00 03 00 01 00 00 00 00 |data/0.i........|
212 0090: 00 00 00 02 00 00 00 01 00 00 00 00 00 00 00 01 |................|
213 00a0: ff ff ff ff ff ff ff ff 80 29 63 a0 49 d3 23 87 |.........)c.I.#.|
214 00b0: bf ce fe 56 67 92 67 2c 69 d1 ec 39 00 00 00 00 |...Vg.g,i..9....|
215 00c0: 00 00 00 00 00 00 00 00 75 30 73 08 42 64 61 74 |........u0s.Bdat|
216 00d0: 61 2f 31 2e 69 00 03 00 01 00 00 00 00 00 00 00 |a/1.i...........|
217 00e0: 02 00 00 00 01 00 00 00 00 00 00 00 01 ff ff ff |................|
218 00f0: ff ff ff ff ff f9 76 da 1d 0d f2 25 6c de 08 db |......v....%l...|
208 0050: 6e 63 6f 64 65 25 32 43 66 6e 63 61 63 68 65 25 |ncode%2Cfncache%|
209 0060: 32 43 67 65 6e 65 72 61 6c 64 65 6c 74 61 25 32 |2Cgeneraldelta%2|
210 0070: 43 72 65 76 6c 6f 67 76 31 25 32 43 73 74 6f 72 |Crevlogv1%2Cstor|
211 0080: 65 00 00 80 00 73 08 42 64 61 74 61 2f 30 2e 69 |e....s.Bdata/0.i|
212 0090: 00 03 00 01 00 00 00 00 00 00 00 02 00 00 00 01 |................|
213 00a0: 00 00 00 00 00 00 00 01 ff ff ff ff ff ff ff ff |................|
214 00b0: 80 29 63 a0 49 d3 23 87 bf ce fe 56 67 92 67 2c |.)c.I.#....Vg.g,|
215 00c0: 69 d1 ec 39 00 00 00 00 00 00 00 00 00 00 00 00 |i..9............|
216 00d0: 75 30 73 08 42 64 61 74 61 2f 31 2e 69 00 03 00 |u0s.Bdata/1.i...|
217 00e0: 01 00 00 00 00 00 00 00 02 00 00 00 01 00 00 00 |................|
218 00f0: 00 00 00 00 01 ff ff ff ff ff ff ff ff f9 76 da |..............v.|
219 219
220 220 --uncompressed is an alias to --stream
221 221
222 222 #if stream-legacy
223 223 $ hg clone --uncompressed -U http://localhost:$HGPORT clone1-uncompressed
224 224 streaming all changes
225 225 1027 files to transfer, 96.3 KB of data
226 226 transferred 96.3 KB in * seconds (*/sec) (glob)
227 227 searching for changes
228 228 no changes found
229 229 #endif
230 230 #if stream-bundle2
231 231 $ hg clone --uncompressed -U http://localhost:$HGPORT clone1-uncompressed
232 232 streaming all changes
233 233 1030 files to transfer, 96.4 KB of data
234 234 transferred 96.4 KB in * seconds (* */sec) (glob)
235 235 #endif
236 236
237 237 Clone with background file closing enabled
238 238
239 239 #if stream-legacy
240 240 $ hg --debug --config worker.backgroundclose=true --config worker.backgroundcloseminfilecount=1 clone --stream -U http://localhost:$HGPORT clone-background | grep -v adding
241 241 using http://localhost:$HGPORT/
242 242 sending capabilities command
243 243 sending branchmap command
244 244 streaming all changes
245 245 sending stream_out command
246 246 1027 files to transfer, 96.3 KB of data
247 247 starting 4 threads for background file closing
248 248 transferred 96.3 KB in * seconds (*/sec) (glob)
249 249 query 1; heads
250 250 sending batch command
251 251 searching for changes
252 252 all remote heads known locally
253 253 no changes found
254 254 sending getbundle command
255 255 bundle2-input-bundle: with-transaction
256 256 bundle2-input-part: "listkeys" (params: 1 mandatory) supported
257 257 bundle2-input-part: "phase-heads" supported
258 258 bundle2-input-part: total payload size 24
259 259 bundle2-input-bundle: 1 parts total
260 260 checking for updated bookmarks
261 261 #endif
262 262 #if stream-bundle2
263 263 $ hg --debug --config worker.backgroundclose=true --config worker.backgroundcloseminfilecount=1 clone --stream -U http://localhost:$HGPORT clone-background | grep -v adding
264 264 using http://localhost:$HGPORT/
265 265 sending capabilities command
266 266 query 1; heads
267 267 sending batch command
268 268 streaming all changes
269 269 sending getbundle command
270 270 bundle2-input-bundle: with-transaction
271 271 bundle2-input-part: "stream2" (params: 3 mandatory) supported
272 272 applying stream bundle
273 273 1030 files to transfer, 96.4 KB of data
274 274 starting 4 threads for background file closing
275 275 starting 4 threads for background file closing
276 276 transferred 96.4 KB in * seconds (* */sec) (glob)
277 277 bundle2-input-part: total payload size 112077
278 278 bundle2-input-part: "listkeys" (params: 1 mandatory) supported
279 279 bundle2-input-bundle: 1 parts total
280 280 checking for updated bookmarks
281 281 #endif
282 282
283 283 Cannot stream clone when there are secret changesets
284 284
285 285 $ hg -R server phase --force --secret -r tip
286 286 $ hg clone --stream -U http://localhost:$HGPORT secret-denied
287 287 warning: stream clone requested but server has them disabled
288 288 requesting all changes
289 289 adding changesets
290 290 adding manifests
291 291 adding file changes
292 292 added 1 changesets with 1 changes to 1 files
293 293 new changesets 96ee1d7354c4
294 294
295 295 $ killdaemons.py
296 296
297 297 Streaming of secrets can be overridden by server config
298 298
299 299 $ cd server
300 300 $ hg serve --config server.uncompressedallowsecret=true -p $HGPORT -d --pid-file=hg.pid
301 301 $ cat hg.pid > $DAEMON_PIDS
302 302 $ cd ..
303 303
304 304 #if stream-legacy
305 305 $ hg clone --stream -U http://localhost:$HGPORT secret-allowed
306 306 streaming all changes
307 307 1027 files to transfer, 96.3 KB of data
308 308 transferred 96.3 KB in * seconds (*/sec) (glob)
309 309 searching for changes
310 310 no changes found
311 311 #endif
312 312 #if stream-bundle2
313 313 $ hg clone --stream -U http://localhost:$HGPORT secret-allowed
314 314 streaming all changes
315 315 1030 files to transfer, 96.4 KB of data
316 316 transferred 96.4 KB in * seconds (* */sec) (glob)
317 317 #endif
318 318
319 319 $ killdaemons.py
320 320
321 321 Verify interaction between preferuncompressed and secret presence
322 322
323 323 $ cd server
324 324 $ hg serve --config server.preferuncompressed=true -p $HGPORT -d --pid-file=hg.pid
325 325 $ cat hg.pid > $DAEMON_PIDS
326 326 $ cd ..
327 327
328 328 $ hg clone -U http://localhost:$HGPORT preferuncompressed-secret
329 329 requesting all changes
330 330 adding changesets
331 331 adding manifests
332 332 adding file changes
333 333 added 1 changesets with 1 changes to 1 files
334 334 new changesets 96ee1d7354c4
335 335
336 336 $ killdaemons.py
337 337
338 338 Clone not allowed when full bundles disabled and can't serve secrets
339 339
340 340 $ cd server
341 341 $ hg serve --config server.disablefullbundle=true -p $HGPORT -d --pid-file=hg.pid
342 342 $ cat hg.pid > $DAEMON_PIDS
343 343 $ cd ..
344 344
345 345 $ hg clone --stream http://localhost:$HGPORT secret-full-disabled
346 346 warning: stream clone requested but server has them disabled
347 347 requesting all changes
348 348 remote: abort: server has pull-based clones disabled
349 349 abort: pull failed on remote
350 350 (remove --pull if specified or upgrade Mercurial)
351 351 [255]
352 352
353 353 Local stream clone with secrets involved
354 354 (This is just a test over behavior: if you have access to the repo's files,
355 355 there is no security so it isn't important to prevent a clone here.)
356 356
357 357 $ hg clone -U --stream server local-secret
358 358 warning: stream clone requested but server has them disabled
359 359 requesting all changes
360 360 adding changesets
361 361 adding manifests
362 362 adding file changes
363 363 added 1 changesets with 1 changes to 1 files
364 364 new changesets 96ee1d7354c4
365 365
366 366 Stream clone while repo is changing:
367 367
368 368 $ mkdir changing
369 369 $ cd changing
370 370
371 371 extension for delaying the server process so we reliably can modify the repo
372 372 while cloning
373 373
374 374 $ cat > delayer.py <<EOF
375 375 > import time
376 376 > from mercurial import extensions, vfs
377 377 > def __call__(orig, self, path, *args, **kwargs):
378 378 > if path == 'data/f1.i':
379 379 > time.sleep(2)
380 380 > return orig(self, path, *args, **kwargs)
381 381 > extensions.wrapfunction(vfs.vfs, '__call__', __call__)
382 382 > EOF
383 383
384 384 prepare repo with small and big file to cover both code paths in emitrevlogdata
385 385
386 386 $ hg init repo
387 387 $ touch repo/f1
388 388 $ $TESTDIR/seq.py 50000 > repo/f2
389 389 $ hg -R repo ci -Aqm "0"
390 390 $ hg serve -R repo -p $HGPORT1 -d --pid-file=hg.pid --config extensions.delayer=delayer.py
391 391 $ cat hg.pid >> $DAEMON_PIDS
392 392
393 393 clone while modifying the repo between stating file with write lock and
394 394 actually serving file content
395 395
396 396 $ hg clone -q --stream -U http://localhost:$HGPORT1 clone &
397 397 $ sleep 1
398 398 $ echo >> repo/f1
399 399 $ echo >> repo/f2
400 400 $ hg -R repo ci -m "1"
401 401 $ wait
402 402 $ hg -R clone id
403 403 000000000000
404 404 $ cd ..
405 405
406 406 Stream repository with bookmarks
407 407 --------------------------------
408 408
409 409 (revert introduction of secret changeset)
410 410
411 411 $ hg -R server phase --draft 'secret()'
412 412
413 413 add a bookmark
414 414
415 415 $ hg -R server bookmark -r tip some-bookmark
416 416
417 417 clone it
418 418
419 419 #if stream-legacy
420 420 $ hg clone --stream http://localhost:$HGPORT with-bookmarks
421 421 streaming all changes
422 422 1027 files to transfer, 96.3 KB of data
423 423 transferred 96.3 KB in * seconds (*) (glob)
424 424 searching for changes
425 425 no changes found
426 426 updating to branch default
427 427 1025 files updated, 0 files merged, 0 files removed, 0 files unresolved
428 428 #endif
429 429 #if stream-bundle2
430 430 $ hg clone --stream http://localhost:$HGPORT with-bookmarks
431 431 streaming all changes
432 432 1033 files to transfer, 96.6 KB of data
433 433 transferred 96.6 KB in * seconds (* */sec) (glob)
434 434 updating to branch default
435 435 1025 files updated, 0 files merged, 0 files removed, 0 files unresolved
436 436 #endif
437 437 $ hg -R with-bookmarks bookmarks
438 438 some-bookmark 1:c17445101a72
439 439
440 440 Stream repository with phases
441 441 -----------------------------
442 442
443 443 Clone as publishing
444 444
445 445 $ hg -R server phase -r 'all()'
446 446 0: draft
447 447 1: draft
448 448
449 449 #if stream-legacy
450 450 $ hg clone --stream http://localhost:$HGPORT phase-publish
451 451 streaming all changes
452 452 1027 files to transfer, 96.3 KB of data
453 453 transferred 96.3 KB in * seconds (*) (glob)
454 454 searching for changes
455 455 no changes found
456 456 updating to branch default
457 457 1025 files updated, 0 files merged, 0 files removed, 0 files unresolved
458 458 #endif
459 459 #if stream-bundle2
460 460 $ hg clone --stream http://localhost:$HGPORT phase-publish
461 461 streaming all changes
462 462 1033 files to transfer, 96.6 KB of data
463 463 transferred 96.6 KB in * seconds (* */sec) (glob)
464 464 updating to branch default
465 465 1025 files updated, 0 files merged, 0 files removed, 0 files unresolved
466 466 #endif
467 467 $ hg -R phase-publish phase -r 'all()'
468 468 0: public
469 469 1: public
470 470
471 471 Clone as non publishing
472 472
473 473 $ cat << EOF >> server/.hg/hgrc
474 474 > [phases]
475 475 > publish = False
476 476 > EOF
477 477 $ killdaemons.py
478 478 $ hg -R server serve -p $HGPORT -d --pid-file=hg.pid
479 479 $ cat hg.pid > $DAEMON_PIDS
480 480
481 481 #if stream-legacy
482 482
483 483 With v1 of the stream protocol, changeset are always cloned as public. It make
484 484 stream v1 unsuitable for non-publishing repository.
485 485
486 486 $ hg clone --stream http://localhost:$HGPORT phase-no-publish
487 487 streaming all changes
488 488 1027 files to transfer, 96.3 KB of data
489 489 transferred 96.3 KB in * seconds (*) (glob)
490 490 searching for changes
491 491 no changes found
492 492 updating to branch default
493 493 1025 files updated, 0 files merged, 0 files removed, 0 files unresolved
494 494 $ hg -R phase-no-publish phase -r 'all()'
495 495 0: public
496 496 1: public
497 497 #endif
498 498 #if stream-bundle2
499 499 $ hg clone --stream http://localhost:$HGPORT phase-no-publish
500 500 streaming all changes
501 501 1034 files to transfer, 96.7 KB of data
502 502 transferred 96.7 KB in * seconds (* */sec) (glob)
503 503 updating to branch default
504 504 1025 files updated, 0 files merged, 0 files removed, 0 files unresolved
505 505 $ hg -R phase-no-publish phase -r 'all()'
506 506 0: draft
507 507 1: draft
508 508 #endif
509 509
510 510 $ killdaemons.py
General Comments 0
You need to be logged in to leave comments. Login now