##// END OF EJS Templates
bundle2: introduce a `getrepocaps` to retrieve the bundle2 caps of a repo...
Pierre-Yves David -
r22342:262c5cc1 default
parent child Browse files
Show More
@@ -1,928 +1,936
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: (16 bits integer)
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: (16 bits inter)
68 68
69 69 The total number of Bytes used by the part headers. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 Bundle processing
129 129 ============================
130 130
131 131 Each part is processed in order using a "part handler". Handler are registered
132 132 for a certain part type.
133 133
134 134 The matching of a part to its handler is case insensitive. The case of the
135 135 part type is used to know if a part is mandatory or advisory. If the Part type
136 136 contains any uppercase char it is considered mandatory. When no handler is
137 137 known for a Mandatory part, the process is aborted and an exception is raised.
138 138 If the part is advisory and no handler is known, the part is ignored. When the
139 139 process is aborted, the full bundle is still read from the stream to keep the
140 140 channel usable. But none of the part read from an abort are processed. In the
141 141 future, dropping the stream may become an option for channel we do not care to
142 142 preserve.
143 143 """
144 144
145 145 import util
146 146 import struct
147 147 import urllib
148 148 import string
149 149 import pushkey
150 150
151 151 import changegroup, error
152 152 from i18n import _
153 153
154 154 _pack = struct.pack
155 155 _unpack = struct.unpack
156 156
157 157 _magicstring = 'HG2X'
158 158
159 159 _fstreamparamsize = '>H'
160 160 _fpartheadersize = '>H'
161 161 _fparttypesize = '>B'
162 162 _fpartid = '>I'
163 163 _fpayloadsize = '>I'
164 164 _fpartparamcount = '>BB'
165 165
166 166 preferedchunksize = 4096
167 167
168 168 def _makefpartparamsizes(nbparams):
169 169 """return a struct format to read part parameter sizes
170 170
171 171 The number parameters is variable so we need to build that format
172 172 dynamically.
173 173 """
174 174 return '>'+('BB'*nbparams)
175 175
176 176 parthandlermapping = {}
177 177
178 178 def parthandler(parttype, params=()):
179 179 """decorator that register a function as a bundle2 part handler
180 180
181 181 eg::
182 182
183 183 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
184 184 def myparttypehandler(...):
185 185 '''process a part of type "my part".'''
186 186 ...
187 187 """
188 188 def _decorator(func):
189 189 lparttype = parttype.lower() # enforce lower case matching.
190 190 assert lparttype not in parthandlermapping
191 191 parthandlermapping[lparttype] = func
192 192 func.params = frozenset(params)
193 193 return func
194 194 return _decorator
195 195
196 196 class unbundlerecords(object):
197 197 """keep record of what happens during and unbundle
198 198
199 199 New records are added using `records.add('cat', obj)`. Where 'cat' is a
200 200 category of record and obj is an arbitrary object.
201 201
202 202 `records['cat']` will return all entries of this category 'cat'.
203 203
204 204 Iterating on the object itself will yield `('category', obj)` tuples
205 205 for all entries.
206 206
207 207 All iterations happens in chronological order.
208 208 """
209 209
210 210 def __init__(self):
211 211 self._categories = {}
212 212 self._sequences = []
213 213 self._replies = {}
214 214
215 215 def add(self, category, entry, inreplyto=None):
216 216 """add a new record of a given category.
217 217
218 218 The entry can then be retrieved in the list returned by
219 219 self['category']."""
220 220 self._categories.setdefault(category, []).append(entry)
221 221 self._sequences.append((category, entry))
222 222 if inreplyto is not None:
223 223 self.getreplies(inreplyto).add(category, entry)
224 224
225 225 def getreplies(self, partid):
226 226 """get the subrecords that replies to a specific part"""
227 227 return self._replies.setdefault(partid, unbundlerecords())
228 228
229 229 def __getitem__(self, cat):
230 230 return tuple(self._categories.get(cat, ()))
231 231
232 232 def __iter__(self):
233 233 return iter(self._sequences)
234 234
235 235 def __len__(self):
236 236 return len(self._sequences)
237 237
238 238 def __nonzero__(self):
239 239 return bool(self._sequences)
240 240
241 241 class bundleoperation(object):
242 242 """an object that represents a single bundling process
243 243
244 244 Its purpose is to carry unbundle-related objects and states.
245 245
246 246 A new object should be created at the beginning of each bundle processing.
247 247 The object is to be returned by the processing function.
248 248
249 249 The object has very little content now it will ultimately contain:
250 250 * an access to the repo the bundle is applied to,
251 251 * a ui object,
252 252 * a way to retrieve a transaction to add changes to the repo,
253 253 * a way to record the result of processing each part,
254 254 * a way to construct a bundle response when applicable.
255 255 """
256 256
257 257 def __init__(self, repo, transactiongetter):
258 258 self.repo = repo
259 259 self.ui = repo.ui
260 260 self.records = unbundlerecords()
261 261 self.gettransaction = transactiongetter
262 262 self.reply = None
263 263
264 264 class TransactionUnavailable(RuntimeError):
265 265 pass
266 266
267 267 def _notransaction():
268 268 """default method to get a transaction while processing a bundle
269 269
270 270 Raise an exception to highlight the fact that no transaction was expected
271 271 to be created"""
272 272 raise TransactionUnavailable()
273 273
274 274 def processbundle(repo, unbundler, transactiongetter=_notransaction):
275 275 """This function process a bundle, apply effect to/from a repo
276 276
277 277 It iterates over each part then searches for and uses the proper handling
278 278 code to process the part. Parts are processed in order.
279 279
280 280 This is very early version of this function that will be strongly reworked
281 281 before final usage.
282 282
283 283 Unknown Mandatory part will abort the process.
284 284 """
285 285 op = bundleoperation(repo, transactiongetter)
286 286 # todo:
287 287 # - replace this is a init function soon.
288 288 # - exception catching
289 289 unbundler.params
290 290 iterparts = unbundler.iterparts()
291 291 part = None
292 292 try:
293 293 for part in iterparts:
294 294 parttype = part.type
295 295 # part key are matched lower case
296 296 key = parttype.lower()
297 297 try:
298 298 handler = parthandlermapping.get(key)
299 299 if handler is None:
300 300 raise error.BundleValueError(parttype=key)
301 301 op.ui.debug('found a handler for part %r\n' % parttype)
302 302 unknownparams = part.mandatorykeys - handler.params
303 303 if unknownparams:
304 304 unknownparams = list(unknownparams)
305 305 unknownparams.sort()
306 306 raise error.BundleValueError(parttype=key,
307 307 params=unknownparams)
308 308 except error.BundleValueError, exc:
309 309 if key != parttype: # mandatory parts
310 310 raise
311 311 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
312 312 # consuming the part
313 313 part.read()
314 314 continue
315 315
316 316
317 317 # handler is called outside the above try block so that we don't
318 318 # risk catching KeyErrors from anything other than the
319 319 # parthandlermapping lookup (any KeyError raised by handler()
320 320 # itself represents a defect of a different variety).
321 321 output = None
322 322 if op.reply is not None:
323 323 op.ui.pushbuffer(error=True)
324 324 output = ''
325 325 try:
326 326 handler(op, part)
327 327 finally:
328 328 if output is not None:
329 329 output = op.ui.popbuffer()
330 330 if output:
331 331 outpart = op.reply.newpart('b2x:output', data=output)
332 332 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
333 333 part.read()
334 334 except Exception, exc:
335 335 if part is not None:
336 336 # consume the bundle content
337 337 part.read()
338 338 for part in iterparts:
339 339 # consume the bundle content
340 340 part.read()
341 341 # Small hack to let caller code distinguish exceptions from bundle2
342 342 # processing fron the ones from bundle1 processing. This is mostly
343 343 # needed to handle different return codes to unbundle according to the
344 344 # type of bundle. We should probably clean up or drop this return code
345 345 # craziness in a future version.
346 346 exc.duringunbundle2 = True
347 347 raise
348 348 return op
349 349
350 350 def decodecaps(blob):
351 351 """decode a bundle2 caps bytes blob into a dictionnary
352 352
353 353 The blob is a list of capabilities (one per line)
354 354 Capabilities may have values using a line of the form::
355 355
356 356 capability=value1,value2,value3
357 357
358 358 The values are always a list."""
359 359 caps = {}
360 360 for line in blob.splitlines():
361 361 if not line:
362 362 continue
363 363 if '=' not in line:
364 364 key, vals = line, ()
365 365 else:
366 366 key, vals = line.split('=', 1)
367 367 vals = vals.split(',')
368 368 key = urllib.unquote(key)
369 369 vals = [urllib.unquote(v) for v in vals]
370 370 caps[key] = vals
371 371 return caps
372 372
373 373 def encodecaps(caps):
374 374 """encode a bundle2 caps dictionary into a bytes blob"""
375 375 chunks = []
376 376 for ca in sorted(caps):
377 377 vals = caps[ca]
378 378 ca = urllib.quote(ca)
379 379 vals = [urllib.quote(v) for v in vals]
380 380 if vals:
381 381 ca = "%s=%s" % (ca, ','.join(vals))
382 382 chunks.append(ca)
383 383 return '\n'.join(chunks)
384 384
385 385 class bundle20(object):
386 386 """represent an outgoing bundle2 container
387 387
388 388 Use the `addparam` method to add stream level parameter. and `newpart` to
389 389 populate it. Then call `getchunks` to retrieve all the binary chunks of
390 390 data that compose the bundle2 container."""
391 391
392 392 def __init__(self, ui, capabilities=()):
393 393 self.ui = ui
394 394 self._params = []
395 395 self._parts = []
396 396 self.capabilities = dict(capabilities)
397 397
398 398 @property
399 399 def nbparts(self):
400 400 """total number of parts added to the bundler"""
401 401 return len(self._parts)
402 402
403 403 # methods used to defines the bundle2 content
404 404 def addparam(self, name, value=None):
405 405 """add a stream level parameter"""
406 406 if not name:
407 407 raise ValueError('empty parameter name')
408 408 if name[0] not in string.letters:
409 409 raise ValueError('non letter first character: %r' % name)
410 410 self._params.append((name, value))
411 411
412 412 def addpart(self, part):
413 413 """add a new part to the bundle2 container
414 414
415 415 Parts contains the actual applicative payload."""
416 416 assert part.id is None
417 417 part.id = len(self._parts) # very cheap counter
418 418 self._parts.append(part)
419 419
420 420 def newpart(self, typeid, *args, **kwargs):
421 421 """create a new part and add it to the containers
422 422
423 423 As the part is directly added to the containers. For now, this means
424 424 that any failure to properly initialize the part after calling
425 425 ``newpart`` should result in a failure of the whole bundling process.
426 426
427 427 You can still fall back to manually create and add if you need better
428 428 control."""
429 429 part = bundlepart(typeid, *args, **kwargs)
430 430 self.addpart(part)
431 431 return part
432 432
433 433 # methods used to generate the bundle2 stream
434 434 def getchunks(self):
435 435 self.ui.debug('start emission of %s stream\n' % _magicstring)
436 436 yield _magicstring
437 437 param = self._paramchunk()
438 438 self.ui.debug('bundle parameter: %s\n' % param)
439 439 yield _pack(_fstreamparamsize, len(param))
440 440 if param:
441 441 yield param
442 442
443 443 self.ui.debug('start of parts\n')
444 444 for part in self._parts:
445 445 self.ui.debug('bundle part: "%s"\n' % part.type)
446 446 for chunk in part.getchunks():
447 447 yield chunk
448 448 self.ui.debug('end of bundle\n')
449 449 yield '\0\0'
450 450
451 451 def _paramchunk(self):
452 452 """return a encoded version of all stream parameters"""
453 453 blocks = []
454 454 for par, value in self._params:
455 455 par = urllib.quote(par)
456 456 if value is not None:
457 457 value = urllib.quote(value)
458 458 par = '%s=%s' % (par, value)
459 459 blocks.append(par)
460 460 return ' '.join(blocks)
461 461
462 462 class unpackermixin(object):
463 463 """A mixin to extract bytes and struct data from a stream"""
464 464
465 465 def __init__(self, fp):
466 466 self._fp = fp
467 467
468 468 def _unpack(self, format):
469 469 """unpack this struct format from the stream"""
470 470 data = self._readexact(struct.calcsize(format))
471 471 return _unpack(format, data)
472 472
473 473 def _readexact(self, size):
474 474 """read exactly <size> bytes from the stream"""
475 475 return changegroup.readexactly(self._fp, size)
476 476
477 477
478 478 class unbundle20(unpackermixin):
479 479 """interpret a bundle2 stream
480 480
481 481 This class is fed with a binary stream and yields parts through its
482 482 `iterparts` methods."""
483 483
484 484 def __init__(self, ui, fp, header=None):
485 485 """If header is specified, we do not read it out of the stream."""
486 486 self.ui = ui
487 487 super(unbundle20, self).__init__(fp)
488 488 if header is None:
489 489 header = self._readexact(4)
490 490 magic, version = header[0:2], header[2:4]
491 491 if magic != 'HG':
492 492 raise util.Abort(_('not a Mercurial bundle'))
493 493 if version != '2X':
494 494 raise util.Abort(_('unknown bundle version %s') % version)
495 495 self.ui.debug('start processing of %s stream\n' % header)
496 496
497 497 @util.propertycache
498 498 def params(self):
499 499 """dictionary of stream level parameters"""
500 500 self.ui.debug('reading bundle2 stream parameters\n')
501 501 params = {}
502 502 paramssize = self._unpack(_fstreamparamsize)[0]
503 503 if paramssize:
504 504 for p in self._readexact(paramssize).split(' '):
505 505 p = p.split('=', 1)
506 506 p = [urllib.unquote(i) for i in p]
507 507 if len(p) < 2:
508 508 p.append(None)
509 509 self._processparam(*p)
510 510 params[p[0]] = p[1]
511 511 return params
512 512
513 513 def _processparam(self, name, value):
514 514 """process a parameter, applying its effect if needed
515 515
516 516 Parameter starting with a lower case letter are advisory and will be
517 517 ignored when unknown. Those starting with an upper case letter are
518 518 mandatory and will this function will raise a KeyError when unknown.
519 519
520 520 Note: no option are currently supported. Any input will be either
521 521 ignored or failing.
522 522 """
523 523 if not name:
524 524 raise ValueError('empty parameter name')
525 525 if name[0] not in string.letters:
526 526 raise ValueError('non letter first character: %r' % name)
527 527 # Some logic will be later added here to try to process the option for
528 528 # a dict of known parameter.
529 529 if name[0].islower():
530 530 self.ui.debug("ignoring unknown parameter %r\n" % name)
531 531 else:
532 532 raise error.BundleValueError(params=(name,))
533 533
534 534
535 535 def iterparts(self):
536 536 """yield all parts contained in the stream"""
537 537 # make sure param have been loaded
538 538 self.params
539 539 self.ui.debug('start extraction of bundle2 parts\n')
540 540 headerblock = self._readpartheader()
541 541 while headerblock is not None:
542 542 part = unbundlepart(self.ui, headerblock, self._fp)
543 543 yield part
544 544 headerblock = self._readpartheader()
545 545 self.ui.debug('end of bundle2 stream\n')
546 546
547 547 def _readpartheader(self):
548 548 """reads a part header size and return the bytes blob
549 549
550 550 returns None if empty"""
551 551 headersize = self._unpack(_fpartheadersize)[0]
552 552 self.ui.debug('part header size: %i\n' % headersize)
553 553 if headersize:
554 554 return self._readexact(headersize)
555 555 return None
556 556
557 557
558 558 class bundlepart(object):
559 559 """A bundle2 part contains application level payload
560 560
561 561 The part `type` is used to route the part to the application level
562 562 handler.
563 563
564 564 The part payload is contained in ``part.data``. It could be raw bytes or a
565 565 generator of byte chunks.
566 566
567 567 You can add parameters to the part using the ``addparam`` method.
568 568 Parameters can be either mandatory (default) or advisory. Remote side
569 569 should be able to safely ignore the advisory ones.
570 570
571 571 Both data and parameters cannot be modified after the generation has begun.
572 572 """
573 573
574 574 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
575 575 data=''):
576 576 self.id = None
577 577 self.type = parttype
578 578 self._data = data
579 579 self._mandatoryparams = list(mandatoryparams)
580 580 self._advisoryparams = list(advisoryparams)
581 581 # checking for duplicated entries
582 582 self._seenparams = set()
583 583 for pname, __ in self._mandatoryparams + self._advisoryparams:
584 584 if pname in self._seenparams:
585 585 raise RuntimeError('duplicated params: %s' % pname)
586 586 self._seenparams.add(pname)
587 587 # status of the part's generation:
588 588 # - None: not started,
589 589 # - False: currently generated,
590 590 # - True: generation done.
591 591 self._generated = None
592 592
593 593 # methods used to defines the part content
594 594 def __setdata(self, data):
595 595 if self._generated is not None:
596 596 raise error.ReadOnlyPartError('part is being generated')
597 597 self._data = data
598 598 def __getdata(self):
599 599 return self._data
600 600 data = property(__getdata, __setdata)
601 601
602 602 @property
603 603 def mandatoryparams(self):
604 604 # make it an immutable tuple to force people through ``addparam``
605 605 return tuple(self._mandatoryparams)
606 606
607 607 @property
608 608 def advisoryparams(self):
609 609 # make it an immutable tuple to force people through ``addparam``
610 610 return tuple(self._advisoryparams)
611 611
612 612 def addparam(self, name, value='', mandatory=True):
613 613 if self._generated is not None:
614 614 raise error.ReadOnlyPartError('part is being generated')
615 615 if name in self._seenparams:
616 616 raise ValueError('duplicated params: %s' % name)
617 617 self._seenparams.add(name)
618 618 params = self._advisoryparams
619 619 if mandatory:
620 620 params = self._mandatoryparams
621 621 params.append((name, value))
622 622
623 623 # methods used to generates the bundle2 stream
624 624 def getchunks(self):
625 625 if self._generated is not None:
626 626 raise RuntimeError('part can only be consumed once')
627 627 self._generated = False
628 628 #### header
629 629 ## parttype
630 630 header = [_pack(_fparttypesize, len(self.type)),
631 631 self.type, _pack(_fpartid, self.id),
632 632 ]
633 633 ## parameters
634 634 # count
635 635 manpar = self.mandatoryparams
636 636 advpar = self.advisoryparams
637 637 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
638 638 # size
639 639 parsizes = []
640 640 for key, value in manpar:
641 641 parsizes.append(len(key))
642 642 parsizes.append(len(value))
643 643 for key, value in advpar:
644 644 parsizes.append(len(key))
645 645 parsizes.append(len(value))
646 646 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
647 647 header.append(paramsizes)
648 648 # key, value
649 649 for key, value in manpar:
650 650 header.append(key)
651 651 header.append(value)
652 652 for key, value in advpar:
653 653 header.append(key)
654 654 header.append(value)
655 655 ## finalize header
656 656 headerchunk = ''.join(header)
657 657 yield _pack(_fpartheadersize, len(headerchunk))
658 658 yield headerchunk
659 659 ## payload
660 660 for chunk in self._payloadchunks():
661 661 yield _pack(_fpayloadsize, len(chunk))
662 662 yield chunk
663 663 # end of payload
664 664 yield _pack(_fpayloadsize, 0)
665 665 self._generated = True
666 666
667 667 def _payloadchunks(self):
668 668 """yield chunks of a the part payload
669 669
670 670 Exists to handle the different methods to provide data to a part."""
671 671 # we only support fixed size data now.
672 672 # This will be improved in the future.
673 673 if util.safehasattr(self.data, 'next'):
674 674 buff = util.chunkbuffer(self.data)
675 675 chunk = buff.read(preferedchunksize)
676 676 while chunk:
677 677 yield chunk
678 678 chunk = buff.read(preferedchunksize)
679 679 elif len(self.data):
680 680 yield self.data
681 681
682 682 class unbundlepart(unpackermixin):
683 683 """a bundle part read from a bundle"""
684 684
685 685 def __init__(self, ui, header, fp):
686 686 super(unbundlepart, self).__init__(fp)
687 687 self.ui = ui
688 688 # unbundle state attr
689 689 self._headerdata = header
690 690 self._headeroffset = 0
691 691 self._initialized = False
692 692 self.consumed = False
693 693 # part data
694 694 self.id = None
695 695 self.type = None
696 696 self.mandatoryparams = None
697 697 self.advisoryparams = None
698 698 self.params = None
699 699 self.mandatorykeys = ()
700 700 self._payloadstream = None
701 701 self._readheader()
702 702
703 703 def _fromheader(self, size):
704 704 """return the next <size> byte from the header"""
705 705 offset = self._headeroffset
706 706 data = self._headerdata[offset:(offset + size)]
707 707 self._headeroffset = offset + size
708 708 return data
709 709
710 710 def _unpackheader(self, format):
711 711 """read given format from header
712 712
713 713 This automatically compute the size of the format to read."""
714 714 data = self._fromheader(struct.calcsize(format))
715 715 return _unpack(format, data)
716 716
717 717 def _initparams(self, mandatoryparams, advisoryparams):
718 718 """internal function to setup all logic related parameters"""
719 719 # make it read only to prevent people touching it by mistake.
720 720 self.mandatoryparams = tuple(mandatoryparams)
721 721 self.advisoryparams = tuple(advisoryparams)
722 722 # user friendly UI
723 723 self.params = dict(self.mandatoryparams)
724 724 self.params.update(dict(self.advisoryparams))
725 725 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
726 726
727 727 def _readheader(self):
728 728 """read the header and setup the object"""
729 729 typesize = self._unpackheader(_fparttypesize)[0]
730 730 self.type = self._fromheader(typesize)
731 731 self.ui.debug('part type: "%s"\n' % self.type)
732 732 self.id = self._unpackheader(_fpartid)[0]
733 733 self.ui.debug('part id: "%s"\n' % self.id)
734 734 ## reading parameters
735 735 # param count
736 736 mancount, advcount = self._unpackheader(_fpartparamcount)
737 737 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
738 738 # param size
739 739 fparamsizes = _makefpartparamsizes(mancount + advcount)
740 740 paramsizes = self._unpackheader(fparamsizes)
741 741 # make it a list of couple again
742 742 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
743 743 # split mandatory from advisory
744 744 mansizes = paramsizes[:mancount]
745 745 advsizes = paramsizes[mancount:]
746 746 # retrive param value
747 747 manparams = []
748 748 for key, value in mansizes:
749 749 manparams.append((self._fromheader(key), self._fromheader(value)))
750 750 advparams = []
751 751 for key, value in advsizes:
752 752 advparams.append((self._fromheader(key), self._fromheader(value)))
753 753 self._initparams(manparams, advparams)
754 754 ## part payload
755 755 def payloadchunks():
756 756 payloadsize = self._unpack(_fpayloadsize)[0]
757 757 self.ui.debug('payload chunk size: %i\n' % payloadsize)
758 758 while payloadsize:
759 759 yield self._readexact(payloadsize)
760 760 payloadsize = self._unpack(_fpayloadsize)[0]
761 761 self.ui.debug('payload chunk size: %i\n' % payloadsize)
762 762 self._payloadstream = util.chunkbuffer(payloadchunks())
763 763 # we read the data, tell it
764 764 self._initialized = True
765 765
766 766 def read(self, size=None):
767 767 """read payload data"""
768 768 if not self._initialized:
769 769 self._readheader()
770 770 if size is None:
771 771 data = self._payloadstream.read()
772 772 else:
773 773 data = self._payloadstream.read(size)
774 774 if size is None or len(data) < size:
775 775 self.consumed = True
776 776 return data
777 777
778 778 capabilities = {'HG2X': (),
779 779 'b2x:listkeys': (),
780 780 'b2x:pushkey': (),
781 781 'b2x:changegroup': (),
782 782 }
783 783
784 def getrepocaps(repo):
785 """return the bundle2 capabilities for a given repo
786
787 Exists to allow extensions (like evolution) to mutate the
788 capabilities.
789 """
790 return capabilities
791
784 792 def bundle2caps(remote):
785 793 """return the bundlecapabilities of a peer as dict"""
786 794 raw = remote.capable('bundle2-exp')
787 795 if not raw and raw != '':
788 796 return {}
789 797 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
790 798 return decodecaps(capsblob)
791 799
792 800 @parthandler('b2x:changegroup')
793 801 def handlechangegroup(op, inpart):
794 802 """apply a changegroup part on the repo
795 803
796 804 This is a very early implementation that will massive rework before being
797 805 inflicted to any end-user.
798 806 """
799 807 # Make sure we trigger a transaction creation
800 808 #
801 809 # The addchangegroup function will get a transaction object by itself, but
802 810 # we need to make sure we trigger the creation of a transaction object used
803 811 # for the whole processing scope.
804 812 op.gettransaction()
805 813 cg = changegroup.unbundle10(inpart, 'UN')
806 814 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
807 815 op.records.add('changegroup', {'return': ret})
808 816 if op.reply is not None:
809 817 # This is definitly not the final form of this
810 818 # return. But one need to start somewhere.
811 819 part = op.reply.newpart('b2x:reply:changegroup')
812 820 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
813 821 part.addparam('return', '%i' % ret, mandatory=False)
814 822 assert not inpart.read()
815 823
816 824 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
817 825 def handlechangegroup(op, inpart):
818 826 ret = int(inpart.params['return'])
819 827 replyto = int(inpart.params['in-reply-to'])
820 828 op.records.add('changegroup', {'return': ret}, replyto)
821 829
822 830 @parthandler('b2x:check:heads')
823 831 def handlechangegroup(op, inpart):
824 832 """check that head of the repo did not change
825 833
826 834 This is used to detect a push race when using unbundle.
827 835 This replaces the "heads" argument of unbundle."""
828 836 h = inpart.read(20)
829 837 heads = []
830 838 while len(h) == 20:
831 839 heads.append(h)
832 840 h = inpart.read(20)
833 841 assert not h
834 842 if heads != op.repo.heads():
835 843 raise error.PushRaced('repository changed while pushing - '
836 844 'please try again')
837 845
838 846 @parthandler('b2x:output')
839 847 def handleoutput(op, inpart):
840 848 """forward output captured on the server to the client"""
841 849 for line in inpart.read().splitlines():
842 850 op.ui.write(('remote: %s\n' % line))
843 851
844 852 @parthandler('b2x:replycaps')
845 853 def handlereplycaps(op, inpart):
846 854 """Notify that a reply bundle should be created
847 855
848 856 The payload contains the capabilities information for the reply"""
849 857 caps = decodecaps(inpart.read())
850 858 if op.reply is None:
851 859 op.reply = bundle20(op.ui, caps)
852 860
853 861 @parthandler('b2x:error:abort', ('message', 'hint'))
854 862 def handlereplycaps(op, inpart):
855 863 """Used to transmit abort error over the wire"""
856 864 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
857 865
858 866 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
859 867 def handlereplycaps(op, inpart):
860 868 """Used to transmit unknown content error over the wire"""
861 869 kwargs = {}
862 870 parttype = inpart.params.get('parttype')
863 871 if parttype is not None:
864 872 kwargs['parttype'] = parttype
865 873 params = inpart.params.get('params')
866 874 if params is not None:
867 875 kwargs['params'] = params.split('\0')
868 876
869 877 raise error.BundleValueError(**kwargs)
870 878
871 879 @parthandler('b2x:error:pushraced', ('message',))
872 880 def handlereplycaps(op, inpart):
873 881 """Used to transmit push race error over the wire"""
874 882 raise error.ResponseError(_('push failed:'), inpart.params['message'])
875 883
876 884 @parthandler('b2x:listkeys', ('namespace',))
877 885 def handlelistkeys(op, inpart):
878 886 """retrieve pushkey namespace content stored in a bundle2"""
879 887 namespace = inpart.params['namespace']
880 888 r = pushkey.decodekeys(inpart.read())
881 889 op.records.add('listkeys', (namespace, r))
882 890
883 891 @parthandler('b2x:pushkey', ('namespace', 'key', 'old', 'new'))
884 892 def handlepushkey(op, inpart):
885 893 """process a pushkey request"""
886 894 dec = pushkey.decode
887 895 namespace = dec(inpart.params['namespace'])
888 896 key = dec(inpart.params['key'])
889 897 old = dec(inpart.params['old'])
890 898 new = dec(inpart.params['new'])
891 899 ret = op.repo.pushkey(namespace, key, old, new)
892 900 record = {'namespace': namespace,
893 901 'key': key,
894 902 'old': old,
895 903 'new': new}
896 904 op.records.add('pushkey', record)
897 905 if op.reply is not None:
898 906 rpart = op.reply.newpart('b2x:reply:pushkey')
899 907 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
900 908 rpart.addparam('return', '%i' % ret, mandatory=False)
901 909
902 910 @parthandler('b2x:reply:pushkey', ('return', 'in-reply-to'))
903 911 def handlepushkeyreply(op, inpart):
904 912 """retrieve the result of a pushkey request"""
905 913 ret = int(inpart.params['return'])
906 914 partid = int(inpart.params['in-reply-to'])
907 915 op.records.add('pushkey', {'return': ret}, partid)
908 916
909 917 @parthandler('b2x:obsmarkers')
910 918 def handleobsmarker(op, inpart):
911 919 """add a stream of obsmarkers to the repo"""
912 920 tr = op.gettransaction()
913 921 new = op.repo.obsstore.mergemarkers(tr, inpart.read())
914 922 if new:
915 923 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
916 924 op.records.add('obsmarkers', {'new': new})
917 925 if op.reply is not None:
918 926 rpart = op.reply.newpart('b2x:reply:obsmarkers')
919 927 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
920 928 rpart.addparam('new', '%i' % new, mandatory=False)
921 929
922 930
923 931 @parthandler('b2x:reply:obsmarkers', ('new', 'in-reply-to'))
924 932 def handlepushkeyreply(op, inpart):
925 933 """retrieve the result of a pushkey request"""
926 934 ret = int(inpart.params['new'])
927 935 partid = int(inpart.params['in-reply-to'])
928 936 op.records.add('obsmarkers', {'new': ret}, partid)
@@ -1,1031 +1,1031
1 1 # exchange.py - utility to exchange data between repos.
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from i18n import _
9 9 from node import hex, nullid
10 10 import errno, urllib
11 11 import util, scmutil, changegroup, base85, error
12 12 import discovery, phases, obsolete, bookmarks, bundle2, pushkey
13 13
14 14 def readbundle(ui, fh, fname, vfs=None):
15 15 header = changegroup.readexactly(fh, 4)
16 16
17 17 alg = None
18 18 if not fname:
19 19 fname = "stream"
20 20 if not header.startswith('HG') and header.startswith('\0'):
21 21 fh = changegroup.headerlessfixup(fh, header)
22 22 header = "HG10"
23 23 alg = 'UN'
24 24 elif vfs:
25 25 fname = vfs.join(fname)
26 26
27 27 magic, version = header[0:2], header[2:4]
28 28
29 29 if magic != 'HG':
30 30 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
31 31 if version == '10':
32 32 if alg is None:
33 33 alg = changegroup.readexactly(fh, 2)
34 34 return changegroup.unbundle10(fh, alg)
35 35 elif version == '2X':
36 36 return bundle2.unbundle20(ui, fh, header=magic + version)
37 37 else:
38 38 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
39 39
40 40
41 41 class pushoperation(object):
42 42 """A object that represent a single push operation
43 43
44 44 It purpose is to carry push related state and very common operation.
45 45
46 46 A new should be created at the beginning of each push and discarded
47 47 afterward.
48 48 """
49 49
50 50 def __init__(self, repo, remote, force=False, revs=None, newbranch=False):
51 51 # repo we push from
52 52 self.repo = repo
53 53 self.ui = repo.ui
54 54 # repo we push to
55 55 self.remote = remote
56 56 # force option provided
57 57 self.force = force
58 58 # revs to be pushed (None is "all")
59 59 self.revs = revs
60 60 # allow push of new branch
61 61 self.newbranch = newbranch
62 62 # did a local lock get acquired?
63 63 self.locallocked = None
64 64 # step already performed
65 65 # (used to check what steps have been already performed through bundle2)
66 66 self.stepsdone = set()
67 67 # Integer version of the push result
68 68 # - None means nothing to push
69 69 # - 0 means HTTP error
70 70 # - 1 means we pushed and remote head count is unchanged *or*
71 71 # we have outgoing changesets but refused to push
72 72 # - other values as described by addchangegroup()
73 73 self.ret = None
74 74 # discover.outgoing object (contains common and outgoing data)
75 75 self.outgoing = None
76 76 # all remote heads before the push
77 77 self.remoteheads = None
78 78 # testable as a boolean indicating if any nodes are missing locally.
79 79 self.incoming = None
80 80 # phases changes that must be pushed along side the changesets
81 81 self.outdatedphases = None
82 82 # phases changes that must be pushed if changeset push fails
83 83 self.fallbackoutdatedphases = None
84 84 # outgoing obsmarkers
85 85 self.outobsmarkers = set()
86 86 # outgoing bookmarks
87 87 self.outbookmarks = []
88 88
89 89 @util.propertycache
90 90 def futureheads(self):
91 91 """future remote heads if the changeset push succeeds"""
92 92 return self.outgoing.missingheads
93 93
94 94 @util.propertycache
95 95 def fallbackheads(self):
96 96 """future remote heads if the changeset push fails"""
97 97 if self.revs is None:
98 98 # not target to push, all common are relevant
99 99 return self.outgoing.commonheads
100 100 unfi = self.repo.unfiltered()
101 101 # I want cheads = heads(::missingheads and ::commonheads)
102 102 # (missingheads is revs with secret changeset filtered out)
103 103 #
104 104 # This can be expressed as:
105 105 # cheads = ( (missingheads and ::commonheads)
106 106 # + (commonheads and ::missingheads))"
107 107 # )
108 108 #
109 109 # while trying to push we already computed the following:
110 110 # common = (::commonheads)
111 111 # missing = ((commonheads::missingheads) - commonheads)
112 112 #
113 113 # We can pick:
114 114 # * missingheads part of common (::commonheads)
115 115 common = set(self.outgoing.common)
116 116 nm = self.repo.changelog.nodemap
117 117 cheads = [node for node in self.revs if nm[node] in common]
118 118 # and
119 119 # * commonheads parents on missing
120 120 revset = unfi.set('%ln and parents(roots(%ln))',
121 121 self.outgoing.commonheads,
122 122 self.outgoing.missing)
123 123 cheads.extend(c.node() for c in revset)
124 124 return cheads
125 125
126 126 @property
127 127 def commonheads(self):
128 128 """set of all common heads after changeset bundle push"""
129 129 if self.ret:
130 130 return self.futureheads
131 131 else:
132 132 return self.fallbackheads
133 133
134 134 def push(repo, remote, force=False, revs=None, newbranch=False):
135 135 '''Push outgoing changesets (limited by revs) from a local
136 136 repository to remote. Return an integer:
137 137 - None means nothing to push
138 138 - 0 means HTTP error
139 139 - 1 means we pushed and remote head count is unchanged *or*
140 140 we have outgoing changesets but refused to push
141 141 - other values as described by addchangegroup()
142 142 '''
143 143 pushop = pushoperation(repo, remote, force, revs, newbranch)
144 144 if pushop.remote.local():
145 145 missing = (set(pushop.repo.requirements)
146 146 - pushop.remote.local().supported)
147 147 if missing:
148 148 msg = _("required features are not"
149 149 " supported in the destination:"
150 150 " %s") % (', '.join(sorted(missing)))
151 151 raise util.Abort(msg)
152 152
153 153 # there are two ways to push to remote repo:
154 154 #
155 155 # addchangegroup assumes local user can lock remote
156 156 # repo (local filesystem, old ssh servers).
157 157 #
158 158 # unbundle assumes local user cannot lock remote repo (new ssh
159 159 # servers, http servers).
160 160
161 161 if not pushop.remote.canpush():
162 162 raise util.Abort(_("destination does not support push"))
163 163 # get local lock as we might write phase data
164 164 locallock = None
165 165 try:
166 166 locallock = pushop.repo.lock()
167 167 pushop.locallocked = True
168 168 except IOError, err:
169 169 pushop.locallocked = False
170 170 if err.errno != errno.EACCES:
171 171 raise
172 172 # source repo cannot be locked.
173 173 # We do not abort the push, but just disable the local phase
174 174 # synchronisation.
175 175 msg = 'cannot lock source repository: %s\n' % err
176 176 pushop.ui.debug(msg)
177 177 try:
178 178 pushop.repo.checkpush(pushop)
179 179 lock = None
180 180 unbundle = pushop.remote.capable('unbundle')
181 181 if not unbundle:
182 182 lock = pushop.remote.lock()
183 183 try:
184 184 _pushdiscovery(pushop)
185 185 if (pushop.repo.ui.configbool('experimental', 'bundle2-exp',
186 186 False)
187 187 and pushop.remote.capable('bundle2-exp')):
188 188 _pushbundle2(pushop)
189 189 _pushchangeset(pushop)
190 190 _pushsyncphase(pushop)
191 191 _pushobsolete(pushop)
192 192 _pushbookmark(pushop)
193 193 finally:
194 194 if lock is not None:
195 195 lock.release()
196 196 finally:
197 197 if locallock is not None:
198 198 locallock.release()
199 199
200 200 return pushop.ret
201 201
202 202 # list of steps to perform discovery before push
203 203 pushdiscoveryorder = []
204 204
205 205 # Mapping between step name and function
206 206 #
207 207 # This exists to help extensions wrap steps if necessary
208 208 pushdiscoverymapping = {}
209 209
210 210 def pushdiscovery(stepname):
211 211 """decorator for function performing discovery before push
212 212
213 213 The function is added to the step -> function mapping and appended to the
214 214 list of steps. Beware that decorated function will be added in order (this
215 215 may matter).
216 216
217 217 You can only use this decorator for a new step, if you want to wrap a step
218 218 from an extension, change the pushdiscovery dictionary directly."""
219 219 def dec(func):
220 220 assert stepname not in pushdiscoverymapping
221 221 pushdiscoverymapping[stepname] = func
222 222 pushdiscoveryorder.append(stepname)
223 223 return func
224 224 return dec
225 225
226 226 def _pushdiscovery(pushop):
227 227 """Run all discovery steps"""
228 228 for stepname in pushdiscoveryorder:
229 229 step = pushdiscoverymapping[stepname]
230 230 step(pushop)
231 231
232 232 @pushdiscovery('changeset')
233 233 def _pushdiscoverychangeset(pushop):
234 234 """discover the changeset that need to be pushed"""
235 235 unfi = pushop.repo.unfiltered()
236 236 fci = discovery.findcommonincoming
237 237 commoninc = fci(unfi, pushop.remote, force=pushop.force)
238 238 common, inc, remoteheads = commoninc
239 239 fco = discovery.findcommonoutgoing
240 240 outgoing = fco(unfi, pushop.remote, onlyheads=pushop.revs,
241 241 commoninc=commoninc, force=pushop.force)
242 242 pushop.outgoing = outgoing
243 243 pushop.remoteheads = remoteheads
244 244 pushop.incoming = inc
245 245
246 246 @pushdiscovery('phase')
247 247 def _pushdiscoveryphase(pushop):
248 248 """discover the phase that needs to be pushed
249 249
250 250 (computed for both success and failure case for changesets push)"""
251 251 outgoing = pushop.outgoing
252 252 unfi = pushop.repo.unfiltered()
253 253 remotephases = pushop.remote.listkeys('phases')
254 254 publishing = remotephases.get('publishing', False)
255 255 ana = phases.analyzeremotephases(pushop.repo,
256 256 pushop.fallbackheads,
257 257 remotephases)
258 258 pheads, droots = ana
259 259 extracond = ''
260 260 if not publishing:
261 261 extracond = ' and public()'
262 262 revset = 'heads((%%ln::%%ln) %s)' % extracond
263 263 # Get the list of all revs draft on remote by public here.
264 264 # XXX Beware that revset break if droots is not strictly
265 265 # XXX root we may want to ensure it is but it is costly
266 266 fallback = list(unfi.set(revset, droots, pushop.fallbackheads))
267 267 if not outgoing.missing:
268 268 future = fallback
269 269 else:
270 270 # adds changeset we are going to push as draft
271 271 #
272 272 # should not be necessary for pushblishing server, but because of an
273 273 # issue fixed in xxxxx we have to do it anyway.
274 274 fdroots = list(unfi.set('roots(%ln + %ln::)',
275 275 outgoing.missing, droots))
276 276 fdroots = [f.node() for f in fdroots]
277 277 future = list(unfi.set(revset, fdroots, pushop.futureheads))
278 278 pushop.outdatedphases = future
279 279 pushop.fallbackoutdatedphases = fallback
280 280
281 281 @pushdiscovery('obsmarker')
282 282 def _pushdiscoveryobsmarkers(pushop):
283 283 if (obsolete._enabled
284 284 and pushop.repo.obsstore
285 285 and 'obsolete' in pushop.remote.listkeys('namespaces')):
286 286 pushop.outobsmarkers = pushop.repo.obsstore
287 287
288 288 @pushdiscovery('bookmarks')
289 289 def _pushdiscoverybookmarks(pushop):
290 290 ui = pushop.ui
291 291 repo = pushop.repo.unfiltered()
292 292 remote = pushop.remote
293 293 ui.debug("checking for updated bookmarks\n")
294 294 ancestors = ()
295 295 if pushop.revs:
296 296 revnums = map(repo.changelog.rev, pushop.revs)
297 297 ancestors = repo.changelog.ancestors(revnums, inclusive=True)
298 298 remotebookmark = remote.listkeys('bookmarks')
299 299
300 300 comp = bookmarks.compare(repo, repo._bookmarks, remotebookmark, srchex=hex)
301 301 addsrc, adddst, advsrc, advdst, diverge, differ, invalid = comp
302 302 for b, scid, dcid in advsrc:
303 303 if not ancestors or repo[scid].rev() in ancestors:
304 304 pushop.outbookmarks.append((b, dcid, scid))
305 305
306 306 def _pushcheckoutgoing(pushop):
307 307 outgoing = pushop.outgoing
308 308 unfi = pushop.repo.unfiltered()
309 309 if not outgoing.missing:
310 310 # nothing to push
311 311 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
312 312 return False
313 313 # something to push
314 314 if not pushop.force:
315 315 # if repo.obsstore == False --> no obsolete
316 316 # then, save the iteration
317 317 if unfi.obsstore:
318 318 # this message are here for 80 char limit reason
319 319 mso = _("push includes obsolete changeset: %s!")
320 320 mst = "push includes %s changeset: %s!"
321 321 # plain versions for i18n tool to detect them
322 322 _("push includes unstable changeset: %s!")
323 323 _("push includes bumped changeset: %s!")
324 324 _("push includes divergent changeset: %s!")
325 325 # If we are to push if there is at least one
326 326 # obsolete or unstable changeset in missing, at
327 327 # least one of the missinghead will be obsolete or
328 328 # unstable. So checking heads only is ok
329 329 for node in outgoing.missingheads:
330 330 ctx = unfi[node]
331 331 if ctx.obsolete():
332 332 raise util.Abort(mso % ctx)
333 333 elif ctx.troubled():
334 334 raise util.Abort(_(mst)
335 335 % (ctx.troubles()[0],
336 336 ctx))
337 337 newbm = pushop.ui.configlist('bookmarks', 'pushing')
338 338 discovery.checkheads(unfi, pushop.remote, outgoing,
339 339 pushop.remoteheads,
340 340 pushop.newbranch,
341 341 bool(pushop.incoming),
342 342 newbm)
343 343 return True
344 344
345 345 # List of names of steps to perform for an outgoing bundle2, order matters.
346 346 b2partsgenorder = []
347 347
348 348 # Mapping between step name and function
349 349 #
350 350 # This exists to help extensions wrap steps if necessary
351 351 b2partsgenmapping = {}
352 352
353 353 def b2partsgenerator(stepname):
354 354 """decorator for function generating bundle2 part
355 355
356 356 The function is added to the step -> function mapping and appended to the
357 357 list of steps. Beware that decorated functions will be added in order
358 358 (this may matter).
359 359
360 360 You can only use this decorator for new steps, if you want to wrap a step
361 361 from an extension, attack the b2partsgenmapping dictionary directly."""
362 362 def dec(func):
363 363 assert stepname not in b2partsgenmapping
364 364 b2partsgenmapping[stepname] = func
365 365 b2partsgenorder.append(stepname)
366 366 return func
367 367 return dec
368 368
369 369 @b2partsgenerator('changeset')
370 370 def _pushb2ctx(pushop, bundler):
371 371 """handle changegroup push through bundle2
372 372
373 373 addchangegroup result is stored in the ``pushop.ret`` attribute.
374 374 """
375 375 if 'changesets' in pushop.stepsdone:
376 376 return
377 377 pushop.stepsdone.add('changesets')
378 378 # Send known heads to the server for race detection.
379 379 if not _pushcheckoutgoing(pushop):
380 380 return
381 381 pushop.repo.prepushoutgoinghooks(pushop.repo,
382 382 pushop.remote,
383 383 pushop.outgoing)
384 384 if not pushop.force:
385 385 bundler.newpart('B2X:CHECK:HEADS', data=iter(pushop.remoteheads))
386 386 cg = changegroup.getlocalbundle(pushop.repo, 'push', pushop.outgoing)
387 387 cgpart = bundler.newpart('B2X:CHANGEGROUP', data=cg.getchunks())
388 388 def handlereply(op):
389 389 """extract addchangroup returns from server reply"""
390 390 cgreplies = op.records.getreplies(cgpart.id)
391 391 assert len(cgreplies['changegroup']) == 1
392 392 pushop.ret = cgreplies['changegroup'][0]['return']
393 393 return handlereply
394 394
395 395 @b2partsgenerator('phase')
396 396 def _pushb2phases(pushop, bundler):
397 397 """handle phase push through bundle2"""
398 398 if 'phases' in pushop.stepsdone:
399 399 return
400 400 b2caps = bundle2.bundle2caps(pushop.remote)
401 401 if not 'b2x:pushkey' in b2caps:
402 402 return
403 403 pushop.stepsdone.add('phases')
404 404 part2node = []
405 405 enc = pushkey.encode
406 406 for newremotehead in pushop.outdatedphases:
407 407 part = bundler.newpart('b2x:pushkey')
408 408 part.addparam('namespace', enc('phases'))
409 409 part.addparam('key', enc(newremotehead.hex()))
410 410 part.addparam('old', enc(str(phases.draft)))
411 411 part.addparam('new', enc(str(phases.public)))
412 412 part2node.append((part.id, newremotehead))
413 413 def handlereply(op):
414 414 for partid, node in part2node:
415 415 partrep = op.records.getreplies(partid)
416 416 results = partrep['pushkey']
417 417 assert len(results) <= 1
418 418 msg = None
419 419 if not results:
420 420 msg = _('server ignored update of %s to public!\n') % node
421 421 elif not int(results[0]['return']):
422 422 msg = _('updating %s to public failed!\n') % node
423 423 if msg is not None:
424 424 pushop.ui.warn(msg)
425 425 return handlereply
426 426
427 427 @b2partsgenerator('bookmarks')
428 428 def _pushb2bookmarks(pushop, bundler):
429 429 """handle phase push through bundle2"""
430 430 if 'bookmarks' in pushop.stepsdone:
431 431 return
432 432 b2caps = bundle2.bundle2caps(pushop.remote)
433 433 if 'b2x:pushkey' not in b2caps:
434 434 return
435 435 pushop.stepsdone.add('bookmarks')
436 436 part2book = []
437 437 enc = pushkey.encode
438 438 for book, old, new in pushop.outbookmarks:
439 439 part = bundler.newpart('b2x:pushkey')
440 440 part.addparam('namespace', enc('bookmarks'))
441 441 part.addparam('key', enc(book))
442 442 part.addparam('old', enc(old))
443 443 part.addparam('new', enc(new))
444 444 part2book.append((part.id, book))
445 445 def handlereply(op):
446 446 for partid, book in part2book:
447 447 partrep = op.records.getreplies(partid)
448 448 results = partrep['pushkey']
449 449 assert len(results) <= 1
450 450 if not results:
451 451 pushop.ui.warn(_('server ignored bookmark %s update\n') % book)
452 452 else:
453 453 ret = int(results[0]['return'])
454 454 if ret:
455 455 pushop.ui.status(_("updating bookmark %s\n") % book)
456 456 else:
457 457 pushop.ui.warn(_('updating bookmark %s failed!\n') % book)
458 458 return handlereply
459 459
460 460
461 461 def _pushbundle2(pushop):
462 462 """push data to the remote using bundle2
463 463
464 464 The only currently supported type of data is changegroup but this will
465 465 evolve in the future."""
466 466 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
467 467 # create reply capability
468 capsblob = bundle2.encodecaps(bundle2.capabilities)
468 capsblob = bundle2.encodecaps(bundle2.getrepocaps(pushop.repo))
469 469 bundler.newpart('b2x:replycaps', data=capsblob)
470 470 replyhandlers = []
471 471 for partgenname in b2partsgenorder:
472 472 partgen = b2partsgenmapping[partgenname]
473 473 ret = partgen(pushop, bundler)
474 474 if callable(ret):
475 475 replyhandlers.append(ret)
476 476 # do not push if nothing to push
477 477 if bundler.nbparts <= 1:
478 478 return
479 479 stream = util.chunkbuffer(bundler.getchunks())
480 480 try:
481 481 reply = pushop.remote.unbundle(stream, ['force'], 'push')
482 482 except error.BundleValueError, exc:
483 483 raise util.Abort('missing support for %s' % exc)
484 484 try:
485 485 op = bundle2.processbundle(pushop.repo, reply)
486 486 except error.BundleValueError, exc:
487 487 raise util.Abort('missing support for %s' % exc)
488 488 for rephand in replyhandlers:
489 489 rephand(op)
490 490
491 491 def _pushchangeset(pushop):
492 492 """Make the actual push of changeset bundle to remote repo"""
493 493 if 'changesets' in pushop.stepsdone:
494 494 return
495 495 pushop.stepsdone.add('changesets')
496 496 if not _pushcheckoutgoing(pushop):
497 497 return
498 498 pushop.repo.prepushoutgoinghooks(pushop.repo,
499 499 pushop.remote,
500 500 pushop.outgoing)
501 501 outgoing = pushop.outgoing
502 502 unbundle = pushop.remote.capable('unbundle')
503 503 # TODO: get bundlecaps from remote
504 504 bundlecaps = None
505 505 # create a changegroup from local
506 506 if pushop.revs is None and not (outgoing.excluded
507 507 or pushop.repo.changelog.filteredrevs):
508 508 # push everything,
509 509 # use the fast path, no race possible on push
510 510 bundler = changegroup.bundle10(pushop.repo, bundlecaps)
511 511 cg = changegroup.getsubset(pushop.repo,
512 512 outgoing,
513 513 bundler,
514 514 'push',
515 515 fastpath=True)
516 516 else:
517 517 cg = changegroup.getlocalbundle(pushop.repo, 'push', outgoing,
518 518 bundlecaps)
519 519
520 520 # apply changegroup to remote
521 521 if unbundle:
522 522 # local repo finds heads on server, finds out what
523 523 # revs it must push. once revs transferred, if server
524 524 # finds it has different heads (someone else won
525 525 # commit/push race), server aborts.
526 526 if pushop.force:
527 527 remoteheads = ['force']
528 528 else:
529 529 remoteheads = pushop.remoteheads
530 530 # ssh: return remote's addchangegroup()
531 531 # http: return remote's addchangegroup() or 0 for error
532 532 pushop.ret = pushop.remote.unbundle(cg, remoteheads,
533 533 pushop.repo.url())
534 534 else:
535 535 # we return an integer indicating remote head count
536 536 # change
537 537 pushop.ret = pushop.remote.addchangegroup(cg, 'push', pushop.repo.url())
538 538
539 539 def _pushsyncphase(pushop):
540 540 """synchronise phase information locally and remotely"""
541 541 cheads = pushop.commonheads
542 542 # even when we don't push, exchanging phase data is useful
543 543 remotephases = pushop.remote.listkeys('phases')
544 544 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
545 545 and remotephases # server supports phases
546 546 and pushop.ret is None # nothing was pushed
547 547 and remotephases.get('publishing', False)):
548 548 # When:
549 549 # - this is a subrepo push
550 550 # - and remote support phase
551 551 # - and no changeset was pushed
552 552 # - and remote is publishing
553 553 # We may be in issue 3871 case!
554 554 # We drop the possible phase synchronisation done by
555 555 # courtesy to publish changesets possibly locally draft
556 556 # on the remote.
557 557 remotephases = {'publishing': 'True'}
558 558 if not remotephases: # old server or public only reply from non-publishing
559 559 _localphasemove(pushop, cheads)
560 560 # don't push any phase data as there is nothing to push
561 561 else:
562 562 ana = phases.analyzeremotephases(pushop.repo, cheads,
563 563 remotephases)
564 564 pheads, droots = ana
565 565 ### Apply remote phase on local
566 566 if remotephases.get('publishing', False):
567 567 _localphasemove(pushop, cheads)
568 568 else: # publish = False
569 569 _localphasemove(pushop, pheads)
570 570 _localphasemove(pushop, cheads, phases.draft)
571 571 ### Apply local phase on remote
572 572
573 573 if pushop.ret:
574 574 if 'phases' in pushop.stepsdone:
575 575 # phases already pushed though bundle2
576 576 return
577 577 outdated = pushop.outdatedphases
578 578 else:
579 579 outdated = pushop.fallbackoutdatedphases
580 580
581 581 pushop.stepsdone.add('phases')
582 582
583 583 # filter heads already turned public by the push
584 584 outdated = [c for c in outdated if c.node() not in pheads]
585 585 b2caps = bundle2.bundle2caps(pushop.remote)
586 586 if 'b2x:pushkey' in b2caps:
587 587 # server supports bundle2, let's do a batched push through it
588 588 #
589 589 # This will eventually be unified with the changesets bundle2 push
590 590 bundler = bundle2.bundle20(pushop.ui, b2caps)
591 capsblob = bundle2.encodecaps(pushop.repo.bundle2caps)
591 capsblob = bundle2.encodecaps(bundle2.getrepocaps(pushop.repo))
592 592 bundler.newpart('b2x:replycaps', data=capsblob)
593 593 part2node = []
594 594 enc = pushkey.encode
595 595 for newremotehead in outdated:
596 596 part = bundler.newpart('b2x:pushkey')
597 597 part.addparam('namespace', enc('phases'))
598 598 part.addparam('key', enc(newremotehead.hex()))
599 599 part.addparam('old', enc(str(phases.draft)))
600 600 part.addparam('new', enc(str(phases.public)))
601 601 part2node.append((part.id, newremotehead))
602 602 stream = util.chunkbuffer(bundler.getchunks())
603 603 try:
604 604 reply = pushop.remote.unbundle(stream, ['force'], 'push')
605 605 op = bundle2.processbundle(pushop.repo, reply)
606 606 except error.BundleValueError, exc:
607 607 raise util.Abort('missing support for %s' % exc)
608 608 for partid, node in part2node:
609 609 partrep = op.records.getreplies(partid)
610 610 results = partrep['pushkey']
611 611 assert len(results) <= 1
612 612 msg = None
613 613 if not results:
614 614 msg = _('server ignored update of %s to public!\n') % node
615 615 elif not int(results[0]['return']):
616 616 msg = _('updating %s to public failed!\n') % node
617 617 if msg is not None:
618 618 pushop.ui.warn(msg)
619 619
620 620 else:
621 621 # fallback to independant pushkey command
622 622 for newremotehead in outdated:
623 623 r = pushop.remote.pushkey('phases',
624 624 newremotehead.hex(),
625 625 str(phases.draft),
626 626 str(phases.public))
627 627 if not r:
628 628 pushop.ui.warn(_('updating %s to public failed!\n')
629 629 % newremotehead)
630 630
631 631 def _localphasemove(pushop, nodes, phase=phases.public):
632 632 """move <nodes> to <phase> in the local source repo"""
633 633 if pushop.locallocked:
634 634 tr = pushop.repo.transaction('push-phase-sync')
635 635 try:
636 636 phases.advanceboundary(pushop.repo, tr, phase, nodes)
637 637 tr.close()
638 638 finally:
639 639 tr.release()
640 640 else:
641 641 # repo is not locked, do not change any phases!
642 642 # Informs the user that phases should have been moved when
643 643 # applicable.
644 644 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
645 645 phasestr = phases.phasenames[phase]
646 646 if actualmoves:
647 647 pushop.ui.status(_('cannot lock source repo, skipping '
648 648 'local %s phase update\n') % phasestr)
649 649
650 650 def _pushobsolete(pushop):
651 651 """utility function to push obsolete markers to a remote"""
652 652 if 'obsmarkers' in pushop.stepsdone:
653 653 return
654 654 pushop.ui.debug('try to push obsolete markers to remote\n')
655 655 repo = pushop.repo
656 656 remote = pushop.remote
657 657 pushop.stepsdone.add('obsmarkers')
658 658 if (pushop.outobsmarkers):
659 659 rslts = []
660 660 remotedata = obsolete._pushkeyescape(pushop.outobsmarkers)
661 661 for key in sorted(remotedata, reverse=True):
662 662 # reverse sort to ensure we end with dump0
663 663 data = remotedata[key]
664 664 rslts.append(remote.pushkey('obsolete', key, '', data))
665 665 if [r for r in rslts if not r]:
666 666 msg = _('failed to push some obsolete markers!\n')
667 667 repo.ui.warn(msg)
668 668
669 669 def _pushbookmark(pushop):
670 670 """Update bookmark position on remote"""
671 671 if pushop.ret == 0 or 'bookmarks' in pushop.stepsdone:
672 672 return
673 673 pushop.stepsdone.add('bookmarks')
674 674 ui = pushop.ui
675 675 remote = pushop.remote
676 676 for b, old, new in pushop.outbookmarks:
677 677 if remote.pushkey('bookmarks', b, old, new):
678 678 ui.status(_("updating bookmark %s\n") % b)
679 679 else:
680 680 ui.warn(_('updating bookmark %s failed!\n') % b)
681 681
682 682 class pulloperation(object):
683 683 """A object that represent a single pull operation
684 684
685 685 It purpose is to carry push related state and very common operation.
686 686
687 687 A new should be created at the beginning of each pull and discarded
688 688 afterward.
689 689 """
690 690
691 691 def __init__(self, repo, remote, heads=None, force=False):
692 692 # repo we pull into
693 693 self.repo = repo
694 694 # repo we pull from
695 695 self.remote = remote
696 696 # revision we try to pull (None is "all")
697 697 self.heads = heads
698 698 # do we force pull?
699 699 self.force = force
700 700 # the name the pull transaction
701 701 self._trname = 'pull\n' + util.hidepassword(remote.url())
702 702 # hold the transaction once created
703 703 self._tr = None
704 704 # set of common changeset between local and remote before pull
705 705 self.common = None
706 706 # set of pulled head
707 707 self.rheads = None
708 708 # list of missing changeset to fetch remotely
709 709 self.fetch = None
710 710 # result of changegroup pulling (used as return code by pull)
711 711 self.cgresult = None
712 712 # list of step remaining todo (related to future bundle2 usage)
713 713 self.todosteps = set(['changegroup', 'phases', 'obsmarkers'])
714 714
715 715 @util.propertycache
716 716 def pulledsubset(self):
717 717 """heads of the set of changeset target by the pull"""
718 718 # compute target subset
719 719 if self.heads is None:
720 720 # We pulled every thing possible
721 721 # sync on everything common
722 722 c = set(self.common)
723 723 ret = list(self.common)
724 724 for n in self.rheads:
725 725 if n not in c:
726 726 ret.append(n)
727 727 return ret
728 728 else:
729 729 # We pulled a specific subset
730 730 # sync on this subset
731 731 return self.heads
732 732
733 733 def gettransaction(self):
734 734 """get appropriate pull transaction, creating it if needed"""
735 735 if self._tr is None:
736 736 self._tr = self.repo.transaction(self._trname)
737 737 return self._tr
738 738
739 739 def closetransaction(self):
740 740 """close transaction if created"""
741 741 if self._tr is not None:
742 742 self._tr.close()
743 743
744 744 def releasetransaction(self):
745 745 """release transaction if created"""
746 746 if self._tr is not None:
747 747 self._tr.release()
748 748
749 749 def pull(repo, remote, heads=None, force=False):
750 750 pullop = pulloperation(repo, remote, heads, force)
751 751 if pullop.remote.local():
752 752 missing = set(pullop.remote.requirements) - pullop.repo.supported
753 753 if missing:
754 754 msg = _("required features are not"
755 755 " supported in the destination:"
756 756 " %s") % (', '.join(sorted(missing)))
757 757 raise util.Abort(msg)
758 758
759 759 lock = pullop.repo.lock()
760 760 try:
761 761 _pulldiscovery(pullop)
762 762 if (pullop.repo.ui.configbool('experimental', 'bundle2-exp', False)
763 763 and pullop.remote.capable('bundle2-exp')):
764 764 _pullbundle2(pullop)
765 765 if 'changegroup' in pullop.todosteps:
766 766 _pullchangeset(pullop)
767 767 if 'phases' in pullop.todosteps:
768 768 _pullphase(pullop)
769 769 if 'obsmarkers' in pullop.todosteps:
770 770 _pullobsolete(pullop)
771 771 pullop.closetransaction()
772 772 finally:
773 773 pullop.releasetransaction()
774 774 lock.release()
775 775
776 776 return pullop.cgresult
777 777
778 778 def _pulldiscovery(pullop):
779 779 """discovery phase for the pull
780 780
781 781 Current handle changeset discovery only, will change handle all discovery
782 782 at some point."""
783 783 tmp = discovery.findcommonincoming(pullop.repo.unfiltered(),
784 784 pullop.remote,
785 785 heads=pullop.heads,
786 786 force=pullop.force)
787 787 pullop.common, pullop.fetch, pullop.rheads = tmp
788 788
789 789 def _pullbundle2(pullop):
790 790 """pull data using bundle2
791 791
792 792 For now, the only supported data are changegroup."""
793 793 remotecaps = bundle2.bundle2caps(pullop.remote)
794 794 kwargs = {'bundlecaps': caps20to10(pullop.repo)}
795 795 # pulling changegroup
796 796 pullop.todosteps.remove('changegroup')
797 797
798 798 kwargs['common'] = pullop.common
799 799 kwargs['heads'] = pullop.heads or pullop.rheads
800 800 if 'b2x:listkeys' in remotecaps:
801 801 kwargs['listkeys'] = ['phase']
802 802 if not pullop.fetch:
803 803 pullop.repo.ui.status(_("no changes found\n"))
804 804 pullop.cgresult = 0
805 805 else:
806 806 if pullop.heads is None and list(pullop.common) == [nullid]:
807 807 pullop.repo.ui.status(_("requesting all changes\n"))
808 808 _pullbundle2extraprepare(pullop, kwargs)
809 809 if kwargs.keys() == ['format']:
810 810 return # nothing to pull
811 811 bundle = pullop.remote.getbundle('pull', **kwargs)
812 812 try:
813 813 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
814 814 except error.BundleValueError, exc:
815 815 raise util.Abort('missing support for %s' % exc)
816 816
817 817 if pullop.fetch:
818 818 assert len(op.records['changegroup']) == 1
819 819 pullop.cgresult = op.records['changegroup'][0]['return']
820 820
821 821 # processing phases change
822 822 for namespace, value in op.records['listkeys']:
823 823 if namespace == 'phases':
824 824 _pullapplyphases(pullop, value)
825 825
826 826 def _pullbundle2extraprepare(pullop, kwargs):
827 827 """hook function so that extensions can extend the getbundle call"""
828 828 pass
829 829
830 830 def _pullchangeset(pullop):
831 831 """pull changeset from unbundle into the local repo"""
832 832 # We delay the open of the transaction as late as possible so we
833 833 # don't open transaction for nothing or you break future useful
834 834 # rollback call
835 835 pullop.todosteps.remove('changegroup')
836 836 if not pullop.fetch:
837 837 pullop.repo.ui.status(_("no changes found\n"))
838 838 pullop.cgresult = 0
839 839 return
840 840 pullop.gettransaction()
841 841 if pullop.heads is None and list(pullop.common) == [nullid]:
842 842 pullop.repo.ui.status(_("requesting all changes\n"))
843 843 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
844 844 # issue1320, avoid a race if remote changed after discovery
845 845 pullop.heads = pullop.rheads
846 846
847 847 if pullop.remote.capable('getbundle'):
848 848 # TODO: get bundlecaps from remote
849 849 cg = pullop.remote.getbundle('pull', common=pullop.common,
850 850 heads=pullop.heads or pullop.rheads)
851 851 elif pullop.heads is None:
852 852 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
853 853 elif not pullop.remote.capable('changegroupsubset'):
854 854 raise util.Abort(_("partial pull cannot be done because "
855 855 "other repository doesn't support "
856 856 "changegroupsubset."))
857 857 else:
858 858 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
859 859 pullop.cgresult = changegroup.addchangegroup(pullop.repo, cg, 'pull',
860 860 pullop.remote.url())
861 861
862 862 def _pullphase(pullop):
863 863 # Get remote phases data from remote
864 864 remotephases = pullop.remote.listkeys('phases')
865 865 _pullapplyphases(pullop, remotephases)
866 866
867 867 def _pullapplyphases(pullop, remotephases):
868 868 """apply phase movement from observed remote state"""
869 869 pullop.todosteps.remove('phases')
870 870 publishing = bool(remotephases.get('publishing', False))
871 871 if remotephases and not publishing:
872 872 # remote is new and unpublishing
873 873 pheads, _dr = phases.analyzeremotephases(pullop.repo,
874 874 pullop.pulledsubset,
875 875 remotephases)
876 876 dheads = pullop.pulledsubset
877 877 else:
878 878 # Remote is old or publishing all common changesets
879 879 # should be seen as public
880 880 pheads = pullop.pulledsubset
881 881 dheads = []
882 882 unfi = pullop.repo.unfiltered()
883 883 phase = unfi._phasecache.phase
884 884 rev = unfi.changelog.nodemap.get
885 885 public = phases.public
886 886 draft = phases.draft
887 887
888 888 # exclude changesets already public locally and update the others
889 889 pheads = [pn for pn in pheads if phase(unfi, rev(pn)) > public]
890 890 if pheads:
891 891 tr = pullop.gettransaction()
892 892 phases.advanceboundary(pullop.repo, tr, public, pheads)
893 893
894 894 # exclude changesets already draft locally and update the others
895 895 dheads = [pn for pn in dheads if phase(unfi, rev(pn)) > draft]
896 896 if dheads:
897 897 tr = pullop.gettransaction()
898 898 phases.advanceboundary(pullop.repo, tr, draft, dheads)
899 899
900 900 def _pullobsolete(pullop):
901 901 """utility function to pull obsolete markers from a remote
902 902
903 903 The `gettransaction` is function that return the pull transaction, creating
904 904 one if necessary. We return the transaction to inform the calling code that
905 905 a new transaction have been created (when applicable).
906 906
907 907 Exists mostly to allow overriding for experimentation purpose"""
908 908 pullop.todosteps.remove('obsmarkers')
909 909 tr = None
910 910 if obsolete._enabled:
911 911 pullop.repo.ui.debug('fetching remote obsolete markers\n')
912 912 remoteobs = pullop.remote.listkeys('obsolete')
913 913 if 'dump0' in remoteobs:
914 914 tr = pullop.gettransaction()
915 915 for key in sorted(remoteobs, reverse=True):
916 916 if key.startswith('dump'):
917 917 data = base85.b85decode(remoteobs[key])
918 918 pullop.repo.obsstore.mergemarkers(tr, data)
919 919 pullop.repo.invalidatevolatilesets()
920 920 return tr
921 921
922 922 def caps20to10(repo):
923 923 """return a set with appropriate options to use bundle20 during getbundle"""
924 924 caps = set(['HG2X'])
925 capsblob = bundle2.encodecaps(bundle2.capabilities)
925 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
926 926 caps.add('bundle2=' + urllib.quote(capsblob))
927 927 return caps
928 928
929 929 def getbundle(repo, source, heads=None, common=None, bundlecaps=None,
930 930 **kwargs):
931 931 """return a full bundle (with potentially multiple kind of parts)
932 932
933 933 Could be a bundle HG10 or a bundle HG2X depending on bundlecaps
934 934 passed. For now, the bundle can contain only changegroup, but this will
935 935 changes when more part type will be available for bundle2.
936 936
937 937 This is different from changegroup.getbundle that only returns an HG10
938 938 changegroup bundle. They may eventually get reunited in the future when we
939 939 have a clearer idea of the API we what to query different data.
940 940
941 941 The implementation is at a very early stage and will get massive rework
942 942 when the API of bundle is refined.
943 943 """
944 944 cg = None
945 945 if kwargs.get('cg', True):
946 946 # build changegroup bundle here.
947 947 cg = changegroup.getbundle(repo, source, heads=heads,
948 948 common=common, bundlecaps=bundlecaps)
949 949 elif 'HG2X' not in bundlecaps:
950 950 raise ValueError(_('request for bundle10 must include changegroup'))
951 951 if bundlecaps is None or 'HG2X' not in bundlecaps:
952 952 if kwargs:
953 953 raise ValueError(_('unsupported getbundle arguments: %s')
954 954 % ', '.join(sorted(kwargs.keys())))
955 955 return cg
956 956 # very crude first implementation,
957 957 # the bundle API will change and the generation will be done lazily.
958 958 b2caps = {}
959 959 for bcaps in bundlecaps:
960 960 if bcaps.startswith('bundle2='):
961 961 blob = urllib.unquote(bcaps[len('bundle2='):])
962 962 b2caps.update(bundle2.decodecaps(blob))
963 963 bundler = bundle2.bundle20(repo.ui, b2caps)
964 964 if cg:
965 965 bundler.newpart('b2x:changegroup', data=cg.getchunks())
966 966 listkeys = kwargs.get('listkeys', ())
967 967 for namespace in listkeys:
968 968 part = bundler.newpart('b2x:listkeys')
969 969 part.addparam('namespace', namespace)
970 970 keys = repo.listkeys(namespace).items()
971 971 part.data = pushkey.encodekeys(keys)
972 972 _getbundleextrapart(bundler, repo, source, heads=heads, common=common,
973 973 bundlecaps=bundlecaps, **kwargs)
974 974 return util.chunkbuffer(bundler.getchunks())
975 975
976 976 def _getbundleextrapart(bundler, repo, source, heads=None, common=None,
977 977 bundlecaps=None, **kwargs):
978 978 """hook function to let extensions add parts to the requested bundle"""
979 979 pass
980 980
981 981 def check_heads(repo, their_heads, context):
982 982 """check if the heads of a repo have been modified
983 983
984 984 Used by peer for unbundling.
985 985 """
986 986 heads = repo.heads()
987 987 heads_hash = util.sha1(''.join(sorted(heads))).digest()
988 988 if not (their_heads == ['force'] or their_heads == heads or
989 989 their_heads == ['hashed', heads_hash]):
990 990 # someone else committed/pushed/unbundled while we
991 991 # were transferring data
992 992 raise error.PushRaced('repository changed while %s - '
993 993 'please try again' % context)
994 994
995 995 def unbundle(repo, cg, heads, source, url):
996 996 """Apply a bundle to a repo.
997 997
998 998 this function makes sure the repo is locked during the application and have
999 999 mechanism to check that no push race occurred between the creation of the
1000 1000 bundle and its application.
1001 1001
1002 1002 If the push was raced as PushRaced exception is raised."""
1003 1003 r = 0
1004 1004 # need a transaction when processing a bundle2 stream
1005 1005 tr = None
1006 1006 lock = repo.lock()
1007 1007 try:
1008 1008 check_heads(repo, heads, 'uploading changes')
1009 1009 # push can proceed
1010 1010 if util.safehasattr(cg, 'params'):
1011 1011 try:
1012 1012 tr = repo.transaction('unbundle')
1013 1013 tr.hookargs['bundle2-exp'] = '1'
1014 1014 r = bundle2.processbundle(repo, cg, lambda: tr).reply
1015 1015 cl = repo.unfiltered().changelog
1016 1016 p = cl.writepending() and repo.root or ""
1017 1017 repo.hook('b2x-pretransactionclose', throw=True, source=source,
1018 1018 url=url, pending=p, **tr.hookargs)
1019 1019 tr.close()
1020 1020 repo.hook('b2x-transactionclose', source=source, url=url,
1021 1021 **tr.hookargs)
1022 1022 except Exception, exc:
1023 1023 exc.duringunbundle2 = True
1024 1024 raise
1025 1025 else:
1026 1026 r = changegroup.addchangegroup(repo, cg, source, url)
1027 1027 finally:
1028 1028 if tr is not None:
1029 1029 tr.release()
1030 1030 lock.release()
1031 1031 return r
@@ -1,1775 +1,1775
1 1 # localrepo.py - read/write repository class for mercurial
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 from node import hex, nullid, short
8 8 from i18n import _
9 9 import urllib
10 10 import peer, changegroup, subrepo, pushkey, obsolete, repoview
11 11 import changelog, dirstate, filelog, manifest, context, bookmarks, phases
12 12 import lock as lockmod
13 13 import transaction, store, encoding, exchange, bundle2
14 14 import scmutil, util, extensions, hook, error, revset
15 15 import match as matchmod
16 16 import merge as mergemod
17 17 import tags as tagsmod
18 18 from lock import release
19 19 import weakref, errno, os, time, inspect
20 20 import branchmap, pathutil
21 21 propertycache = util.propertycache
22 22 filecache = scmutil.filecache
23 23
24 24 class repofilecache(filecache):
25 25 """All filecache usage on repo are done for logic that should be unfiltered
26 26 """
27 27
28 28 def __get__(self, repo, type=None):
29 29 return super(repofilecache, self).__get__(repo.unfiltered(), type)
30 30 def __set__(self, repo, value):
31 31 return super(repofilecache, self).__set__(repo.unfiltered(), value)
32 32 def __delete__(self, repo):
33 33 return super(repofilecache, self).__delete__(repo.unfiltered())
34 34
35 35 class storecache(repofilecache):
36 36 """filecache for files in the store"""
37 37 def join(self, obj, fname):
38 38 return obj.sjoin(fname)
39 39
40 40 class unfilteredpropertycache(propertycache):
41 41 """propertycache that apply to unfiltered repo only"""
42 42
43 43 def __get__(self, repo, type=None):
44 44 unfi = repo.unfiltered()
45 45 if unfi is repo:
46 46 return super(unfilteredpropertycache, self).__get__(unfi)
47 47 return getattr(unfi, self.name)
48 48
49 49 class filteredpropertycache(propertycache):
50 50 """propertycache that must take filtering in account"""
51 51
52 52 def cachevalue(self, obj, value):
53 53 object.__setattr__(obj, self.name, value)
54 54
55 55
56 56 def hasunfilteredcache(repo, name):
57 57 """check if a repo has an unfilteredpropertycache value for <name>"""
58 58 return name in vars(repo.unfiltered())
59 59
60 60 def unfilteredmethod(orig):
61 61 """decorate method that always need to be run on unfiltered version"""
62 62 def wrapper(repo, *args, **kwargs):
63 63 return orig(repo.unfiltered(), *args, **kwargs)
64 64 return wrapper
65 65
66 66 moderncaps = set(('lookup', 'branchmap', 'pushkey', 'known', 'getbundle',
67 67 'unbundle'))
68 68 legacycaps = moderncaps.union(set(['changegroupsubset']))
69 69
70 70 class localpeer(peer.peerrepository):
71 71 '''peer for a local repo; reflects only the most recent API'''
72 72
73 73 def __init__(self, repo, caps=moderncaps):
74 74 peer.peerrepository.__init__(self)
75 75 self._repo = repo.filtered('served')
76 76 self.ui = repo.ui
77 77 self._caps = repo._restrictcapabilities(caps)
78 78 self.requirements = repo.requirements
79 79 self.supportedformats = repo.supportedformats
80 80
81 81 def close(self):
82 82 self._repo.close()
83 83
84 84 def _capabilities(self):
85 85 return self._caps
86 86
87 87 def local(self):
88 88 return self._repo
89 89
90 90 def canpush(self):
91 91 return True
92 92
93 93 def url(self):
94 94 return self._repo.url()
95 95
96 96 def lookup(self, key):
97 97 return self._repo.lookup(key)
98 98
99 99 def branchmap(self):
100 100 return self._repo.branchmap()
101 101
102 102 def heads(self):
103 103 return self._repo.heads()
104 104
105 105 def known(self, nodes):
106 106 return self._repo.known(nodes)
107 107
108 108 def getbundle(self, source, heads=None, common=None, bundlecaps=None,
109 109 format='HG10', **kwargs):
110 110 cg = exchange.getbundle(self._repo, source, heads=heads,
111 111 common=common, bundlecaps=bundlecaps, **kwargs)
112 112 if bundlecaps is not None and 'HG2X' in bundlecaps:
113 113 # When requesting a bundle2, getbundle returns a stream to make the
114 114 # wire level function happier. We need to build a proper object
115 115 # from it in local peer.
116 116 cg = bundle2.unbundle20(self.ui, cg)
117 117 return cg
118 118
119 119 # TODO We might want to move the next two calls into legacypeer and add
120 120 # unbundle instead.
121 121
122 122 def unbundle(self, cg, heads, url):
123 123 """apply a bundle on a repo
124 124
125 125 This function handles the repo locking itself."""
126 126 try:
127 127 cg = exchange.readbundle(self.ui, cg, None)
128 128 ret = exchange.unbundle(self._repo, cg, heads, 'push', url)
129 129 if util.safehasattr(ret, 'getchunks'):
130 130 # This is a bundle20 object, turn it into an unbundler.
131 131 # This little dance should be dropped eventually when the API
132 132 # is finally improved.
133 133 stream = util.chunkbuffer(ret.getchunks())
134 134 ret = bundle2.unbundle20(self.ui, stream)
135 135 return ret
136 136 except error.PushRaced, exc:
137 137 raise error.ResponseError(_('push failed:'), str(exc))
138 138
139 139 def lock(self):
140 140 return self._repo.lock()
141 141
142 142 def addchangegroup(self, cg, source, url):
143 143 return changegroup.addchangegroup(self._repo, cg, source, url)
144 144
145 145 def pushkey(self, namespace, key, old, new):
146 146 return self._repo.pushkey(namespace, key, old, new)
147 147
148 148 def listkeys(self, namespace):
149 149 return self._repo.listkeys(namespace)
150 150
151 151 def debugwireargs(self, one, two, three=None, four=None, five=None):
152 152 '''used to test argument passing over the wire'''
153 153 return "%s %s %s %s %s" % (one, two, three, four, five)
154 154
155 155 class locallegacypeer(localpeer):
156 156 '''peer extension which implements legacy methods too; used for tests with
157 157 restricted capabilities'''
158 158
159 159 def __init__(self, repo):
160 160 localpeer.__init__(self, repo, caps=legacycaps)
161 161
162 162 def branches(self, nodes):
163 163 return self._repo.branches(nodes)
164 164
165 165 def between(self, pairs):
166 166 return self._repo.between(pairs)
167 167
168 168 def changegroup(self, basenodes, source):
169 169 return changegroup.changegroup(self._repo, basenodes, source)
170 170
171 171 def changegroupsubset(self, bases, heads, source):
172 172 return changegroup.changegroupsubset(self._repo, bases, heads, source)
173 173
174 174 class localrepository(object):
175 175
176 176 supportedformats = set(('revlogv1', 'generaldelta'))
177 177 _basesupported = supportedformats | set(('store', 'fncache', 'shared',
178 178 'dotencode'))
179 179 openerreqs = set(('revlogv1', 'generaldelta'))
180 180 requirements = ['revlogv1']
181 181 filtername = None
182 182
183 183 # a list of (ui, featureset) functions.
184 184 # only functions defined in module of enabled extensions are invoked
185 185 featuresetupfuncs = set()
186 186
187 187 def _baserequirements(self, create):
188 188 return self.requirements[:]
189 189
190 190 def __init__(self, baseui, path=None, create=False):
191 191 self.wvfs = scmutil.vfs(path, expandpath=True, realpath=True)
192 192 self.wopener = self.wvfs
193 193 self.root = self.wvfs.base
194 194 self.path = self.wvfs.join(".hg")
195 195 self.origroot = path
196 196 self.auditor = pathutil.pathauditor(self.root, self._checknested)
197 197 self.vfs = scmutil.vfs(self.path)
198 198 self.opener = self.vfs
199 199 self.baseui = baseui
200 200 self.ui = baseui.copy()
201 201 self.ui.copy = baseui.copy # prevent copying repo configuration
202 202 # A list of callback to shape the phase if no data were found.
203 203 # Callback are in the form: func(repo, roots) --> processed root.
204 204 # This list it to be filled by extension during repo setup
205 205 self._phasedefaults = []
206 206 try:
207 207 self.ui.readconfig(self.join("hgrc"), self.root)
208 208 extensions.loadall(self.ui)
209 209 except IOError:
210 210 pass
211 211
212 212 if self.featuresetupfuncs:
213 213 self.supported = set(self._basesupported) # use private copy
214 214 extmods = set(m.__name__ for n, m
215 215 in extensions.extensions(self.ui))
216 216 for setupfunc in self.featuresetupfuncs:
217 217 if setupfunc.__module__ in extmods:
218 218 setupfunc(self.ui, self.supported)
219 219 else:
220 220 self.supported = self._basesupported
221 221
222 222 if not self.vfs.isdir():
223 223 if create:
224 224 if not self.wvfs.exists():
225 225 self.wvfs.makedirs()
226 226 self.vfs.makedir(notindexed=True)
227 227 requirements = self._baserequirements(create)
228 228 if self.ui.configbool('format', 'usestore', True):
229 229 self.vfs.mkdir("store")
230 230 requirements.append("store")
231 231 if self.ui.configbool('format', 'usefncache', True):
232 232 requirements.append("fncache")
233 233 if self.ui.configbool('format', 'dotencode', True):
234 234 requirements.append('dotencode')
235 235 # create an invalid changelog
236 236 self.vfs.append(
237 237 "00changelog.i",
238 238 '\0\0\0\2' # represents revlogv2
239 239 ' dummy changelog to prevent using the old repo layout'
240 240 )
241 241 if self.ui.configbool('format', 'generaldelta', False):
242 242 requirements.append("generaldelta")
243 243 requirements = set(requirements)
244 244 else:
245 245 raise error.RepoError(_("repository %s not found") % path)
246 246 elif create:
247 247 raise error.RepoError(_("repository %s already exists") % path)
248 248 else:
249 249 try:
250 250 requirements = scmutil.readrequires(self.vfs, self.supported)
251 251 except IOError, inst:
252 252 if inst.errno != errno.ENOENT:
253 253 raise
254 254 requirements = set()
255 255
256 256 self.sharedpath = self.path
257 257 try:
258 258 vfs = scmutil.vfs(self.vfs.read("sharedpath").rstrip('\n'),
259 259 realpath=True)
260 260 s = vfs.base
261 261 if not vfs.exists():
262 262 raise error.RepoError(
263 263 _('.hg/sharedpath points to nonexistent directory %s') % s)
264 264 self.sharedpath = s
265 265 except IOError, inst:
266 266 if inst.errno != errno.ENOENT:
267 267 raise
268 268
269 269 self.store = store.store(requirements, self.sharedpath, scmutil.vfs)
270 270 self.spath = self.store.path
271 271 self.svfs = self.store.vfs
272 272 self.sopener = self.svfs
273 273 self.sjoin = self.store.join
274 274 self.vfs.createmode = self.store.createmode
275 275 self._applyrequirements(requirements)
276 276 if create:
277 277 self._writerequirements()
278 278
279 279
280 280 self._branchcaches = {}
281 281 self.filterpats = {}
282 282 self._datafilters = {}
283 283 self._transref = self._lockref = self._wlockref = None
284 284
285 285 # A cache for various files under .hg/ that tracks file changes,
286 286 # (used by the filecache decorator)
287 287 #
288 288 # Maps a property name to its util.filecacheentry
289 289 self._filecache = {}
290 290
291 291 # hold sets of revision to be filtered
292 292 # should be cleared when something might have changed the filter value:
293 293 # - new changesets,
294 294 # - phase change,
295 295 # - new obsolescence marker,
296 296 # - working directory parent change,
297 297 # - bookmark changes
298 298 self.filteredrevcache = {}
299 299
300 300 def close(self):
301 301 pass
302 302
303 303 def _restrictcapabilities(self, caps):
304 304 # bundle2 is not ready for prime time, drop it unless explicitly
305 305 # required by the tests (or some brave tester)
306 306 if self.ui.configbool('experimental', 'bundle2-exp', False):
307 307 caps = set(caps)
308 capsblob = bundle2.encodecaps(bundle2.capabilities)
308 capsblob = bundle2.encodecaps(bundle2.getrepocaps(self))
309 309 caps.add('bundle2-exp=' + urllib.quote(capsblob))
310 310 return caps
311 311
312 312 def _applyrequirements(self, requirements):
313 313 self.requirements = requirements
314 314 self.sopener.options = dict((r, 1) for r in requirements
315 315 if r in self.openerreqs)
316 316 chunkcachesize = self.ui.configint('format', 'chunkcachesize')
317 317 if chunkcachesize is not None:
318 318 self.sopener.options['chunkcachesize'] = chunkcachesize
319 319
320 320 def _writerequirements(self):
321 321 reqfile = self.opener("requires", "w")
322 322 for r in sorted(self.requirements):
323 323 reqfile.write("%s\n" % r)
324 324 reqfile.close()
325 325
326 326 def _checknested(self, path):
327 327 """Determine if path is a legal nested repository."""
328 328 if not path.startswith(self.root):
329 329 return False
330 330 subpath = path[len(self.root) + 1:]
331 331 normsubpath = util.pconvert(subpath)
332 332
333 333 # XXX: Checking against the current working copy is wrong in
334 334 # the sense that it can reject things like
335 335 #
336 336 # $ hg cat -r 10 sub/x.txt
337 337 #
338 338 # if sub/ is no longer a subrepository in the working copy
339 339 # parent revision.
340 340 #
341 341 # However, it can of course also allow things that would have
342 342 # been rejected before, such as the above cat command if sub/
343 343 # is a subrepository now, but was a normal directory before.
344 344 # The old path auditor would have rejected by mistake since it
345 345 # panics when it sees sub/.hg/.
346 346 #
347 347 # All in all, checking against the working copy seems sensible
348 348 # since we want to prevent access to nested repositories on
349 349 # the filesystem *now*.
350 350 ctx = self[None]
351 351 parts = util.splitpath(subpath)
352 352 while parts:
353 353 prefix = '/'.join(parts)
354 354 if prefix in ctx.substate:
355 355 if prefix == normsubpath:
356 356 return True
357 357 else:
358 358 sub = ctx.sub(prefix)
359 359 return sub.checknested(subpath[len(prefix) + 1:])
360 360 else:
361 361 parts.pop()
362 362 return False
363 363
364 364 def peer(self):
365 365 return localpeer(self) # not cached to avoid reference cycle
366 366
367 367 def unfiltered(self):
368 368 """Return unfiltered version of the repository
369 369
370 370 Intended to be overwritten by filtered repo."""
371 371 return self
372 372
373 373 def filtered(self, name):
374 374 """Return a filtered version of a repository"""
375 375 # build a new class with the mixin and the current class
376 376 # (possibly subclass of the repo)
377 377 class proxycls(repoview.repoview, self.unfiltered().__class__):
378 378 pass
379 379 return proxycls(self, name)
380 380
381 381 @repofilecache('bookmarks')
382 382 def _bookmarks(self):
383 383 return bookmarks.bmstore(self)
384 384
385 385 @repofilecache('bookmarks.current')
386 386 def _bookmarkcurrent(self):
387 387 return bookmarks.readcurrent(self)
388 388
389 389 def bookmarkheads(self, bookmark):
390 390 name = bookmark.split('@', 1)[0]
391 391 heads = []
392 392 for mark, n in self._bookmarks.iteritems():
393 393 if mark.split('@', 1)[0] == name:
394 394 heads.append(n)
395 395 return heads
396 396
397 397 @storecache('phaseroots')
398 398 def _phasecache(self):
399 399 return phases.phasecache(self, self._phasedefaults)
400 400
401 401 @storecache('obsstore')
402 402 def obsstore(self):
403 403 store = obsolete.obsstore(self.sopener)
404 404 if store and not obsolete._enabled:
405 405 # message is rare enough to not be translated
406 406 msg = 'obsolete feature not enabled but %i markers found!\n'
407 407 self.ui.warn(msg % len(list(store)))
408 408 return store
409 409
410 410 @storecache('00changelog.i')
411 411 def changelog(self):
412 412 c = changelog.changelog(self.sopener)
413 413 if 'HG_PENDING' in os.environ:
414 414 p = os.environ['HG_PENDING']
415 415 if p.startswith(self.root):
416 416 c.readpending('00changelog.i.a')
417 417 return c
418 418
419 419 @storecache('00manifest.i')
420 420 def manifest(self):
421 421 return manifest.manifest(self.sopener)
422 422
423 423 @repofilecache('dirstate')
424 424 def dirstate(self):
425 425 warned = [0]
426 426 def validate(node):
427 427 try:
428 428 self.changelog.rev(node)
429 429 return node
430 430 except error.LookupError:
431 431 if not warned[0]:
432 432 warned[0] = True
433 433 self.ui.warn(_("warning: ignoring unknown"
434 434 " working parent %s!\n") % short(node))
435 435 return nullid
436 436
437 437 return dirstate.dirstate(self.opener, self.ui, self.root, validate)
438 438
439 439 def __getitem__(self, changeid):
440 440 if changeid is None:
441 441 return context.workingctx(self)
442 442 return context.changectx(self, changeid)
443 443
444 444 def __contains__(self, changeid):
445 445 try:
446 446 return bool(self.lookup(changeid))
447 447 except error.RepoLookupError:
448 448 return False
449 449
450 450 def __nonzero__(self):
451 451 return True
452 452
453 453 def __len__(self):
454 454 return len(self.changelog)
455 455
456 456 def __iter__(self):
457 457 return iter(self.changelog)
458 458
459 459 def revs(self, expr, *args):
460 460 '''Return a list of revisions matching the given revset'''
461 461 expr = revset.formatspec(expr, *args)
462 462 m = revset.match(None, expr)
463 463 return m(self, revset.spanset(self))
464 464
465 465 def set(self, expr, *args):
466 466 '''
467 467 Yield a context for each matching revision, after doing arg
468 468 replacement via revset.formatspec
469 469 '''
470 470 for r in self.revs(expr, *args):
471 471 yield self[r]
472 472
473 473 def url(self):
474 474 return 'file:' + self.root
475 475
476 476 def hook(self, name, throw=False, **args):
477 477 """Call a hook, passing this repo instance.
478 478
479 479 This a convenience method to aid invoking hooks. Extensions likely
480 480 won't call this unless they have registered a custom hook or are
481 481 replacing code that is expected to call a hook.
482 482 """
483 483 return hook.hook(self.ui, self, name, throw, **args)
484 484
485 485 @unfilteredmethod
486 486 def _tag(self, names, node, message, local, user, date, extra={},
487 487 editor=False):
488 488 if isinstance(names, str):
489 489 names = (names,)
490 490
491 491 branches = self.branchmap()
492 492 for name in names:
493 493 self.hook('pretag', throw=True, node=hex(node), tag=name,
494 494 local=local)
495 495 if name in branches:
496 496 self.ui.warn(_("warning: tag %s conflicts with existing"
497 497 " branch name\n") % name)
498 498
499 499 def writetags(fp, names, munge, prevtags):
500 500 fp.seek(0, 2)
501 501 if prevtags and prevtags[-1] != '\n':
502 502 fp.write('\n')
503 503 for name in names:
504 504 m = munge and munge(name) or name
505 505 if (self._tagscache.tagtypes and
506 506 name in self._tagscache.tagtypes):
507 507 old = self.tags().get(name, nullid)
508 508 fp.write('%s %s\n' % (hex(old), m))
509 509 fp.write('%s %s\n' % (hex(node), m))
510 510 fp.close()
511 511
512 512 prevtags = ''
513 513 if local:
514 514 try:
515 515 fp = self.opener('localtags', 'r+')
516 516 except IOError:
517 517 fp = self.opener('localtags', 'a')
518 518 else:
519 519 prevtags = fp.read()
520 520
521 521 # local tags are stored in the current charset
522 522 writetags(fp, names, None, prevtags)
523 523 for name in names:
524 524 self.hook('tag', node=hex(node), tag=name, local=local)
525 525 return
526 526
527 527 try:
528 528 fp = self.wfile('.hgtags', 'rb+')
529 529 except IOError, e:
530 530 if e.errno != errno.ENOENT:
531 531 raise
532 532 fp = self.wfile('.hgtags', 'ab')
533 533 else:
534 534 prevtags = fp.read()
535 535
536 536 # committed tags are stored in UTF-8
537 537 writetags(fp, names, encoding.fromlocal, prevtags)
538 538
539 539 fp.close()
540 540
541 541 self.invalidatecaches()
542 542
543 543 if '.hgtags' not in self.dirstate:
544 544 self[None].add(['.hgtags'])
545 545
546 546 m = matchmod.exact(self.root, '', ['.hgtags'])
547 547 tagnode = self.commit(message, user, date, extra=extra, match=m,
548 548 editor=editor)
549 549
550 550 for name in names:
551 551 self.hook('tag', node=hex(node), tag=name, local=local)
552 552
553 553 return tagnode
554 554
555 555 def tag(self, names, node, message, local, user, date, editor=False):
556 556 '''tag a revision with one or more symbolic names.
557 557
558 558 names is a list of strings or, when adding a single tag, names may be a
559 559 string.
560 560
561 561 if local is True, the tags are stored in a per-repository file.
562 562 otherwise, they are stored in the .hgtags file, and a new
563 563 changeset is committed with the change.
564 564
565 565 keyword arguments:
566 566
567 567 local: whether to store tags in non-version-controlled file
568 568 (default False)
569 569
570 570 message: commit message to use if committing
571 571
572 572 user: name of user to use if committing
573 573
574 574 date: date tuple to use if committing'''
575 575
576 576 if not local:
577 577 for x in self.status()[:5]:
578 578 if '.hgtags' in x:
579 579 raise util.Abort(_('working copy of .hgtags is changed '
580 580 '(please commit .hgtags manually)'))
581 581
582 582 self.tags() # instantiate the cache
583 583 self._tag(names, node, message, local, user, date, editor=editor)
584 584
585 585 @filteredpropertycache
586 586 def _tagscache(self):
587 587 '''Returns a tagscache object that contains various tags related
588 588 caches.'''
589 589
590 590 # This simplifies its cache management by having one decorated
591 591 # function (this one) and the rest simply fetch things from it.
592 592 class tagscache(object):
593 593 def __init__(self):
594 594 # These two define the set of tags for this repository. tags
595 595 # maps tag name to node; tagtypes maps tag name to 'global' or
596 596 # 'local'. (Global tags are defined by .hgtags across all
597 597 # heads, and local tags are defined in .hg/localtags.)
598 598 # They constitute the in-memory cache of tags.
599 599 self.tags = self.tagtypes = None
600 600
601 601 self.nodetagscache = self.tagslist = None
602 602
603 603 cache = tagscache()
604 604 cache.tags, cache.tagtypes = self._findtags()
605 605
606 606 return cache
607 607
608 608 def tags(self):
609 609 '''return a mapping of tag to node'''
610 610 t = {}
611 611 if self.changelog.filteredrevs:
612 612 tags, tt = self._findtags()
613 613 else:
614 614 tags = self._tagscache.tags
615 615 for k, v in tags.iteritems():
616 616 try:
617 617 # ignore tags to unknown nodes
618 618 self.changelog.rev(v)
619 619 t[k] = v
620 620 except (error.LookupError, ValueError):
621 621 pass
622 622 return t
623 623
624 624 def _findtags(self):
625 625 '''Do the hard work of finding tags. Return a pair of dicts
626 626 (tags, tagtypes) where tags maps tag name to node, and tagtypes
627 627 maps tag name to a string like \'global\' or \'local\'.
628 628 Subclasses or extensions are free to add their own tags, but
629 629 should be aware that the returned dicts will be retained for the
630 630 duration of the localrepo object.'''
631 631
632 632 # XXX what tagtype should subclasses/extensions use? Currently
633 633 # mq and bookmarks add tags, but do not set the tagtype at all.
634 634 # Should each extension invent its own tag type? Should there
635 635 # be one tagtype for all such "virtual" tags? Or is the status
636 636 # quo fine?
637 637
638 638 alltags = {} # map tag name to (node, hist)
639 639 tagtypes = {}
640 640
641 641 tagsmod.findglobaltags(self.ui, self, alltags, tagtypes)
642 642 tagsmod.readlocaltags(self.ui, self, alltags, tagtypes)
643 643
644 644 # Build the return dicts. Have to re-encode tag names because
645 645 # the tags module always uses UTF-8 (in order not to lose info
646 646 # writing to the cache), but the rest of Mercurial wants them in
647 647 # local encoding.
648 648 tags = {}
649 649 for (name, (node, hist)) in alltags.iteritems():
650 650 if node != nullid:
651 651 tags[encoding.tolocal(name)] = node
652 652 tags['tip'] = self.changelog.tip()
653 653 tagtypes = dict([(encoding.tolocal(name), value)
654 654 for (name, value) in tagtypes.iteritems()])
655 655 return (tags, tagtypes)
656 656
657 657 def tagtype(self, tagname):
658 658 '''
659 659 return the type of the given tag. result can be:
660 660
661 661 'local' : a local tag
662 662 'global' : a global tag
663 663 None : tag does not exist
664 664 '''
665 665
666 666 return self._tagscache.tagtypes.get(tagname)
667 667
668 668 def tagslist(self):
669 669 '''return a list of tags ordered by revision'''
670 670 if not self._tagscache.tagslist:
671 671 l = []
672 672 for t, n in self.tags().iteritems():
673 673 l.append((self.changelog.rev(n), t, n))
674 674 self._tagscache.tagslist = [(t, n) for r, t, n in sorted(l)]
675 675
676 676 return self._tagscache.tagslist
677 677
678 678 def nodetags(self, node):
679 679 '''return the tags associated with a node'''
680 680 if not self._tagscache.nodetagscache:
681 681 nodetagscache = {}
682 682 for t, n in self._tagscache.tags.iteritems():
683 683 nodetagscache.setdefault(n, []).append(t)
684 684 for tags in nodetagscache.itervalues():
685 685 tags.sort()
686 686 self._tagscache.nodetagscache = nodetagscache
687 687 return self._tagscache.nodetagscache.get(node, [])
688 688
689 689 def nodebookmarks(self, node):
690 690 marks = []
691 691 for bookmark, n in self._bookmarks.iteritems():
692 692 if n == node:
693 693 marks.append(bookmark)
694 694 return sorted(marks)
695 695
696 696 def branchmap(self):
697 697 '''returns a dictionary {branch: [branchheads]} with branchheads
698 698 ordered by increasing revision number'''
699 699 branchmap.updatecache(self)
700 700 return self._branchcaches[self.filtername]
701 701
702 702 def branchtip(self, branch):
703 703 '''return the tip node for a given branch'''
704 704 try:
705 705 return self.branchmap().branchtip(branch)
706 706 except KeyError:
707 707 raise error.RepoLookupError(_("unknown branch '%s'") % branch)
708 708
709 709 def lookup(self, key):
710 710 return self[key].node()
711 711
712 712 def lookupbranch(self, key, remote=None):
713 713 repo = remote or self
714 714 if key in repo.branchmap():
715 715 return key
716 716
717 717 repo = (remote and remote.local()) and remote or self
718 718 return repo[key].branch()
719 719
720 720 def known(self, nodes):
721 721 nm = self.changelog.nodemap
722 722 pc = self._phasecache
723 723 result = []
724 724 for n in nodes:
725 725 r = nm.get(n)
726 726 resp = not (r is None or pc.phase(self, r) >= phases.secret)
727 727 result.append(resp)
728 728 return result
729 729
730 730 def local(self):
731 731 return self
732 732
733 733 def cancopy(self):
734 734 # so statichttprepo's override of local() works
735 735 if not self.local():
736 736 return False
737 737 if not self.ui.configbool('phases', 'publish', True):
738 738 return True
739 739 # if publishing we can't copy if there is filtered content
740 740 return not self.filtered('visible').changelog.filteredrevs
741 741
742 742 def join(self, f):
743 743 return os.path.join(self.path, f)
744 744
745 745 def wjoin(self, f):
746 746 return os.path.join(self.root, f)
747 747
748 748 def file(self, f):
749 749 if f[0] == '/':
750 750 f = f[1:]
751 751 return filelog.filelog(self.sopener, f)
752 752
753 753 def changectx(self, changeid):
754 754 return self[changeid]
755 755
756 756 def parents(self, changeid=None):
757 757 '''get list of changectxs for parents of changeid'''
758 758 return self[changeid].parents()
759 759
760 760 def setparents(self, p1, p2=nullid):
761 761 copies = self.dirstate.setparents(p1, p2)
762 762 pctx = self[p1]
763 763 if copies:
764 764 # Adjust copy records, the dirstate cannot do it, it
765 765 # requires access to parents manifests. Preserve them
766 766 # only for entries added to first parent.
767 767 for f in copies:
768 768 if f not in pctx and copies[f] in pctx:
769 769 self.dirstate.copy(copies[f], f)
770 770 if p2 == nullid:
771 771 for f, s in sorted(self.dirstate.copies().items()):
772 772 if f not in pctx and s not in pctx:
773 773 self.dirstate.copy(None, f)
774 774
775 775 def filectx(self, path, changeid=None, fileid=None):
776 776 """changeid can be a changeset revision, node, or tag.
777 777 fileid can be a file revision or node."""
778 778 return context.filectx(self, path, changeid, fileid)
779 779
780 780 def getcwd(self):
781 781 return self.dirstate.getcwd()
782 782
783 783 def pathto(self, f, cwd=None):
784 784 return self.dirstate.pathto(f, cwd)
785 785
786 786 def wfile(self, f, mode='r'):
787 787 return self.wopener(f, mode)
788 788
789 789 def _link(self, f):
790 790 return self.wvfs.islink(f)
791 791
792 792 def _loadfilter(self, filter):
793 793 if filter not in self.filterpats:
794 794 l = []
795 795 for pat, cmd in self.ui.configitems(filter):
796 796 if cmd == '!':
797 797 continue
798 798 mf = matchmod.match(self.root, '', [pat])
799 799 fn = None
800 800 params = cmd
801 801 for name, filterfn in self._datafilters.iteritems():
802 802 if cmd.startswith(name):
803 803 fn = filterfn
804 804 params = cmd[len(name):].lstrip()
805 805 break
806 806 if not fn:
807 807 fn = lambda s, c, **kwargs: util.filter(s, c)
808 808 # Wrap old filters not supporting keyword arguments
809 809 if not inspect.getargspec(fn)[2]:
810 810 oldfn = fn
811 811 fn = lambda s, c, **kwargs: oldfn(s, c)
812 812 l.append((mf, fn, params))
813 813 self.filterpats[filter] = l
814 814 return self.filterpats[filter]
815 815
816 816 def _filter(self, filterpats, filename, data):
817 817 for mf, fn, cmd in filterpats:
818 818 if mf(filename):
819 819 self.ui.debug("filtering %s through %s\n" % (filename, cmd))
820 820 data = fn(data, cmd, ui=self.ui, repo=self, filename=filename)
821 821 break
822 822
823 823 return data
824 824
825 825 @unfilteredpropertycache
826 826 def _encodefilterpats(self):
827 827 return self._loadfilter('encode')
828 828
829 829 @unfilteredpropertycache
830 830 def _decodefilterpats(self):
831 831 return self._loadfilter('decode')
832 832
833 833 def adddatafilter(self, name, filter):
834 834 self._datafilters[name] = filter
835 835
836 836 def wread(self, filename):
837 837 if self._link(filename):
838 838 data = self.wvfs.readlink(filename)
839 839 else:
840 840 data = self.wopener.read(filename)
841 841 return self._filter(self._encodefilterpats, filename, data)
842 842
843 843 def wwrite(self, filename, data, flags):
844 844 data = self._filter(self._decodefilterpats, filename, data)
845 845 if 'l' in flags:
846 846 self.wopener.symlink(data, filename)
847 847 else:
848 848 self.wopener.write(filename, data)
849 849 if 'x' in flags:
850 850 self.wvfs.setflags(filename, False, True)
851 851
852 852 def wwritedata(self, filename, data):
853 853 return self._filter(self._decodefilterpats, filename, data)
854 854
855 855 def transaction(self, desc, report=None):
856 856 tr = self._transref and self._transref() or None
857 857 if tr and tr.running():
858 858 return tr.nest()
859 859
860 860 # abort here if the journal already exists
861 861 if self.svfs.exists("journal"):
862 862 raise error.RepoError(
863 863 _("abandoned transaction found"),
864 864 hint=_("run 'hg recover' to clean up transaction"))
865 865
866 866 def onclose():
867 867 self.store.write(self._transref())
868 868
869 869 self._writejournal(desc)
870 870 renames = [(vfs, x, undoname(x)) for vfs, x in self._journalfiles()]
871 871 rp = report and report or self.ui.warn
872 872 tr = transaction.transaction(rp, self.sopener,
873 873 "journal",
874 874 aftertrans(renames),
875 875 self.store.createmode,
876 876 onclose)
877 877 self._transref = weakref.ref(tr)
878 878 return tr
879 879
880 880 def _journalfiles(self):
881 881 return ((self.svfs, 'journal'),
882 882 (self.vfs, 'journal.dirstate'),
883 883 (self.vfs, 'journal.branch'),
884 884 (self.vfs, 'journal.desc'),
885 885 (self.vfs, 'journal.bookmarks'),
886 886 (self.svfs, 'journal.phaseroots'))
887 887
888 888 def undofiles(self):
889 889 return [(vfs, undoname(x)) for vfs, x in self._journalfiles()]
890 890
891 891 def _writejournal(self, desc):
892 892 self.opener.write("journal.dirstate",
893 893 self.opener.tryread("dirstate"))
894 894 self.opener.write("journal.branch",
895 895 encoding.fromlocal(self.dirstate.branch()))
896 896 self.opener.write("journal.desc",
897 897 "%d\n%s\n" % (len(self), desc))
898 898 self.opener.write("journal.bookmarks",
899 899 self.opener.tryread("bookmarks"))
900 900 self.sopener.write("journal.phaseroots",
901 901 self.sopener.tryread("phaseroots"))
902 902
903 903 def recover(self):
904 904 lock = self.lock()
905 905 try:
906 906 if self.svfs.exists("journal"):
907 907 self.ui.status(_("rolling back interrupted transaction\n"))
908 908 transaction.rollback(self.sopener, "journal",
909 909 self.ui.warn)
910 910 self.invalidate()
911 911 return True
912 912 else:
913 913 self.ui.warn(_("no interrupted transaction available\n"))
914 914 return False
915 915 finally:
916 916 lock.release()
917 917
918 918 def rollback(self, dryrun=False, force=False):
919 919 wlock = lock = None
920 920 try:
921 921 wlock = self.wlock()
922 922 lock = self.lock()
923 923 if self.svfs.exists("undo"):
924 924 return self._rollback(dryrun, force)
925 925 else:
926 926 self.ui.warn(_("no rollback information available\n"))
927 927 return 1
928 928 finally:
929 929 release(lock, wlock)
930 930
931 931 @unfilteredmethod # Until we get smarter cache management
932 932 def _rollback(self, dryrun, force):
933 933 ui = self.ui
934 934 try:
935 935 args = self.opener.read('undo.desc').splitlines()
936 936 (oldlen, desc, detail) = (int(args[0]), args[1], None)
937 937 if len(args) >= 3:
938 938 detail = args[2]
939 939 oldtip = oldlen - 1
940 940
941 941 if detail and ui.verbose:
942 942 msg = (_('repository tip rolled back to revision %s'
943 943 ' (undo %s: %s)\n')
944 944 % (oldtip, desc, detail))
945 945 else:
946 946 msg = (_('repository tip rolled back to revision %s'
947 947 ' (undo %s)\n')
948 948 % (oldtip, desc))
949 949 except IOError:
950 950 msg = _('rolling back unknown transaction\n')
951 951 desc = None
952 952
953 953 if not force and self['.'] != self['tip'] and desc == 'commit':
954 954 raise util.Abort(
955 955 _('rollback of last commit while not checked out '
956 956 'may lose data'), hint=_('use -f to force'))
957 957
958 958 ui.status(msg)
959 959 if dryrun:
960 960 return 0
961 961
962 962 parents = self.dirstate.parents()
963 963 self.destroying()
964 964 transaction.rollback(self.sopener, 'undo', ui.warn)
965 965 if self.vfs.exists('undo.bookmarks'):
966 966 self.vfs.rename('undo.bookmarks', 'bookmarks')
967 967 if self.svfs.exists('undo.phaseroots'):
968 968 self.svfs.rename('undo.phaseroots', 'phaseroots')
969 969 self.invalidate()
970 970
971 971 parentgone = (parents[0] not in self.changelog.nodemap or
972 972 parents[1] not in self.changelog.nodemap)
973 973 if parentgone:
974 974 self.vfs.rename('undo.dirstate', 'dirstate')
975 975 try:
976 976 branch = self.opener.read('undo.branch')
977 977 self.dirstate.setbranch(encoding.tolocal(branch))
978 978 except IOError:
979 979 ui.warn(_('named branch could not be reset: '
980 980 'current branch is still \'%s\'\n')
981 981 % self.dirstate.branch())
982 982
983 983 self.dirstate.invalidate()
984 984 parents = tuple([p.rev() for p in self.parents()])
985 985 if len(parents) > 1:
986 986 ui.status(_('working directory now based on '
987 987 'revisions %d and %d\n') % parents)
988 988 else:
989 989 ui.status(_('working directory now based on '
990 990 'revision %d\n') % parents)
991 991 # TODO: if we know which new heads may result from this rollback, pass
992 992 # them to destroy(), which will prevent the branchhead cache from being
993 993 # invalidated.
994 994 self.destroyed()
995 995 return 0
996 996
997 997 def invalidatecaches(self):
998 998
999 999 if '_tagscache' in vars(self):
1000 1000 # can't use delattr on proxy
1001 1001 del self.__dict__['_tagscache']
1002 1002
1003 1003 self.unfiltered()._branchcaches.clear()
1004 1004 self.invalidatevolatilesets()
1005 1005
1006 1006 def invalidatevolatilesets(self):
1007 1007 self.filteredrevcache.clear()
1008 1008 obsolete.clearobscaches(self)
1009 1009
1010 1010 def invalidatedirstate(self):
1011 1011 '''Invalidates the dirstate, causing the next call to dirstate
1012 1012 to check if it was modified since the last time it was read,
1013 1013 rereading it if it has.
1014 1014
1015 1015 This is different to dirstate.invalidate() that it doesn't always
1016 1016 rereads the dirstate. Use dirstate.invalidate() if you want to
1017 1017 explicitly read the dirstate again (i.e. restoring it to a previous
1018 1018 known good state).'''
1019 1019 if hasunfilteredcache(self, 'dirstate'):
1020 1020 for k in self.dirstate._filecache:
1021 1021 try:
1022 1022 delattr(self.dirstate, k)
1023 1023 except AttributeError:
1024 1024 pass
1025 1025 delattr(self.unfiltered(), 'dirstate')
1026 1026
1027 1027 def invalidate(self):
1028 1028 unfiltered = self.unfiltered() # all file caches are stored unfiltered
1029 1029 for k in self._filecache:
1030 1030 # dirstate is invalidated separately in invalidatedirstate()
1031 1031 if k == 'dirstate':
1032 1032 continue
1033 1033
1034 1034 try:
1035 1035 delattr(unfiltered, k)
1036 1036 except AttributeError:
1037 1037 pass
1038 1038 self.invalidatecaches()
1039 1039 self.store.invalidatecaches()
1040 1040
1041 1041 def invalidateall(self):
1042 1042 '''Fully invalidates both store and non-store parts, causing the
1043 1043 subsequent operation to reread any outside changes.'''
1044 1044 # extension should hook this to invalidate its caches
1045 1045 self.invalidate()
1046 1046 self.invalidatedirstate()
1047 1047
1048 1048 def _lock(self, vfs, lockname, wait, releasefn, acquirefn, desc):
1049 1049 try:
1050 1050 l = lockmod.lock(vfs, lockname, 0, releasefn, desc=desc)
1051 1051 except error.LockHeld, inst:
1052 1052 if not wait:
1053 1053 raise
1054 1054 self.ui.warn(_("waiting for lock on %s held by %r\n") %
1055 1055 (desc, inst.locker))
1056 1056 # default to 600 seconds timeout
1057 1057 l = lockmod.lock(vfs, lockname,
1058 1058 int(self.ui.config("ui", "timeout", "600")),
1059 1059 releasefn, desc=desc)
1060 1060 self.ui.warn(_("got lock after %s seconds\n") % l.delay)
1061 1061 if acquirefn:
1062 1062 acquirefn()
1063 1063 return l
1064 1064
1065 1065 def _afterlock(self, callback):
1066 1066 """add a callback to the current repository lock.
1067 1067
1068 1068 The callback will be executed on lock release."""
1069 1069 l = self._lockref and self._lockref()
1070 1070 if l:
1071 1071 l.postrelease.append(callback)
1072 1072 else:
1073 1073 callback()
1074 1074
1075 1075 def lock(self, wait=True):
1076 1076 '''Lock the repository store (.hg/store) and return a weak reference
1077 1077 to the lock. Use this before modifying the store (e.g. committing or
1078 1078 stripping). If you are opening a transaction, get a lock as well.)'''
1079 1079 l = self._lockref and self._lockref()
1080 1080 if l is not None and l.held:
1081 1081 l.lock()
1082 1082 return l
1083 1083
1084 1084 def unlock():
1085 1085 for k, ce in self._filecache.items():
1086 1086 if k == 'dirstate' or k not in self.__dict__:
1087 1087 continue
1088 1088 ce.refresh()
1089 1089
1090 1090 l = self._lock(self.svfs, "lock", wait, unlock,
1091 1091 self.invalidate, _('repository %s') % self.origroot)
1092 1092 self._lockref = weakref.ref(l)
1093 1093 return l
1094 1094
1095 1095 def wlock(self, wait=True):
1096 1096 '''Lock the non-store parts of the repository (everything under
1097 1097 .hg except .hg/store) and return a weak reference to the lock.
1098 1098 Use this before modifying files in .hg.'''
1099 1099 l = self._wlockref and self._wlockref()
1100 1100 if l is not None and l.held:
1101 1101 l.lock()
1102 1102 return l
1103 1103
1104 1104 def unlock():
1105 1105 self.dirstate.write()
1106 1106 self._filecache['dirstate'].refresh()
1107 1107
1108 1108 l = self._lock(self.vfs, "wlock", wait, unlock,
1109 1109 self.invalidatedirstate, _('working directory of %s') %
1110 1110 self.origroot)
1111 1111 self._wlockref = weakref.ref(l)
1112 1112 return l
1113 1113
1114 1114 def _filecommit(self, fctx, manifest1, manifest2, linkrev, tr, changelist):
1115 1115 """
1116 1116 commit an individual file as part of a larger transaction
1117 1117 """
1118 1118
1119 1119 fname = fctx.path()
1120 1120 text = fctx.data()
1121 1121 flog = self.file(fname)
1122 1122 fparent1 = manifest1.get(fname, nullid)
1123 1123 fparent2 = fparent2o = manifest2.get(fname, nullid)
1124 1124
1125 1125 meta = {}
1126 1126 copy = fctx.renamed()
1127 1127 if copy and copy[0] != fname:
1128 1128 # Mark the new revision of this file as a copy of another
1129 1129 # file. This copy data will effectively act as a parent
1130 1130 # of this new revision. If this is a merge, the first
1131 1131 # parent will be the nullid (meaning "look up the copy data")
1132 1132 # and the second one will be the other parent. For example:
1133 1133 #
1134 1134 # 0 --- 1 --- 3 rev1 changes file foo
1135 1135 # \ / rev2 renames foo to bar and changes it
1136 1136 # \- 2 -/ rev3 should have bar with all changes and
1137 1137 # should record that bar descends from
1138 1138 # bar in rev2 and foo in rev1
1139 1139 #
1140 1140 # this allows this merge to succeed:
1141 1141 #
1142 1142 # 0 --- 1 --- 3 rev4 reverts the content change from rev2
1143 1143 # \ / merging rev3 and rev4 should use bar@rev2
1144 1144 # \- 2 --- 4 as the merge base
1145 1145 #
1146 1146
1147 1147 cfname = copy[0]
1148 1148 crev = manifest1.get(cfname)
1149 1149 newfparent = fparent2
1150 1150
1151 1151 if manifest2: # branch merge
1152 1152 if fparent2 == nullid or crev is None: # copied on remote side
1153 1153 if cfname in manifest2:
1154 1154 crev = manifest2[cfname]
1155 1155 newfparent = fparent1
1156 1156
1157 1157 # find source in nearest ancestor if we've lost track
1158 1158 if not crev:
1159 1159 self.ui.debug(" %s: searching for copy revision for %s\n" %
1160 1160 (fname, cfname))
1161 1161 for ancestor in self[None].ancestors():
1162 1162 if cfname in ancestor:
1163 1163 crev = ancestor[cfname].filenode()
1164 1164 break
1165 1165
1166 1166 if crev:
1167 1167 self.ui.debug(" %s: copy %s:%s\n" % (fname, cfname, hex(crev)))
1168 1168 meta["copy"] = cfname
1169 1169 meta["copyrev"] = hex(crev)
1170 1170 fparent1, fparent2 = nullid, newfparent
1171 1171 else:
1172 1172 self.ui.warn(_("warning: can't find ancestor for '%s' "
1173 1173 "copied from '%s'!\n") % (fname, cfname))
1174 1174
1175 1175 elif fparent1 == nullid:
1176 1176 fparent1, fparent2 = fparent2, nullid
1177 1177 elif fparent2 != nullid:
1178 1178 # is one parent an ancestor of the other?
1179 1179 fparentancestors = flog.commonancestorsheads(fparent1, fparent2)
1180 1180 if fparent1 in fparentancestors:
1181 1181 fparent1, fparent2 = fparent2, nullid
1182 1182 elif fparent2 in fparentancestors:
1183 1183 fparent2 = nullid
1184 1184
1185 1185 # is the file changed?
1186 1186 if fparent2 != nullid or flog.cmp(fparent1, text) or meta:
1187 1187 changelist.append(fname)
1188 1188 return flog.add(text, meta, tr, linkrev, fparent1, fparent2)
1189 1189
1190 1190 # are just the flags changed during merge?
1191 1191 if fparent1 != fparent2o and manifest1.flags(fname) != fctx.flags():
1192 1192 changelist.append(fname)
1193 1193
1194 1194 return fparent1
1195 1195
1196 1196 @unfilteredmethod
1197 1197 def commit(self, text="", user=None, date=None, match=None, force=False,
1198 1198 editor=False, extra={}):
1199 1199 """Add a new revision to current repository.
1200 1200
1201 1201 Revision information is gathered from the working directory,
1202 1202 match can be used to filter the committed files. If editor is
1203 1203 supplied, it is called to get a commit message.
1204 1204 """
1205 1205
1206 1206 def fail(f, msg):
1207 1207 raise util.Abort('%s: %s' % (f, msg))
1208 1208
1209 1209 if not match:
1210 1210 match = matchmod.always(self.root, '')
1211 1211
1212 1212 if not force:
1213 1213 vdirs = []
1214 1214 match.explicitdir = vdirs.append
1215 1215 match.bad = fail
1216 1216
1217 1217 wlock = self.wlock()
1218 1218 try:
1219 1219 wctx = self[None]
1220 1220 merge = len(wctx.parents()) > 1
1221 1221
1222 1222 if (not force and merge and match and
1223 1223 (match.files() or match.anypats())):
1224 1224 raise util.Abort(_('cannot partially commit a merge '
1225 1225 '(do not specify files or patterns)'))
1226 1226
1227 1227 changes = self.status(match=match, clean=force)
1228 1228 if force:
1229 1229 changes[0].extend(changes[6]) # mq may commit unchanged files
1230 1230
1231 1231 # check subrepos
1232 1232 subs = []
1233 1233 commitsubs = set()
1234 1234 newstate = wctx.substate.copy()
1235 1235 # only manage subrepos and .hgsubstate if .hgsub is present
1236 1236 if '.hgsub' in wctx:
1237 1237 # we'll decide whether to track this ourselves, thanks
1238 1238 for c in changes[:3]:
1239 1239 if '.hgsubstate' in c:
1240 1240 c.remove('.hgsubstate')
1241 1241
1242 1242 # compare current state to last committed state
1243 1243 # build new substate based on last committed state
1244 1244 oldstate = wctx.p1().substate
1245 1245 for s in sorted(newstate.keys()):
1246 1246 if not match(s):
1247 1247 # ignore working copy, use old state if present
1248 1248 if s in oldstate:
1249 1249 newstate[s] = oldstate[s]
1250 1250 continue
1251 1251 if not force:
1252 1252 raise util.Abort(
1253 1253 _("commit with new subrepo %s excluded") % s)
1254 1254 if wctx.sub(s).dirty(True):
1255 1255 if not self.ui.configbool('ui', 'commitsubrepos'):
1256 1256 raise util.Abort(
1257 1257 _("uncommitted changes in subrepo %s") % s,
1258 1258 hint=_("use --subrepos for recursive commit"))
1259 1259 subs.append(s)
1260 1260 commitsubs.add(s)
1261 1261 else:
1262 1262 bs = wctx.sub(s).basestate()
1263 1263 newstate[s] = (newstate[s][0], bs, newstate[s][2])
1264 1264 if oldstate.get(s, (None, None, None))[1] != bs:
1265 1265 subs.append(s)
1266 1266
1267 1267 # check for removed subrepos
1268 1268 for p in wctx.parents():
1269 1269 r = [s for s in p.substate if s not in newstate]
1270 1270 subs += [s for s in r if match(s)]
1271 1271 if subs:
1272 1272 if (not match('.hgsub') and
1273 1273 '.hgsub' in (wctx.modified() + wctx.added())):
1274 1274 raise util.Abort(
1275 1275 _("can't commit subrepos without .hgsub"))
1276 1276 changes[0].insert(0, '.hgsubstate')
1277 1277
1278 1278 elif '.hgsub' in changes[2]:
1279 1279 # clean up .hgsubstate when .hgsub is removed
1280 1280 if ('.hgsubstate' in wctx and
1281 1281 '.hgsubstate' not in changes[0] + changes[1] + changes[2]):
1282 1282 changes[2].insert(0, '.hgsubstate')
1283 1283
1284 1284 # make sure all explicit patterns are matched
1285 1285 if not force and match.files():
1286 1286 matched = set(changes[0] + changes[1] + changes[2])
1287 1287
1288 1288 for f in match.files():
1289 1289 f = self.dirstate.normalize(f)
1290 1290 if f == '.' or f in matched or f in wctx.substate:
1291 1291 continue
1292 1292 if f in changes[3]: # missing
1293 1293 fail(f, _('file not found!'))
1294 1294 if f in vdirs: # visited directory
1295 1295 d = f + '/'
1296 1296 for mf in matched:
1297 1297 if mf.startswith(d):
1298 1298 break
1299 1299 else:
1300 1300 fail(f, _("no match under directory!"))
1301 1301 elif f not in self.dirstate:
1302 1302 fail(f, _("file not tracked!"))
1303 1303
1304 1304 cctx = context.workingctx(self, text, user, date, extra, changes)
1305 1305
1306 1306 if (not force and not extra.get("close") and not merge
1307 1307 and not cctx.files()
1308 1308 and wctx.branch() == wctx.p1().branch()):
1309 1309 return None
1310 1310
1311 1311 if merge and cctx.deleted():
1312 1312 raise util.Abort(_("cannot commit merge with missing files"))
1313 1313
1314 1314 ms = mergemod.mergestate(self)
1315 1315 for f in changes[0]:
1316 1316 if f in ms and ms[f] == 'u':
1317 1317 raise util.Abort(_("unresolved merge conflicts "
1318 1318 "(see hg help resolve)"))
1319 1319
1320 1320 if editor:
1321 1321 cctx._text = editor(self, cctx, subs)
1322 1322 edited = (text != cctx._text)
1323 1323
1324 1324 # Save commit message in case this transaction gets rolled back
1325 1325 # (e.g. by a pretxncommit hook). Leave the content alone on
1326 1326 # the assumption that the user will use the same editor again.
1327 1327 msgfn = self.savecommitmessage(cctx._text)
1328 1328
1329 1329 # commit subs and write new state
1330 1330 if subs:
1331 1331 for s in sorted(commitsubs):
1332 1332 sub = wctx.sub(s)
1333 1333 self.ui.status(_('committing subrepository %s\n') %
1334 1334 subrepo.subrelpath(sub))
1335 1335 sr = sub.commit(cctx._text, user, date)
1336 1336 newstate[s] = (newstate[s][0], sr)
1337 1337 subrepo.writestate(self, newstate)
1338 1338
1339 1339 p1, p2 = self.dirstate.parents()
1340 1340 hookp1, hookp2 = hex(p1), (p2 != nullid and hex(p2) or '')
1341 1341 try:
1342 1342 self.hook("precommit", throw=True, parent1=hookp1,
1343 1343 parent2=hookp2)
1344 1344 ret = self.commitctx(cctx, True)
1345 1345 except: # re-raises
1346 1346 if edited:
1347 1347 self.ui.write(
1348 1348 _('note: commit message saved in %s\n') % msgfn)
1349 1349 raise
1350 1350
1351 1351 # update bookmarks, dirstate and mergestate
1352 1352 bookmarks.update(self, [p1, p2], ret)
1353 1353 cctx.markcommitted(ret)
1354 1354 ms.reset()
1355 1355 finally:
1356 1356 wlock.release()
1357 1357
1358 1358 def commithook(node=hex(ret), parent1=hookp1, parent2=hookp2):
1359 1359 self.hook("commit", node=node, parent1=parent1, parent2=parent2)
1360 1360 self._afterlock(commithook)
1361 1361 return ret
1362 1362
1363 1363 @unfilteredmethod
1364 1364 def commitctx(self, ctx, error=False):
1365 1365 """Add a new revision to current repository.
1366 1366 Revision information is passed via the context argument.
1367 1367 """
1368 1368
1369 1369 tr = lock = None
1370 1370 removed = list(ctx.removed())
1371 1371 p1, p2 = ctx.p1(), ctx.p2()
1372 1372 user = ctx.user()
1373 1373
1374 1374 lock = self.lock()
1375 1375 try:
1376 1376 tr = self.transaction("commit")
1377 1377 trp = weakref.proxy(tr)
1378 1378
1379 1379 if ctx.files():
1380 1380 m1 = p1.manifest().copy()
1381 1381 m2 = p2.manifest()
1382 1382
1383 1383 # check in files
1384 1384 new = {}
1385 1385 changed = []
1386 1386 linkrev = len(self)
1387 1387 for f in sorted(ctx.modified() + ctx.added()):
1388 1388 self.ui.note(f + "\n")
1389 1389 try:
1390 1390 fctx = ctx[f]
1391 1391 if fctx is None:
1392 1392 removed.append(f)
1393 1393 else:
1394 1394 new[f] = self._filecommit(fctx, m1, m2, linkrev,
1395 1395 trp, changed)
1396 1396 m1.set(f, fctx.flags())
1397 1397 except OSError, inst:
1398 1398 self.ui.warn(_("trouble committing %s!\n") % f)
1399 1399 raise
1400 1400 except IOError, inst:
1401 1401 errcode = getattr(inst, 'errno', errno.ENOENT)
1402 1402 if error or errcode and errcode != errno.ENOENT:
1403 1403 self.ui.warn(_("trouble committing %s!\n") % f)
1404 1404 raise
1405 1405
1406 1406 # update manifest
1407 1407 m1.update(new)
1408 1408 removed = [f for f in sorted(removed) if f in m1 or f in m2]
1409 1409 drop = [f for f in removed if f in m1]
1410 1410 for f in drop:
1411 1411 del m1[f]
1412 1412 mn = self.manifest.add(m1, trp, linkrev, p1.manifestnode(),
1413 1413 p2.manifestnode(), (new, drop))
1414 1414 files = changed + removed
1415 1415 else:
1416 1416 mn = p1.manifestnode()
1417 1417 files = []
1418 1418
1419 1419 # update changelog
1420 1420 self.changelog.delayupdate()
1421 1421 n = self.changelog.add(mn, files, ctx.description(),
1422 1422 trp, p1.node(), p2.node(),
1423 1423 user, ctx.date(), ctx.extra().copy())
1424 1424 p = lambda: self.changelog.writepending() and self.root or ""
1425 1425 xp1, xp2 = p1.hex(), p2 and p2.hex() or ''
1426 1426 self.hook('pretxncommit', throw=True, node=hex(n), parent1=xp1,
1427 1427 parent2=xp2, pending=p)
1428 1428 self.changelog.finalize(trp)
1429 1429 # set the new commit is proper phase
1430 1430 targetphase = subrepo.newcommitphase(self.ui, ctx)
1431 1431 if targetphase:
1432 1432 # retract boundary do not alter parent changeset.
1433 1433 # if a parent have higher the resulting phase will
1434 1434 # be compliant anyway
1435 1435 #
1436 1436 # if minimal phase was 0 we don't need to retract anything
1437 1437 phases.retractboundary(self, tr, targetphase, [n])
1438 1438 tr.close()
1439 1439 branchmap.updatecache(self.filtered('served'))
1440 1440 return n
1441 1441 finally:
1442 1442 if tr:
1443 1443 tr.release()
1444 1444 lock.release()
1445 1445
1446 1446 @unfilteredmethod
1447 1447 def destroying(self):
1448 1448 '''Inform the repository that nodes are about to be destroyed.
1449 1449 Intended for use by strip and rollback, so there's a common
1450 1450 place for anything that has to be done before destroying history.
1451 1451
1452 1452 This is mostly useful for saving state that is in memory and waiting
1453 1453 to be flushed when the current lock is released. Because a call to
1454 1454 destroyed is imminent, the repo will be invalidated causing those
1455 1455 changes to stay in memory (waiting for the next unlock), or vanish
1456 1456 completely.
1457 1457 '''
1458 1458 # When using the same lock to commit and strip, the phasecache is left
1459 1459 # dirty after committing. Then when we strip, the repo is invalidated,
1460 1460 # causing those changes to disappear.
1461 1461 if '_phasecache' in vars(self):
1462 1462 self._phasecache.write()
1463 1463
1464 1464 @unfilteredmethod
1465 1465 def destroyed(self):
1466 1466 '''Inform the repository that nodes have been destroyed.
1467 1467 Intended for use by strip and rollback, so there's a common
1468 1468 place for anything that has to be done after destroying history.
1469 1469 '''
1470 1470 # When one tries to:
1471 1471 # 1) destroy nodes thus calling this method (e.g. strip)
1472 1472 # 2) use phasecache somewhere (e.g. commit)
1473 1473 #
1474 1474 # then 2) will fail because the phasecache contains nodes that were
1475 1475 # removed. We can either remove phasecache from the filecache,
1476 1476 # causing it to reload next time it is accessed, or simply filter
1477 1477 # the removed nodes now and write the updated cache.
1478 1478 self._phasecache.filterunknown(self)
1479 1479 self._phasecache.write()
1480 1480
1481 1481 # update the 'served' branch cache to help read only server process
1482 1482 # Thanks to branchcache collaboration this is done from the nearest
1483 1483 # filtered subset and it is expected to be fast.
1484 1484 branchmap.updatecache(self.filtered('served'))
1485 1485
1486 1486 # Ensure the persistent tag cache is updated. Doing it now
1487 1487 # means that the tag cache only has to worry about destroyed
1488 1488 # heads immediately after a strip/rollback. That in turn
1489 1489 # guarantees that "cachetip == currenttip" (comparing both rev
1490 1490 # and node) always means no nodes have been added or destroyed.
1491 1491
1492 1492 # XXX this is suboptimal when qrefresh'ing: we strip the current
1493 1493 # head, refresh the tag cache, then immediately add a new head.
1494 1494 # But I think doing it this way is necessary for the "instant
1495 1495 # tag cache retrieval" case to work.
1496 1496 self.invalidate()
1497 1497
1498 1498 def walk(self, match, node=None):
1499 1499 '''
1500 1500 walk recursively through the directory tree or a given
1501 1501 changeset, finding all files matched by the match
1502 1502 function
1503 1503 '''
1504 1504 return self[node].walk(match)
1505 1505
1506 1506 def status(self, node1='.', node2=None, match=None,
1507 1507 ignored=False, clean=False, unknown=False,
1508 1508 listsubrepos=False):
1509 1509 '''a convenience method that calls node1.status(node2)'''
1510 1510 return self[node1].status(node2, match, ignored, clean, unknown,
1511 1511 listsubrepos)
1512 1512
1513 1513 def heads(self, start=None):
1514 1514 heads = self.changelog.heads(start)
1515 1515 # sort the output in rev descending order
1516 1516 return sorted(heads, key=self.changelog.rev, reverse=True)
1517 1517
1518 1518 def branchheads(self, branch=None, start=None, closed=False):
1519 1519 '''return a (possibly filtered) list of heads for the given branch
1520 1520
1521 1521 Heads are returned in topological order, from newest to oldest.
1522 1522 If branch is None, use the dirstate branch.
1523 1523 If start is not None, return only heads reachable from start.
1524 1524 If closed is True, return heads that are marked as closed as well.
1525 1525 '''
1526 1526 if branch is None:
1527 1527 branch = self[None].branch()
1528 1528 branches = self.branchmap()
1529 1529 if branch not in branches:
1530 1530 return []
1531 1531 # the cache returns heads ordered lowest to highest
1532 1532 bheads = list(reversed(branches.branchheads(branch, closed=closed)))
1533 1533 if start is not None:
1534 1534 # filter out the heads that cannot be reached from startrev
1535 1535 fbheads = set(self.changelog.nodesbetween([start], bheads)[2])
1536 1536 bheads = [h for h in bheads if h in fbheads]
1537 1537 return bheads
1538 1538
1539 1539 def branches(self, nodes):
1540 1540 if not nodes:
1541 1541 nodes = [self.changelog.tip()]
1542 1542 b = []
1543 1543 for n in nodes:
1544 1544 t = n
1545 1545 while True:
1546 1546 p = self.changelog.parents(n)
1547 1547 if p[1] != nullid or p[0] == nullid:
1548 1548 b.append((t, n, p[0], p[1]))
1549 1549 break
1550 1550 n = p[0]
1551 1551 return b
1552 1552
1553 1553 def between(self, pairs):
1554 1554 r = []
1555 1555
1556 1556 for top, bottom in pairs:
1557 1557 n, l, i = top, [], 0
1558 1558 f = 1
1559 1559
1560 1560 while n != bottom and n != nullid:
1561 1561 p = self.changelog.parents(n)[0]
1562 1562 if i == f:
1563 1563 l.append(n)
1564 1564 f = f * 2
1565 1565 n = p
1566 1566 i += 1
1567 1567
1568 1568 r.append(l)
1569 1569
1570 1570 return r
1571 1571
1572 1572 def pull(self, remote, heads=None, force=False):
1573 1573 return exchange.pull (self, remote, heads, force)
1574 1574
1575 1575 def checkpush(self, pushop):
1576 1576 """Extensions can override this function if additional checks have
1577 1577 to be performed before pushing, or call it if they override push
1578 1578 command.
1579 1579 """
1580 1580 pass
1581 1581
1582 1582 @unfilteredpropertycache
1583 1583 def prepushoutgoinghooks(self):
1584 1584 """Return util.hooks consists of "(repo, remote, outgoing)"
1585 1585 functions, which are called before pushing changesets.
1586 1586 """
1587 1587 return util.hooks()
1588 1588
1589 1589 def push(self, remote, force=False, revs=None, newbranch=False):
1590 1590 return exchange.push(self, remote, force, revs, newbranch)
1591 1591
1592 1592 def stream_in(self, remote, requirements):
1593 1593 lock = self.lock()
1594 1594 try:
1595 1595 # Save remote branchmap. We will use it later
1596 1596 # to speed up branchcache creation
1597 1597 rbranchmap = None
1598 1598 if remote.capable("branchmap"):
1599 1599 rbranchmap = remote.branchmap()
1600 1600
1601 1601 fp = remote.stream_out()
1602 1602 l = fp.readline()
1603 1603 try:
1604 1604 resp = int(l)
1605 1605 except ValueError:
1606 1606 raise error.ResponseError(
1607 1607 _('unexpected response from remote server:'), l)
1608 1608 if resp == 1:
1609 1609 raise util.Abort(_('operation forbidden by server'))
1610 1610 elif resp == 2:
1611 1611 raise util.Abort(_('locking the remote repository failed'))
1612 1612 elif resp != 0:
1613 1613 raise util.Abort(_('the server sent an unknown error code'))
1614 1614 self.ui.status(_('streaming all changes\n'))
1615 1615 l = fp.readline()
1616 1616 try:
1617 1617 total_files, total_bytes = map(int, l.split(' ', 1))
1618 1618 except (ValueError, TypeError):
1619 1619 raise error.ResponseError(
1620 1620 _('unexpected response from remote server:'), l)
1621 1621 self.ui.status(_('%d files to transfer, %s of data\n') %
1622 1622 (total_files, util.bytecount(total_bytes)))
1623 1623 handled_bytes = 0
1624 1624 self.ui.progress(_('clone'), 0, total=total_bytes)
1625 1625 start = time.time()
1626 1626
1627 1627 tr = self.transaction(_('clone'))
1628 1628 try:
1629 1629 for i in xrange(total_files):
1630 1630 # XXX doesn't support '\n' or '\r' in filenames
1631 1631 l = fp.readline()
1632 1632 try:
1633 1633 name, size = l.split('\0', 1)
1634 1634 size = int(size)
1635 1635 except (ValueError, TypeError):
1636 1636 raise error.ResponseError(
1637 1637 _('unexpected response from remote server:'), l)
1638 1638 if self.ui.debugflag:
1639 1639 self.ui.debug('adding %s (%s)\n' %
1640 1640 (name, util.bytecount(size)))
1641 1641 # for backwards compat, name was partially encoded
1642 1642 ofp = self.sopener(store.decodedir(name), 'w')
1643 1643 for chunk in util.filechunkiter(fp, limit=size):
1644 1644 handled_bytes += len(chunk)
1645 1645 self.ui.progress(_('clone'), handled_bytes,
1646 1646 total=total_bytes)
1647 1647 ofp.write(chunk)
1648 1648 ofp.close()
1649 1649 tr.close()
1650 1650 finally:
1651 1651 tr.release()
1652 1652
1653 1653 # Writing straight to files circumvented the inmemory caches
1654 1654 self.invalidate()
1655 1655
1656 1656 elapsed = time.time() - start
1657 1657 if elapsed <= 0:
1658 1658 elapsed = 0.001
1659 1659 self.ui.progress(_('clone'), None)
1660 1660 self.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
1661 1661 (util.bytecount(total_bytes), elapsed,
1662 1662 util.bytecount(total_bytes / elapsed)))
1663 1663
1664 1664 # new requirements = old non-format requirements +
1665 1665 # new format-related
1666 1666 # requirements from the streamed-in repository
1667 1667 requirements.update(set(self.requirements) - self.supportedformats)
1668 1668 self._applyrequirements(requirements)
1669 1669 self._writerequirements()
1670 1670
1671 1671 if rbranchmap:
1672 1672 rbheads = []
1673 1673 for bheads in rbranchmap.itervalues():
1674 1674 rbheads.extend(bheads)
1675 1675
1676 1676 if rbheads:
1677 1677 rtiprev = max((int(self.changelog.rev(node))
1678 1678 for node in rbheads))
1679 1679 cache = branchmap.branchcache(rbranchmap,
1680 1680 self[rtiprev].node(),
1681 1681 rtiprev)
1682 1682 # Try to stick it as low as possible
1683 1683 # filter above served are unlikely to be fetch from a clone
1684 1684 for candidate in ('base', 'immutable', 'served'):
1685 1685 rview = self.filtered(candidate)
1686 1686 if cache.validfor(rview):
1687 1687 self._branchcaches[candidate] = cache
1688 1688 cache.write(rview)
1689 1689 break
1690 1690 self.invalidate()
1691 1691 return len(self.heads()) + 1
1692 1692 finally:
1693 1693 lock.release()
1694 1694
1695 1695 def clone(self, remote, heads=[], stream=False):
1696 1696 '''clone remote repository.
1697 1697
1698 1698 keyword arguments:
1699 1699 heads: list of revs to clone (forces use of pull)
1700 1700 stream: use streaming clone if possible'''
1701 1701
1702 1702 # now, all clients that can request uncompressed clones can
1703 1703 # read repo formats supported by all servers that can serve
1704 1704 # them.
1705 1705
1706 1706 # if revlog format changes, client will have to check version
1707 1707 # and format flags on "stream" capability, and use
1708 1708 # uncompressed only if compatible.
1709 1709
1710 1710 if not stream:
1711 1711 # if the server explicitly prefers to stream (for fast LANs)
1712 1712 stream = remote.capable('stream-preferred')
1713 1713
1714 1714 if stream and not heads:
1715 1715 # 'stream' means remote revlog format is revlogv1 only
1716 1716 if remote.capable('stream'):
1717 1717 return self.stream_in(remote, set(('revlogv1',)))
1718 1718 # otherwise, 'streamreqs' contains the remote revlog format
1719 1719 streamreqs = remote.capable('streamreqs')
1720 1720 if streamreqs:
1721 1721 streamreqs = set(streamreqs.split(','))
1722 1722 # if we support it, stream in and adjust our requirements
1723 1723 if not streamreqs - self.supportedformats:
1724 1724 return self.stream_in(remote, streamreqs)
1725 1725 return self.pull(remote, heads)
1726 1726
1727 1727 def pushkey(self, namespace, key, old, new):
1728 1728 self.hook('prepushkey', throw=True, namespace=namespace, key=key,
1729 1729 old=old, new=new)
1730 1730 self.ui.debug('pushing key for "%s:%s"\n' % (namespace, key))
1731 1731 ret = pushkey.push(self, namespace, key, old, new)
1732 1732 self.hook('pushkey', namespace=namespace, key=key, old=old, new=new,
1733 1733 ret=ret)
1734 1734 return ret
1735 1735
1736 1736 def listkeys(self, namespace):
1737 1737 self.hook('prelistkeys', throw=True, namespace=namespace)
1738 1738 self.ui.debug('listing keys for "%s"\n' % namespace)
1739 1739 values = pushkey.list(self, namespace)
1740 1740 self.hook('listkeys', namespace=namespace, values=values)
1741 1741 return values
1742 1742
1743 1743 def debugwireargs(self, one, two, three=None, four=None, five=None):
1744 1744 '''used to test argument passing over the wire'''
1745 1745 return "%s %s %s %s %s" % (one, two, three, four, five)
1746 1746
1747 1747 def savecommitmessage(self, text):
1748 1748 fp = self.opener('last-message.txt', 'wb')
1749 1749 try:
1750 1750 fp.write(text)
1751 1751 finally:
1752 1752 fp.close()
1753 1753 return self.pathto(fp.name[len(self.root) + 1:])
1754 1754
1755 1755 # used to avoid circular references so destructors work
1756 1756 def aftertrans(files):
1757 1757 renamefiles = [tuple(t) for t in files]
1758 1758 def a():
1759 1759 for vfs, src, dest in renamefiles:
1760 1760 try:
1761 1761 vfs.rename(src, dest)
1762 1762 except OSError: # journal file does not yet exist
1763 1763 pass
1764 1764 return a
1765 1765
1766 1766 def undoname(fn):
1767 1767 base, name = os.path.split(fn)
1768 1768 assert name.startswith('journal')
1769 1769 return os.path.join(base, name.replace('journal', 'undo', 1))
1770 1770
1771 1771 def instance(ui, path, create):
1772 1772 return localrepository(ui, util.urllocalpath(path), create)
1773 1773
1774 1774 def islocal(path):
1775 1775 return True
@@ -1,868 +1,868
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import urllib, tempfile, os, sys
9 9 from i18n import _
10 10 from node import bin, hex
11 11 import changegroup as changegroupmod, bundle2, pushkey as pushkeymod
12 12 import peer, error, encoding, util, store, exchange
13 13
14 14
15 15 class abstractserverproto(object):
16 16 """abstract class that summarizes the protocol API
17 17
18 18 Used as reference and documentation.
19 19 """
20 20
21 21 def getargs(self, args):
22 22 """return the value for arguments in <args>
23 23
24 24 returns a list of values (same order as <args>)"""
25 25 raise NotImplementedError()
26 26
27 27 def getfile(self, fp):
28 28 """write the whole content of a file into a file like object
29 29
30 30 The file is in the form::
31 31
32 32 (<chunk-size>\n<chunk>)+0\n
33 33
34 34 chunk size is the ascii version of the int.
35 35 """
36 36 raise NotImplementedError()
37 37
38 38 def redirect(self):
39 39 """may setup interception for stdout and stderr
40 40
41 41 See also the `restore` method."""
42 42 raise NotImplementedError()
43 43
44 44 # If the `redirect` function does install interception, the `restore`
45 45 # function MUST be defined. If interception is not used, this function
46 46 # MUST NOT be defined.
47 47 #
48 48 # left commented here on purpose
49 49 #
50 50 #def restore(self):
51 51 # """reinstall previous stdout and stderr and return intercepted stdout
52 52 # """
53 53 # raise NotImplementedError()
54 54
55 55 def groupchunks(self, cg):
56 56 """return 4096 chunks from a changegroup object
57 57
58 58 Some protocols may have compressed the contents."""
59 59 raise NotImplementedError()
60 60
61 61 # abstract batching support
62 62
63 63 class future(object):
64 64 '''placeholder for a value to be set later'''
65 65 def set(self, value):
66 66 if util.safehasattr(self, 'value'):
67 67 raise error.RepoError("future is already set")
68 68 self.value = value
69 69
70 70 class batcher(object):
71 71 '''base class for batches of commands submittable in a single request
72 72
73 73 All methods invoked on instances of this class are simply queued and
74 74 return a a future for the result. Once you call submit(), all the queued
75 75 calls are performed and the results set in their respective futures.
76 76 '''
77 77 def __init__(self):
78 78 self.calls = []
79 79 def __getattr__(self, name):
80 80 def call(*args, **opts):
81 81 resref = future()
82 82 self.calls.append((name, args, opts, resref,))
83 83 return resref
84 84 return call
85 85 def submit(self):
86 86 pass
87 87
88 88 class localbatch(batcher):
89 89 '''performs the queued calls directly'''
90 90 def __init__(self, local):
91 91 batcher.__init__(self)
92 92 self.local = local
93 93 def submit(self):
94 94 for name, args, opts, resref in self.calls:
95 95 resref.set(getattr(self.local, name)(*args, **opts))
96 96
97 97 class remotebatch(batcher):
98 98 '''batches the queued calls; uses as few roundtrips as possible'''
99 99 def __init__(self, remote):
100 100 '''remote must support _submitbatch(encbatch) and
101 101 _submitone(op, encargs)'''
102 102 batcher.__init__(self)
103 103 self.remote = remote
104 104 def submit(self):
105 105 req, rsp = [], []
106 106 for name, args, opts, resref in self.calls:
107 107 mtd = getattr(self.remote, name)
108 108 batchablefn = getattr(mtd, 'batchable', None)
109 109 if batchablefn is not None:
110 110 batchable = batchablefn(mtd.im_self, *args, **opts)
111 111 encargsorres, encresref = batchable.next()
112 112 if encresref:
113 113 req.append((name, encargsorres,))
114 114 rsp.append((batchable, encresref, resref,))
115 115 else:
116 116 resref.set(encargsorres)
117 117 else:
118 118 if req:
119 119 self._submitreq(req, rsp)
120 120 req, rsp = [], []
121 121 resref.set(mtd(*args, **opts))
122 122 if req:
123 123 self._submitreq(req, rsp)
124 124 def _submitreq(self, req, rsp):
125 125 encresults = self.remote._submitbatch(req)
126 126 for encres, r in zip(encresults, rsp):
127 127 batchable, encresref, resref = r
128 128 encresref.set(encres)
129 129 resref.set(batchable.next())
130 130
131 131 def batchable(f):
132 132 '''annotation for batchable methods
133 133
134 134 Such methods must implement a coroutine as follows:
135 135
136 136 @batchable
137 137 def sample(self, one, two=None):
138 138 # Handle locally computable results first:
139 139 if not one:
140 140 yield "a local result", None
141 141 # Build list of encoded arguments suitable for your wire protocol:
142 142 encargs = [('one', encode(one),), ('two', encode(two),)]
143 143 # Create future for injection of encoded result:
144 144 encresref = future()
145 145 # Return encoded arguments and future:
146 146 yield encargs, encresref
147 147 # Assuming the future to be filled with the result from the batched
148 148 # request now. Decode it:
149 149 yield decode(encresref.value)
150 150
151 151 The decorator returns a function which wraps this coroutine as a plain
152 152 method, but adds the original method as an attribute called "batchable",
153 153 which is used by remotebatch to split the call into separate encoding and
154 154 decoding phases.
155 155 '''
156 156 def plain(*args, **opts):
157 157 batchable = f(*args, **opts)
158 158 encargsorres, encresref = batchable.next()
159 159 if not encresref:
160 160 return encargsorres # a local result in this case
161 161 self = args[0]
162 162 encresref.set(self._submitone(f.func_name, encargsorres))
163 163 return batchable.next()
164 164 setattr(plain, 'batchable', f)
165 165 return plain
166 166
167 167 # list of nodes encoding / decoding
168 168
169 169 def decodelist(l, sep=' '):
170 170 if l:
171 171 return map(bin, l.split(sep))
172 172 return []
173 173
174 174 def encodelist(l, sep=' '):
175 175 return sep.join(map(hex, l))
176 176
177 177 # batched call argument encoding
178 178
179 179 def escapearg(plain):
180 180 return (plain
181 181 .replace(':', '::')
182 182 .replace(',', ':,')
183 183 .replace(';', ':;')
184 184 .replace('=', ':='))
185 185
186 186 def unescapearg(escaped):
187 187 return (escaped
188 188 .replace(':=', '=')
189 189 .replace(':;', ';')
190 190 .replace(':,', ',')
191 191 .replace('::', ':'))
192 192
193 193 # mapping of options accepted by getbundle and their types
194 194 #
195 195 # Meant to be extended by extensions. It is extensions responsibility to ensure
196 196 # such options are properly processed in exchange.getbundle.
197 197 #
198 198 # supported types are:
199 199 #
200 200 # :nodes: list of binary nodes
201 201 # :csv: list of comma-separated values
202 202 # :plain: string with no transformation needed.
203 203 gboptsmap = {'heads': 'nodes',
204 204 'common': 'nodes',
205 205 'bundlecaps': 'csv',
206 206 'listkeys': 'csv',
207 207 'cg': 'boolean'}
208 208
209 209 # client side
210 210
211 211 class wirepeer(peer.peerrepository):
212 212
213 213 def batch(self):
214 214 return remotebatch(self)
215 215 def _submitbatch(self, req):
216 216 cmds = []
217 217 for op, argsdict in req:
218 218 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
219 219 cmds.append('%s %s' % (op, args))
220 220 rsp = self._call("batch", cmds=';'.join(cmds))
221 221 return rsp.split(';')
222 222 def _submitone(self, op, args):
223 223 return self._call(op, **args)
224 224
225 225 @batchable
226 226 def lookup(self, key):
227 227 self.requirecap('lookup', _('look up remote revision'))
228 228 f = future()
229 229 yield {'key': encoding.fromlocal(key)}, f
230 230 d = f.value
231 231 success, data = d[:-1].split(" ", 1)
232 232 if int(success):
233 233 yield bin(data)
234 234 self._abort(error.RepoError(data))
235 235
236 236 @batchable
237 237 def heads(self):
238 238 f = future()
239 239 yield {}, f
240 240 d = f.value
241 241 try:
242 242 yield decodelist(d[:-1])
243 243 except ValueError:
244 244 self._abort(error.ResponseError(_("unexpected response:"), d))
245 245
246 246 @batchable
247 247 def known(self, nodes):
248 248 f = future()
249 249 yield {'nodes': encodelist(nodes)}, f
250 250 d = f.value
251 251 try:
252 252 yield [bool(int(b)) for b in d]
253 253 except ValueError:
254 254 self._abort(error.ResponseError(_("unexpected response:"), d))
255 255
256 256 @batchable
257 257 def branchmap(self):
258 258 f = future()
259 259 yield {}, f
260 260 d = f.value
261 261 try:
262 262 branchmap = {}
263 263 for branchpart in d.splitlines():
264 264 branchname, branchheads = branchpart.split(' ', 1)
265 265 branchname = encoding.tolocal(urllib.unquote(branchname))
266 266 branchheads = decodelist(branchheads)
267 267 branchmap[branchname] = branchheads
268 268 yield branchmap
269 269 except TypeError:
270 270 self._abort(error.ResponseError(_("unexpected response:"), d))
271 271
272 272 def branches(self, nodes):
273 273 n = encodelist(nodes)
274 274 d = self._call("branches", nodes=n)
275 275 try:
276 276 br = [tuple(decodelist(b)) for b in d.splitlines()]
277 277 return br
278 278 except ValueError:
279 279 self._abort(error.ResponseError(_("unexpected response:"), d))
280 280
281 281 def between(self, pairs):
282 282 batch = 8 # avoid giant requests
283 283 r = []
284 284 for i in xrange(0, len(pairs), batch):
285 285 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
286 286 d = self._call("between", pairs=n)
287 287 try:
288 288 r.extend(l and decodelist(l) or [] for l in d.splitlines())
289 289 except ValueError:
290 290 self._abort(error.ResponseError(_("unexpected response:"), d))
291 291 return r
292 292
293 293 @batchable
294 294 def pushkey(self, namespace, key, old, new):
295 295 if not self.capable('pushkey'):
296 296 yield False, None
297 297 f = future()
298 298 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
299 299 yield {'namespace': encoding.fromlocal(namespace),
300 300 'key': encoding.fromlocal(key),
301 301 'old': encoding.fromlocal(old),
302 302 'new': encoding.fromlocal(new)}, f
303 303 d = f.value
304 304 d, output = d.split('\n', 1)
305 305 try:
306 306 d = bool(int(d))
307 307 except ValueError:
308 308 raise error.ResponseError(
309 309 _('push failed (unexpected response):'), d)
310 310 for l in output.splitlines(True):
311 311 self.ui.status(_('remote: '), l)
312 312 yield d
313 313
314 314 @batchable
315 315 def listkeys(self, namespace):
316 316 if not self.capable('pushkey'):
317 317 yield {}, None
318 318 f = future()
319 319 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
320 320 yield {'namespace': encoding.fromlocal(namespace)}, f
321 321 d = f.value
322 322 yield pushkeymod.decodekeys(d)
323 323
324 324 def stream_out(self):
325 325 return self._callstream('stream_out')
326 326
327 327 def changegroup(self, nodes, kind):
328 328 n = encodelist(nodes)
329 329 f = self._callcompressable("changegroup", roots=n)
330 330 return changegroupmod.unbundle10(f, 'UN')
331 331
332 332 def changegroupsubset(self, bases, heads, kind):
333 333 self.requirecap('changegroupsubset', _('look up remote changes'))
334 334 bases = encodelist(bases)
335 335 heads = encodelist(heads)
336 336 f = self._callcompressable("changegroupsubset",
337 337 bases=bases, heads=heads)
338 338 return changegroupmod.unbundle10(f, 'UN')
339 339
340 340 def getbundle(self, source, **kwargs):
341 341 self.requirecap('getbundle', _('look up remote changes'))
342 342 opts = {}
343 343 for key, value in kwargs.iteritems():
344 344 if value is None:
345 345 continue
346 346 keytype = gboptsmap.get(key)
347 347 if keytype is None:
348 348 assert False, 'unexpected'
349 349 elif keytype == 'nodes':
350 350 value = encodelist(value)
351 351 elif keytype == 'csv':
352 352 value = ','.join(value)
353 353 elif keytype == 'boolean':
354 354 value = bool(value)
355 355 elif keytype != 'plain':
356 356 raise KeyError('unknown getbundle option type %s'
357 357 % keytype)
358 358 opts[key] = value
359 359 f = self._callcompressable("getbundle", **opts)
360 360 bundlecaps = kwargs.get('bundlecaps')
361 361 if bundlecaps is not None and 'HG2X' in bundlecaps:
362 362 return bundle2.unbundle20(self.ui, f)
363 363 else:
364 364 return changegroupmod.unbundle10(f, 'UN')
365 365
366 366 def unbundle(self, cg, heads, source):
367 367 '''Send cg (a readable file-like object representing the
368 368 changegroup to push, typically a chunkbuffer object) to the
369 369 remote server as a bundle.
370 370
371 371 When pushing a bundle10 stream, return an integer indicating the
372 372 result of the push (see localrepository.addchangegroup()).
373 373
374 374 When pushing a bundle20 stream, return a bundle20 stream.'''
375 375
376 376 if heads != ['force'] and self.capable('unbundlehash'):
377 377 heads = encodelist(['hashed',
378 378 util.sha1(''.join(sorted(heads))).digest()])
379 379 else:
380 380 heads = encodelist(heads)
381 381
382 382 if util.safehasattr(cg, 'deltaheader'):
383 383 # this a bundle10, do the old style call sequence
384 384 ret, output = self._callpush("unbundle", cg, heads=heads)
385 385 if ret == "":
386 386 raise error.ResponseError(
387 387 _('push failed:'), output)
388 388 try:
389 389 ret = int(ret)
390 390 except ValueError:
391 391 raise error.ResponseError(
392 392 _('push failed (unexpected response):'), ret)
393 393
394 394 for l in output.splitlines(True):
395 395 self.ui.status(_('remote: '), l)
396 396 else:
397 397 # bundle2 push. Send a stream, fetch a stream.
398 398 stream = self._calltwowaystream('unbundle', cg, heads=heads)
399 399 ret = bundle2.unbundle20(self.ui, stream)
400 400 return ret
401 401
402 402 def debugwireargs(self, one, two, three=None, four=None, five=None):
403 403 # don't pass optional arguments left at their default value
404 404 opts = {}
405 405 if three is not None:
406 406 opts['three'] = three
407 407 if four is not None:
408 408 opts['four'] = four
409 409 return self._call('debugwireargs', one=one, two=two, **opts)
410 410
411 411 def _call(self, cmd, **args):
412 412 """execute <cmd> on the server
413 413
414 414 The command is expected to return a simple string.
415 415
416 416 returns the server reply as a string."""
417 417 raise NotImplementedError()
418 418
419 419 def _callstream(self, cmd, **args):
420 420 """execute <cmd> on the server
421 421
422 422 The command is expected to return a stream.
423 423
424 424 returns the server reply as a file like object."""
425 425 raise NotImplementedError()
426 426
427 427 def _callcompressable(self, cmd, **args):
428 428 """execute <cmd> on the server
429 429
430 430 The command is expected to return a stream.
431 431
432 432 The stream may have been compressed in some implementations. This
433 433 function takes care of the decompression. This is the only difference
434 434 with _callstream.
435 435
436 436 returns the server reply as a file like object.
437 437 """
438 438 raise NotImplementedError()
439 439
440 440 def _callpush(self, cmd, fp, **args):
441 441 """execute a <cmd> on server
442 442
443 443 The command is expected to be related to a push. Push has a special
444 444 return method.
445 445
446 446 returns the server reply as a (ret, output) tuple. ret is either
447 447 empty (error) or a stringified int.
448 448 """
449 449 raise NotImplementedError()
450 450
451 451 def _calltwowaystream(self, cmd, fp, **args):
452 452 """execute <cmd> on server
453 453
454 454 The command will send a stream to the server and get a stream in reply.
455 455 """
456 456 raise NotImplementedError()
457 457
458 458 def _abort(self, exception):
459 459 """clearly abort the wire protocol connection and raise the exception
460 460 """
461 461 raise NotImplementedError()
462 462
463 463 # server side
464 464
465 465 # wire protocol command can either return a string or one of these classes.
466 466 class streamres(object):
467 467 """wireproto reply: binary stream
468 468
469 469 The call was successful and the result is a stream.
470 470 Iterate on the `self.gen` attribute to retrieve chunks.
471 471 """
472 472 def __init__(self, gen):
473 473 self.gen = gen
474 474
475 475 class pushres(object):
476 476 """wireproto reply: success with simple integer return
477 477
478 478 The call was successful and returned an integer contained in `self.res`.
479 479 """
480 480 def __init__(self, res):
481 481 self.res = res
482 482
483 483 class pusherr(object):
484 484 """wireproto reply: failure
485 485
486 486 The call failed. The `self.res` attribute contains the error message.
487 487 """
488 488 def __init__(self, res):
489 489 self.res = res
490 490
491 491 class ooberror(object):
492 492 """wireproto reply: failure of a batch of operation
493 493
494 494 Something failed during a batch call. The error message is stored in
495 495 `self.message`.
496 496 """
497 497 def __init__(self, message):
498 498 self.message = message
499 499
500 500 def dispatch(repo, proto, command):
501 501 repo = repo.filtered("served")
502 502 func, spec = commands[command]
503 503 args = proto.getargs(spec)
504 504 return func(repo, proto, *args)
505 505
506 506 def options(cmd, keys, others):
507 507 opts = {}
508 508 for k in keys:
509 509 if k in others:
510 510 opts[k] = others[k]
511 511 del others[k]
512 512 if others:
513 513 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
514 514 % (cmd, ",".join(others)))
515 515 return opts
516 516
517 517 # list of commands
518 518 commands = {}
519 519
520 520 def wireprotocommand(name, args=''):
521 521 """decorator for wire protocol command"""
522 522 def register(func):
523 523 commands[name] = (func, args)
524 524 return func
525 525 return register
526 526
527 527 @wireprotocommand('batch', 'cmds *')
528 528 def batch(repo, proto, cmds, others):
529 529 repo = repo.filtered("served")
530 530 res = []
531 531 for pair in cmds.split(';'):
532 532 op, args = pair.split(' ', 1)
533 533 vals = {}
534 534 for a in args.split(','):
535 535 if a:
536 536 n, v = a.split('=')
537 537 vals[n] = unescapearg(v)
538 538 func, spec = commands[op]
539 539 if spec:
540 540 keys = spec.split()
541 541 data = {}
542 542 for k in keys:
543 543 if k == '*':
544 544 star = {}
545 545 for key in vals.keys():
546 546 if key not in keys:
547 547 star[key] = vals[key]
548 548 data['*'] = star
549 549 else:
550 550 data[k] = vals[k]
551 551 result = func(repo, proto, *[data[k] for k in keys])
552 552 else:
553 553 result = func(repo, proto)
554 554 if isinstance(result, ooberror):
555 555 return result
556 556 res.append(escapearg(result))
557 557 return ';'.join(res)
558 558
559 559 @wireprotocommand('between', 'pairs')
560 560 def between(repo, proto, pairs):
561 561 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
562 562 r = []
563 563 for b in repo.between(pairs):
564 564 r.append(encodelist(b) + "\n")
565 565 return "".join(r)
566 566
567 567 @wireprotocommand('branchmap')
568 568 def branchmap(repo, proto):
569 569 branchmap = repo.branchmap()
570 570 heads = []
571 571 for branch, nodes in branchmap.iteritems():
572 572 branchname = urllib.quote(encoding.fromlocal(branch))
573 573 branchnodes = encodelist(nodes)
574 574 heads.append('%s %s' % (branchname, branchnodes))
575 575 return '\n'.join(heads)
576 576
577 577 @wireprotocommand('branches', 'nodes')
578 578 def branches(repo, proto, nodes):
579 579 nodes = decodelist(nodes)
580 580 r = []
581 581 for b in repo.branches(nodes):
582 582 r.append(encodelist(b) + "\n")
583 583 return "".join(r)
584 584
585 585
586 586 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
587 587 'known', 'getbundle', 'unbundlehash', 'batch']
588 588
589 589 def _capabilities(repo, proto):
590 590 """return a list of capabilities for a repo
591 591
592 592 This function exists to allow extensions to easily wrap capabilities
593 593 computation
594 594
595 595 - returns a lists: easy to alter
596 596 - change done here will be propagated to both `capabilities` and `hello`
597 597 command without any other action needed.
598 598 """
599 599 # copy to prevent modification of the global list
600 600 caps = list(wireprotocaps)
601 601 if _allowstream(repo.ui):
602 602 if repo.ui.configbool('server', 'preferuncompressed', False):
603 603 caps.append('stream-preferred')
604 604 requiredformats = repo.requirements & repo.supportedformats
605 605 # if our local revlogs are just revlogv1, add 'stream' cap
606 606 if not requiredformats - set(('revlogv1',)):
607 607 caps.append('stream')
608 608 # otherwise, add 'streamreqs' detailing our local revlog format
609 609 else:
610 610 caps.append('streamreqs=%s' % ','.join(requiredformats))
611 611 if repo.ui.configbool('experimental', 'bundle2-exp', False):
612 capsblob = bundle2.encodecaps(bundle2.capabilities)
612 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
613 613 caps.append('bundle2-exp=' + urllib.quote(capsblob))
614 614 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
615 615 caps.append('httpheader=1024')
616 616 return caps
617 617
618 618 # If you are writing an extension and consider wrapping this function. Wrap
619 619 # `_capabilities` instead.
620 620 @wireprotocommand('capabilities')
621 621 def capabilities(repo, proto):
622 622 return ' '.join(_capabilities(repo, proto))
623 623
624 624 @wireprotocommand('changegroup', 'roots')
625 625 def changegroup(repo, proto, roots):
626 626 nodes = decodelist(roots)
627 627 cg = changegroupmod.changegroup(repo, nodes, 'serve')
628 628 return streamres(proto.groupchunks(cg))
629 629
630 630 @wireprotocommand('changegroupsubset', 'bases heads')
631 631 def changegroupsubset(repo, proto, bases, heads):
632 632 bases = decodelist(bases)
633 633 heads = decodelist(heads)
634 634 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
635 635 return streamres(proto.groupchunks(cg))
636 636
637 637 @wireprotocommand('debugwireargs', 'one two *')
638 638 def debugwireargs(repo, proto, one, two, others):
639 639 # only accept optional args from the known set
640 640 opts = options('debugwireargs', ['three', 'four'], others)
641 641 return repo.debugwireargs(one, two, **opts)
642 642
643 643 # List of options accepted by getbundle.
644 644 #
645 645 # Meant to be extended by extensions. It is the extension's responsibility to
646 646 # ensure such options are properly processed in exchange.getbundle.
647 647 gboptslist = ['heads', 'common', 'bundlecaps']
648 648
649 649 @wireprotocommand('getbundle', '*')
650 650 def getbundle(repo, proto, others):
651 651 opts = options('getbundle', gboptsmap.keys(), others)
652 652 for k, v in opts.iteritems():
653 653 keytype = gboptsmap[k]
654 654 if keytype == 'nodes':
655 655 opts[k] = decodelist(v)
656 656 elif keytype == 'csv':
657 657 opts[k] = set(v.split(','))
658 658 elif keytype == 'boolean':
659 659 opts[k] = '%i' % bool(v)
660 660 elif keytype != 'plain':
661 661 raise KeyError('unknown getbundle option type %s'
662 662 % keytype)
663 663 cg = exchange.getbundle(repo, 'serve', **opts)
664 664 return streamres(proto.groupchunks(cg))
665 665
666 666 @wireprotocommand('heads')
667 667 def heads(repo, proto):
668 668 h = repo.heads()
669 669 return encodelist(h) + "\n"
670 670
671 671 @wireprotocommand('hello')
672 672 def hello(repo, proto):
673 673 '''the hello command returns a set of lines describing various
674 674 interesting things about the server, in an RFC822-like format.
675 675 Currently the only one defined is "capabilities", which
676 676 consists of a line in the form:
677 677
678 678 capabilities: space separated list of tokens
679 679 '''
680 680 return "capabilities: %s\n" % (capabilities(repo, proto))
681 681
682 682 @wireprotocommand('listkeys', 'namespace')
683 683 def listkeys(repo, proto, namespace):
684 684 d = repo.listkeys(encoding.tolocal(namespace)).items()
685 685 return pushkeymod.encodekeys(d)
686 686
687 687 @wireprotocommand('lookup', 'key')
688 688 def lookup(repo, proto, key):
689 689 try:
690 690 k = encoding.tolocal(key)
691 691 c = repo[k]
692 692 r = c.hex()
693 693 success = 1
694 694 except Exception, inst:
695 695 r = str(inst)
696 696 success = 0
697 697 return "%s %s\n" % (success, r)
698 698
699 699 @wireprotocommand('known', 'nodes *')
700 700 def known(repo, proto, nodes, others):
701 701 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
702 702
703 703 @wireprotocommand('pushkey', 'namespace key old new')
704 704 def pushkey(repo, proto, namespace, key, old, new):
705 705 # compatibility with pre-1.8 clients which were accidentally
706 706 # sending raw binary nodes rather than utf-8-encoded hex
707 707 if len(new) == 20 and new.encode('string-escape') != new:
708 708 # looks like it could be a binary node
709 709 try:
710 710 new.decode('utf-8')
711 711 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
712 712 except UnicodeDecodeError:
713 713 pass # binary, leave unmodified
714 714 else:
715 715 new = encoding.tolocal(new) # normal path
716 716
717 717 if util.safehasattr(proto, 'restore'):
718 718
719 719 proto.redirect()
720 720
721 721 try:
722 722 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
723 723 encoding.tolocal(old), new) or False
724 724 except util.Abort:
725 725 r = False
726 726
727 727 output = proto.restore()
728 728
729 729 return '%s\n%s' % (int(r), output)
730 730
731 731 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
732 732 encoding.tolocal(old), new)
733 733 return '%s\n' % int(r)
734 734
735 735 def _allowstream(ui):
736 736 return ui.configbool('server', 'uncompressed', True, untrusted=True)
737 737
738 738 def _walkstreamfiles(repo):
739 739 # this is it's own function so extensions can override it
740 740 return repo.store.walk()
741 741
742 742 @wireprotocommand('stream_out')
743 743 def stream(repo, proto):
744 744 '''If the server supports streaming clone, it advertises the "stream"
745 745 capability with a value representing the version and flags of the repo
746 746 it is serving. Client checks to see if it understands the format.
747 747
748 748 The format is simple: the server writes out a line with the amount
749 749 of files, then the total amount of bytes to be transferred (separated
750 750 by a space). Then, for each file, the server first writes the filename
751 751 and file size (separated by the null character), then the file contents.
752 752 '''
753 753
754 754 if not _allowstream(repo.ui):
755 755 return '1\n'
756 756
757 757 entries = []
758 758 total_bytes = 0
759 759 try:
760 760 # get consistent snapshot of repo, lock during scan
761 761 lock = repo.lock()
762 762 try:
763 763 repo.ui.debug('scanning\n')
764 764 for name, ename, size in _walkstreamfiles(repo):
765 765 if size:
766 766 entries.append((name, size))
767 767 total_bytes += size
768 768 finally:
769 769 lock.release()
770 770 except error.LockError:
771 771 return '2\n' # error: 2
772 772
773 773 def streamer(repo, entries, total):
774 774 '''stream out all metadata files in repository.'''
775 775 yield '0\n' # success
776 776 repo.ui.debug('%d files, %d bytes to transfer\n' %
777 777 (len(entries), total_bytes))
778 778 yield '%d %d\n' % (len(entries), total_bytes)
779 779
780 780 sopener = repo.sopener
781 781 oldaudit = sopener.mustaudit
782 782 debugflag = repo.ui.debugflag
783 783 sopener.mustaudit = False
784 784
785 785 try:
786 786 for name, size in entries:
787 787 if debugflag:
788 788 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
789 789 # partially encode name over the wire for backwards compat
790 790 yield '%s\0%d\n' % (store.encodedir(name), size)
791 791 if size <= 65536:
792 792 fp = sopener(name)
793 793 try:
794 794 data = fp.read(size)
795 795 finally:
796 796 fp.close()
797 797 yield data
798 798 else:
799 799 for chunk in util.filechunkiter(sopener(name), limit=size):
800 800 yield chunk
801 801 # replace with "finally:" when support for python 2.4 has been dropped
802 802 except Exception:
803 803 sopener.mustaudit = oldaudit
804 804 raise
805 805 sopener.mustaudit = oldaudit
806 806
807 807 return streamres(streamer(repo, entries, total_bytes))
808 808
809 809 @wireprotocommand('unbundle', 'heads')
810 810 def unbundle(repo, proto, heads):
811 811 their_heads = decodelist(heads)
812 812
813 813 try:
814 814 proto.redirect()
815 815
816 816 exchange.check_heads(repo, their_heads, 'preparing changes')
817 817
818 818 # write bundle data to temporary file because it can be big
819 819 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
820 820 fp = os.fdopen(fd, 'wb+')
821 821 r = 0
822 822 try:
823 823 proto.getfile(fp)
824 824 fp.seek(0)
825 825 gen = exchange.readbundle(repo.ui, fp, None)
826 826 r = exchange.unbundle(repo, gen, their_heads, 'serve',
827 827 proto._client())
828 828 if util.safehasattr(r, 'addpart'):
829 829 # The return looks streameable, we are in the bundle2 case and
830 830 # should return a stream.
831 831 return streamres(r.getchunks())
832 832 return pushres(r)
833 833
834 834 finally:
835 835 fp.close()
836 836 os.unlink(tempname)
837 837 except error.BundleValueError, exc:
838 838 bundler = bundle2.bundle20(repo.ui)
839 839 errpart = bundler.newpart('B2X:ERROR:UNSUPPORTEDCONTENT')
840 840 if exc.parttype is not None:
841 841 errpart.addparam('parttype', exc.parttype)
842 842 if exc.params:
843 843 errpart.addparam('params', '\0'.join(exc.params))
844 844 return streamres(bundler.getchunks())
845 845 except util.Abort, inst:
846 846 # The old code we moved used sys.stderr directly.
847 847 # We did not change it to minimise code change.
848 848 # This need to be moved to something proper.
849 849 # Feel free to do it.
850 850 if getattr(inst, 'duringunbundle2', False):
851 851 bundler = bundle2.bundle20(repo.ui)
852 852 manargs = [('message', str(inst))]
853 853 advargs = []
854 854 if inst.hint is not None:
855 855 advargs.append(('hint', inst.hint))
856 856 bundler.addpart(bundle2.bundlepart('B2X:ERROR:ABORT',
857 857 manargs, advargs))
858 858 return streamres(bundler.getchunks())
859 859 else:
860 860 sys.stderr.write("abort: %s\n" % inst)
861 861 return pushres(0)
862 862 except error.PushRaced, exc:
863 863 if getattr(exc, 'duringunbundle2', False):
864 864 bundler = bundle2.bundle20(repo.ui)
865 865 bundler.newpart('B2X:ERROR:PUSHRACED', [('message', str(exc))])
866 866 return streamres(bundler.getchunks())
867 867 else:
868 868 return pusherr(str(exc))
General Comments 0
You need to be logged in to leave comments. Login now