##// END OF EJS Templates
bundle2: only grab a transaction when 'phase-heads' affect the repository...
Boris Feld -
r34322:4ef472b9 default
parent child Browse files
Show More
@@ -1,1921 +1,1921 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import, division
149 149
150 150 import errno
151 151 import re
152 152 import string
153 153 import struct
154 154 import sys
155 155
156 156 from .i18n import _
157 157 from . import (
158 158 changegroup,
159 159 error,
160 160 obsolete,
161 161 phases,
162 162 pushkey,
163 163 pycompat,
164 164 tags,
165 165 url,
166 166 util,
167 167 )
168 168
169 169 urlerr = util.urlerr
170 170 urlreq = util.urlreq
171 171
172 172 _pack = struct.pack
173 173 _unpack = struct.unpack
174 174
175 175 _fstreamparamsize = '>i'
176 176 _fpartheadersize = '>i'
177 177 _fparttypesize = '>B'
178 178 _fpartid = '>I'
179 179 _fpayloadsize = '>i'
180 180 _fpartparamcount = '>BB'
181 181
182 182 preferedchunksize = 4096
183 183
184 184 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
185 185
186 186 def outdebug(ui, message):
187 187 """debug regarding output stream (bundling)"""
188 188 if ui.configbool('devel', 'bundle2.debug'):
189 189 ui.debug('bundle2-output: %s\n' % message)
190 190
191 191 def indebug(ui, message):
192 192 """debug on input stream (unbundling)"""
193 193 if ui.configbool('devel', 'bundle2.debug'):
194 194 ui.debug('bundle2-input: %s\n' % message)
195 195
196 196 def validateparttype(parttype):
197 197 """raise ValueError if a parttype contains invalid character"""
198 198 if _parttypeforbidden.search(parttype):
199 199 raise ValueError(parttype)
200 200
201 201 def _makefpartparamsizes(nbparams):
202 202 """return a struct format to read part parameter sizes
203 203
204 204 The number parameters is variable so we need to build that format
205 205 dynamically.
206 206 """
207 207 return '>'+('BB'*nbparams)
208 208
209 209 parthandlermapping = {}
210 210
211 211 def parthandler(parttype, params=()):
212 212 """decorator that register a function as a bundle2 part handler
213 213
214 214 eg::
215 215
216 216 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
217 217 def myparttypehandler(...):
218 218 '''process a part of type "my part".'''
219 219 ...
220 220 """
221 221 validateparttype(parttype)
222 222 def _decorator(func):
223 223 lparttype = parttype.lower() # enforce lower case matching.
224 224 assert lparttype not in parthandlermapping
225 225 parthandlermapping[lparttype] = func
226 226 func.params = frozenset(params)
227 227 return func
228 228 return _decorator
229 229
230 230 class unbundlerecords(object):
231 231 """keep record of what happens during and unbundle
232 232
233 233 New records are added using `records.add('cat', obj)`. Where 'cat' is a
234 234 category of record and obj is an arbitrary object.
235 235
236 236 `records['cat']` will return all entries of this category 'cat'.
237 237
238 238 Iterating on the object itself will yield `('category', obj)` tuples
239 239 for all entries.
240 240
241 241 All iterations happens in chronological order.
242 242 """
243 243
244 244 def __init__(self):
245 245 self._categories = {}
246 246 self._sequences = []
247 247 self._replies = {}
248 248
249 249 def add(self, category, entry, inreplyto=None):
250 250 """add a new record of a given category.
251 251
252 252 The entry can then be retrieved in the list returned by
253 253 self['category']."""
254 254 self._categories.setdefault(category, []).append(entry)
255 255 self._sequences.append((category, entry))
256 256 if inreplyto is not None:
257 257 self.getreplies(inreplyto).add(category, entry)
258 258
259 259 def getreplies(self, partid):
260 260 """get the records that are replies to a specific part"""
261 261 return self._replies.setdefault(partid, unbundlerecords())
262 262
263 263 def __getitem__(self, cat):
264 264 return tuple(self._categories.get(cat, ()))
265 265
266 266 def __iter__(self):
267 267 return iter(self._sequences)
268 268
269 269 def __len__(self):
270 270 return len(self._sequences)
271 271
272 272 def __nonzero__(self):
273 273 return bool(self._sequences)
274 274
275 275 __bool__ = __nonzero__
276 276
277 277 class bundleoperation(object):
278 278 """an object that represents a single bundling process
279 279
280 280 Its purpose is to carry unbundle-related objects and states.
281 281
282 282 A new object should be created at the beginning of each bundle processing.
283 283 The object is to be returned by the processing function.
284 284
285 285 The object has very little content now it will ultimately contain:
286 286 * an access to the repo the bundle is applied to,
287 287 * a ui object,
288 288 * a way to retrieve a transaction to add changes to the repo,
289 289 * a way to record the result of processing each part,
290 290 * a way to construct a bundle response when applicable.
291 291 """
292 292
293 293 def __init__(self, repo, transactiongetter, captureoutput=True):
294 294 self.repo = repo
295 295 self.ui = repo.ui
296 296 self.records = unbundlerecords()
297 297 self.reply = None
298 298 self.captureoutput = captureoutput
299 299 self.hookargs = {}
300 300 self._gettransaction = transactiongetter
301 301
302 302 def gettransaction(self):
303 303 transaction = self._gettransaction()
304 304
305 305 if self.hookargs:
306 306 # the ones added to the transaction supercede those added
307 307 # to the operation.
308 308 self.hookargs.update(transaction.hookargs)
309 309 transaction.hookargs = self.hookargs
310 310
311 311 # mark the hookargs as flushed. further attempts to add to
312 312 # hookargs will result in an abort.
313 313 self.hookargs = None
314 314
315 315 return transaction
316 316
317 317 def addhookargs(self, hookargs):
318 318 if self.hookargs is None:
319 319 raise error.ProgrammingError('attempted to add hookargs to '
320 320 'operation after transaction started')
321 321 self.hookargs.update(hookargs)
322 322
323 323 class TransactionUnavailable(RuntimeError):
324 324 pass
325 325
326 326 def _notransaction():
327 327 """default method to get a transaction while processing a bundle
328 328
329 329 Raise an exception to highlight the fact that no transaction was expected
330 330 to be created"""
331 331 raise TransactionUnavailable()
332 332
333 333 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
334 334 # transform me into unbundler.apply() as soon as the freeze is lifted
335 335 if isinstance(unbundler, unbundle20):
336 336 tr.hookargs['bundle2'] = '1'
337 337 if source is not None and 'source' not in tr.hookargs:
338 338 tr.hookargs['source'] = source
339 339 if url is not None and 'url' not in tr.hookargs:
340 340 tr.hookargs['url'] = url
341 341 return processbundle(repo, unbundler, lambda: tr)
342 342 else:
343 343 # the transactiongetter won't be used, but we might as well set it
344 344 op = bundleoperation(repo, lambda: tr)
345 345 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
346 346 return op
347 347
348 348 class partiterator(object):
349 349 def __init__(self, repo, op, unbundler):
350 350 self.repo = repo
351 351 self.op = op
352 352 self.unbundler = unbundler
353 353 self.iterator = None
354 354 self.count = 0
355 355 self.current = None
356 356
357 357 def __enter__(self):
358 358 def func():
359 359 itr = enumerate(self.unbundler.iterparts())
360 360 for count, p in itr:
361 361 self.count = count
362 362 self.current = p
363 363 yield p
364 364 p.seek(0, 2)
365 365 self.current = None
366 366 self.iterator = func()
367 367 return self.iterator
368 368
369 369 def __exit__(self, type, exc, tb):
370 370 if not self.iterator:
371 371 return
372 372
373 373 if exc:
374 374 # If exiting or interrupted, do not attempt to seek the stream in
375 375 # the finally block below. This makes abort faster.
376 376 if (self.current and
377 377 not isinstance(exc, (SystemExit, KeyboardInterrupt))):
378 378 # consume the part content to not corrupt the stream.
379 379 self.current.seek(0, 2)
380 380
381 381 # Any exceptions seeking to the end of the bundle at this point are
382 382 # almost certainly related to the underlying stream being bad.
383 383 # And, chances are that the exception we're handling is related to
384 384 # getting in that bad state. So, we swallow the seeking error and
385 385 # re-raise the original error.
386 386 seekerror = False
387 387 try:
388 388 for part in self.iterator:
389 389 # consume the bundle content
390 390 part.seek(0, 2)
391 391 except Exception:
392 392 seekerror = True
393 393
394 394 # Small hack to let caller code distinguish exceptions from bundle2
395 395 # processing from processing the old format. This is mostly needed
396 396 # to handle different return codes to unbundle according to the type
397 397 # of bundle. We should probably clean up or drop this return code
398 398 # craziness in a future version.
399 399 exc.duringunbundle2 = True
400 400 salvaged = []
401 401 replycaps = None
402 402 if self.op.reply is not None:
403 403 salvaged = self.op.reply.salvageoutput()
404 404 replycaps = self.op.reply.capabilities
405 405 exc._replycaps = replycaps
406 406 exc._bundle2salvagedoutput = salvaged
407 407
408 408 # Re-raising from a variable loses the original stack. So only use
409 409 # that form if we need to.
410 410 if seekerror:
411 411 raise exc
412 412
413 413 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
414 414 self.count)
415 415
416 416 def processbundle(repo, unbundler, transactiongetter=None, op=None):
417 417 """This function process a bundle, apply effect to/from a repo
418 418
419 419 It iterates over each part then searches for and uses the proper handling
420 420 code to process the part. Parts are processed in order.
421 421
422 422 Unknown Mandatory part will abort the process.
423 423
424 424 It is temporarily possible to provide a prebuilt bundleoperation to the
425 425 function. This is used to ensure output is properly propagated in case of
426 426 an error during the unbundling. This output capturing part will likely be
427 427 reworked and this ability will probably go away in the process.
428 428 """
429 429 if op is None:
430 430 if transactiongetter is None:
431 431 transactiongetter = _notransaction
432 432 op = bundleoperation(repo, transactiongetter)
433 433 # todo:
434 434 # - replace this is a init function soon.
435 435 # - exception catching
436 436 unbundler.params
437 437 if repo.ui.debugflag:
438 438 msg = ['bundle2-input-bundle:']
439 439 if unbundler.params:
440 440 msg.append(' %i params' % len(unbundler.params))
441 441 if op._gettransaction is None or op._gettransaction is _notransaction:
442 442 msg.append(' no-transaction')
443 443 else:
444 444 msg.append(' with-transaction')
445 445 msg.append('\n')
446 446 repo.ui.debug(''.join(msg))
447 447
448 448 processparts(repo, op, unbundler)
449 449
450 450 return op
451 451
452 452 def processparts(repo, op, unbundler):
453 453 with partiterator(repo, op, unbundler) as parts:
454 454 for part in parts:
455 455 _processpart(op, part)
456 456
457 457 def _processchangegroup(op, cg, tr, source, url, **kwargs):
458 458 ret = cg.apply(op.repo, tr, source, url, **kwargs)
459 459 op.records.add('changegroup', {
460 460 'return': ret,
461 461 })
462 462 return ret
463 463
464 464 def _gethandler(op, part):
465 465 status = 'unknown' # used by debug output
466 466 try:
467 467 handler = parthandlermapping.get(part.type)
468 468 if handler is None:
469 469 status = 'unsupported-type'
470 470 raise error.BundleUnknownFeatureError(parttype=part.type)
471 471 indebug(op.ui, 'found a handler for part %s' % part.type)
472 472 unknownparams = part.mandatorykeys - handler.params
473 473 if unknownparams:
474 474 unknownparams = list(unknownparams)
475 475 unknownparams.sort()
476 476 status = 'unsupported-params (%s)' % ', '.join(unknownparams)
477 477 raise error.BundleUnknownFeatureError(parttype=part.type,
478 478 params=unknownparams)
479 479 status = 'supported'
480 480 except error.BundleUnknownFeatureError as exc:
481 481 if part.mandatory: # mandatory parts
482 482 raise
483 483 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
484 484 return # skip to part processing
485 485 finally:
486 486 if op.ui.debugflag:
487 487 msg = ['bundle2-input-part: "%s"' % part.type]
488 488 if not part.mandatory:
489 489 msg.append(' (advisory)')
490 490 nbmp = len(part.mandatorykeys)
491 491 nbap = len(part.params) - nbmp
492 492 if nbmp or nbap:
493 493 msg.append(' (params:')
494 494 if nbmp:
495 495 msg.append(' %i mandatory' % nbmp)
496 496 if nbap:
497 497 msg.append(' %i advisory' % nbmp)
498 498 msg.append(')')
499 499 msg.append(' %s\n' % status)
500 500 op.ui.debug(''.join(msg))
501 501
502 502 return handler
503 503
504 504 def _processpart(op, part):
505 505 """process a single part from a bundle
506 506
507 507 The part is guaranteed to have been fully consumed when the function exits
508 508 (even if an exception is raised)."""
509 509 handler = _gethandler(op, part)
510 510 if handler is None:
511 511 return
512 512
513 513 # handler is called outside the above try block so that we don't
514 514 # risk catching KeyErrors from anything other than the
515 515 # parthandlermapping lookup (any KeyError raised by handler()
516 516 # itself represents a defect of a different variety).
517 517 output = None
518 518 if op.captureoutput and op.reply is not None:
519 519 op.ui.pushbuffer(error=True, subproc=True)
520 520 output = ''
521 521 try:
522 522 handler(op, part)
523 523 finally:
524 524 if output is not None:
525 525 output = op.ui.popbuffer()
526 526 if output:
527 527 outpart = op.reply.newpart('output', data=output,
528 528 mandatory=False)
529 529 outpart.addparam(
530 530 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
531 531
532 532 def decodecaps(blob):
533 533 """decode a bundle2 caps bytes blob into a dictionary
534 534
535 535 The blob is a list of capabilities (one per line)
536 536 Capabilities may have values using a line of the form::
537 537
538 538 capability=value1,value2,value3
539 539
540 540 The values are always a list."""
541 541 caps = {}
542 542 for line in blob.splitlines():
543 543 if not line:
544 544 continue
545 545 if '=' not in line:
546 546 key, vals = line, ()
547 547 else:
548 548 key, vals = line.split('=', 1)
549 549 vals = vals.split(',')
550 550 key = urlreq.unquote(key)
551 551 vals = [urlreq.unquote(v) for v in vals]
552 552 caps[key] = vals
553 553 return caps
554 554
555 555 def encodecaps(caps):
556 556 """encode a bundle2 caps dictionary into a bytes blob"""
557 557 chunks = []
558 558 for ca in sorted(caps):
559 559 vals = caps[ca]
560 560 ca = urlreq.quote(ca)
561 561 vals = [urlreq.quote(v) for v in vals]
562 562 if vals:
563 563 ca = "%s=%s" % (ca, ','.join(vals))
564 564 chunks.append(ca)
565 565 return '\n'.join(chunks)
566 566
567 567 bundletypes = {
568 568 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
569 569 # since the unification ssh accepts a header but there
570 570 # is no capability signaling it.
571 571 "HG20": (), # special-cased below
572 572 "HG10UN": ("HG10UN", 'UN'),
573 573 "HG10BZ": ("HG10", 'BZ'),
574 574 "HG10GZ": ("HG10GZ", 'GZ'),
575 575 }
576 576
577 577 # hgweb uses this list to communicate its preferred type
578 578 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
579 579
580 580 class bundle20(object):
581 581 """represent an outgoing bundle2 container
582 582
583 583 Use the `addparam` method to add stream level parameter. and `newpart` to
584 584 populate it. Then call `getchunks` to retrieve all the binary chunks of
585 585 data that compose the bundle2 container."""
586 586
587 587 _magicstring = 'HG20'
588 588
589 589 def __init__(self, ui, capabilities=()):
590 590 self.ui = ui
591 591 self._params = []
592 592 self._parts = []
593 593 self.capabilities = dict(capabilities)
594 594 self._compengine = util.compengines.forbundletype('UN')
595 595 self._compopts = None
596 596
597 597 def setcompression(self, alg, compopts=None):
598 598 """setup core part compression to <alg>"""
599 599 if alg in (None, 'UN'):
600 600 return
601 601 assert not any(n.lower() == 'compression' for n, v in self._params)
602 602 self.addparam('Compression', alg)
603 603 self._compengine = util.compengines.forbundletype(alg)
604 604 self._compopts = compopts
605 605
606 606 @property
607 607 def nbparts(self):
608 608 """total number of parts added to the bundler"""
609 609 return len(self._parts)
610 610
611 611 # methods used to defines the bundle2 content
612 612 def addparam(self, name, value=None):
613 613 """add a stream level parameter"""
614 614 if not name:
615 615 raise ValueError(r'empty parameter name')
616 616 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
617 617 raise ValueError(r'non letter first character: %s' % name)
618 618 self._params.append((name, value))
619 619
620 620 def addpart(self, part):
621 621 """add a new part to the bundle2 container
622 622
623 623 Parts contains the actual applicative payload."""
624 624 assert part.id is None
625 625 part.id = len(self._parts) # very cheap counter
626 626 self._parts.append(part)
627 627
628 628 def newpart(self, typeid, *args, **kwargs):
629 629 """create a new part and add it to the containers
630 630
631 631 As the part is directly added to the containers. For now, this means
632 632 that any failure to properly initialize the part after calling
633 633 ``newpart`` should result in a failure of the whole bundling process.
634 634
635 635 You can still fall back to manually create and add if you need better
636 636 control."""
637 637 part = bundlepart(typeid, *args, **kwargs)
638 638 self.addpart(part)
639 639 return part
640 640
641 641 # methods used to generate the bundle2 stream
642 642 def getchunks(self):
643 643 if self.ui.debugflag:
644 644 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
645 645 if self._params:
646 646 msg.append(' (%i params)' % len(self._params))
647 647 msg.append(' %i parts total\n' % len(self._parts))
648 648 self.ui.debug(''.join(msg))
649 649 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
650 650 yield self._magicstring
651 651 param = self._paramchunk()
652 652 outdebug(self.ui, 'bundle parameter: %s' % param)
653 653 yield _pack(_fstreamparamsize, len(param))
654 654 if param:
655 655 yield param
656 656 for chunk in self._compengine.compressstream(self._getcorechunk(),
657 657 self._compopts):
658 658 yield chunk
659 659
660 660 def _paramchunk(self):
661 661 """return a encoded version of all stream parameters"""
662 662 blocks = []
663 663 for par, value in self._params:
664 664 par = urlreq.quote(par)
665 665 if value is not None:
666 666 value = urlreq.quote(value)
667 667 par = '%s=%s' % (par, value)
668 668 blocks.append(par)
669 669 return ' '.join(blocks)
670 670
671 671 def _getcorechunk(self):
672 672 """yield chunk for the core part of the bundle
673 673
674 674 (all but headers and parameters)"""
675 675 outdebug(self.ui, 'start of parts')
676 676 for part in self._parts:
677 677 outdebug(self.ui, 'bundle part: "%s"' % part.type)
678 678 for chunk in part.getchunks(ui=self.ui):
679 679 yield chunk
680 680 outdebug(self.ui, 'end of bundle')
681 681 yield _pack(_fpartheadersize, 0)
682 682
683 683
684 684 def salvageoutput(self):
685 685 """return a list with a copy of all output parts in the bundle
686 686
687 687 This is meant to be used during error handling to make sure we preserve
688 688 server output"""
689 689 salvaged = []
690 690 for part in self._parts:
691 691 if part.type.startswith('output'):
692 692 salvaged.append(part.copy())
693 693 return salvaged
694 694
695 695
696 696 class unpackermixin(object):
697 697 """A mixin to extract bytes and struct data from a stream"""
698 698
699 699 def __init__(self, fp):
700 700 self._fp = fp
701 701
702 702 def _unpack(self, format):
703 703 """unpack this struct format from the stream
704 704
705 705 This method is meant for internal usage by the bundle2 protocol only.
706 706 They directly manipulate the low level stream including bundle2 level
707 707 instruction.
708 708
709 709 Do not use it to implement higher-level logic or methods."""
710 710 data = self._readexact(struct.calcsize(format))
711 711 return _unpack(format, data)
712 712
713 713 def _readexact(self, size):
714 714 """read exactly <size> bytes from the stream
715 715
716 716 This method is meant for internal usage by the bundle2 protocol only.
717 717 They directly manipulate the low level stream including bundle2 level
718 718 instruction.
719 719
720 720 Do not use it to implement higher-level logic or methods."""
721 721 return changegroup.readexactly(self._fp, size)
722 722
723 723 def getunbundler(ui, fp, magicstring=None):
724 724 """return a valid unbundler object for a given magicstring"""
725 725 if magicstring is None:
726 726 magicstring = changegroup.readexactly(fp, 4)
727 727 magic, version = magicstring[0:2], magicstring[2:4]
728 728 if magic != 'HG':
729 729 ui.debug(
730 730 "error: invalid magic: %r (version %r), should be 'HG'\n"
731 731 % (magic, version))
732 732 raise error.Abort(_('not a Mercurial bundle'))
733 733 unbundlerclass = formatmap.get(version)
734 734 if unbundlerclass is None:
735 735 raise error.Abort(_('unknown bundle version %s') % version)
736 736 unbundler = unbundlerclass(ui, fp)
737 737 indebug(ui, 'start processing of %s stream' % magicstring)
738 738 return unbundler
739 739
740 740 class unbundle20(unpackermixin):
741 741 """interpret a bundle2 stream
742 742
743 743 This class is fed with a binary stream and yields parts through its
744 744 `iterparts` methods."""
745 745
746 746 _magicstring = 'HG20'
747 747
748 748 def __init__(self, ui, fp):
749 749 """If header is specified, we do not read it out of the stream."""
750 750 self.ui = ui
751 751 self._compengine = util.compengines.forbundletype('UN')
752 752 self._compressed = None
753 753 super(unbundle20, self).__init__(fp)
754 754
755 755 @util.propertycache
756 756 def params(self):
757 757 """dictionary of stream level parameters"""
758 758 indebug(self.ui, 'reading bundle2 stream parameters')
759 759 params = {}
760 760 paramssize = self._unpack(_fstreamparamsize)[0]
761 761 if paramssize < 0:
762 762 raise error.BundleValueError('negative bundle param size: %i'
763 763 % paramssize)
764 764 if paramssize:
765 765 params = self._readexact(paramssize)
766 766 params = self._processallparams(params)
767 767 return params
768 768
769 769 def _processallparams(self, paramsblock):
770 770 """"""
771 771 params = util.sortdict()
772 772 for p in paramsblock.split(' '):
773 773 p = p.split('=', 1)
774 774 p = [urlreq.unquote(i) for i in p]
775 775 if len(p) < 2:
776 776 p.append(None)
777 777 self._processparam(*p)
778 778 params[p[0]] = p[1]
779 779 return params
780 780
781 781
782 782 def _processparam(self, name, value):
783 783 """process a parameter, applying its effect if needed
784 784
785 785 Parameter starting with a lower case letter are advisory and will be
786 786 ignored when unknown. Those starting with an upper case letter are
787 787 mandatory and will this function will raise a KeyError when unknown.
788 788
789 789 Note: no option are currently supported. Any input will be either
790 790 ignored or failing.
791 791 """
792 792 if not name:
793 793 raise ValueError(r'empty parameter name')
794 794 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
795 795 raise ValueError(r'non letter first character: %s' % name)
796 796 try:
797 797 handler = b2streamparamsmap[name.lower()]
798 798 except KeyError:
799 799 if name[0:1].islower():
800 800 indebug(self.ui, "ignoring unknown parameter %s" % name)
801 801 else:
802 802 raise error.BundleUnknownFeatureError(params=(name,))
803 803 else:
804 804 handler(self, name, value)
805 805
806 806 def _forwardchunks(self):
807 807 """utility to transfer a bundle2 as binary
808 808
809 809 This is made necessary by the fact the 'getbundle' command over 'ssh'
810 810 have no way to know then the reply end, relying on the bundle to be
811 811 interpreted to know its end. This is terrible and we are sorry, but we
812 812 needed to move forward to get general delta enabled.
813 813 """
814 814 yield self._magicstring
815 815 assert 'params' not in vars(self)
816 816 paramssize = self._unpack(_fstreamparamsize)[0]
817 817 if paramssize < 0:
818 818 raise error.BundleValueError('negative bundle param size: %i'
819 819 % paramssize)
820 820 yield _pack(_fstreamparamsize, paramssize)
821 821 if paramssize:
822 822 params = self._readexact(paramssize)
823 823 self._processallparams(params)
824 824 yield params
825 825 assert self._compengine.bundletype == 'UN'
826 826 # From there, payload might need to be decompressed
827 827 self._fp = self._compengine.decompressorreader(self._fp)
828 828 emptycount = 0
829 829 while emptycount < 2:
830 830 # so we can brainlessly loop
831 831 assert _fpartheadersize == _fpayloadsize
832 832 size = self._unpack(_fpartheadersize)[0]
833 833 yield _pack(_fpartheadersize, size)
834 834 if size:
835 835 emptycount = 0
836 836 else:
837 837 emptycount += 1
838 838 continue
839 839 if size == flaginterrupt:
840 840 continue
841 841 elif size < 0:
842 842 raise error.BundleValueError('negative chunk size: %i')
843 843 yield self._readexact(size)
844 844
845 845
846 846 def iterparts(self):
847 847 """yield all parts contained in the stream"""
848 848 # make sure param have been loaded
849 849 self.params
850 850 # From there, payload need to be decompressed
851 851 self._fp = self._compengine.decompressorreader(self._fp)
852 852 indebug(self.ui, 'start extraction of bundle2 parts')
853 853 headerblock = self._readpartheader()
854 854 while headerblock is not None:
855 855 part = unbundlepart(self.ui, headerblock, self._fp)
856 856 yield part
857 857 # Seek to the end of the part to force it's consumption so the next
858 858 # part can be read. But then seek back to the beginning so the
859 859 # code consuming this generator has a part that starts at 0.
860 860 part.seek(0, 2)
861 861 part.seek(0)
862 862 headerblock = self._readpartheader()
863 863 indebug(self.ui, 'end of bundle2 stream')
864 864
865 865 def _readpartheader(self):
866 866 """reads a part header size and return the bytes blob
867 867
868 868 returns None if empty"""
869 869 headersize = self._unpack(_fpartheadersize)[0]
870 870 if headersize < 0:
871 871 raise error.BundleValueError('negative part header size: %i'
872 872 % headersize)
873 873 indebug(self.ui, 'part header size: %i' % headersize)
874 874 if headersize:
875 875 return self._readexact(headersize)
876 876 return None
877 877
878 878 def compressed(self):
879 879 self.params # load params
880 880 return self._compressed
881 881
882 882 def close(self):
883 883 """close underlying file"""
884 884 if util.safehasattr(self._fp, 'close'):
885 885 return self._fp.close()
886 886
887 887 formatmap = {'20': unbundle20}
888 888
889 889 b2streamparamsmap = {}
890 890
891 891 def b2streamparamhandler(name):
892 892 """register a handler for a stream level parameter"""
893 893 def decorator(func):
894 894 assert name not in formatmap
895 895 b2streamparamsmap[name] = func
896 896 return func
897 897 return decorator
898 898
899 899 @b2streamparamhandler('compression')
900 900 def processcompression(unbundler, param, value):
901 901 """read compression parameter and install payload decompression"""
902 902 if value not in util.compengines.supportedbundletypes:
903 903 raise error.BundleUnknownFeatureError(params=(param,),
904 904 values=(value,))
905 905 unbundler._compengine = util.compengines.forbundletype(value)
906 906 if value is not None:
907 907 unbundler._compressed = True
908 908
909 909 class bundlepart(object):
910 910 """A bundle2 part contains application level payload
911 911
912 912 The part `type` is used to route the part to the application level
913 913 handler.
914 914
915 915 The part payload is contained in ``part.data``. It could be raw bytes or a
916 916 generator of byte chunks.
917 917
918 918 You can add parameters to the part using the ``addparam`` method.
919 919 Parameters can be either mandatory (default) or advisory. Remote side
920 920 should be able to safely ignore the advisory ones.
921 921
922 922 Both data and parameters cannot be modified after the generation has begun.
923 923 """
924 924
925 925 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
926 926 data='', mandatory=True):
927 927 validateparttype(parttype)
928 928 self.id = None
929 929 self.type = parttype
930 930 self._data = data
931 931 self._mandatoryparams = list(mandatoryparams)
932 932 self._advisoryparams = list(advisoryparams)
933 933 # checking for duplicated entries
934 934 self._seenparams = set()
935 935 for pname, __ in self._mandatoryparams + self._advisoryparams:
936 936 if pname in self._seenparams:
937 937 raise error.ProgrammingError('duplicated params: %s' % pname)
938 938 self._seenparams.add(pname)
939 939 # status of the part's generation:
940 940 # - None: not started,
941 941 # - False: currently generated,
942 942 # - True: generation done.
943 943 self._generated = None
944 944 self.mandatory = mandatory
945 945
946 946 def __repr__(self):
947 947 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
948 948 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
949 949 % (cls, id(self), self.id, self.type, self.mandatory))
950 950
951 951 def copy(self):
952 952 """return a copy of the part
953 953
954 954 The new part have the very same content but no partid assigned yet.
955 955 Parts with generated data cannot be copied."""
956 956 assert not util.safehasattr(self.data, 'next')
957 957 return self.__class__(self.type, self._mandatoryparams,
958 958 self._advisoryparams, self._data, self.mandatory)
959 959
960 960 # methods used to defines the part content
961 961 @property
962 962 def data(self):
963 963 return self._data
964 964
965 965 @data.setter
966 966 def data(self, data):
967 967 if self._generated is not None:
968 968 raise error.ReadOnlyPartError('part is being generated')
969 969 self._data = data
970 970
971 971 @property
972 972 def mandatoryparams(self):
973 973 # make it an immutable tuple to force people through ``addparam``
974 974 return tuple(self._mandatoryparams)
975 975
976 976 @property
977 977 def advisoryparams(self):
978 978 # make it an immutable tuple to force people through ``addparam``
979 979 return tuple(self._advisoryparams)
980 980
981 981 def addparam(self, name, value='', mandatory=True):
982 982 """add a parameter to the part
983 983
984 984 If 'mandatory' is set to True, the remote handler must claim support
985 985 for this parameter or the unbundling will be aborted.
986 986
987 987 The 'name' and 'value' cannot exceed 255 bytes each.
988 988 """
989 989 if self._generated is not None:
990 990 raise error.ReadOnlyPartError('part is being generated')
991 991 if name in self._seenparams:
992 992 raise ValueError('duplicated params: %s' % name)
993 993 self._seenparams.add(name)
994 994 params = self._advisoryparams
995 995 if mandatory:
996 996 params = self._mandatoryparams
997 997 params.append((name, value))
998 998
999 999 # methods used to generates the bundle2 stream
1000 1000 def getchunks(self, ui):
1001 1001 if self._generated is not None:
1002 1002 raise error.ProgrammingError('part can only be consumed once')
1003 1003 self._generated = False
1004 1004
1005 1005 if ui.debugflag:
1006 1006 msg = ['bundle2-output-part: "%s"' % self.type]
1007 1007 if not self.mandatory:
1008 1008 msg.append(' (advisory)')
1009 1009 nbmp = len(self.mandatoryparams)
1010 1010 nbap = len(self.advisoryparams)
1011 1011 if nbmp or nbap:
1012 1012 msg.append(' (params:')
1013 1013 if nbmp:
1014 1014 msg.append(' %i mandatory' % nbmp)
1015 1015 if nbap:
1016 1016 msg.append(' %i advisory' % nbmp)
1017 1017 msg.append(')')
1018 1018 if not self.data:
1019 1019 msg.append(' empty payload')
1020 1020 elif (util.safehasattr(self.data, 'next')
1021 1021 or util.safehasattr(self.data, '__next__')):
1022 1022 msg.append(' streamed payload')
1023 1023 else:
1024 1024 msg.append(' %i bytes payload' % len(self.data))
1025 1025 msg.append('\n')
1026 1026 ui.debug(''.join(msg))
1027 1027
1028 1028 #### header
1029 1029 if self.mandatory:
1030 1030 parttype = self.type.upper()
1031 1031 else:
1032 1032 parttype = self.type.lower()
1033 1033 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1034 1034 ## parttype
1035 1035 header = [_pack(_fparttypesize, len(parttype)),
1036 1036 parttype, _pack(_fpartid, self.id),
1037 1037 ]
1038 1038 ## parameters
1039 1039 # count
1040 1040 manpar = self.mandatoryparams
1041 1041 advpar = self.advisoryparams
1042 1042 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1043 1043 # size
1044 1044 parsizes = []
1045 1045 for key, value in manpar:
1046 1046 parsizes.append(len(key))
1047 1047 parsizes.append(len(value))
1048 1048 for key, value in advpar:
1049 1049 parsizes.append(len(key))
1050 1050 parsizes.append(len(value))
1051 1051 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1052 1052 header.append(paramsizes)
1053 1053 # key, value
1054 1054 for key, value in manpar:
1055 1055 header.append(key)
1056 1056 header.append(value)
1057 1057 for key, value in advpar:
1058 1058 header.append(key)
1059 1059 header.append(value)
1060 1060 ## finalize header
1061 1061 try:
1062 1062 headerchunk = ''.join(header)
1063 1063 except TypeError:
1064 1064 raise TypeError(r'Found a non-bytes trying to '
1065 1065 r'build bundle part header: %r' % header)
1066 1066 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1067 1067 yield _pack(_fpartheadersize, len(headerchunk))
1068 1068 yield headerchunk
1069 1069 ## payload
1070 1070 try:
1071 1071 for chunk in self._payloadchunks():
1072 1072 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1073 1073 yield _pack(_fpayloadsize, len(chunk))
1074 1074 yield chunk
1075 1075 except GeneratorExit:
1076 1076 # GeneratorExit means that nobody is listening for our
1077 1077 # results anyway, so just bail quickly rather than trying
1078 1078 # to produce an error part.
1079 1079 ui.debug('bundle2-generatorexit\n')
1080 1080 raise
1081 1081 except BaseException as exc:
1082 1082 bexc = util.forcebytestr(exc)
1083 1083 # backup exception data for later
1084 1084 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1085 1085 % bexc)
1086 1086 tb = sys.exc_info()[2]
1087 1087 msg = 'unexpected error: %s' % bexc
1088 1088 interpart = bundlepart('error:abort', [('message', msg)],
1089 1089 mandatory=False)
1090 1090 interpart.id = 0
1091 1091 yield _pack(_fpayloadsize, -1)
1092 1092 for chunk in interpart.getchunks(ui=ui):
1093 1093 yield chunk
1094 1094 outdebug(ui, 'closing payload chunk')
1095 1095 # abort current part payload
1096 1096 yield _pack(_fpayloadsize, 0)
1097 1097 pycompat.raisewithtb(exc, tb)
1098 1098 # end of payload
1099 1099 outdebug(ui, 'closing payload chunk')
1100 1100 yield _pack(_fpayloadsize, 0)
1101 1101 self._generated = True
1102 1102
1103 1103 def _payloadchunks(self):
1104 1104 """yield chunks of a the part payload
1105 1105
1106 1106 Exists to handle the different methods to provide data to a part."""
1107 1107 # we only support fixed size data now.
1108 1108 # This will be improved in the future.
1109 1109 if (util.safehasattr(self.data, 'next')
1110 1110 or util.safehasattr(self.data, '__next__')):
1111 1111 buff = util.chunkbuffer(self.data)
1112 1112 chunk = buff.read(preferedchunksize)
1113 1113 while chunk:
1114 1114 yield chunk
1115 1115 chunk = buff.read(preferedchunksize)
1116 1116 elif len(self.data):
1117 1117 yield self.data
1118 1118
1119 1119
1120 1120 flaginterrupt = -1
1121 1121
1122 1122 class interrupthandler(unpackermixin):
1123 1123 """read one part and process it with restricted capability
1124 1124
1125 1125 This allows to transmit exception raised on the producer size during part
1126 1126 iteration while the consumer is reading a part.
1127 1127
1128 1128 Part processed in this manner only have access to a ui object,"""
1129 1129
1130 1130 def __init__(self, ui, fp):
1131 1131 super(interrupthandler, self).__init__(fp)
1132 1132 self.ui = ui
1133 1133
1134 1134 def _readpartheader(self):
1135 1135 """reads a part header size and return the bytes blob
1136 1136
1137 1137 returns None if empty"""
1138 1138 headersize = self._unpack(_fpartheadersize)[0]
1139 1139 if headersize < 0:
1140 1140 raise error.BundleValueError('negative part header size: %i'
1141 1141 % headersize)
1142 1142 indebug(self.ui, 'part header size: %i\n' % headersize)
1143 1143 if headersize:
1144 1144 return self._readexact(headersize)
1145 1145 return None
1146 1146
1147 1147 def __call__(self):
1148 1148
1149 1149 self.ui.debug('bundle2-input-stream-interrupt:'
1150 1150 ' opening out of band context\n')
1151 1151 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1152 1152 headerblock = self._readpartheader()
1153 1153 if headerblock is None:
1154 1154 indebug(self.ui, 'no part found during interruption.')
1155 1155 return
1156 1156 part = unbundlepart(self.ui, headerblock, self._fp)
1157 1157 op = interruptoperation(self.ui)
1158 1158 hardabort = False
1159 1159 try:
1160 1160 _processpart(op, part)
1161 1161 except (SystemExit, KeyboardInterrupt):
1162 1162 hardabort = True
1163 1163 raise
1164 1164 finally:
1165 1165 if not hardabort:
1166 1166 part.seek(0, 2)
1167 1167 self.ui.debug('bundle2-input-stream-interrupt:'
1168 1168 ' closing out of band context\n')
1169 1169
1170 1170 class interruptoperation(object):
1171 1171 """A limited operation to be use by part handler during interruption
1172 1172
1173 1173 It only have access to an ui object.
1174 1174 """
1175 1175
1176 1176 def __init__(self, ui):
1177 1177 self.ui = ui
1178 1178 self.reply = None
1179 1179 self.captureoutput = False
1180 1180
1181 1181 @property
1182 1182 def repo(self):
1183 1183 raise error.ProgrammingError('no repo access from stream interruption')
1184 1184
1185 1185 def gettransaction(self):
1186 1186 raise TransactionUnavailable('no repo access from stream interruption')
1187 1187
1188 1188 class unbundlepart(unpackermixin):
1189 1189 """a bundle part read from a bundle"""
1190 1190
1191 1191 def __init__(self, ui, header, fp):
1192 1192 super(unbundlepart, self).__init__(fp)
1193 1193 self._seekable = (util.safehasattr(fp, 'seek') and
1194 1194 util.safehasattr(fp, 'tell'))
1195 1195 self.ui = ui
1196 1196 # unbundle state attr
1197 1197 self._headerdata = header
1198 1198 self._headeroffset = 0
1199 1199 self._initialized = False
1200 1200 self.consumed = False
1201 1201 # part data
1202 1202 self.id = None
1203 1203 self.type = None
1204 1204 self.mandatoryparams = None
1205 1205 self.advisoryparams = None
1206 1206 self.params = None
1207 1207 self.mandatorykeys = ()
1208 1208 self._payloadstream = None
1209 1209 self._readheader()
1210 1210 self._mandatory = None
1211 1211 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1212 1212 self._pos = 0
1213 1213
1214 1214 def _fromheader(self, size):
1215 1215 """return the next <size> byte from the header"""
1216 1216 offset = self._headeroffset
1217 1217 data = self._headerdata[offset:(offset + size)]
1218 1218 self._headeroffset = offset + size
1219 1219 return data
1220 1220
1221 1221 def _unpackheader(self, format):
1222 1222 """read given format from header
1223 1223
1224 1224 This automatically compute the size of the format to read."""
1225 1225 data = self._fromheader(struct.calcsize(format))
1226 1226 return _unpack(format, data)
1227 1227
1228 1228 def _initparams(self, mandatoryparams, advisoryparams):
1229 1229 """internal function to setup all logic related parameters"""
1230 1230 # make it read only to prevent people touching it by mistake.
1231 1231 self.mandatoryparams = tuple(mandatoryparams)
1232 1232 self.advisoryparams = tuple(advisoryparams)
1233 1233 # user friendly UI
1234 1234 self.params = util.sortdict(self.mandatoryparams)
1235 1235 self.params.update(self.advisoryparams)
1236 1236 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1237 1237
1238 1238 def _payloadchunks(self, chunknum=0):
1239 1239 '''seek to specified chunk and start yielding data'''
1240 1240 if len(self._chunkindex) == 0:
1241 1241 assert chunknum == 0, 'Must start with chunk 0'
1242 1242 self._chunkindex.append((0, self._tellfp()))
1243 1243 else:
1244 1244 assert chunknum < len(self._chunkindex), \
1245 1245 'Unknown chunk %d' % chunknum
1246 1246 self._seekfp(self._chunkindex[chunknum][1])
1247 1247
1248 1248 pos = self._chunkindex[chunknum][0]
1249 1249 payloadsize = self._unpack(_fpayloadsize)[0]
1250 1250 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1251 1251 while payloadsize:
1252 1252 if payloadsize == flaginterrupt:
1253 1253 # interruption detection, the handler will now read a
1254 1254 # single part and process it.
1255 1255 interrupthandler(self.ui, self._fp)()
1256 1256 elif payloadsize < 0:
1257 1257 msg = 'negative payload chunk size: %i' % payloadsize
1258 1258 raise error.BundleValueError(msg)
1259 1259 else:
1260 1260 result = self._readexact(payloadsize)
1261 1261 chunknum += 1
1262 1262 pos += payloadsize
1263 1263 if chunknum == len(self._chunkindex):
1264 1264 self._chunkindex.append((pos, self._tellfp()))
1265 1265 yield result
1266 1266 payloadsize = self._unpack(_fpayloadsize)[0]
1267 1267 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1268 1268
1269 1269 def _findchunk(self, pos):
1270 1270 '''for a given payload position, return a chunk number and offset'''
1271 1271 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1272 1272 if ppos == pos:
1273 1273 return chunk, 0
1274 1274 elif ppos > pos:
1275 1275 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1276 1276 raise ValueError('Unknown chunk')
1277 1277
1278 1278 def _readheader(self):
1279 1279 """read the header and setup the object"""
1280 1280 typesize = self._unpackheader(_fparttypesize)[0]
1281 1281 self.type = self._fromheader(typesize)
1282 1282 indebug(self.ui, 'part type: "%s"' % self.type)
1283 1283 self.id = self._unpackheader(_fpartid)[0]
1284 1284 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1285 1285 # extract mandatory bit from type
1286 1286 self.mandatory = (self.type != self.type.lower())
1287 1287 self.type = self.type.lower()
1288 1288 ## reading parameters
1289 1289 # param count
1290 1290 mancount, advcount = self._unpackheader(_fpartparamcount)
1291 1291 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1292 1292 # param size
1293 1293 fparamsizes = _makefpartparamsizes(mancount + advcount)
1294 1294 paramsizes = self._unpackheader(fparamsizes)
1295 1295 # make it a list of couple again
1296 1296 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1297 1297 # split mandatory from advisory
1298 1298 mansizes = paramsizes[:mancount]
1299 1299 advsizes = paramsizes[mancount:]
1300 1300 # retrieve param value
1301 1301 manparams = []
1302 1302 for key, value in mansizes:
1303 1303 manparams.append((self._fromheader(key), self._fromheader(value)))
1304 1304 advparams = []
1305 1305 for key, value in advsizes:
1306 1306 advparams.append((self._fromheader(key), self._fromheader(value)))
1307 1307 self._initparams(manparams, advparams)
1308 1308 ## part payload
1309 1309 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1310 1310 # we read the data, tell it
1311 1311 self._initialized = True
1312 1312
1313 1313 def read(self, size=None):
1314 1314 """read payload data"""
1315 1315 if not self._initialized:
1316 1316 self._readheader()
1317 1317 if size is None:
1318 1318 data = self._payloadstream.read()
1319 1319 else:
1320 1320 data = self._payloadstream.read(size)
1321 1321 self._pos += len(data)
1322 1322 if size is None or len(data) < size:
1323 1323 if not self.consumed and self._pos:
1324 1324 self.ui.debug('bundle2-input-part: total payload size %i\n'
1325 1325 % self._pos)
1326 1326 self.consumed = True
1327 1327 return data
1328 1328
1329 1329 def tell(self):
1330 1330 return self._pos
1331 1331
1332 1332 def seek(self, offset, whence=0):
1333 1333 if whence == 0:
1334 1334 newpos = offset
1335 1335 elif whence == 1:
1336 1336 newpos = self._pos + offset
1337 1337 elif whence == 2:
1338 1338 if not self.consumed:
1339 1339 self.read()
1340 1340 newpos = self._chunkindex[-1][0] - offset
1341 1341 else:
1342 1342 raise ValueError('Unknown whence value: %r' % (whence,))
1343 1343
1344 1344 if newpos > self._chunkindex[-1][0] and not self.consumed:
1345 1345 self.read()
1346 1346 if not 0 <= newpos <= self._chunkindex[-1][0]:
1347 1347 raise ValueError('Offset out of range')
1348 1348
1349 1349 if self._pos != newpos:
1350 1350 chunk, internaloffset = self._findchunk(newpos)
1351 1351 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1352 1352 adjust = self.read(internaloffset)
1353 1353 if len(adjust) != internaloffset:
1354 1354 raise error.Abort(_('Seek failed\n'))
1355 1355 self._pos = newpos
1356 1356
1357 1357 def _seekfp(self, offset, whence=0):
1358 1358 """move the underlying file pointer
1359 1359
1360 1360 This method is meant for internal usage by the bundle2 protocol only.
1361 1361 They directly manipulate the low level stream including bundle2 level
1362 1362 instruction.
1363 1363
1364 1364 Do not use it to implement higher-level logic or methods."""
1365 1365 if self._seekable:
1366 1366 return self._fp.seek(offset, whence)
1367 1367 else:
1368 1368 raise NotImplementedError(_('File pointer is not seekable'))
1369 1369
1370 1370 def _tellfp(self):
1371 1371 """return the file offset, or None if file is not seekable
1372 1372
1373 1373 This method is meant for internal usage by the bundle2 protocol only.
1374 1374 They directly manipulate the low level stream including bundle2 level
1375 1375 instruction.
1376 1376
1377 1377 Do not use it to implement higher-level logic or methods."""
1378 1378 if self._seekable:
1379 1379 try:
1380 1380 return self._fp.tell()
1381 1381 except IOError as e:
1382 1382 if e.errno == errno.ESPIPE:
1383 1383 self._seekable = False
1384 1384 else:
1385 1385 raise
1386 1386 return None
1387 1387
1388 1388 # These are only the static capabilities.
1389 1389 # Check the 'getrepocaps' function for the rest.
1390 1390 capabilities = {'HG20': (),
1391 1391 'error': ('abort', 'unsupportedcontent', 'pushraced',
1392 1392 'pushkey'),
1393 1393 'listkeys': (),
1394 1394 'pushkey': (),
1395 1395 'digests': tuple(sorted(util.DIGESTS.keys())),
1396 1396 'remote-changegroup': ('http', 'https'),
1397 1397 'hgtagsfnodes': (),
1398 1398 }
1399 1399
1400 1400 def getrepocaps(repo, allowpushback=False):
1401 1401 """return the bundle2 capabilities for a given repo
1402 1402
1403 1403 Exists to allow extensions (like evolution) to mutate the capabilities.
1404 1404 """
1405 1405 caps = capabilities.copy()
1406 1406 caps['changegroup'] = tuple(sorted(
1407 1407 changegroup.supportedincomingversions(repo)))
1408 1408 if obsolete.isenabled(repo, obsolete.exchangeopt):
1409 1409 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1410 1410 caps['obsmarkers'] = supportedformat
1411 1411 if allowpushback:
1412 1412 caps['pushback'] = ()
1413 1413 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1414 1414 if cpmode == 'check-related':
1415 1415 caps['checkheads'] = ('related',)
1416 1416 return caps
1417 1417
1418 1418 def bundle2caps(remote):
1419 1419 """return the bundle capabilities of a peer as dict"""
1420 1420 raw = remote.capable('bundle2')
1421 1421 if not raw and raw != '':
1422 1422 return {}
1423 1423 capsblob = urlreq.unquote(remote.capable('bundle2'))
1424 1424 return decodecaps(capsblob)
1425 1425
1426 1426 def obsmarkersversion(caps):
1427 1427 """extract the list of supported obsmarkers versions from a bundle2caps dict
1428 1428 """
1429 1429 obscaps = caps.get('obsmarkers', ())
1430 1430 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1431 1431
1432 1432 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1433 1433 vfs=None, compression=None, compopts=None):
1434 1434 if bundletype.startswith('HG10'):
1435 1435 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1436 1436 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1437 1437 compression=compression, compopts=compopts)
1438 1438 elif not bundletype.startswith('HG20'):
1439 1439 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1440 1440
1441 1441 caps = {}
1442 1442 if 'obsolescence' in opts:
1443 1443 caps['obsmarkers'] = ('V1',)
1444 1444 bundle = bundle20(ui, caps)
1445 1445 bundle.setcompression(compression, compopts)
1446 1446 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1447 1447 chunkiter = bundle.getchunks()
1448 1448
1449 1449 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1450 1450
1451 1451 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1452 1452 # We should eventually reconcile this logic with the one behind
1453 1453 # 'exchange.getbundle2partsgenerator'.
1454 1454 #
1455 1455 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1456 1456 # different right now. So we keep them separated for now for the sake of
1457 1457 # simplicity.
1458 1458
1459 1459 # we always want a changegroup in such bundle
1460 1460 cgversion = opts.get('cg.version')
1461 1461 if cgversion is None:
1462 1462 cgversion = changegroup.safeversion(repo)
1463 1463 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1464 1464 part = bundler.newpart('changegroup', data=cg.getchunks())
1465 1465 part.addparam('version', cg.version)
1466 1466 if 'clcount' in cg.extras:
1467 1467 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1468 1468 mandatory=False)
1469 1469 if opts.get('phases') and repo.revs('%ln and secret()',
1470 1470 outgoing.missingheads):
1471 1471 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1472 1472
1473 1473 addparttagsfnodescache(repo, bundler, outgoing)
1474 1474
1475 1475 if opts.get('obsolescence', False):
1476 1476 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1477 1477 buildobsmarkerspart(bundler, obsmarkers)
1478 1478
1479 1479 if opts.get('phases', False):
1480 1480 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1481 1481 phasedata = phases.binaryencode(headsbyphase)
1482 1482 bundler.newpart('phase-heads', data=phasedata)
1483 1483
1484 1484 def addparttagsfnodescache(repo, bundler, outgoing):
1485 1485 # we include the tags fnode cache for the bundle changeset
1486 1486 # (as an optional parts)
1487 1487 cache = tags.hgtagsfnodescache(repo.unfiltered())
1488 1488 chunks = []
1489 1489
1490 1490 # .hgtags fnodes are only relevant for head changesets. While we could
1491 1491 # transfer values for all known nodes, there will likely be little to
1492 1492 # no benefit.
1493 1493 #
1494 1494 # We don't bother using a generator to produce output data because
1495 1495 # a) we only have 40 bytes per head and even esoteric numbers of heads
1496 1496 # consume little memory (1M heads is 40MB) b) we don't want to send the
1497 1497 # part if we don't have entries and knowing if we have entries requires
1498 1498 # cache lookups.
1499 1499 for node in outgoing.missingheads:
1500 1500 # Don't compute missing, as this may slow down serving.
1501 1501 fnode = cache.getfnode(node, computemissing=False)
1502 1502 if fnode is not None:
1503 1503 chunks.extend([node, fnode])
1504 1504
1505 1505 if chunks:
1506 1506 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1507 1507
1508 1508 def buildobsmarkerspart(bundler, markers):
1509 1509 """add an obsmarker part to the bundler with <markers>
1510 1510
1511 1511 No part is created if markers is empty.
1512 1512 Raises ValueError if the bundler doesn't support any known obsmarker format.
1513 1513 """
1514 1514 if not markers:
1515 1515 return None
1516 1516
1517 1517 remoteversions = obsmarkersversion(bundler.capabilities)
1518 1518 version = obsolete.commonversion(remoteversions)
1519 1519 if version is None:
1520 1520 raise ValueError('bundler does not support common obsmarker format')
1521 1521 stream = obsolete.encodemarkers(markers, True, version=version)
1522 1522 return bundler.newpart('obsmarkers', data=stream)
1523 1523
1524 1524 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1525 1525 compopts=None):
1526 1526 """Write a bundle file and return its filename.
1527 1527
1528 1528 Existing files will not be overwritten.
1529 1529 If no filename is specified, a temporary file is created.
1530 1530 bz2 compression can be turned off.
1531 1531 The bundle file will be deleted in case of errors.
1532 1532 """
1533 1533
1534 1534 if bundletype == "HG20":
1535 1535 bundle = bundle20(ui)
1536 1536 bundle.setcompression(compression, compopts)
1537 1537 part = bundle.newpart('changegroup', data=cg.getchunks())
1538 1538 part.addparam('version', cg.version)
1539 1539 if 'clcount' in cg.extras:
1540 1540 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1541 1541 mandatory=False)
1542 1542 chunkiter = bundle.getchunks()
1543 1543 else:
1544 1544 # compression argument is only for the bundle2 case
1545 1545 assert compression is None
1546 1546 if cg.version != '01':
1547 1547 raise error.Abort(_('old bundle types only supports v1 '
1548 1548 'changegroups'))
1549 1549 header, comp = bundletypes[bundletype]
1550 1550 if comp not in util.compengines.supportedbundletypes:
1551 1551 raise error.Abort(_('unknown stream compression type: %s')
1552 1552 % comp)
1553 1553 compengine = util.compengines.forbundletype(comp)
1554 1554 def chunkiter():
1555 1555 yield header
1556 1556 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1557 1557 yield chunk
1558 1558 chunkiter = chunkiter()
1559 1559
1560 1560 # parse the changegroup data, otherwise we will block
1561 1561 # in case of sshrepo because we don't know the end of the stream
1562 1562 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1563 1563
1564 1564 def combinechangegroupresults(op):
1565 1565 """logic to combine 0 or more addchangegroup results into one"""
1566 1566 results = [r.get('return', 0)
1567 1567 for r in op.records['changegroup']]
1568 1568 changedheads = 0
1569 1569 result = 1
1570 1570 for ret in results:
1571 1571 # If any changegroup result is 0, return 0
1572 1572 if ret == 0:
1573 1573 result = 0
1574 1574 break
1575 1575 if ret < -1:
1576 1576 changedheads += ret + 1
1577 1577 elif ret > 1:
1578 1578 changedheads += ret - 1
1579 1579 if changedheads > 0:
1580 1580 result = 1 + changedheads
1581 1581 elif changedheads < 0:
1582 1582 result = -1 + changedheads
1583 1583 return result
1584 1584
1585 1585 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1586 1586 'targetphase'))
1587 1587 def handlechangegroup(op, inpart):
1588 1588 """apply a changegroup part on the repo
1589 1589
1590 1590 This is a very early implementation that will massive rework before being
1591 1591 inflicted to any end-user.
1592 1592 """
1593 1593 tr = op.gettransaction()
1594 1594 unpackerversion = inpart.params.get('version', '01')
1595 1595 # We should raise an appropriate exception here
1596 1596 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1597 1597 # the source and url passed here are overwritten by the one contained in
1598 1598 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1599 1599 nbchangesets = None
1600 1600 if 'nbchanges' in inpart.params:
1601 1601 nbchangesets = int(inpart.params.get('nbchanges'))
1602 1602 if ('treemanifest' in inpart.params and
1603 1603 'treemanifest' not in op.repo.requirements):
1604 1604 if len(op.repo.changelog) != 0:
1605 1605 raise error.Abort(_(
1606 1606 "bundle contains tree manifests, but local repo is "
1607 1607 "non-empty and does not use tree manifests"))
1608 1608 op.repo.requirements.add('treemanifest')
1609 1609 op.repo._applyopenerreqs()
1610 1610 op.repo._writerequirements()
1611 1611 extrakwargs = {}
1612 1612 targetphase = inpart.params.get('targetphase')
1613 1613 if targetphase is not None:
1614 1614 extrakwargs['targetphase'] = int(targetphase)
1615 1615 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1616 1616 expectedtotal=nbchangesets, **extrakwargs)
1617 1617 if op.reply is not None:
1618 1618 # This is definitely not the final form of this
1619 1619 # return. But one need to start somewhere.
1620 1620 part = op.reply.newpart('reply:changegroup', mandatory=False)
1621 1621 part.addparam(
1622 1622 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1623 1623 part.addparam('return', '%i' % ret, mandatory=False)
1624 1624 assert not inpart.read()
1625 1625
1626 1626 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1627 1627 ['digest:%s' % k for k in util.DIGESTS.keys()])
1628 1628 @parthandler('remote-changegroup', _remotechangegroupparams)
1629 1629 def handleremotechangegroup(op, inpart):
1630 1630 """apply a bundle10 on the repo, given an url and validation information
1631 1631
1632 1632 All the information about the remote bundle to import are given as
1633 1633 parameters. The parameters include:
1634 1634 - url: the url to the bundle10.
1635 1635 - size: the bundle10 file size. It is used to validate what was
1636 1636 retrieved by the client matches the server knowledge about the bundle.
1637 1637 - digests: a space separated list of the digest types provided as
1638 1638 parameters.
1639 1639 - digest:<digest-type>: the hexadecimal representation of the digest with
1640 1640 that name. Like the size, it is used to validate what was retrieved by
1641 1641 the client matches what the server knows about the bundle.
1642 1642
1643 1643 When multiple digest types are given, all of them are checked.
1644 1644 """
1645 1645 try:
1646 1646 raw_url = inpart.params['url']
1647 1647 except KeyError:
1648 1648 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1649 1649 parsed_url = util.url(raw_url)
1650 1650 if parsed_url.scheme not in capabilities['remote-changegroup']:
1651 1651 raise error.Abort(_('remote-changegroup does not support %s urls') %
1652 1652 parsed_url.scheme)
1653 1653
1654 1654 try:
1655 1655 size = int(inpart.params['size'])
1656 1656 except ValueError:
1657 1657 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1658 1658 % 'size')
1659 1659 except KeyError:
1660 1660 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1661 1661
1662 1662 digests = {}
1663 1663 for typ in inpart.params.get('digests', '').split():
1664 1664 param = 'digest:%s' % typ
1665 1665 try:
1666 1666 value = inpart.params[param]
1667 1667 except KeyError:
1668 1668 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1669 1669 param)
1670 1670 digests[typ] = value
1671 1671
1672 1672 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1673 1673
1674 1674 tr = op.gettransaction()
1675 1675 from . import exchange
1676 1676 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1677 1677 if not isinstance(cg, changegroup.cg1unpacker):
1678 1678 raise error.Abort(_('%s: not a bundle version 1.0') %
1679 1679 util.hidepassword(raw_url))
1680 1680 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1681 1681 if op.reply is not None:
1682 1682 # This is definitely not the final form of this
1683 1683 # return. But one need to start somewhere.
1684 1684 part = op.reply.newpart('reply:changegroup')
1685 1685 part.addparam(
1686 1686 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1687 1687 part.addparam('return', '%i' % ret, mandatory=False)
1688 1688 try:
1689 1689 real_part.validate()
1690 1690 except error.Abort as e:
1691 1691 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1692 1692 (util.hidepassword(raw_url), str(e)))
1693 1693 assert not inpart.read()
1694 1694
1695 1695 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1696 1696 def handlereplychangegroup(op, inpart):
1697 1697 ret = int(inpart.params['return'])
1698 1698 replyto = int(inpart.params['in-reply-to'])
1699 1699 op.records.add('changegroup', {'return': ret}, replyto)
1700 1700
1701 1701 @parthandler('check:heads')
1702 1702 def handlecheckheads(op, inpart):
1703 1703 """check that head of the repo did not change
1704 1704
1705 1705 This is used to detect a push race when using unbundle.
1706 1706 This replaces the "heads" argument of unbundle."""
1707 1707 h = inpart.read(20)
1708 1708 heads = []
1709 1709 while len(h) == 20:
1710 1710 heads.append(h)
1711 1711 h = inpart.read(20)
1712 1712 assert not h
1713 1713 # Trigger a transaction so that we are guaranteed to have the lock now.
1714 1714 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1715 1715 op.gettransaction()
1716 1716 if sorted(heads) != sorted(op.repo.heads()):
1717 1717 raise error.PushRaced('repository changed while pushing - '
1718 1718 'please try again')
1719 1719
1720 1720 @parthandler('check:updated-heads')
1721 1721 def handlecheckupdatedheads(op, inpart):
1722 1722 """check for race on the heads touched by a push
1723 1723
1724 1724 This is similar to 'check:heads' but focus on the heads actually updated
1725 1725 during the push. If other activities happen on unrelated heads, it is
1726 1726 ignored.
1727 1727
1728 1728 This allow server with high traffic to avoid push contention as long as
1729 1729 unrelated parts of the graph are involved."""
1730 1730 h = inpart.read(20)
1731 1731 heads = []
1732 1732 while len(h) == 20:
1733 1733 heads.append(h)
1734 1734 h = inpart.read(20)
1735 1735 assert not h
1736 1736 # trigger a transaction so that we are guaranteed to have the lock now.
1737 1737 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1738 1738 op.gettransaction()
1739 1739
1740 1740 currentheads = set()
1741 1741 for ls in op.repo.branchmap().itervalues():
1742 1742 currentheads.update(ls)
1743 1743
1744 1744 for h in heads:
1745 1745 if h not in currentheads:
1746 1746 raise error.PushRaced('repository changed while pushing - '
1747 1747 'please try again')
1748 1748
1749 1749 @parthandler('output')
1750 1750 def handleoutput(op, inpart):
1751 1751 """forward output captured on the server to the client"""
1752 1752 for line in inpart.read().splitlines():
1753 1753 op.ui.status(_('remote: %s\n') % line)
1754 1754
1755 1755 @parthandler('replycaps')
1756 1756 def handlereplycaps(op, inpart):
1757 1757 """Notify that a reply bundle should be created
1758 1758
1759 1759 The payload contains the capabilities information for the reply"""
1760 1760 caps = decodecaps(inpart.read())
1761 1761 if op.reply is None:
1762 1762 op.reply = bundle20(op.ui, caps)
1763 1763
1764 1764 class AbortFromPart(error.Abort):
1765 1765 """Sub-class of Abort that denotes an error from a bundle2 part."""
1766 1766
1767 1767 @parthandler('error:abort', ('message', 'hint'))
1768 1768 def handleerrorabort(op, inpart):
1769 1769 """Used to transmit abort error over the wire"""
1770 1770 raise AbortFromPart(inpart.params['message'],
1771 1771 hint=inpart.params.get('hint'))
1772 1772
1773 1773 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1774 1774 'in-reply-to'))
1775 1775 def handleerrorpushkey(op, inpart):
1776 1776 """Used to transmit failure of a mandatory pushkey over the wire"""
1777 1777 kwargs = {}
1778 1778 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1779 1779 value = inpart.params.get(name)
1780 1780 if value is not None:
1781 1781 kwargs[name] = value
1782 1782 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1783 1783
1784 1784 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1785 1785 def handleerrorunsupportedcontent(op, inpart):
1786 1786 """Used to transmit unknown content error over the wire"""
1787 1787 kwargs = {}
1788 1788 parttype = inpart.params.get('parttype')
1789 1789 if parttype is not None:
1790 1790 kwargs['parttype'] = parttype
1791 1791 params = inpart.params.get('params')
1792 1792 if params is not None:
1793 1793 kwargs['params'] = params.split('\0')
1794 1794
1795 1795 raise error.BundleUnknownFeatureError(**kwargs)
1796 1796
1797 1797 @parthandler('error:pushraced', ('message',))
1798 1798 def handleerrorpushraced(op, inpart):
1799 1799 """Used to transmit push race error over the wire"""
1800 1800 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1801 1801
1802 1802 @parthandler('listkeys', ('namespace',))
1803 1803 def handlelistkeys(op, inpart):
1804 1804 """retrieve pushkey namespace content stored in a bundle2"""
1805 1805 namespace = inpart.params['namespace']
1806 1806 r = pushkey.decodekeys(inpart.read())
1807 1807 op.records.add('listkeys', (namespace, r))
1808 1808
1809 1809 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1810 1810 def handlepushkey(op, inpart):
1811 1811 """process a pushkey request"""
1812 1812 dec = pushkey.decode
1813 1813 namespace = dec(inpart.params['namespace'])
1814 1814 key = dec(inpart.params['key'])
1815 1815 old = dec(inpart.params['old'])
1816 1816 new = dec(inpart.params['new'])
1817 1817 # Grab the transaction to ensure that we have the lock before performing the
1818 1818 # pushkey.
1819 1819 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1820 1820 op.gettransaction()
1821 1821 ret = op.repo.pushkey(namespace, key, old, new)
1822 1822 record = {'namespace': namespace,
1823 1823 'key': key,
1824 1824 'old': old,
1825 1825 'new': new}
1826 1826 op.records.add('pushkey', record)
1827 1827 if op.reply is not None:
1828 1828 rpart = op.reply.newpart('reply:pushkey')
1829 1829 rpart.addparam(
1830 1830 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1831 1831 rpart.addparam('return', '%i' % ret, mandatory=False)
1832 1832 if inpart.mandatory and not ret:
1833 1833 kwargs = {}
1834 1834 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1835 1835 if key in inpart.params:
1836 1836 kwargs[key] = inpart.params[key]
1837 1837 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1838 1838
1839 1839 @parthandler('phase-heads')
1840 1840 def handlephases(op, inpart):
1841 1841 """apply phases from bundle part to repo"""
1842 1842 headsbyphase = phases.binarydecode(inpart)
1843 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1843 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
1844 1844 op.records.add('phase-heads', {})
1845 1845
1846 1846 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1847 1847 def handlepushkeyreply(op, inpart):
1848 1848 """retrieve the result of a pushkey request"""
1849 1849 ret = int(inpart.params['return'])
1850 1850 partid = int(inpart.params['in-reply-to'])
1851 1851 op.records.add('pushkey', {'return': ret}, partid)
1852 1852
1853 1853 @parthandler('obsmarkers')
1854 1854 def handleobsmarker(op, inpart):
1855 1855 """add a stream of obsmarkers to the repo"""
1856 1856 tr = op.gettransaction()
1857 1857 markerdata = inpart.read()
1858 1858 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1859 1859 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1860 1860 % len(markerdata))
1861 1861 # The mergemarkers call will crash if marker creation is not enabled.
1862 1862 # we want to avoid this if the part is advisory.
1863 1863 if not inpart.mandatory and op.repo.obsstore.readonly:
1864 1864 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1865 1865 return
1866 1866 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1867 1867 op.repo.invalidatevolatilesets()
1868 1868 if new:
1869 1869 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1870 1870 op.records.add('obsmarkers', {'new': new})
1871 1871 if op.reply is not None:
1872 1872 rpart = op.reply.newpart('reply:obsmarkers')
1873 1873 rpart.addparam(
1874 1874 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1875 1875 rpart.addparam('new', '%i' % new, mandatory=False)
1876 1876
1877 1877
1878 1878 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1879 1879 def handleobsmarkerreply(op, inpart):
1880 1880 """retrieve the result of a pushkey request"""
1881 1881 ret = int(inpart.params['new'])
1882 1882 partid = int(inpart.params['in-reply-to'])
1883 1883 op.records.add('obsmarkers', {'new': ret}, partid)
1884 1884
1885 1885 @parthandler('hgtagsfnodes')
1886 1886 def handlehgtagsfnodes(op, inpart):
1887 1887 """Applies .hgtags fnodes cache entries to the local repo.
1888 1888
1889 1889 Payload is pairs of 20 byte changeset nodes and filenodes.
1890 1890 """
1891 1891 # Grab the transaction so we ensure that we have the lock at this point.
1892 1892 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1893 1893 op.gettransaction()
1894 1894 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1895 1895
1896 1896 count = 0
1897 1897 while True:
1898 1898 node = inpart.read(20)
1899 1899 fnode = inpart.read(20)
1900 1900 if len(node) < 20 or len(fnode) < 20:
1901 1901 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1902 1902 break
1903 1903 cache.setfnode(node, fnode)
1904 1904 count += 1
1905 1905
1906 1906 cache.write()
1907 1907 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1908 1908
1909 1909 @parthandler('pushvars')
1910 1910 def bundle2getvars(op, part):
1911 1911 '''unbundle a bundle2 containing shellvars on the server'''
1912 1912 # An option to disable unbundling on server-side for security reasons
1913 1913 if op.ui.configbool('push', 'pushvars.server'):
1914 1914 hookargs = {}
1915 1915 for key, value in part.advisoryparams:
1916 1916 key = key.upper()
1917 1917 # We want pushed variables to have USERVAR_ prepended so we know
1918 1918 # they came from the --pushvar flag.
1919 1919 key = "USERVAR_" + key
1920 1920 hookargs[key] = value
1921 1921 op.addhookargs(hookargs)
@@ -1,627 +1,634 b''
1 1 """ Mercurial phases support code
2 2
3 3 ---
4 4
5 5 Copyright 2011 Pierre-Yves David <pierre-yves.david@ens-lyon.org>
6 6 Logilab SA <contact@logilab.fr>
7 7 Augie Fackler <durin42@gmail.com>
8 8
9 9 This software may be used and distributed according to the terms
10 10 of the GNU General Public License version 2 or any later version.
11 11
12 12 ---
13 13
14 14 This module implements most phase logic in mercurial.
15 15
16 16
17 17 Basic Concept
18 18 =============
19 19
20 20 A 'changeset phase' is an indicator that tells us how a changeset is
21 21 manipulated and communicated. The details of each phase is described
22 22 below, here we describe the properties they have in common.
23 23
24 24 Like bookmarks, phases are not stored in history and thus are not
25 25 permanent and leave no audit trail.
26 26
27 27 First, no changeset can be in two phases at once. Phases are ordered,
28 28 so they can be considered from lowest to highest. The default, lowest
29 29 phase is 'public' - this is the normal phase of existing changesets. A
30 30 child changeset can not be in a lower phase than its parents.
31 31
32 32 These phases share a hierarchy of traits:
33 33
34 34 immutable shared
35 35 public: X X
36 36 draft: X
37 37 secret:
38 38
39 39 Local commits are draft by default.
40 40
41 41 Phase Movement and Exchange
42 42 ===========================
43 43
44 44 Phase data is exchanged by pushkey on pull and push. Some servers have
45 45 a publish option set, we call such a server a "publishing server".
46 46 Pushing a draft changeset to a publishing server changes the phase to
47 47 public.
48 48
49 49 A small list of fact/rules define the exchange of phase:
50 50
51 51 * old client never changes server states
52 52 * pull never changes server states
53 53 * publish and old server changesets are seen as public by client
54 54 * any secret changeset seen in another repository is lowered to at
55 55 least draft
56 56
57 57 Here is the final table summing up the 49 possible use cases of phase
58 58 exchange:
59 59
60 60 server
61 61 old publish non-publish
62 62 N X N D P N D P
63 63 old client
64 64 pull
65 65 N - X/X - X/D X/P - X/D X/P
66 66 X - X/X - X/D X/P - X/D X/P
67 67 push
68 68 X X/X X/X X/P X/P X/P X/D X/D X/P
69 69 new client
70 70 pull
71 71 N - P/X - P/D P/P - D/D P/P
72 72 D - P/X - P/D P/P - D/D P/P
73 73 P - P/X - P/D P/P - P/D P/P
74 74 push
75 75 D P/X P/X P/P P/P P/P D/D D/D P/P
76 76 P P/X P/X P/P P/P P/P P/P P/P P/P
77 77
78 78 Legend:
79 79
80 80 A/B = final state on client / state on server
81 81
82 82 * N = new/not present,
83 83 * P = public,
84 84 * D = draft,
85 85 * X = not tracked (i.e., the old client or server has no internal
86 86 way of recording the phase.)
87 87
88 88 passive = only pushes
89 89
90 90
91 91 A cell here can be read like this:
92 92
93 93 "When a new client pushes a draft changeset (D) to a publishing
94 94 server where it's not present (N), it's marked public on both
95 95 sides (P/P)."
96 96
97 97 Note: old client behave as a publishing server with draft only content
98 98 - other people see it as public
99 99 - content is pushed as draft
100 100
101 101 """
102 102
103 103 from __future__ import absolute_import
104 104
105 105 import errno
106 106 import struct
107 107
108 108 from .i18n import _
109 109 from .node import (
110 110 bin,
111 111 hex,
112 112 nullid,
113 113 nullrev,
114 114 short,
115 115 )
116 116 from . import (
117 117 error,
118 118 smartset,
119 119 txnutil,
120 120 util,
121 121 )
122 122
123 123 _fphasesentry = struct.Struct('>i20s')
124 124
125 125 allphases = public, draft, secret = range(3)
126 126 trackedphases = allphases[1:]
127 127 phasenames = ['public', 'draft', 'secret']
128 128
129 129 def _readroots(repo, phasedefaults=None):
130 130 """Read phase roots from disk
131 131
132 132 phasedefaults is a list of fn(repo, roots) callable, which are
133 133 executed if the phase roots file does not exist. When phases are
134 134 being initialized on an existing repository, this could be used to
135 135 set selected changesets phase to something else than public.
136 136
137 137 Return (roots, dirty) where dirty is true if roots differ from
138 138 what is being stored.
139 139 """
140 140 repo = repo.unfiltered()
141 141 dirty = False
142 142 roots = [set() for i in allphases]
143 143 try:
144 144 f, pending = txnutil.trypending(repo.root, repo.svfs, 'phaseroots')
145 145 try:
146 146 for line in f:
147 147 phase, nh = line.split()
148 148 roots[int(phase)].add(bin(nh))
149 149 finally:
150 150 f.close()
151 151 except IOError as inst:
152 152 if inst.errno != errno.ENOENT:
153 153 raise
154 154 if phasedefaults:
155 155 for f in phasedefaults:
156 156 roots = f(repo, roots)
157 157 dirty = True
158 158 return roots, dirty
159 159
160 160 def binaryencode(phasemapping):
161 161 """encode a 'phase -> nodes' mapping into a binary stream
162 162
163 163 Since phases are integer the mapping is actually a python list:
164 164 [[PUBLIC_HEADS], [DRAFTS_HEADS], [SECRET_HEADS]]
165 165 """
166 166 binarydata = []
167 167 for phase, nodes in enumerate(phasemapping):
168 168 for head in nodes:
169 169 binarydata.append(_fphasesentry.pack(phase, head))
170 170 return ''.join(binarydata)
171 171
172 172 def binarydecode(stream):
173 173 """decode a binary stream into a 'phase -> nodes' mapping
174 174
175 175 Since phases are integer the mapping is actually a python list."""
176 176 headsbyphase = [[] for i in allphases]
177 177 entrysize = _fphasesentry.size
178 178 while True:
179 179 entry = stream.read(entrysize)
180 180 if len(entry) < entrysize:
181 181 if entry:
182 182 raise error.Abort(_('bad phase-heads stream'))
183 183 break
184 184 phase, node = _fphasesentry.unpack(entry)
185 185 headsbyphase[phase].append(node)
186 186 return headsbyphase
187 187
188 188 def _trackphasechange(data, rev, old, new):
189 189 """add a phase move the <data> dictionnary
190 190
191 191 If data is None, nothing happens.
192 192 """
193 193 if data is None:
194 194 return
195 195 existing = data.get(rev)
196 196 if existing is not None:
197 197 old = existing[0]
198 198 data[rev] = (old, new)
199 199
200 200 class phasecache(object):
201 201 def __init__(self, repo, phasedefaults, _load=True):
202 202 if _load:
203 203 # Cheap trick to allow shallow-copy without copy module
204 204 self.phaseroots, self.dirty = _readroots(repo, phasedefaults)
205 205 self._phaserevs = None
206 206 self._phasesets = None
207 207 self.filterunknown(repo)
208 208 self.opener = repo.svfs
209 209
210 210 def getrevset(self, repo, phases):
211 211 """return a smartset for the given phases"""
212 212 self.loadphaserevs(repo) # ensure phase's sets are loaded
213 213
214 214 if self._phasesets and all(self._phasesets[p] is not None
215 215 for p in phases):
216 216 # fast path - use _phasesets
217 217 revs = self._phasesets[phases[0]]
218 218 if len(phases) > 1:
219 219 revs = revs.copy() # only copy when needed
220 220 for p in phases[1:]:
221 221 revs.update(self._phasesets[p])
222 222 if repo.changelog.filteredrevs:
223 223 revs = revs - repo.changelog.filteredrevs
224 224 return smartset.baseset(revs)
225 225 else:
226 226 # slow path - enumerate all revisions
227 227 phase = self.phase
228 228 revs = (r for r in repo if phase(repo, r) in phases)
229 229 return smartset.generatorset(revs, iterasc=True)
230 230
231 231 def copy(self):
232 232 # Shallow copy meant to ensure isolation in
233 233 # advance/retractboundary(), nothing more.
234 234 ph = self.__class__(None, None, _load=False)
235 235 ph.phaseroots = self.phaseroots[:]
236 236 ph.dirty = self.dirty
237 237 ph.opener = self.opener
238 238 ph._phaserevs = self._phaserevs
239 239 ph._phasesets = self._phasesets
240 240 return ph
241 241
242 242 def replace(self, phcache):
243 243 """replace all values in 'self' with content of phcache"""
244 244 for a in ('phaseroots', 'dirty', 'opener', '_phaserevs', '_phasesets'):
245 245 setattr(self, a, getattr(phcache, a))
246 246
247 247 def _getphaserevsnative(self, repo):
248 248 repo = repo.unfiltered()
249 249 nativeroots = []
250 250 for phase in trackedphases:
251 251 nativeroots.append(map(repo.changelog.rev, self.phaseroots[phase]))
252 252 return repo.changelog.computephases(nativeroots)
253 253
254 254 def _computephaserevspure(self, repo):
255 255 repo = repo.unfiltered()
256 256 revs = [public] * len(repo.changelog)
257 257 self._phaserevs = revs
258 258 self._populatephaseroots(repo)
259 259 for phase in trackedphases:
260 260 roots = list(map(repo.changelog.rev, self.phaseroots[phase]))
261 261 if roots:
262 262 for rev in roots:
263 263 revs[rev] = phase
264 264 for rev in repo.changelog.descendants(roots):
265 265 revs[rev] = phase
266 266
267 267 def loadphaserevs(self, repo):
268 268 """ensure phase information is loaded in the object"""
269 269 if self._phaserevs is None:
270 270 try:
271 271 res = self._getphaserevsnative(repo)
272 272 self._phaserevs, self._phasesets = res
273 273 except AttributeError:
274 274 self._computephaserevspure(repo)
275 275
276 276 def invalidate(self):
277 277 self._phaserevs = None
278 278 self._phasesets = None
279 279
280 280 def _populatephaseroots(self, repo):
281 281 """Fills the _phaserevs cache with phases for the roots.
282 282 """
283 283 cl = repo.changelog
284 284 phaserevs = self._phaserevs
285 285 for phase in trackedphases:
286 286 roots = map(cl.rev, self.phaseroots[phase])
287 287 for root in roots:
288 288 phaserevs[root] = phase
289 289
290 290 def phase(self, repo, rev):
291 291 # We need a repo argument here to be able to build _phaserevs
292 292 # if necessary. The repository instance is not stored in
293 293 # phasecache to avoid reference cycles. The changelog instance
294 294 # is not stored because it is a filecache() property and can
295 295 # be replaced without us being notified.
296 296 if rev == nullrev:
297 297 return public
298 298 if rev < nullrev:
299 299 raise ValueError(_('cannot lookup negative revision'))
300 300 if self._phaserevs is None or rev >= len(self._phaserevs):
301 301 self.invalidate()
302 302 self.loadphaserevs(repo)
303 303 return self._phaserevs[rev]
304 304
305 305 def write(self):
306 306 if not self.dirty:
307 307 return
308 308 f = self.opener('phaseroots', 'w', atomictemp=True, checkambig=True)
309 309 try:
310 310 self._write(f)
311 311 finally:
312 312 f.close()
313 313
314 314 def _write(self, fp):
315 315 for phase, roots in enumerate(self.phaseroots):
316 316 for h in roots:
317 317 fp.write('%i %s\n' % (phase, hex(h)))
318 318 self.dirty = False
319 319
320 320 def _updateroots(self, phase, newroots, tr):
321 321 self.phaseroots[phase] = newroots
322 322 self.invalidate()
323 323 self.dirty = True
324 324
325 325 tr.addfilegenerator('phase', ('phaseroots',), self._write)
326 326 tr.hookargs['phases_moved'] = '1'
327 327
328 328 def registernew(self, repo, tr, targetphase, nodes):
329 329 repo = repo.unfiltered()
330 330 self._retractboundary(repo, tr, targetphase, nodes)
331 331 if tr is not None and 'phases' in tr.changes:
332 332 phasetracking = tr.changes['phases']
333 333 torev = repo.changelog.rev
334 334 phase = self.phase
335 335 for n in nodes:
336 336 rev = torev(n)
337 337 revphase = phase(repo, rev)
338 338 _trackphasechange(phasetracking, rev, None, revphase)
339 339 repo.invalidatevolatilesets()
340 340
341 341 def advanceboundary(self, repo, tr, targetphase, nodes):
342 342 """Set all 'nodes' to phase 'targetphase'
343 343
344 344 Nodes with a phase lower than 'targetphase' are not affected.
345 345 """
346 346 # Be careful to preserve shallow-copied values: do not update
347 347 # phaseroots values, replace them.
348 348 if tr is None:
349 349 phasetracking = None
350 350 else:
351 351 phasetracking = tr.changes.get('phases')
352 352
353 353 repo = repo.unfiltered()
354 354
355 355 delroots = [] # set of root deleted by this path
356 356 for phase in xrange(targetphase + 1, len(allphases)):
357 357 # filter nodes that are not in a compatible phase already
358 358 nodes = [n for n in nodes
359 359 if self.phase(repo, repo[n].rev()) >= phase]
360 360 if not nodes:
361 361 break # no roots to move anymore
362 362
363 363 olds = self.phaseroots[phase]
364 364
365 365 affected = repo.revs('%ln::%ln', olds, nodes)
366 366 for r in affected:
367 367 _trackphasechange(phasetracking, r, self.phase(repo, r),
368 368 targetphase)
369 369
370 370 roots = set(ctx.node() for ctx in repo.set(
371 371 'roots((%ln::) - %ld)', olds, affected))
372 372 if olds != roots:
373 373 self._updateroots(phase, roots, tr)
374 374 # some roots may need to be declared for lower phases
375 375 delroots.extend(olds - roots)
376 376 # declare deleted root in the target phase
377 377 if targetphase != 0:
378 378 self._retractboundary(repo, tr, targetphase, delroots)
379 379 repo.invalidatevolatilesets()
380 380
381 381 def retractboundary(self, repo, tr, targetphase, nodes):
382 382 oldroots = self.phaseroots[:targetphase + 1]
383 383 if tr is None:
384 384 phasetracking = None
385 385 else:
386 386 phasetracking = tr.changes.get('phases')
387 387 repo = repo.unfiltered()
388 388 if (self._retractboundary(repo, tr, targetphase, nodes)
389 389 and phasetracking is not None):
390 390
391 391 # find the affected revisions
392 392 new = self.phaseroots[targetphase]
393 393 old = oldroots[targetphase]
394 394 affected = set(repo.revs('(%ln::) - (%ln::)', new, old))
395 395
396 396 # find the phase of the affected revision
397 397 for phase in xrange(targetphase, -1, -1):
398 398 if phase:
399 399 roots = oldroots[phase]
400 400 revs = set(repo.revs('%ln::%ld', roots, affected))
401 401 affected -= revs
402 402 else: # public phase
403 403 revs = affected
404 404 for r in revs:
405 405 _trackphasechange(phasetracking, r, phase, targetphase)
406 406 repo.invalidatevolatilesets()
407 407
408 408 def _retractboundary(self, repo, tr, targetphase, nodes):
409 409 # Be careful to preserve shallow-copied values: do not update
410 410 # phaseroots values, replace them.
411 411
412 412 repo = repo.unfiltered()
413 413 currentroots = self.phaseroots[targetphase]
414 414 finalroots = oldroots = set(currentroots)
415 415 newroots = [n for n in nodes
416 416 if self.phase(repo, repo[n].rev()) < targetphase]
417 417 if newroots:
418 418
419 419 if nullid in newroots:
420 420 raise error.Abort(_('cannot change null revision phase'))
421 421 currentroots = currentroots.copy()
422 422 currentroots.update(newroots)
423 423
424 424 # Only compute new roots for revs above the roots that are being
425 425 # retracted.
426 426 minnewroot = min(repo[n].rev() for n in newroots)
427 427 aboveroots = [n for n in currentroots
428 428 if repo[n].rev() >= minnewroot]
429 429 updatedroots = repo.set('roots(%ln::)', aboveroots)
430 430
431 431 finalroots = set(n for n in currentroots if repo[n].rev() <
432 432 minnewroot)
433 433 finalroots.update(ctx.node() for ctx in updatedroots)
434 434 if finalroots != oldroots:
435 435 self._updateroots(targetphase, finalroots, tr)
436 436 return True
437 437 return False
438 438
439 439 def filterunknown(self, repo):
440 440 """remove unknown nodes from the phase boundary
441 441
442 442 Nothing is lost as unknown nodes only hold data for their descendants.
443 443 """
444 444 filtered = False
445 445 nodemap = repo.changelog.nodemap # to filter unknown nodes
446 446 for phase, nodes in enumerate(self.phaseroots):
447 447 missing = sorted(node for node in nodes if node not in nodemap)
448 448 if missing:
449 449 for mnode in missing:
450 450 repo.ui.debug(
451 451 'removing unknown node %s from %i-phase boundary\n'
452 452 % (short(mnode), phase))
453 453 nodes.symmetric_difference_update(missing)
454 454 filtered = True
455 455 if filtered:
456 456 self.dirty = True
457 457 # filterunknown is called by repo.destroyed, we may have no changes in
458 458 # root but phaserevs contents is certainly invalid (or at least we
459 459 # have not proper way to check that). related to issue 3858.
460 460 #
461 461 # The other caller is __init__ that have no _phaserevs initialized
462 462 # anyway. If this change we should consider adding a dedicated
463 463 # "destroyed" function to phasecache or a proper cache key mechanism
464 464 # (see branchmap one)
465 465 self.invalidate()
466 466
467 467 def advanceboundary(repo, tr, targetphase, nodes):
468 468 """Add nodes to a phase changing other nodes phases if necessary.
469 469
470 470 This function move boundary *forward* this means that all nodes
471 471 are set in the target phase or kept in a *lower* phase.
472 472
473 473 Simplify boundary to contains phase roots only."""
474 474 phcache = repo._phasecache.copy()
475 475 phcache.advanceboundary(repo, tr, targetphase, nodes)
476 476 repo._phasecache.replace(phcache)
477 477
478 478 def retractboundary(repo, tr, targetphase, nodes):
479 479 """Set nodes back to a phase changing other nodes phases if
480 480 necessary.
481 481
482 482 This function move boundary *backward* this means that all nodes
483 483 are set in the target phase or kept in a *higher* phase.
484 484
485 485 Simplify boundary to contains phase roots only."""
486 486 phcache = repo._phasecache.copy()
487 487 phcache.retractboundary(repo, tr, targetphase, nodes)
488 488 repo._phasecache.replace(phcache)
489 489
490 490 def registernew(repo, tr, targetphase, nodes):
491 491 """register a new revision and its phase
492 492
493 493 Code adding revisions to the repository should use this function to
494 494 set new changeset in their target phase (or higher).
495 495 """
496 496 phcache = repo._phasecache.copy()
497 497 phcache.registernew(repo, tr, targetphase, nodes)
498 498 repo._phasecache.replace(phcache)
499 499
500 500 def listphases(repo):
501 501 """List phases root for serialization over pushkey"""
502 502 # Use ordered dictionary so behavior is deterministic.
503 503 keys = util.sortdict()
504 504 value = '%i' % draft
505 505 for root in repo._phasecache.phaseroots[draft]:
506 506 keys[hex(root)] = value
507 507
508 508 if repo.publishing():
509 509 # Add an extra data to let remote know we are a publishing
510 510 # repo. Publishing repo can't just pretend they are old repo.
511 511 # When pushing to a publishing repo, the client still need to
512 512 # push phase boundary
513 513 #
514 514 # Push do not only push changeset. It also push phase data.
515 515 # New phase data may apply to common changeset which won't be
516 516 # push (as they are common). Here is a very simple example:
517 517 #
518 518 # 1) repo A push changeset X as draft to repo B
519 519 # 2) repo B make changeset X public
520 520 # 3) repo B push to repo A. X is not pushed but the data that
521 521 # X as now public should
522 522 #
523 523 # The server can't handle it on it's own as it has no idea of
524 524 # client phase data.
525 525 keys['publishing'] = 'True'
526 526 return keys
527 527
528 528 def pushphase(repo, nhex, oldphasestr, newphasestr):
529 529 """List phases root for serialization over pushkey"""
530 530 repo = repo.unfiltered()
531 531 with repo.lock():
532 532 currentphase = repo[nhex].phase()
533 533 newphase = abs(int(newphasestr)) # let's avoid negative index surprise
534 534 oldphase = abs(int(oldphasestr)) # let's avoid negative index surprise
535 535 if currentphase == oldphase and newphase < oldphase:
536 536 with repo.transaction('pushkey-phase') as tr:
537 537 advanceboundary(repo, tr, newphase, [bin(nhex)])
538 538 return True
539 539 elif currentphase == newphase:
540 540 # raced, but got correct result
541 541 return True
542 542 else:
543 543 return False
544 544
545 545 def subsetphaseheads(repo, subset):
546 546 """Finds the phase heads for a subset of a history
547 547
548 548 Returns a list indexed by phase number where each item is a list of phase
549 549 head nodes.
550 550 """
551 551 cl = repo.changelog
552 552
553 553 headsbyphase = [[] for i in allphases]
554 554 # No need to keep track of secret phase; any heads in the subset that
555 555 # are not mentioned are implicitly secret.
556 556 for phase in allphases[:-1]:
557 557 revset = "heads(%%ln & %s())" % phasenames[phase]
558 558 headsbyphase[phase] = [cl.node(r) for r in repo.revs(revset, subset)]
559 559 return headsbyphase
560 560
561 def updatephases(repo, tr, headsbyphase):
561 def updatephases(repo, trgetter, headsbyphase):
562 562 """Updates the repo with the given phase heads"""
563 563 # Now advance phase boundaries of all but secret phase
564 #
565 # run the update (and fetch transaction) only if there are actually things
566 # to update. This avoid creating empty transaction during no-op operation.
567
564 568 for phase in allphases[:-1]:
565 advanceboundary(repo, tr, phase, headsbyphase[phase])
569 revset = '%%ln - %s()' % phasenames[phase]
570 heads = [c.node() for c in repo.set(revset, headsbyphase[phase])]
571 if heads:
572 advanceboundary(repo, trgetter(), phase, heads)
566 573
567 574 def analyzeremotephases(repo, subset, roots):
568 575 """Compute phases heads and root in a subset of node from root dict
569 576
570 577 * subset is heads of the subset
571 578 * roots is {<nodeid> => phase} mapping. key and value are string.
572 579
573 580 Accept unknown element input
574 581 """
575 582 repo = repo.unfiltered()
576 583 # build list from dictionary
577 584 draftroots = []
578 585 nodemap = repo.changelog.nodemap # to filter unknown nodes
579 586 for nhex, phase in roots.iteritems():
580 587 if nhex == 'publishing': # ignore data related to publish option
581 588 continue
582 589 node = bin(nhex)
583 590 phase = int(phase)
584 591 if phase == public:
585 592 if node != nullid:
586 593 repo.ui.warn(_('ignoring inconsistent public root'
587 594 ' from remote: %s\n') % nhex)
588 595 elif phase == draft:
589 596 if node in nodemap:
590 597 draftroots.append(node)
591 598 else:
592 599 repo.ui.warn(_('ignoring unexpected root from remote: %i %s\n')
593 600 % (phase, nhex))
594 601 # compute heads
595 602 publicheads = newheads(repo, subset, draftroots)
596 603 return publicheads, draftroots
597 604
598 605 def newheads(repo, heads, roots):
599 606 """compute new head of a subset minus another
600 607
601 608 * `heads`: define the first subset
602 609 * `roots`: define the second we subtract from the first"""
603 610 repo = repo.unfiltered()
604 611 revset = repo.set('heads((%ln + parents(%ln)) - (%ln::%ln))',
605 612 heads, roots, roots, heads)
606 613 return [c.node() for c in revset]
607 614
608 615
609 616 def newcommitphase(ui):
610 617 """helper to get the target phase of new commit
611 618
612 619 Handle all possible values for the phases.new-commit options.
613 620
614 621 """
615 622 v = ui.config('phases', 'new-commit', draft)
616 623 try:
617 624 return phasenames.index(v)
618 625 except ValueError:
619 626 try:
620 627 return int(v)
621 628 except ValueError:
622 629 msg = _("phases.new-commit: not a valid phase name ('%s')")
623 630 raise error.ConfigError(msg % v)
624 631
625 632 def hassecret(repo):
626 633 """utility function that check if a repo have any secret changeset."""
627 634 return bool(repo._phasecache.phaseroots[2])
General Comments 0
You need to be logged in to leave comments. Login now