##// END OF EJS Templates
bundle2: rename UnknownPartError to BundleValueError...
Pierre-Yves David -
r21617:0cfda08a default
parent child Browse files
Show More
@@ -1,847 +1,849 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: (16 bits integer)
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: (16 bits inter)
68 68
69 69 The total number of Bytes used by the part headers. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 Bundle processing
129 129 ============================
130 130
131 131 Each part is processed in order using a "part handler". Handler are registered
132 132 for a certain part type.
133 133
134 134 The matching of a part to its handler is case insensitive. The case of the
135 135 part type is used to know if a part is mandatory or advisory. If the Part type
136 136 contains any uppercase char it is considered mandatory. When no handler is
137 137 known for a Mandatory part, the process is aborted and an exception is raised.
138 138 If the part is advisory and no handler is known, the part is ignored. When the
139 139 process is aborted, the full bundle is still read from the stream to keep the
140 140 channel usable. But none of the part read from an abort are processed. In the
141 141 future, dropping the stream may become an option for channel we do not care to
142 142 preserve.
143 143 """
144 144
145 145 import util
146 146 import struct
147 147 import urllib
148 148 import string
149 149
150 150 import changegroup, error
151 151 from i18n import _
152 152
153 153 _pack = struct.pack
154 154 _unpack = struct.unpack
155 155
156 156 _magicstring = 'HG2X'
157 157
158 158 _fstreamparamsize = '>H'
159 159 _fpartheadersize = '>H'
160 160 _fparttypesize = '>B'
161 161 _fpartid = '>I'
162 162 _fpayloadsize = '>I'
163 163 _fpartparamcount = '>BB'
164 164
165 165 preferedchunksize = 4096
166 166
167 167 def _makefpartparamsizes(nbparams):
168 168 """return a struct format to read part parameter sizes
169 169
170 170 The number parameters is variable so we need to build that format
171 171 dynamically.
172 172 """
173 173 return '>'+('BB'*nbparams)
174 174
175 class UnknownPartError(KeyError):
176 """error raised when no handler is found for a Mandatory part"""
175 class BundleValueError(ValueError):
176 """error raised when bundle2 cannot be processed
177
178 Current main usecase is unsupported part types."""
177 179 pass
178 180
179 181 class ReadOnlyPartError(RuntimeError):
180 182 """error raised when code tries to alter a part being generated"""
181 183 pass
182 184
183 185 parthandlermapping = {}
184 186
185 187 def parthandler(parttype):
186 188 """decorator that register a function as a bundle2 part handler
187 189
188 190 eg::
189 191
190 192 @parthandler('myparttype')
191 193 def myparttypehandler(...):
192 194 '''process a part of type "my part".'''
193 195 ...
194 196 """
195 197 def _decorator(func):
196 198 lparttype = parttype.lower() # enforce lower case matching.
197 199 assert lparttype not in parthandlermapping
198 200 parthandlermapping[lparttype] = func
199 201 return func
200 202 return _decorator
201 203
202 204 class unbundlerecords(object):
203 205 """keep record of what happens during and unbundle
204 206
205 207 New records are added using `records.add('cat', obj)`. Where 'cat' is a
206 208 category of record and obj is an arbitrary object.
207 209
208 210 `records['cat']` will return all entries of this category 'cat'.
209 211
210 212 Iterating on the object itself will yield `('category', obj)` tuples
211 213 for all entries.
212 214
213 215 All iterations happens in chronological order.
214 216 """
215 217
216 218 def __init__(self):
217 219 self._categories = {}
218 220 self._sequences = []
219 221 self._replies = {}
220 222
221 223 def add(self, category, entry, inreplyto=None):
222 224 """add a new record of a given category.
223 225
224 226 The entry can then be retrieved in the list returned by
225 227 self['category']."""
226 228 self._categories.setdefault(category, []).append(entry)
227 229 self._sequences.append((category, entry))
228 230 if inreplyto is not None:
229 231 self.getreplies(inreplyto).add(category, entry)
230 232
231 233 def getreplies(self, partid):
232 234 """get the subrecords that replies to a specific part"""
233 235 return self._replies.setdefault(partid, unbundlerecords())
234 236
235 237 def __getitem__(self, cat):
236 238 return tuple(self._categories.get(cat, ()))
237 239
238 240 def __iter__(self):
239 241 return iter(self._sequences)
240 242
241 243 def __len__(self):
242 244 return len(self._sequences)
243 245
244 246 def __nonzero__(self):
245 247 return bool(self._sequences)
246 248
247 249 class bundleoperation(object):
248 250 """an object that represents a single bundling process
249 251
250 252 Its purpose is to carry unbundle-related objects and states.
251 253
252 254 A new object should be created at the beginning of each bundle processing.
253 255 The object is to be returned by the processing function.
254 256
255 257 The object has very little content now it will ultimately contain:
256 258 * an access to the repo the bundle is applied to,
257 259 * a ui object,
258 260 * a way to retrieve a transaction to add changes to the repo,
259 261 * a way to record the result of processing each part,
260 262 * a way to construct a bundle response when applicable.
261 263 """
262 264
263 265 def __init__(self, repo, transactiongetter):
264 266 self.repo = repo
265 267 self.ui = repo.ui
266 268 self.records = unbundlerecords()
267 269 self.gettransaction = transactiongetter
268 270 self.reply = None
269 271
270 272 class TransactionUnavailable(RuntimeError):
271 273 pass
272 274
273 275 def _notransaction():
274 276 """default method to get a transaction while processing a bundle
275 277
276 278 Raise an exception to highlight the fact that no transaction was expected
277 279 to be created"""
278 280 raise TransactionUnavailable()
279 281
280 282 def processbundle(repo, unbundler, transactiongetter=_notransaction):
281 283 """This function process a bundle, apply effect to/from a repo
282 284
283 285 It iterates over each part then searches for and uses the proper handling
284 286 code to process the part. Parts are processed in order.
285 287
286 288 This is very early version of this function that will be strongly reworked
287 289 before final usage.
288 290
289 291 Unknown Mandatory part will abort the process.
290 292 """
291 293 op = bundleoperation(repo, transactiongetter)
292 294 # todo:
293 295 # - replace this is a init function soon.
294 296 # - exception catching
295 297 unbundler.params
296 298 iterparts = unbundler.iterparts()
297 299 part = None
298 300 try:
299 301 for part in iterparts:
300 302 parttype = part.type
301 303 # part key are matched lower case
302 304 key = parttype.lower()
303 305 try:
304 306 handler = parthandlermapping[key]
305 307 op.ui.debug('found a handler for part %r\n' % parttype)
306 308 except KeyError:
307 309 if key != parttype: # mandatory parts
308 310 # todo:
309 311 # - use a more precise exception
310 raise UnknownPartError(key)
312 raise BundleValueError(key)
311 313 op.ui.debug('ignoring unknown advisory part %r\n' % key)
312 314 # consuming the part
313 315 part.read()
314 316 continue
315 317
316 318 # handler is called outside the above try block so that we don't
317 319 # risk catching KeyErrors from anything other than the
318 320 # parthandlermapping lookup (any KeyError raised by handler()
319 321 # itself represents a defect of a different variety).
320 322 output = None
321 323 if op.reply is not None:
322 324 op.ui.pushbuffer(error=True)
323 325 output = ''
324 326 try:
325 327 handler(op, part)
326 328 finally:
327 329 if output is not None:
328 330 output = op.ui.popbuffer()
329 331 if output:
330 332 outpart = op.reply.newpart('b2x:output', data=output)
331 333 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
332 334 part.read()
333 335 except Exception, exc:
334 336 if part is not None:
335 337 # consume the bundle content
336 338 part.read()
337 339 for part in iterparts:
338 340 # consume the bundle content
339 341 part.read()
340 342 # Small hack to let caller code distinguish exceptions from bundle2
341 343 # processing fron the ones from bundle1 processing. This is mostly
342 344 # needed to handle different return codes to unbundle according to the
343 345 # type of bundle. We should probably clean up or drop this return code
344 346 # craziness in a future version.
345 347 exc.duringunbundle2 = True
346 348 raise
347 349 return op
348 350
349 351 def decodecaps(blob):
350 352 """decode a bundle2 caps bytes blob into a dictionnary
351 353
352 354 The blob is a list of capabilities (one per line)
353 355 Capabilities may have values using a line of the form::
354 356
355 357 capability=value1,value2,value3
356 358
357 359 The values are always a list."""
358 360 caps = {}
359 361 for line in blob.splitlines():
360 362 if not line:
361 363 continue
362 364 if '=' not in line:
363 365 key, vals = line, ()
364 366 else:
365 367 key, vals = line.split('=', 1)
366 368 vals = vals.split(',')
367 369 key = urllib.unquote(key)
368 370 vals = [urllib.unquote(v) for v in vals]
369 371 caps[key] = vals
370 372 return caps
371 373
372 374 def encodecaps(caps):
373 375 """encode a bundle2 caps dictionary into a bytes blob"""
374 376 chunks = []
375 377 for ca in sorted(caps):
376 378 vals = caps[ca]
377 379 ca = urllib.quote(ca)
378 380 vals = [urllib.quote(v) for v in vals]
379 381 if vals:
380 382 ca = "%s=%s" % (ca, ','.join(vals))
381 383 chunks.append(ca)
382 384 return '\n'.join(chunks)
383 385
384 386 class bundle20(object):
385 387 """represent an outgoing bundle2 container
386 388
387 389 Use the `addparam` method to add stream level parameter. and `newpart` to
388 390 populate it. Then call `getchunks` to retrieve all the binary chunks of
389 391 data that compose the bundle2 container."""
390 392
391 393 def __init__(self, ui, capabilities=()):
392 394 self.ui = ui
393 395 self._params = []
394 396 self._parts = []
395 397 self.capabilities = dict(capabilities)
396 398
397 399 # methods used to defines the bundle2 content
398 400 def addparam(self, name, value=None):
399 401 """add a stream level parameter"""
400 402 if not name:
401 403 raise ValueError('empty parameter name')
402 404 if name[0] not in string.letters:
403 405 raise ValueError('non letter first character: %r' % name)
404 406 self._params.append((name, value))
405 407
406 408 def addpart(self, part):
407 409 """add a new part to the bundle2 container
408 410
409 411 Parts contains the actual applicative payload."""
410 412 assert part.id is None
411 413 part.id = len(self._parts) # very cheap counter
412 414 self._parts.append(part)
413 415
414 416 def newpart(self, typeid, *args, **kwargs):
415 417 """create a new part and add it to the containers
416 418
417 419 As the part is directly added to the containers. For now, this means
418 420 that any failure to properly initialize the part after calling
419 421 ``newpart`` should result in a failure of the whole bundling process.
420 422
421 423 You can still fall back to manually create and add if you need better
422 424 control."""
423 425 part = bundlepart(typeid, *args, **kwargs)
424 426 self.addpart(part)
425 427 return part
426 428
427 429 # methods used to generate the bundle2 stream
428 430 def getchunks(self):
429 431 self.ui.debug('start emission of %s stream\n' % _magicstring)
430 432 yield _magicstring
431 433 param = self._paramchunk()
432 434 self.ui.debug('bundle parameter: %s\n' % param)
433 435 yield _pack(_fstreamparamsize, len(param))
434 436 if param:
435 437 yield param
436 438
437 439 self.ui.debug('start of parts\n')
438 440 for part in self._parts:
439 441 self.ui.debug('bundle part: "%s"\n' % part.type)
440 442 for chunk in part.getchunks():
441 443 yield chunk
442 444 self.ui.debug('end of bundle\n')
443 445 yield '\0\0'
444 446
445 447 def _paramchunk(self):
446 448 """return a encoded version of all stream parameters"""
447 449 blocks = []
448 450 for par, value in self._params:
449 451 par = urllib.quote(par)
450 452 if value is not None:
451 453 value = urllib.quote(value)
452 454 par = '%s=%s' % (par, value)
453 455 blocks.append(par)
454 456 return ' '.join(blocks)
455 457
456 458 class unpackermixin(object):
457 459 """A mixin to extract bytes and struct data from a stream"""
458 460
459 461 def __init__(self, fp):
460 462 self._fp = fp
461 463
462 464 def _unpack(self, format):
463 465 """unpack this struct format from the stream"""
464 466 data = self._readexact(struct.calcsize(format))
465 467 return _unpack(format, data)
466 468
467 469 def _readexact(self, size):
468 470 """read exactly <size> bytes from the stream"""
469 471 return changegroup.readexactly(self._fp, size)
470 472
471 473
472 474 class unbundle20(unpackermixin):
473 475 """interpret a bundle2 stream
474 476
475 477 This class is fed with a binary stream and yields parts through its
476 478 `iterparts` methods."""
477 479
478 480 def __init__(self, ui, fp, header=None):
479 481 """If header is specified, we do not read it out of the stream."""
480 482 self.ui = ui
481 483 super(unbundle20, self).__init__(fp)
482 484 if header is None:
483 485 header = self._readexact(4)
484 486 magic, version = header[0:2], header[2:4]
485 487 if magic != 'HG':
486 488 raise util.Abort(_('not a Mercurial bundle'))
487 489 if version != '2X':
488 490 raise util.Abort(_('unknown bundle version %s') % version)
489 491 self.ui.debug('start processing of %s stream\n' % header)
490 492
491 493 @util.propertycache
492 494 def params(self):
493 495 """dictionary of stream level parameters"""
494 496 self.ui.debug('reading bundle2 stream parameters\n')
495 497 params = {}
496 498 paramssize = self._unpack(_fstreamparamsize)[0]
497 499 if paramssize:
498 500 for p in self._readexact(paramssize).split(' '):
499 501 p = p.split('=', 1)
500 502 p = [urllib.unquote(i) for i in p]
501 503 if len(p) < 2:
502 504 p.append(None)
503 505 self._processparam(*p)
504 506 params[p[0]] = p[1]
505 507 return params
506 508
507 509 def _processparam(self, name, value):
508 510 """process a parameter, applying its effect if needed
509 511
510 512 Parameter starting with a lower case letter are advisory and will be
511 513 ignored when unknown. Those starting with an upper case letter are
512 514 mandatory and will this function will raise a KeyError when unknown.
513 515
514 516 Note: no option are currently supported. Any input will be either
515 517 ignored or failing.
516 518 """
517 519 if not name:
518 520 raise ValueError('empty parameter name')
519 521 if name[0] not in string.letters:
520 522 raise ValueError('non letter first character: %r' % name)
521 523 # Some logic will be later added here to try to process the option for
522 524 # a dict of known parameter.
523 525 if name[0].islower():
524 526 self.ui.debug("ignoring unknown parameter %r\n" % name)
525 527 else:
526 528 raise KeyError(name)
527 529
528 530
529 531 def iterparts(self):
530 532 """yield all parts contained in the stream"""
531 533 # make sure param have been loaded
532 534 self.params
533 535 self.ui.debug('start extraction of bundle2 parts\n')
534 536 headerblock = self._readpartheader()
535 537 while headerblock is not None:
536 538 part = unbundlepart(self.ui, headerblock, self._fp)
537 539 yield part
538 540 headerblock = self._readpartheader()
539 541 self.ui.debug('end of bundle2 stream\n')
540 542
541 543 def _readpartheader(self):
542 544 """reads a part header size and return the bytes blob
543 545
544 546 returns None if empty"""
545 547 headersize = self._unpack(_fpartheadersize)[0]
546 548 self.ui.debug('part header size: %i\n' % headersize)
547 549 if headersize:
548 550 return self._readexact(headersize)
549 551 return None
550 552
551 553
552 554 class bundlepart(object):
553 555 """A bundle2 part contains application level payload
554 556
555 557 The part `type` is used to route the part to the application level
556 558 handler.
557 559
558 560 The part payload is contained in ``part.data``. It could be raw bytes or a
559 561 generator of byte chunks.
560 562
561 563 You can add parameters to the part using the ``addparam`` method.
562 564 Parameters can be either mandatory (default) or advisory. Remote side
563 565 should be able to safely ignore the advisory ones.
564 566
565 567 Both data and parameters cannot be modified after the generation has begun.
566 568 """
567 569
568 570 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
569 571 data=''):
570 572 self.id = None
571 573 self.type = parttype
572 574 self._data = data
573 575 self._mandatoryparams = list(mandatoryparams)
574 576 self._advisoryparams = list(advisoryparams)
575 577 # checking for duplicated entries
576 578 self._seenparams = set()
577 579 for pname, __ in self._mandatoryparams + self._advisoryparams:
578 580 if pname in self._seenparams:
579 581 raise RuntimeError('duplicated params: %s' % pname)
580 582 self._seenparams.add(pname)
581 583 # status of the part's generation:
582 584 # - None: not started,
583 585 # - False: currently generated,
584 586 # - True: generation done.
585 587 self._generated = None
586 588
587 589 # methods used to defines the part content
588 590 def __setdata(self, data):
589 591 if self._generated is not None:
590 592 raise ReadOnlyPartError('part is being generated')
591 593 self._data = data
592 594 def __getdata(self):
593 595 return self._data
594 596 data = property(__getdata, __setdata)
595 597
596 598 @property
597 599 def mandatoryparams(self):
598 600 # make it an immutable tuple to force people through ``addparam``
599 601 return tuple(self._mandatoryparams)
600 602
601 603 @property
602 604 def advisoryparams(self):
603 605 # make it an immutable tuple to force people through ``addparam``
604 606 return tuple(self._advisoryparams)
605 607
606 608 def addparam(self, name, value='', mandatory=True):
607 609 if self._generated is not None:
608 610 raise ReadOnlyPartError('part is being generated')
609 611 if name in self._seenparams:
610 612 raise ValueError('duplicated params: %s' % name)
611 613 self._seenparams.add(name)
612 614 params = self._advisoryparams
613 615 if mandatory:
614 616 params = self._mandatoryparams
615 617 params.append((name, value))
616 618
617 619 # methods used to generates the bundle2 stream
618 620 def getchunks(self):
619 621 if self._generated is not None:
620 622 raise RuntimeError('part can only be consumed once')
621 623 self._generated = False
622 624 #### header
623 625 ## parttype
624 626 header = [_pack(_fparttypesize, len(self.type)),
625 627 self.type, _pack(_fpartid, self.id),
626 628 ]
627 629 ## parameters
628 630 # count
629 631 manpar = self.mandatoryparams
630 632 advpar = self.advisoryparams
631 633 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
632 634 # size
633 635 parsizes = []
634 636 for key, value in manpar:
635 637 parsizes.append(len(key))
636 638 parsizes.append(len(value))
637 639 for key, value in advpar:
638 640 parsizes.append(len(key))
639 641 parsizes.append(len(value))
640 642 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
641 643 header.append(paramsizes)
642 644 # key, value
643 645 for key, value in manpar:
644 646 header.append(key)
645 647 header.append(value)
646 648 for key, value in advpar:
647 649 header.append(key)
648 650 header.append(value)
649 651 ## finalize header
650 652 headerchunk = ''.join(header)
651 653 yield _pack(_fpartheadersize, len(headerchunk))
652 654 yield headerchunk
653 655 ## payload
654 656 for chunk in self._payloadchunks():
655 657 yield _pack(_fpayloadsize, len(chunk))
656 658 yield chunk
657 659 # end of payload
658 660 yield _pack(_fpayloadsize, 0)
659 661 self._generated = True
660 662
661 663 def _payloadchunks(self):
662 664 """yield chunks of a the part payload
663 665
664 666 Exists to handle the different methods to provide data to a part."""
665 667 # we only support fixed size data now.
666 668 # This will be improved in the future.
667 669 if util.safehasattr(self.data, 'next'):
668 670 buff = util.chunkbuffer(self.data)
669 671 chunk = buff.read(preferedchunksize)
670 672 while chunk:
671 673 yield chunk
672 674 chunk = buff.read(preferedchunksize)
673 675 elif len(self.data):
674 676 yield self.data
675 677
676 678 class unbundlepart(unpackermixin):
677 679 """a bundle part read from a bundle"""
678 680
679 681 def __init__(self, ui, header, fp):
680 682 super(unbundlepart, self).__init__(fp)
681 683 self.ui = ui
682 684 # unbundle state attr
683 685 self._headerdata = header
684 686 self._headeroffset = 0
685 687 self._initialized = False
686 688 self.consumed = False
687 689 # part data
688 690 self.id = None
689 691 self.type = None
690 692 self.mandatoryparams = None
691 693 self.advisoryparams = None
692 694 self.params = None
693 695 self.mandatorykeys = ()
694 696 self._payloadstream = None
695 697 self._readheader()
696 698
697 699 def _fromheader(self, size):
698 700 """return the next <size> byte from the header"""
699 701 offset = self._headeroffset
700 702 data = self._headerdata[offset:(offset + size)]
701 703 self._headeroffset = offset + size
702 704 return data
703 705
704 706 def _unpackheader(self, format):
705 707 """read given format from header
706 708
707 709 This automatically compute the size of the format to read."""
708 710 data = self._fromheader(struct.calcsize(format))
709 711 return _unpack(format, data)
710 712
711 713 def _initparams(self, mandatoryparams, advisoryparams):
712 714 """internal function to setup all logic related parameters"""
713 715 # make it read only to prevent people touching it by mistake.
714 716 self.mandatoryparams = tuple(mandatoryparams)
715 717 self.advisoryparams = tuple(advisoryparams)
716 718 # user friendly UI
717 719 self.params = dict(self.mandatoryparams)
718 720 self.params.update(dict(self.advisoryparams))
719 721 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
720 722
721 723 def _readheader(self):
722 724 """read the header and setup the object"""
723 725 typesize = self._unpackheader(_fparttypesize)[0]
724 726 self.type = self._fromheader(typesize)
725 727 self.ui.debug('part type: "%s"\n' % self.type)
726 728 self.id = self._unpackheader(_fpartid)[0]
727 729 self.ui.debug('part id: "%s"\n' % self.id)
728 730 ## reading parameters
729 731 # param count
730 732 mancount, advcount = self._unpackheader(_fpartparamcount)
731 733 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
732 734 # param size
733 735 fparamsizes = _makefpartparamsizes(mancount + advcount)
734 736 paramsizes = self._unpackheader(fparamsizes)
735 737 # make it a list of couple again
736 738 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
737 739 # split mandatory from advisory
738 740 mansizes = paramsizes[:mancount]
739 741 advsizes = paramsizes[mancount:]
740 742 # retrive param value
741 743 manparams = []
742 744 for key, value in mansizes:
743 745 manparams.append((self._fromheader(key), self._fromheader(value)))
744 746 advparams = []
745 747 for key, value in advsizes:
746 748 advparams.append((self._fromheader(key), self._fromheader(value)))
747 749 self._initparams(manparams, advparams)
748 750 ## part payload
749 751 def payloadchunks():
750 752 payloadsize = self._unpack(_fpayloadsize)[0]
751 753 self.ui.debug('payload chunk size: %i\n' % payloadsize)
752 754 while payloadsize:
753 755 yield self._readexact(payloadsize)
754 756 payloadsize = self._unpack(_fpayloadsize)[0]
755 757 self.ui.debug('payload chunk size: %i\n' % payloadsize)
756 758 self._payloadstream = util.chunkbuffer(payloadchunks())
757 759 # we read the data, tell it
758 760 self._initialized = True
759 761
760 762 def read(self, size=None):
761 763 """read payload data"""
762 764 if not self._initialized:
763 765 self._readheader()
764 766 if size is None:
765 767 data = self._payloadstream.read()
766 768 else:
767 769 data = self._payloadstream.read(size)
768 770 if size is None or len(data) < size:
769 771 self.consumed = True
770 772 return data
771 773
772 774
773 775 @parthandler('b2x:changegroup')
774 776 def handlechangegroup(op, inpart):
775 777 """apply a changegroup part on the repo
776 778
777 779 This is a very early implementation that will massive rework before being
778 780 inflicted to any end-user.
779 781 """
780 782 # Make sure we trigger a transaction creation
781 783 #
782 784 # The addchangegroup function will get a transaction object by itself, but
783 785 # we need to make sure we trigger the creation of a transaction object used
784 786 # for the whole processing scope.
785 787 op.gettransaction()
786 788 cg = changegroup.unbundle10(inpart, 'UN')
787 789 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
788 790 op.records.add('changegroup', {'return': ret})
789 791 if op.reply is not None:
790 792 # This is definitly not the final form of this
791 793 # return. But one need to start somewhere.
792 794 part = op.reply.newpart('b2x:reply:changegroup')
793 795 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
794 796 part.addparam('return', '%i' % ret, mandatory=False)
795 797 assert not inpart.read()
796 798
797 799 @parthandler('b2x:reply:changegroup')
798 800 def handlechangegroup(op, inpart):
799 801 ret = int(inpart.params['return'])
800 802 replyto = int(inpart.params['in-reply-to'])
801 803 op.records.add('changegroup', {'return': ret}, replyto)
802 804
803 805 @parthandler('b2x:check:heads')
804 806 def handlechangegroup(op, inpart):
805 807 """check that head of the repo did not change
806 808
807 809 This is used to detect a push race when using unbundle.
808 810 This replaces the "heads" argument of unbundle."""
809 811 h = inpart.read(20)
810 812 heads = []
811 813 while len(h) == 20:
812 814 heads.append(h)
813 815 h = inpart.read(20)
814 816 assert not h
815 817 if heads != op.repo.heads():
816 818 raise error.PushRaced('repository changed while pushing - '
817 819 'please try again')
818 820
819 821 @parthandler('b2x:output')
820 822 def handleoutput(op, inpart):
821 823 """forward output captured on the server to the client"""
822 824 for line in inpart.read().splitlines():
823 825 op.ui.write(('remote: %s\n' % line))
824 826
825 827 @parthandler('b2x:replycaps')
826 828 def handlereplycaps(op, inpart):
827 829 """Notify that a reply bundle should be created
828 830
829 831 The payload contains the capabilities information for the reply"""
830 832 caps = decodecaps(inpart.read())
831 833 if op.reply is None:
832 834 op.reply = bundle20(op.ui, caps)
833 835
834 836 @parthandler('b2x:error:abort')
835 837 def handlereplycaps(op, inpart):
836 838 """Used to transmit abort error over the wire"""
837 839 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
838 840
839 841 @parthandler('b2x:error:unknownpart')
840 842 def handlereplycaps(op, inpart):
841 843 """Used to transmit unknown part error over the wire"""
842 raise UnknownPartError(inpart.params['parttype'])
844 raise BundleValueError(inpart.params['parttype'])
843 845
844 846 @parthandler('b2x:error:pushraced')
845 847 def handlereplycaps(op, inpart):
846 848 """Used to transmit push race error over the wire"""
847 849 raise error.ResponseError(_('push failed:'), inpart.params['message'])
@@ -1,730 +1,730 b''
1 1 # exchange.py - utility to exchange data between repos.
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from i18n import _
9 9 from node import hex, nullid
10 10 import errno, urllib
11 11 import util, scmutil, changegroup, base85, error
12 12 import discovery, phases, obsolete, bookmarks, bundle2
13 13
14 14 def readbundle(ui, fh, fname, vfs=None):
15 15 header = changegroup.readexactly(fh, 4)
16 16
17 17 alg = None
18 18 if not fname:
19 19 fname = "stream"
20 20 if not header.startswith('HG') and header.startswith('\0'):
21 21 fh = changegroup.headerlessfixup(fh, header)
22 22 header = "HG10"
23 23 alg = 'UN'
24 24 elif vfs:
25 25 fname = vfs.join(fname)
26 26
27 27 magic, version = header[0:2], header[2:4]
28 28
29 29 if magic != 'HG':
30 30 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
31 31 if version == '10':
32 32 if alg is None:
33 33 alg = changegroup.readexactly(fh, 2)
34 34 return changegroup.unbundle10(fh, alg)
35 35 elif version == '2X':
36 36 return bundle2.unbundle20(ui, fh, header=magic + version)
37 37 else:
38 38 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
39 39
40 40
41 41 class pushoperation(object):
42 42 """A object that represent a single push operation
43 43
44 44 It purpose is to carry push related state and very common operation.
45 45
46 46 A new should be created at the beginning of each push and discarded
47 47 afterward.
48 48 """
49 49
50 50 def __init__(self, repo, remote, force=False, revs=None, newbranch=False):
51 51 # repo we push from
52 52 self.repo = repo
53 53 self.ui = repo.ui
54 54 # repo we push to
55 55 self.remote = remote
56 56 # force option provided
57 57 self.force = force
58 58 # revs to be pushed (None is "all")
59 59 self.revs = revs
60 60 # allow push of new branch
61 61 self.newbranch = newbranch
62 62 # did a local lock get acquired?
63 63 self.locallocked = None
64 64 # Integer version of the push result
65 65 # - None means nothing to push
66 66 # - 0 means HTTP error
67 67 # - 1 means we pushed and remote head count is unchanged *or*
68 68 # we have outgoing changesets but refused to push
69 69 # - other values as described by addchangegroup()
70 70 self.ret = None
71 71 # discover.outgoing object (contains common and outgoing data)
72 72 self.outgoing = None
73 73 # all remote heads before the push
74 74 self.remoteheads = None
75 75 # testable as a boolean indicating if any nodes are missing locally.
76 76 self.incoming = None
77 77 # set of all heads common after changeset bundle push
78 78 self.commonheads = None
79 79
80 80 def push(repo, remote, force=False, revs=None, newbranch=False):
81 81 '''Push outgoing changesets (limited by revs) from a local
82 82 repository to remote. Return an integer:
83 83 - None means nothing to push
84 84 - 0 means HTTP error
85 85 - 1 means we pushed and remote head count is unchanged *or*
86 86 we have outgoing changesets but refused to push
87 87 - other values as described by addchangegroup()
88 88 '''
89 89 pushop = pushoperation(repo, remote, force, revs, newbranch)
90 90 if pushop.remote.local():
91 91 missing = (set(pushop.repo.requirements)
92 92 - pushop.remote.local().supported)
93 93 if missing:
94 94 msg = _("required features are not"
95 95 " supported in the destination:"
96 96 " %s") % (', '.join(sorted(missing)))
97 97 raise util.Abort(msg)
98 98
99 99 # there are two ways to push to remote repo:
100 100 #
101 101 # addchangegroup assumes local user can lock remote
102 102 # repo (local filesystem, old ssh servers).
103 103 #
104 104 # unbundle assumes local user cannot lock remote repo (new ssh
105 105 # servers, http servers).
106 106
107 107 if not pushop.remote.canpush():
108 108 raise util.Abort(_("destination does not support push"))
109 109 # get local lock as we might write phase data
110 110 locallock = None
111 111 try:
112 112 locallock = pushop.repo.lock()
113 113 pushop.locallocked = True
114 114 except IOError, err:
115 115 pushop.locallocked = False
116 116 if err.errno != errno.EACCES:
117 117 raise
118 118 # source repo cannot be locked.
119 119 # We do not abort the push, but just disable the local phase
120 120 # synchronisation.
121 121 msg = 'cannot lock source repository: %s\n' % err
122 122 pushop.ui.debug(msg)
123 123 try:
124 124 pushop.repo.checkpush(pushop)
125 125 lock = None
126 126 unbundle = pushop.remote.capable('unbundle')
127 127 if not unbundle:
128 128 lock = pushop.remote.lock()
129 129 try:
130 130 _pushdiscovery(pushop)
131 131 if _pushcheckoutgoing(pushop):
132 132 pushop.repo.prepushoutgoinghooks(pushop.repo,
133 133 pushop.remote,
134 134 pushop.outgoing)
135 135 if (pushop.repo.ui.configbool('experimental', 'bundle2-exp',
136 136 False)
137 137 and pushop.remote.capable('bundle2-exp')):
138 138 _pushbundle2(pushop)
139 139 else:
140 140 _pushchangeset(pushop)
141 141 _pushcomputecommonheads(pushop)
142 142 _pushsyncphase(pushop)
143 143 _pushobsolete(pushop)
144 144 finally:
145 145 if lock is not None:
146 146 lock.release()
147 147 finally:
148 148 if locallock is not None:
149 149 locallock.release()
150 150
151 151 _pushbookmark(pushop)
152 152 return pushop.ret
153 153
154 154 def _pushdiscovery(pushop):
155 155 # discovery
156 156 unfi = pushop.repo.unfiltered()
157 157 fci = discovery.findcommonincoming
158 158 commoninc = fci(unfi, pushop.remote, force=pushop.force)
159 159 common, inc, remoteheads = commoninc
160 160 fco = discovery.findcommonoutgoing
161 161 outgoing = fco(unfi, pushop.remote, onlyheads=pushop.revs,
162 162 commoninc=commoninc, force=pushop.force)
163 163 pushop.outgoing = outgoing
164 164 pushop.remoteheads = remoteheads
165 165 pushop.incoming = inc
166 166
167 167 def _pushcheckoutgoing(pushop):
168 168 outgoing = pushop.outgoing
169 169 unfi = pushop.repo.unfiltered()
170 170 if not outgoing.missing:
171 171 # nothing to push
172 172 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
173 173 return False
174 174 # something to push
175 175 if not pushop.force:
176 176 # if repo.obsstore == False --> no obsolete
177 177 # then, save the iteration
178 178 if unfi.obsstore:
179 179 # this message are here for 80 char limit reason
180 180 mso = _("push includes obsolete changeset: %s!")
181 181 mst = "push includes %s changeset: %s!"
182 182 # plain versions for i18n tool to detect them
183 183 _("push includes unstable changeset: %s!")
184 184 _("push includes bumped changeset: %s!")
185 185 _("push includes divergent changeset: %s!")
186 186 # If we are to push if there is at least one
187 187 # obsolete or unstable changeset in missing, at
188 188 # least one of the missinghead will be obsolete or
189 189 # unstable. So checking heads only is ok
190 190 for node in outgoing.missingheads:
191 191 ctx = unfi[node]
192 192 if ctx.obsolete():
193 193 raise util.Abort(mso % ctx)
194 194 elif ctx.troubled():
195 195 raise util.Abort(_(mst)
196 196 % (ctx.troubles()[0],
197 197 ctx))
198 198 newbm = pushop.ui.configlist('bookmarks', 'pushing')
199 199 discovery.checkheads(unfi, pushop.remote, outgoing,
200 200 pushop.remoteheads,
201 201 pushop.newbranch,
202 202 bool(pushop.incoming),
203 203 newbm)
204 204 return True
205 205
206 206 def _pushbundle2(pushop):
207 207 """push data to the remote using bundle2
208 208
209 209 The only currently supported type of data is changegroup but this will
210 210 evolve in the future."""
211 211 # Send known head to the server for race detection.
212 212 capsblob = urllib.unquote(pushop.remote.capable('bundle2-exp'))
213 213 caps = bundle2.decodecaps(capsblob)
214 214 bundler = bundle2.bundle20(pushop.ui, caps)
215 215 # create reply capability
216 216 capsblob = bundle2.encodecaps(pushop.repo.bundle2caps)
217 217 bundler.newpart('b2x:replycaps', data=capsblob)
218 218 if not pushop.force:
219 219 bundler.newpart('B2X:CHECK:HEADS', data=iter(pushop.remoteheads))
220 220 extrainfo = _pushbundle2extraparts(pushop, bundler)
221 221 # add the changegroup bundle
222 222 cg = changegroup.getlocalbundle(pushop.repo, 'push', pushop.outgoing)
223 223 cgpart = bundler.newpart('B2X:CHANGEGROUP', data=cg.getchunks())
224 224 stream = util.chunkbuffer(bundler.getchunks())
225 225 try:
226 226 reply = pushop.remote.unbundle(stream, ['force'], 'push')
227 except bundle2.UnknownPartError, exc:
227 except bundle2.BundleValueError, exc:
228 228 raise util.Abort('missing support for %s' % exc)
229 229 try:
230 230 op = bundle2.processbundle(pushop.repo, reply)
231 except bundle2.UnknownPartError, exc:
231 except bundle2.BundleValueError, exc:
232 232 raise util.Abort('missing support for %s' % exc)
233 233 cgreplies = op.records.getreplies(cgpart.id)
234 234 assert len(cgreplies['changegroup']) == 1
235 235 pushop.ret = cgreplies['changegroup'][0]['return']
236 236 _pushbundle2extrareply(pushop, op, extrainfo)
237 237
238 238 def _pushbundle2extraparts(pushop, bundler):
239 239 """hook function to let extensions add parts
240 240
241 241 Return a dict to let extensions pass data to the reply processing.
242 242 """
243 243 return {}
244 244
245 245 def _pushbundle2extrareply(pushop, op, extrainfo):
246 246 """hook function to let extensions react to part replies
247 247
248 248 The dict from _pushbundle2extrareply is fed to this function.
249 249 """
250 250 pass
251 251
252 252 def _pushchangeset(pushop):
253 253 """Make the actual push of changeset bundle to remote repo"""
254 254 outgoing = pushop.outgoing
255 255 unbundle = pushop.remote.capable('unbundle')
256 256 # TODO: get bundlecaps from remote
257 257 bundlecaps = None
258 258 # create a changegroup from local
259 259 if pushop.revs is None and not (outgoing.excluded
260 260 or pushop.repo.changelog.filteredrevs):
261 261 # push everything,
262 262 # use the fast path, no race possible on push
263 263 bundler = changegroup.bundle10(pushop.repo, bundlecaps)
264 264 cg = changegroup.getsubset(pushop.repo,
265 265 outgoing,
266 266 bundler,
267 267 'push',
268 268 fastpath=True)
269 269 else:
270 270 cg = changegroup.getlocalbundle(pushop.repo, 'push', outgoing,
271 271 bundlecaps)
272 272
273 273 # apply changegroup to remote
274 274 if unbundle:
275 275 # local repo finds heads on server, finds out what
276 276 # revs it must push. once revs transferred, if server
277 277 # finds it has different heads (someone else won
278 278 # commit/push race), server aborts.
279 279 if pushop.force:
280 280 remoteheads = ['force']
281 281 else:
282 282 remoteheads = pushop.remoteheads
283 283 # ssh: return remote's addchangegroup()
284 284 # http: return remote's addchangegroup() or 0 for error
285 285 pushop.ret = pushop.remote.unbundle(cg, remoteheads,
286 286 'push')
287 287 else:
288 288 # we return an integer indicating remote head count
289 289 # change
290 290 pushop.ret = pushop.remote.addchangegroup(cg, 'push', pushop.repo.url())
291 291
292 292 def _pushcomputecommonheads(pushop):
293 293 unfi = pushop.repo.unfiltered()
294 294 if pushop.ret:
295 295 # push succeed, synchronize target of the push
296 296 cheads = pushop.outgoing.missingheads
297 297 elif pushop.revs is None:
298 298 # All out push fails. synchronize all common
299 299 cheads = pushop.outgoing.commonheads
300 300 else:
301 301 # I want cheads = heads(::missingheads and ::commonheads)
302 302 # (missingheads is revs with secret changeset filtered out)
303 303 #
304 304 # This can be expressed as:
305 305 # cheads = ( (missingheads and ::commonheads)
306 306 # + (commonheads and ::missingheads))"
307 307 # )
308 308 #
309 309 # while trying to push we already computed the following:
310 310 # common = (::commonheads)
311 311 # missing = ((commonheads::missingheads) - commonheads)
312 312 #
313 313 # We can pick:
314 314 # * missingheads part of common (::commonheads)
315 315 common = set(pushop.outgoing.common)
316 316 nm = pushop.repo.changelog.nodemap
317 317 cheads = [node for node in pushop.revs if nm[node] in common]
318 318 # and
319 319 # * commonheads parents on missing
320 320 revset = unfi.set('%ln and parents(roots(%ln))',
321 321 pushop.outgoing.commonheads,
322 322 pushop.outgoing.missing)
323 323 cheads.extend(c.node() for c in revset)
324 324 pushop.commonheads = cheads
325 325
326 326 def _pushsyncphase(pushop):
327 327 """synchronise phase information locally and remotely"""
328 328 unfi = pushop.repo.unfiltered()
329 329 cheads = pushop.commonheads
330 330 # even when we don't push, exchanging phase data is useful
331 331 remotephases = pushop.remote.listkeys('phases')
332 332 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
333 333 and remotephases # server supports phases
334 334 and pushop.ret is None # nothing was pushed
335 335 and remotephases.get('publishing', False)):
336 336 # When:
337 337 # - this is a subrepo push
338 338 # - and remote support phase
339 339 # - and no changeset was pushed
340 340 # - and remote is publishing
341 341 # We may be in issue 3871 case!
342 342 # We drop the possible phase synchronisation done by
343 343 # courtesy to publish changesets possibly locally draft
344 344 # on the remote.
345 345 remotephases = {'publishing': 'True'}
346 346 if not remotephases: # old server or public only reply from non-publishing
347 347 _localphasemove(pushop, cheads)
348 348 # don't push any phase data as there is nothing to push
349 349 else:
350 350 ana = phases.analyzeremotephases(pushop.repo, cheads,
351 351 remotephases)
352 352 pheads, droots = ana
353 353 ### Apply remote phase on local
354 354 if remotephases.get('publishing', False):
355 355 _localphasemove(pushop, cheads)
356 356 else: # publish = False
357 357 _localphasemove(pushop, pheads)
358 358 _localphasemove(pushop, cheads, phases.draft)
359 359 ### Apply local phase on remote
360 360
361 361 # Get the list of all revs draft on remote by public here.
362 362 # XXX Beware that revset break if droots is not strictly
363 363 # XXX root we may want to ensure it is but it is costly
364 364 outdated = unfi.set('heads((%ln::%ln) and public())',
365 365 droots, cheads)
366 366 for newremotehead in outdated:
367 367 r = pushop.remote.pushkey('phases',
368 368 newremotehead.hex(),
369 369 str(phases.draft),
370 370 str(phases.public))
371 371 if not r:
372 372 pushop.ui.warn(_('updating %s to public failed!\n')
373 373 % newremotehead)
374 374
375 375 def _localphasemove(pushop, nodes, phase=phases.public):
376 376 """move <nodes> to <phase> in the local source repo"""
377 377 if pushop.locallocked:
378 378 phases.advanceboundary(pushop.repo, phase, nodes)
379 379 else:
380 380 # repo is not locked, do not change any phases!
381 381 # Informs the user that phases should have been moved when
382 382 # applicable.
383 383 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
384 384 phasestr = phases.phasenames[phase]
385 385 if actualmoves:
386 386 pushop.ui.status(_('cannot lock source repo, skipping '
387 387 'local %s phase update\n') % phasestr)
388 388
389 389 def _pushobsolete(pushop):
390 390 """utility function to push obsolete markers to a remote"""
391 391 pushop.ui.debug('try to push obsolete markers to remote\n')
392 392 repo = pushop.repo
393 393 remote = pushop.remote
394 394 if (obsolete._enabled and repo.obsstore and
395 395 'obsolete' in remote.listkeys('namespaces')):
396 396 rslts = []
397 397 remotedata = repo.listkeys('obsolete')
398 398 for key in sorted(remotedata, reverse=True):
399 399 # reverse sort to ensure we end with dump0
400 400 data = remotedata[key]
401 401 rslts.append(remote.pushkey('obsolete', key, '', data))
402 402 if [r for r in rslts if not r]:
403 403 msg = _('failed to push some obsolete markers!\n')
404 404 repo.ui.warn(msg)
405 405
406 406 def _pushbookmark(pushop):
407 407 """Update bookmark position on remote"""
408 408 ui = pushop.ui
409 409 repo = pushop.repo.unfiltered()
410 410 remote = pushop.remote
411 411 ui.debug("checking for updated bookmarks\n")
412 412 revnums = map(repo.changelog.rev, pushop.revs or [])
413 413 ancestors = [a for a in repo.changelog.ancestors(revnums, inclusive=True)]
414 414 (addsrc, adddst, advsrc, advdst, diverge, differ, invalid
415 415 ) = bookmarks.compare(repo, repo._bookmarks, remote.listkeys('bookmarks'),
416 416 srchex=hex)
417 417
418 418 for b, scid, dcid in advsrc:
419 419 if ancestors and repo[scid].rev() not in ancestors:
420 420 continue
421 421 if remote.pushkey('bookmarks', b, dcid, scid):
422 422 ui.status(_("updating bookmark %s\n") % b)
423 423 else:
424 424 ui.warn(_('updating bookmark %s failed!\n') % b)
425 425
426 426 class pulloperation(object):
427 427 """A object that represent a single pull operation
428 428
429 429 It purpose is to carry push related state and very common operation.
430 430
431 431 A new should be created at the beginning of each pull and discarded
432 432 afterward.
433 433 """
434 434
435 435 def __init__(self, repo, remote, heads=None, force=False):
436 436 # repo we pull into
437 437 self.repo = repo
438 438 # repo we pull from
439 439 self.remote = remote
440 440 # revision we try to pull (None is "all")
441 441 self.heads = heads
442 442 # do we force pull?
443 443 self.force = force
444 444 # the name the pull transaction
445 445 self._trname = 'pull\n' + util.hidepassword(remote.url())
446 446 # hold the transaction once created
447 447 self._tr = None
448 448 # set of common changeset between local and remote before pull
449 449 self.common = None
450 450 # set of pulled head
451 451 self.rheads = None
452 452 # list of missing changeset to fetch remotely
453 453 self.fetch = None
454 454 # result of changegroup pulling (used as return code by pull)
455 455 self.cgresult = None
456 456 # list of step remaining todo (related to future bundle2 usage)
457 457 self.todosteps = set(['changegroup', 'phases', 'obsmarkers'])
458 458
459 459 @util.propertycache
460 460 def pulledsubset(self):
461 461 """heads of the set of changeset target by the pull"""
462 462 # compute target subset
463 463 if self.heads is None:
464 464 # We pulled every thing possible
465 465 # sync on everything common
466 466 c = set(self.common)
467 467 ret = list(self.common)
468 468 for n in self.rheads:
469 469 if n not in c:
470 470 ret.append(n)
471 471 return ret
472 472 else:
473 473 # We pulled a specific subset
474 474 # sync on this subset
475 475 return self.heads
476 476
477 477 def gettransaction(self):
478 478 """get appropriate pull transaction, creating it if needed"""
479 479 if self._tr is None:
480 480 self._tr = self.repo.transaction(self._trname)
481 481 return self._tr
482 482
483 483 def closetransaction(self):
484 484 """close transaction if created"""
485 485 if self._tr is not None:
486 486 self._tr.close()
487 487
488 488 def releasetransaction(self):
489 489 """release transaction if created"""
490 490 if self._tr is not None:
491 491 self._tr.release()
492 492
493 493 def pull(repo, remote, heads=None, force=False):
494 494 pullop = pulloperation(repo, remote, heads, force)
495 495 if pullop.remote.local():
496 496 missing = set(pullop.remote.requirements) - pullop.repo.supported
497 497 if missing:
498 498 msg = _("required features are not"
499 499 " supported in the destination:"
500 500 " %s") % (', '.join(sorted(missing)))
501 501 raise util.Abort(msg)
502 502
503 503 lock = pullop.repo.lock()
504 504 try:
505 505 _pulldiscovery(pullop)
506 506 if (pullop.repo.ui.configbool('experimental', 'bundle2-exp', False)
507 507 and pullop.remote.capable('bundle2-exp')):
508 508 _pullbundle2(pullop)
509 509 if 'changegroup' in pullop.todosteps:
510 510 _pullchangeset(pullop)
511 511 if 'phases' in pullop.todosteps:
512 512 _pullphase(pullop)
513 513 if 'obsmarkers' in pullop.todosteps:
514 514 _pullobsolete(pullop)
515 515 pullop.closetransaction()
516 516 finally:
517 517 pullop.releasetransaction()
518 518 lock.release()
519 519
520 520 return pullop.cgresult
521 521
522 522 def _pulldiscovery(pullop):
523 523 """discovery phase for the pull
524 524
525 525 Current handle changeset discovery only, will change handle all discovery
526 526 at some point."""
527 527 tmp = discovery.findcommonincoming(pullop.repo.unfiltered(),
528 528 pullop.remote,
529 529 heads=pullop.heads,
530 530 force=pullop.force)
531 531 pullop.common, pullop.fetch, pullop.rheads = tmp
532 532
533 533 def _pullbundle2(pullop):
534 534 """pull data using bundle2
535 535
536 536 For now, the only supported data are changegroup."""
537 537 kwargs = {'bundlecaps': set(['HG2X'])}
538 538 capsblob = bundle2.encodecaps(pullop.repo.bundle2caps)
539 539 kwargs['bundlecaps'].add('bundle2=' + urllib.quote(capsblob))
540 540 # pulling changegroup
541 541 pullop.todosteps.remove('changegroup')
542 542
543 543 kwargs['common'] = pullop.common
544 544 kwargs['heads'] = pullop.heads or pullop.rheads
545 545 if not pullop.fetch:
546 546 pullop.repo.ui.status(_("no changes found\n"))
547 547 pullop.cgresult = 0
548 548 else:
549 549 if pullop.heads is None and list(pullop.common) == [nullid]:
550 550 pullop.repo.ui.status(_("requesting all changes\n"))
551 551 _pullbundle2extraprepare(pullop, kwargs)
552 552 if kwargs.keys() == ['format']:
553 553 return # nothing to pull
554 554 bundle = pullop.remote.getbundle('pull', **kwargs)
555 555 try:
556 556 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
557 except bundle2.UnknownPartError, exc:
557 except bundle2.BundleValueError, exc:
558 558 raise util.Abort('missing support for %s' % exc)
559 559
560 560 if pullop.fetch:
561 561 assert len(op.records['changegroup']) == 1
562 562 pullop.cgresult = op.records['changegroup'][0]['return']
563 563
564 564 def _pullbundle2extraprepare(pullop, kwargs):
565 565 """hook function so that extensions can extend the getbundle call"""
566 566 pass
567 567
568 568 def _pullchangeset(pullop):
569 569 """pull changeset from unbundle into the local repo"""
570 570 # We delay the open of the transaction as late as possible so we
571 571 # don't open transaction for nothing or you break future useful
572 572 # rollback call
573 573 pullop.todosteps.remove('changegroup')
574 574 if not pullop.fetch:
575 575 pullop.repo.ui.status(_("no changes found\n"))
576 576 pullop.cgresult = 0
577 577 return
578 578 pullop.gettransaction()
579 579 if pullop.heads is None and list(pullop.common) == [nullid]:
580 580 pullop.repo.ui.status(_("requesting all changes\n"))
581 581 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
582 582 # issue1320, avoid a race if remote changed after discovery
583 583 pullop.heads = pullop.rheads
584 584
585 585 if pullop.remote.capable('getbundle'):
586 586 # TODO: get bundlecaps from remote
587 587 cg = pullop.remote.getbundle('pull', common=pullop.common,
588 588 heads=pullop.heads or pullop.rheads)
589 589 elif pullop.heads is None:
590 590 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
591 591 elif not pullop.remote.capable('changegroupsubset'):
592 592 raise util.Abort(_("partial pull cannot be done because "
593 593 "other repository doesn't support "
594 594 "changegroupsubset."))
595 595 else:
596 596 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
597 597 pullop.cgresult = changegroup.addchangegroup(pullop.repo, cg, 'pull',
598 598 pullop.remote.url())
599 599
600 600 def _pullphase(pullop):
601 601 # Get remote phases data from remote
602 602 pullop.todosteps.remove('phases')
603 603 remotephases = pullop.remote.listkeys('phases')
604 604 publishing = bool(remotephases.get('publishing', False))
605 605 if remotephases and not publishing:
606 606 # remote is new and unpublishing
607 607 pheads, _dr = phases.analyzeremotephases(pullop.repo,
608 608 pullop.pulledsubset,
609 609 remotephases)
610 610 phases.advanceboundary(pullop.repo, phases.public, pheads)
611 611 phases.advanceboundary(pullop.repo, phases.draft,
612 612 pullop.pulledsubset)
613 613 else:
614 614 # Remote is old or publishing all common changesets
615 615 # should be seen as public
616 616 phases.advanceboundary(pullop.repo, phases.public,
617 617 pullop.pulledsubset)
618 618
619 619 def _pullobsolete(pullop):
620 620 """utility function to pull obsolete markers from a remote
621 621
622 622 The `gettransaction` is function that return the pull transaction, creating
623 623 one if necessary. We return the transaction to inform the calling code that
624 624 a new transaction have been created (when applicable).
625 625
626 626 Exists mostly to allow overriding for experimentation purpose"""
627 627 pullop.todosteps.remove('obsmarkers')
628 628 tr = None
629 629 if obsolete._enabled:
630 630 pullop.repo.ui.debug('fetching remote obsolete markers\n')
631 631 remoteobs = pullop.remote.listkeys('obsolete')
632 632 if 'dump0' in remoteobs:
633 633 tr = pullop.gettransaction()
634 634 for key in sorted(remoteobs, reverse=True):
635 635 if key.startswith('dump'):
636 636 data = base85.b85decode(remoteobs[key])
637 637 pullop.repo.obsstore.mergemarkers(tr, data)
638 638 pullop.repo.invalidatevolatilesets()
639 639 return tr
640 640
641 641 def getbundle(repo, source, heads=None, common=None, bundlecaps=None,
642 642 **kwargs):
643 643 """return a full bundle (with potentially multiple kind of parts)
644 644
645 645 Could be a bundle HG10 or a bundle HG2X depending on bundlecaps
646 646 passed. For now, the bundle can contain only changegroup, but this will
647 647 changes when more part type will be available for bundle2.
648 648
649 649 This is different from changegroup.getbundle that only returns an HG10
650 650 changegroup bundle. They may eventually get reunited in the future when we
651 651 have a clearer idea of the API we what to query different data.
652 652
653 653 The implementation is at a very early stage and will get massive rework
654 654 when the API of bundle is refined.
655 655 """
656 656 # build changegroup bundle here.
657 657 cg = changegroup.getbundle(repo, source, heads=heads,
658 658 common=common, bundlecaps=bundlecaps)
659 659 if bundlecaps is None or 'HG2X' not in bundlecaps:
660 660 return cg
661 661 # very crude first implementation,
662 662 # the bundle API will change and the generation will be done lazily.
663 663 b2caps = {}
664 664 for bcaps in bundlecaps:
665 665 if bcaps.startswith('bundle2='):
666 666 blob = urllib.unquote(bcaps[len('bundle2='):])
667 667 b2caps.update(bundle2.decodecaps(blob))
668 668 bundler = bundle2.bundle20(repo.ui, b2caps)
669 669 if cg:
670 670 bundler.newpart('b2x:changegroup', data=cg.getchunks())
671 671 _getbundleextrapart(bundler, repo, source, heads=heads, common=common,
672 672 bundlecaps=bundlecaps, **kwargs)
673 673 return util.chunkbuffer(bundler.getchunks())
674 674
675 675 def _getbundleextrapart(bundler, repo, source, heads=None, common=None,
676 676 bundlecaps=None, **kwargs):
677 677 """hook function to let extensions add parts to the requested bundle"""
678 678 pass
679 679
680 680 def check_heads(repo, their_heads, context):
681 681 """check if the heads of a repo have been modified
682 682
683 683 Used by peer for unbundling.
684 684 """
685 685 heads = repo.heads()
686 686 heads_hash = util.sha1(''.join(sorted(heads))).digest()
687 687 if not (their_heads == ['force'] or their_heads == heads or
688 688 their_heads == ['hashed', heads_hash]):
689 689 # someone else committed/pushed/unbundled while we
690 690 # were transferring data
691 691 raise error.PushRaced('repository changed while %s - '
692 692 'please try again' % context)
693 693
694 694 def unbundle(repo, cg, heads, source, url):
695 695 """Apply a bundle to a repo.
696 696
697 697 this function makes sure the repo is locked during the application and have
698 698 mechanism to check that no push race occurred between the creation of the
699 699 bundle and its application.
700 700
701 701 If the push was raced as PushRaced exception is raised."""
702 702 r = 0
703 703 # need a transaction when processing a bundle2 stream
704 704 tr = None
705 705 lock = repo.lock()
706 706 try:
707 707 check_heads(repo, heads, 'uploading changes')
708 708 # push can proceed
709 709 if util.safehasattr(cg, 'params'):
710 710 try:
711 711 tr = repo.transaction('unbundle')
712 712 tr.hookargs['bundle2-exp'] = '1'
713 713 r = bundle2.processbundle(repo, cg, lambda: tr).reply
714 714 cl = repo.unfiltered().changelog
715 715 p = cl.writepending() and repo.root or ""
716 716 repo.hook('b2x-pretransactionclose', throw=True, source=source,
717 717 url=url, pending=p, **tr.hookargs)
718 718 tr.close()
719 719 repo.hook('b2x-transactionclose', source=source, url=url,
720 720 **tr.hookargs)
721 721 except Exception, exc:
722 722 exc.duringunbundle2 = True
723 723 raise
724 724 else:
725 725 r = changegroup.addchangegroup(repo, cg, source, url)
726 726 finally:
727 727 if tr is not None:
728 728 tr.release()
729 729 lock.release()
730 730 return r
@@ -1,833 +1,833 b''
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import urllib, tempfile, os, sys
9 9 from i18n import _
10 10 from node import bin, hex
11 11 import changegroup as changegroupmod, bundle2
12 12 import peer, error, encoding, util, store, exchange
13 13
14 14
15 15 class abstractserverproto(object):
16 16 """abstract class that summarizes the protocol API
17 17
18 18 Used as reference and documentation.
19 19 """
20 20
21 21 def getargs(self, args):
22 22 """return the value for arguments in <args>
23 23
24 24 returns a list of values (same order as <args>)"""
25 25 raise NotImplementedError()
26 26
27 27 def getfile(self, fp):
28 28 """write the whole content of a file into a file like object
29 29
30 30 The file is in the form::
31 31
32 32 (<chunk-size>\n<chunk>)+0\n
33 33
34 34 chunk size is the ascii version of the int.
35 35 """
36 36 raise NotImplementedError()
37 37
38 38 def redirect(self):
39 39 """may setup interception for stdout and stderr
40 40
41 41 See also the `restore` method."""
42 42 raise NotImplementedError()
43 43
44 44 # If the `redirect` function does install interception, the `restore`
45 45 # function MUST be defined. If interception is not used, this function
46 46 # MUST NOT be defined.
47 47 #
48 48 # left commented here on purpose
49 49 #
50 50 #def restore(self):
51 51 # """reinstall previous stdout and stderr and return intercepted stdout
52 52 # """
53 53 # raise NotImplementedError()
54 54
55 55 def groupchunks(self, cg):
56 56 """return 4096 chunks from a changegroup object
57 57
58 58 Some protocols may have compressed the contents."""
59 59 raise NotImplementedError()
60 60
61 61 # abstract batching support
62 62
63 63 class future(object):
64 64 '''placeholder for a value to be set later'''
65 65 def set(self, value):
66 66 if util.safehasattr(self, 'value'):
67 67 raise error.RepoError("future is already set")
68 68 self.value = value
69 69
70 70 class batcher(object):
71 71 '''base class for batches of commands submittable in a single request
72 72
73 73 All methods invoked on instances of this class are simply queued and
74 74 return a a future for the result. Once you call submit(), all the queued
75 75 calls are performed and the results set in their respective futures.
76 76 '''
77 77 def __init__(self):
78 78 self.calls = []
79 79 def __getattr__(self, name):
80 80 def call(*args, **opts):
81 81 resref = future()
82 82 self.calls.append((name, args, opts, resref,))
83 83 return resref
84 84 return call
85 85 def submit(self):
86 86 pass
87 87
88 88 class localbatch(batcher):
89 89 '''performs the queued calls directly'''
90 90 def __init__(self, local):
91 91 batcher.__init__(self)
92 92 self.local = local
93 93 def submit(self):
94 94 for name, args, opts, resref in self.calls:
95 95 resref.set(getattr(self.local, name)(*args, **opts))
96 96
97 97 class remotebatch(batcher):
98 98 '''batches the queued calls; uses as few roundtrips as possible'''
99 99 def __init__(self, remote):
100 100 '''remote must support _submitbatch(encbatch) and
101 101 _submitone(op, encargs)'''
102 102 batcher.__init__(self)
103 103 self.remote = remote
104 104 def submit(self):
105 105 req, rsp = [], []
106 106 for name, args, opts, resref in self.calls:
107 107 mtd = getattr(self.remote, name)
108 108 batchablefn = getattr(mtd, 'batchable', None)
109 109 if batchablefn is not None:
110 110 batchable = batchablefn(mtd.im_self, *args, **opts)
111 111 encargsorres, encresref = batchable.next()
112 112 if encresref:
113 113 req.append((name, encargsorres,))
114 114 rsp.append((batchable, encresref, resref,))
115 115 else:
116 116 resref.set(encargsorres)
117 117 else:
118 118 if req:
119 119 self._submitreq(req, rsp)
120 120 req, rsp = [], []
121 121 resref.set(mtd(*args, **opts))
122 122 if req:
123 123 self._submitreq(req, rsp)
124 124 def _submitreq(self, req, rsp):
125 125 encresults = self.remote._submitbatch(req)
126 126 for encres, r in zip(encresults, rsp):
127 127 batchable, encresref, resref = r
128 128 encresref.set(encres)
129 129 resref.set(batchable.next())
130 130
131 131 def batchable(f):
132 132 '''annotation for batchable methods
133 133
134 134 Such methods must implement a coroutine as follows:
135 135
136 136 @batchable
137 137 def sample(self, one, two=None):
138 138 # Handle locally computable results first:
139 139 if not one:
140 140 yield "a local result", None
141 141 # Build list of encoded arguments suitable for your wire protocol:
142 142 encargs = [('one', encode(one),), ('two', encode(two),)]
143 143 # Create future for injection of encoded result:
144 144 encresref = future()
145 145 # Return encoded arguments and future:
146 146 yield encargs, encresref
147 147 # Assuming the future to be filled with the result from the batched
148 148 # request now. Decode it:
149 149 yield decode(encresref.value)
150 150
151 151 The decorator returns a function which wraps this coroutine as a plain
152 152 method, but adds the original method as an attribute called "batchable",
153 153 which is used by remotebatch to split the call into separate encoding and
154 154 decoding phases.
155 155 '''
156 156 def plain(*args, **opts):
157 157 batchable = f(*args, **opts)
158 158 encargsorres, encresref = batchable.next()
159 159 if not encresref:
160 160 return encargsorres # a local result in this case
161 161 self = args[0]
162 162 encresref.set(self._submitone(f.func_name, encargsorres))
163 163 return batchable.next()
164 164 setattr(plain, 'batchable', f)
165 165 return plain
166 166
167 167 # list of nodes encoding / decoding
168 168
169 169 def decodelist(l, sep=' '):
170 170 if l:
171 171 return map(bin, l.split(sep))
172 172 return []
173 173
174 174 def encodelist(l, sep=' '):
175 175 return sep.join(map(hex, l))
176 176
177 177 # batched call argument encoding
178 178
179 179 def escapearg(plain):
180 180 return (plain
181 181 .replace(':', '::')
182 182 .replace(',', ':,')
183 183 .replace(';', ':;')
184 184 .replace('=', ':='))
185 185
186 186 def unescapearg(escaped):
187 187 return (escaped
188 188 .replace(':=', '=')
189 189 .replace(':;', ';')
190 190 .replace(':,', ',')
191 191 .replace('::', ':'))
192 192
193 193 # client side
194 194
195 195 class wirepeer(peer.peerrepository):
196 196
197 197 def batch(self):
198 198 return remotebatch(self)
199 199 def _submitbatch(self, req):
200 200 cmds = []
201 201 for op, argsdict in req:
202 202 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
203 203 cmds.append('%s %s' % (op, args))
204 204 rsp = self._call("batch", cmds=';'.join(cmds))
205 205 return rsp.split(';')
206 206 def _submitone(self, op, args):
207 207 return self._call(op, **args)
208 208
209 209 @batchable
210 210 def lookup(self, key):
211 211 self.requirecap('lookup', _('look up remote revision'))
212 212 f = future()
213 213 yield {'key': encoding.fromlocal(key)}, f
214 214 d = f.value
215 215 success, data = d[:-1].split(" ", 1)
216 216 if int(success):
217 217 yield bin(data)
218 218 self._abort(error.RepoError(data))
219 219
220 220 @batchable
221 221 def heads(self):
222 222 f = future()
223 223 yield {}, f
224 224 d = f.value
225 225 try:
226 226 yield decodelist(d[:-1])
227 227 except ValueError:
228 228 self._abort(error.ResponseError(_("unexpected response:"), d))
229 229
230 230 @batchable
231 231 def known(self, nodes):
232 232 f = future()
233 233 yield {'nodes': encodelist(nodes)}, f
234 234 d = f.value
235 235 try:
236 236 yield [bool(int(f)) for f in d]
237 237 except ValueError:
238 238 self._abort(error.ResponseError(_("unexpected response:"), d))
239 239
240 240 @batchable
241 241 def branchmap(self):
242 242 f = future()
243 243 yield {}, f
244 244 d = f.value
245 245 try:
246 246 branchmap = {}
247 247 for branchpart in d.splitlines():
248 248 branchname, branchheads = branchpart.split(' ', 1)
249 249 branchname = encoding.tolocal(urllib.unquote(branchname))
250 250 branchheads = decodelist(branchheads)
251 251 branchmap[branchname] = branchheads
252 252 yield branchmap
253 253 except TypeError:
254 254 self._abort(error.ResponseError(_("unexpected response:"), d))
255 255
256 256 def branches(self, nodes):
257 257 n = encodelist(nodes)
258 258 d = self._call("branches", nodes=n)
259 259 try:
260 260 br = [tuple(decodelist(b)) for b in d.splitlines()]
261 261 return br
262 262 except ValueError:
263 263 self._abort(error.ResponseError(_("unexpected response:"), d))
264 264
265 265 def between(self, pairs):
266 266 batch = 8 # avoid giant requests
267 267 r = []
268 268 for i in xrange(0, len(pairs), batch):
269 269 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
270 270 d = self._call("between", pairs=n)
271 271 try:
272 272 r.extend(l and decodelist(l) or [] for l in d.splitlines())
273 273 except ValueError:
274 274 self._abort(error.ResponseError(_("unexpected response:"), d))
275 275 return r
276 276
277 277 @batchable
278 278 def pushkey(self, namespace, key, old, new):
279 279 if not self.capable('pushkey'):
280 280 yield False, None
281 281 f = future()
282 282 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
283 283 yield {'namespace': encoding.fromlocal(namespace),
284 284 'key': encoding.fromlocal(key),
285 285 'old': encoding.fromlocal(old),
286 286 'new': encoding.fromlocal(new)}, f
287 287 d = f.value
288 288 d, output = d.split('\n', 1)
289 289 try:
290 290 d = bool(int(d))
291 291 except ValueError:
292 292 raise error.ResponseError(
293 293 _('push failed (unexpected response):'), d)
294 294 for l in output.splitlines(True):
295 295 self.ui.status(_('remote: '), l)
296 296 yield d
297 297
298 298 @batchable
299 299 def listkeys(self, namespace):
300 300 if not self.capable('pushkey'):
301 301 yield {}, None
302 302 f = future()
303 303 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
304 304 yield {'namespace': encoding.fromlocal(namespace)}, f
305 305 d = f.value
306 306 r = {}
307 307 for l in d.splitlines():
308 308 k, v = l.split('\t')
309 309 r[encoding.tolocal(k)] = encoding.tolocal(v)
310 310 yield r
311 311
312 312 def stream_out(self):
313 313 return self._callstream('stream_out')
314 314
315 315 def changegroup(self, nodes, kind):
316 316 n = encodelist(nodes)
317 317 f = self._callcompressable("changegroup", roots=n)
318 318 return changegroupmod.unbundle10(f, 'UN')
319 319
320 320 def changegroupsubset(self, bases, heads, kind):
321 321 self.requirecap('changegroupsubset', _('look up remote changes'))
322 322 bases = encodelist(bases)
323 323 heads = encodelist(heads)
324 324 f = self._callcompressable("changegroupsubset",
325 325 bases=bases, heads=heads)
326 326 return changegroupmod.unbundle10(f, 'UN')
327 327
328 328 def getbundle(self, source, heads=None, common=None, bundlecaps=None,
329 329 **kwargs):
330 330 self.requirecap('getbundle', _('look up remote changes'))
331 331 opts = {}
332 332 if heads is not None:
333 333 opts['heads'] = encodelist(heads)
334 334 if common is not None:
335 335 opts['common'] = encodelist(common)
336 336 if bundlecaps is not None:
337 337 opts['bundlecaps'] = ','.join(bundlecaps)
338 338 opts.update(kwargs)
339 339 f = self._callcompressable("getbundle", **opts)
340 340 if bundlecaps is not None and 'HG2X' in bundlecaps:
341 341 return bundle2.unbundle20(self.ui, f)
342 342 else:
343 343 return changegroupmod.unbundle10(f, 'UN')
344 344
345 345 def unbundle(self, cg, heads, source):
346 346 '''Send cg (a readable file-like object representing the
347 347 changegroup to push, typically a chunkbuffer object) to the
348 348 remote server as a bundle.
349 349
350 350 When pushing a bundle10 stream, return an integer indicating the
351 351 result of the push (see localrepository.addchangegroup()).
352 352
353 353 When pushing a bundle20 stream, return a bundle20 stream.'''
354 354
355 355 if heads != ['force'] and self.capable('unbundlehash'):
356 356 heads = encodelist(['hashed',
357 357 util.sha1(''.join(sorted(heads))).digest()])
358 358 else:
359 359 heads = encodelist(heads)
360 360
361 361 if util.safehasattr(cg, 'deltaheader'):
362 362 # this a bundle10, do the old style call sequence
363 363 ret, output = self._callpush("unbundle", cg, heads=heads)
364 364 if ret == "":
365 365 raise error.ResponseError(
366 366 _('push failed:'), output)
367 367 try:
368 368 ret = int(ret)
369 369 except ValueError:
370 370 raise error.ResponseError(
371 371 _('push failed (unexpected response):'), ret)
372 372
373 373 for l in output.splitlines(True):
374 374 self.ui.status(_('remote: '), l)
375 375 else:
376 376 # bundle2 push. Send a stream, fetch a stream.
377 377 stream = self._calltwowaystream('unbundle', cg, heads=heads)
378 378 ret = bundle2.unbundle20(self.ui, stream)
379 379 return ret
380 380
381 381 def debugwireargs(self, one, two, three=None, four=None, five=None):
382 382 # don't pass optional arguments left at their default value
383 383 opts = {}
384 384 if three is not None:
385 385 opts['three'] = three
386 386 if four is not None:
387 387 opts['four'] = four
388 388 return self._call('debugwireargs', one=one, two=two, **opts)
389 389
390 390 def _call(self, cmd, **args):
391 391 """execute <cmd> on the server
392 392
393 393 The command is expected to return a simple string.
394 394
395 395 returns the server reply as a string."""
396 396 raise NotImplementedError()
397 397
398 398 def _callstream(self, cmd, **args):
399 399 """execute <cmd> on the server
400 400
401 401 The command is expected to return a stream.
402 402
403 403 returns the server reply as a file like object."""
404 404 raise NotImplementedError()
405 405
406 406 def _callcompressable(self, cmd, **args):
407 407 """execute <cmd> on the server
408 408
409 409 The command is expected to return a stream.
410 410
411 411 The stream may have been compressed in some implementations. This
412 412 function takes care of the decompression. This is the only difference
413 413 with _callstream.
414 414
415 415 returns the server reply as a file like object.
416 416 """
417 417 raise NotImplementedError()
418 418
419 419 def _callpush(self, cmd, fp, **args):
420 420 """execute a <cmd> on server
421 421
422 422 The command is expected to be related to a push. Push has a special
423 423 return method.
424 424
425 425 returns the server reply as a (ret, output) tuple. ret is either
426 426 empty (error) or a stringified int.
427 427 """
428 428 raise NotImplementedError()
429 429
430 430 def _calltwowaystream(self, cmd, fp, **args):
431 431 """execute <cmd> on server
432 432
433 433 The command will send a stream to the server and get a stream in reply.
434 434 """
435 435 raise NotImplementedError()
436 436
437 437 def _abort(self, exception):
438 438 """clearly abort the wire protocol connection and raise the exception
439 439 """
440 440 raise NotImplementedError()
441 441
442 442 # server side
443 443
444 444 # wire protocol command can either return a string or one of these classes.
445 445 class streamres(object):
446 446 """wireproto reply: binary stream
447 447
448 448 The call was successful and the result is a stream.
449 449 Iterate on the `self.gen` attribute to retrieve chunks.
450 450 """
451 451 def __init__(self, gen):
452 452 self.gen = gen
453 453
454 454 class pushres(object):
455 455 """wireproto reply: success with simple integer return
456 456
457 457 The call was successful and returned an integer contained in `self.res`.
458 458 """
459 459 def __init__(self, res):
460 460 self.res = res
461 461
462 462 class pusherr(object):
463 463 """wireproto reply: failure
464 464
465 465 The call failed. The `self.res` attribute contains the error message.
466 466 """
467 467 def __init__(self, res):
468 468 self.res = res
469 469
470 470 class ooberror(object):
471 471 """wireproto reply: failure of a batch of operation
472 472
473 473 Something failed during a batch call. The error message is stored in
474 474 `self.message`.
475 475 """
476 476 def __init__(self, message):
477 477 self.message = message
478 478
479 479 def dispatch(repo, proto, command):
480 480 repo = repo.filtered("served")
481 481 func, spec = commands[command]
482 482 args = proto.getargs(spec)
483 483 return func(repo, proto, *args)
484 484
485 485 def options(cmd, keys, others):
486 486 opts = {}
487 487 for k in keys:
488 488 if k in others:
489 489 opts[k] = others[k]
490 490 del others[k]
491 491 if others:
492 492 sys.stderr.write("abort: %s got unexpected arguments %s\n"
493 493 % (cmd, ",".join(others)))
494 494 return opts
495 495
496 496 # list of commands
497 497 commands = {}
498 498
499 499 def wireprotocommand(name, args=''):
500 500 """decorator for wire protocol command"""
501 501 def register(func):
502 502 commands[name] = (func, args)
503 503 return func
504 504 return register
505 505
506 506 @wireprotocommand('batch', 'cmds *')
507 507 def batch(repo, proto, cmds, others):
508 508 repo = repo.filtered("served")
509 509 res = []
510 510 for pair in cmds.split(';'):
511 511 op, args = pair.split(' ', 1)
512 512 vals = {}
513 513 for a in args.split(','):
514 514 if a:
515 515 n, v = a.split('=')
516 516 vals[n] = unescapearg(v)
517 517 func, spec = commands[op]
518 518 if spec:
519 519 keys = spec.split()
520 520 data = {}
521 521 for k in keys:
522 522 if k == '*':
523 523 star = {}
524 524 for key in vals.keys():
525 525 if key not in keys:
526 526 star[key] = vals[key]
527 527 data['*'] = star
528 528 else:
529 529 data[k] = vals[k]
530 530 result = func(repo, proto, *[data[k] for k in keys])
531 531 else:
532 532 result = func(repo, proto)
533 533 if isinstance(result, ooberror):
534 534 return result
535 535 res.append(escapearg(result))
536 536 return ';'.join(res)
537 537
538 538 @wireprotocommand('between', 'pairs')
539 539 def between(repo, proto, pairs):
540 540 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
541 541 r = []
542 542 for b in repo.between(pairs):
543 543 r.append(encodelist(b) + "\n")
544 544 return "".join(r)
545 545
546 546 @wireprotocommand('branchmap')
547 547 def branchmap(repo, proto):
548 548 branchmap = repo.branchmap()
549 549 heads = []
550 550 for branch, nodes in branchmap.iteritems():
551 551 branchname = urllib.quote(encoding.fromlocal(branch))
552 552 branchnodes = encodelist(nodes)
553 553 heads.append('%s %s' % (branchname, branchnodes))
554 554 return '\n'.join(heads)
555 555
556 556 @wireprotocommand('branches', 'nodes')
557 557 def branches(repo, proto, nodes):
558 558 nodes = decodelist(nodes)
559 559 r = []
560 560 for b in repo.branches(nodes):
561 561 r.append(encodelist(b) + "\n")
562 562 return "".join(r)
563 563
564 564
565 565 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
566 566 'known', 'getbundle', 'unbundlehash', 'batch']
567 567
568 568 def _capabilities(repo, proto):
569 569 """return a list of capabilities for a repo
570 570
571 571 This function exists to allow extensions to easily wrap capabilities
572 572 computation
573 573
574 574 - returns a lists: easy to alter
575 575 - change done here will be propagated to both `capabilities` and `hello`
576 576 command without any other action needed.
577 577 """
578 578 # copy to prevent modification of the global list
579 579 caps = list(wireprotocaps)
580 580 if _allowstream(repo.ui):
581 581 if repo.ui.configbool('server', 'preferuncompressed', False):
582 582 caps.append('stream-preferred')
583 583 requiredformats = repo.requirements & repo.supportedformats
584 584 # if our local revlogs are just revlogv1, add 'stream' cap
585 585 if not requiredformats - set(('revlogv1',)):
586 586 caps.append('stream')
587 587 # otherwise, add 'streamreqs' detailing our local revlog format
588 588 else:
589 589 caps.append('streamreqs=%s' % ','.join(requiredformats))
590 590 if repo.ui.configbool('experimental', 'bundle2-exp', False):
591 591 capsblob = bundle2.encodecaps(repo.bundle2caps)
592 592 caps.append('bundle2-exp=' + urllib.quote(capsblob))
593 593 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
594 594 caps.append('httpheader=1024')
595 595 return caps
596 596
597 597 # If you are writing an extension and consider wrapping this function. Wrap
598 598 # `_capabilities` instead.
599 599 @wireprotocommand('capabilities')
600 600 def capabilities(repo, proto):
601 601 return ' '.join(_capabilities(repo, proto))
602 602
603 603 @wireprotocommand('changegroup', 'roots')
604 604 def changegroup(repo, proto, roots):
605 605 nodes = decodelist(roots)
606 606 cg = changegroupmod.changegroup(repo, nodes, 'serve')
607 607 return streamres(proto.groupchunks(cg))
608 608
609 609 @wireprotocommand('changegroupsubset', 'bases heads')
610 610 def changegroupsubset(repo, proto, bases, heads):
611 611 bases = decodelist(bases)
612 612 heads = decodelist(heads)
613 613 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
614 614 return streamres(proto.groupchunks(cg))
615 615
616 616 @wireprotocommand('debugwireargs', 'one two *')
617 617 def debugwireargs(repo, proto, one, two, others):
618 618 # only accept optional args from the known set
619 619 opts = options('debugwireargs', ['three', 'four'], others)
620 620 return repo.debugwireargs(one, two, **opts)
621 621
622 622 @wireprotocommand('getbundle', '*')
623 623 def getbundle(repo, proto, others):
624 624 opts = options('getbundle', ['heads', 'common', 'bundlecaps'], others)
625 625 for k, v in opts.iteritems():
626 626 if k in ('heads', 'common'):
627 627 opts[k] = decodelist(v)
628 628 elif k == 'bundlecaps':
629 629 opts[k] = set(v.split(','))
630 630 cg = exchange.getbundle(repo, 'serve', **opts)
631 631 return streamres(proto.groupchunks(cg))
632 632
633 633 @wireprotocommand('heads')
634 634 def heads(repo, proto):
635 635 h = repo.heads()
636 636 return encodelist(h) + "\n"
637 637
638 638 @wireprotocommand('hello')
639 639 def hello(repo, proto):
640 640 '''the hello command returns a set of lines describing various
641 641 interesting things about the server, in an RFC822-like format.
642 642 Currently the only one defined is "capabilities", which
643 643 consists of a line in the form:
644 644
645 645 capabilities: space separated list of tokens
646 646 '''
647 647 return "capabilities: %s\n" % (capabilities(repo, proto))
648 648
649 649 @wireprotocommand('listkeys', 'namespace')
650 650 def listkeys(repo, proto, namespace):
651 651 d = repo.listkeys(encoding.tolocal(namespace)).items()
652 652 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
653 653 for k, v in d])
654 654 return t
655 655
656 656 @wireprotocommand('lookup', 'key')
657 657 def lookup(repo, proto, key):
658 658 try:
659 659 k = encoding.tolocal(key)
660 660 c = repo[k]
661 661 r = c.hex()
662 662 success = 1
663 663 except Exception, inst:
664 664 r = str(inst)
665 665 success = 0
666 666 return "%s %s\n" % (success, r)
667 667
668 668 @wireprotocommand('known', 'nodes *')
669 669 def known(repo, proto, nodes, others):
670 670 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
671 671
672 672 @wireprotocommand('pushkey', 'namespace key old new')
673 673 def pushkey(repo, proto, namespace, key, old, new):
674 674 # compatibility with pre-1.8 clients which were accidentally
675 675 # sending raw binary nodes rather than utf-8-encoded hex
676 676 if len(new) == 20 and new.encode('string-escape') != new:
677 677 # looks like it could be a binary node
678 678 try:
679 679 new.decode('utf-8')
680 680 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
681 681 except UnicodeDecodeError:
682 682 pass # binary, leave unmodified
683 683 else:
684 684 new = encoding.tolocal(new) # normal path
685 685
686 686 if util.safehasattr(proto, 'restore'):
687 687
688 688 proto.redirect()
689 689
690 690 try:
691 691 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
692 692 encoding.tolocal(old), new) or False
693 693 except util.Abort:
694 694 r = False
695 695
696 696 output = proto.restore()
697 697
698 698 return '%s\n%s' % (int(r), output)
699 699
700 700 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
701 701 encoding.tolocal(old), new)
702 702 return '%s\n' % int(r)
703 703
704 704 def _allowstream(ui):
705 705 return ui.configbool('server', 'uncompressed', True, untrusted=True)
706 706
707 707 def _walkstreamfiles(repo):
708 708 # this is it's own function so extensions can override it
709 709 return repo.store.walk()
710 710
711 711 @wireprotocommand('stream_out')
712 712 def stream(repo, proto):
713 713 '''If the server supports streaming clone, it advertises the "stream"
714 714 capability with a value representing the version and flags of the repo
715 715 it is serving. Client checks to see if it understands the format.
716 716
717 717 The format is simple: the server writes out a line with the amount
718 718 of files, then the total amount of bytes to be transferred (separated
719 719 by a space). Then, for each file, the server first writes the filename
720 720 and file size (separated by the null character), then the file contents.
721 721 '''
722 722
723 723 if not _allowstream(repo.ui):
724 724 return '1\n'
725 725
726 726 entries = []
727 727 total_bytes = 0
728 728 try:
729 729 # get consistent snapshot of repo, lock during scan
730 730 lock = repo.lock()
731 731 try:
732 732 repo.ui.debug('scanning\n')
733 733 for name, ename, size in _walkstreamfiles(repo):
734 734 if size:
735 735 entries.append((name, size))
736 736 total_bytes += size
737 737 finally:
738 738 lock.release()
739 739 except error.LockError:
740 740 return '2\n' # error: 2
741 741
742 742 def streamer(repo, entries, total):
743 743 '''stream out all metadata files in repository.'''
744 744 yield '0\n' # success
745 745 repo.ui.debug('%d files, %d bytes to transfer\n' %
746 746 (len(entries), total_bytes))
747 747 yield '%d %d\n' % (len(entries), total_bytes)
748 748
749 749 sopener = repo.sopener
750 750 oldaudit = sopener.mustaudit
751 751 debugflag = repo.ui.debugflag
752 752 sopener.mustaudit = False
753 753
754 754 try:
755 755 for name, size in entries:
756 756 if debugflag:
757 757 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
758 758 # partially encode name over the wire for backwards compat
759 759 yield '%s\0%d\n' % (store.encodedir(name), size)
760 760 if size <= 65536:
761 761 fp = sopener(name)
762 762 try:
763 763 data = fp.read(size)
764 764 finally:
765 765 fp.close()
766 766 yield data
767 767 else:
768 768 for chunk in util.filechunkiter(sopener(name), limit=size):
769 769 yield chunk
770 770 # replace with "finally:" when support for python 2.4 has been dropped
771 771 except Exception:
772 772 sopener.mustaudit = oldaudit
773 773 raise
774 774 sopener.mustaudit = oldaudit
775 775
776 776 return streamres(streamer(repo, entries, total_bytes))
777 777
778 778 @wireprotocommand('unbundle', 'heads')
779 779 def unbundle(repo, proto, heads):
780 780 their_heads = decodelist(heads)
781 781
782 782 try:
783 783 proto.redirect()
784 784
785 785 exchange.check_heads(repo, their_heads, 'preparing changes')
786 786
787 787 # write bundle data to temporary file because it can be big
788 788 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
789 789 fp = os.fdopen(fd, 'wb+')
790 790 r = 0
791 791 try:
792 792 proto.getfile(fp)
793 793 fp.seek(0)
794 794 gen = exchange.readbundle(repo.ui, fp, None)
795 795 r = exchange.unbundle(repo, gen, their_heads, 'serve',
796 796 proto._client())
797 797 if util.safehasattr(r, 'addpart'):
798 798 # The return looks streameable, we are in the bundle2 case and
799 799 # should return a stream.
800 800 return streamres(r.getchunks())
801 801 return pushres(r)
802 802
803 803 finally:
804 804 fp.close()
805 805 os.unlink(tempname)
806 except bundle2.UnknownPartError, exc:
806 except bundle2.BundleValueError, exc:
807 807 bundler = bundle2.bundle20(repo.ui)
808 808 bundler.newpart('B2X:ERROR:UNKNOWNPART', [('parttype', str(exc))])
809 809 return streamres(bundler.getchunks())
810 810 except util.Abort, inst:
811 811 # The old code we moved used sys.stderr directly.
812 812 # We did not change it to minimise code change.
813 813 # This need to be moved to something proper.
814 814 # Feel free to do it.
815 815 if getattr(inst, 'duringunbundle2', False):
816 816 bundler = bundle2.bundle20(repo.ui)
817 817 manargs = [('message', str(inst))]
818 818 advargs = []
819 819 if inst.hint is not None:
820 820 advargs.append(('hint', inst.hint))
821 821 bundler.addpart(bundle2.bundlepart('B2X:ERROR:ABORT',
822 822 manargs, advargs))
823 823 return streamres(bundler.getchunks())
824 824 else:
825 825 sys.stderr.write("abort: %s\n" % inst)
826 826 return pushres(0)
827 827 except error.PushRaced, exc:
828 828 if getattr(exc, 'duringunbundle2', False):
829 829 bundler = bundle2.bundle20(repo.ui)
830 830 bundler.newpart('B2X:ERROR:PUSHRACED', [('message', str(exc))])
831 831 return streamres(bundler.getchunks())
832 832 else:
833 833 return pusherr(str(exc))
@@ -1,1085 +1,1085 b''
1 1
2 2 Create an extension to test bundle2 API
3 3
4 4 $ cat > bundle2.py << EOF
5 5 > """A small extension to test bundle2 implementation
6 6 >
7 7 > Current bundle2 implementation is far too limited to be used in any core
8 8 > code. We still need to be able to test it while it grow up.
9 9 > """
10 10 >
11 11 > try:
12 12 > import msvcrt
13 13 > msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)
14 14 > msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
15 15 > msvcrt.setmode(sys.stderr.fileno(), os.O_BINARY)
16 16 > except ImportError:
17 17 > pass
18 18 >
19 19 > import sys
20 20 > from mercurial import cmdutil
21 21 > from mercurial import util
22 22 > from mercurial import bundle2
23 23 > from mercurial import scmutil
24 24 > from mercurial import discovery
25 25 > from mercurial import changegroup
26 26 > from mercurial import error
27 27 > cmdtable = {}
28 28 > command = cmdutil.command(cmdtable)
29 29 >
30 30 > ELEPHANTSSONG = """Patali Dirapata, Cromda Cromda Ripalo, Pata Pata, Ko Ko Ko
31 31 > Bokoro Dipoulito, Rondi Rondi Pepino, Pata Pata, Ko Ko Ko
32 32 > Emana Karassoli, Loucra Loucra Ponponto, Pata Pata, Ko Ko Ko."""
33 33 > assert len(ELEPHANTSSONG) == 178 # future test say 178 bytes, trust it.
34 34 >
35 35 > @bundle2.parthandler('test:song')
36 36 > def songhandler(op, part):
37 37 > """handle a "test:song" bundle2 part, printing the lyrics on stdin"""
38 38 > op.ui.write('The choir starts singing:\n')
39 39 > verses = 0
40 40 > for line in part.read().split('\n'):
41 41 > op.ui.write(' %s\n' % line)
42 42 > verses += 1
43 43 > op.records.add('song', {'verses': verses})
44 44 >
45 45 > @bundle2.parthandler('test:ping')
46 46 > def pinghandler(op, part):
47 47 > op.ui.write('received ping request (id %i)\n' % part.id)
48 48 > if op.reply is not None and 'ping-pong' in op.reply.capabilities:
49 49 > op.ui.write_err('replying to ping request (id %i)\n' % part.id)
50 50 > op.reply.newpart('test:pong', [('in-reply-to', str(part.id))])
51 51 >
52 52 > @bundle2.parthandler('test:debugreply')
53 53 > def debugreply(op, part):
54 54 > """print data about the capacity of the bundle reply"""
55 55 > if op.reply is None:
56 56 > op.ui.write('debugreply: no reply\n')
57 57 > else:
58 58 > op.ui.write('debugreply: capabilities:\n')
59 59 > for cap in sorted(op.reply.capabilities):
60 60 > op.ui.write('debugreply: %r\n' % cap)
61 61 > for val in op.reply.capabilities[cap]:
62 62 > op.ui.write('debugreply: %r\n' % val)
63 63 >
64 64 > @command('bundle2',
65 65 > [('', 'param', [], 'stream level parameter'),
66 66 > ('', 'unknown', False, 'include an unknown mandatory part in the bundle'),
67 67 > ('', 'parts', False, 'include some arbitrary parts to the bundle'),
68 68 > ('', 'reply', False, 'produce a reply bundle'),
69 69 > ('', 'pushrace', False, 'includes a check:head part with unknown nodes'),
70 70 > ('r', 'rev', [], 'includes those changeset in the bundle'),],
71 71 > '[OUTPUTFILE]')
72 72 > def cmdbundle2(ui, repo, path=None, **opts):
73 73 > """write a bundle2 container on standard ouput"""
74 74 > bundler = bundle2.bundle20(ui)
75 75 > for p in opts['param']:
76 76 > p = p.split('=', 1)
77 77 > try:
78 78 > bundler.addparam(*p)
79 79 > except ValueError, exc:
80 80 > raise util.Abort('%s' % exc)
81 81 >
82 82 > if opts['reply']:
83 83 > capsstring = 'ping-pong\nelephants=babar,celeste\ncity%3D%21=celeste%2Cville'
84 84 > bundler.newpart('b2x:replycaps', data=capsstring)
85 85 >
86 86 > if opts['pushrace']:
87 87 > # also serve to test the assignement of data outside of init
88 88 > part = bundler.newpart('b2x:check:heads')
89 89 > part.data = '01234567890123456789'
90 90 >
91 91 > revs = opts['rev']
92 92 > if 'rev' in opts:
93 93 > revs = scmutil.revrange(repo, opts['rev'])
94 94 > if revs:
95 95 > # very crude version of a changegroup part creation
96 96 > bundled = repo.revs('%ld::%ld', revs, revs)
97 97 > headmissing = [c.node() for c in repo.set('heads(%ld)', revs)]
98 98 > headcommon = [c.node() for c in repo.set('parents(%ld) - %ld', revs, revs)]
99 99 > outgoing = discovery.outgoing(repo.changelog, headcommon, headmissing)
100 100 > cg = changegroup.getlocalbundle(repo, 'test:bundle2', outgoing, None)
101 101 > bundler.newpart('b2x:changegroup', data=cg.getchunks())
102 102 >
103 103 > if opts['parts']:
104 104 > bundler.newpart('test:empty')
105 105 > # add a second one to make sure we handle multiple parts
106 106 > bundler.newpart('test:empty')
107 107 > bundler.newpart('test:song', data=ELEPHANTSSONG)
108 108 > bundler.newpart('test:debugreply')
109 109 > mathpart = bundler.newpart('test:math')
110 110 > mathpart.addparam('pi', '3.14')
111 111 > mathpart.addparam('e', '2.72')
112 112 > mathpart.addparam('cooking', 'raw', mandatory=False)
113 113 > mathpart.data = '42'
114 114 > if opts['unknown']:
115 115 > bundler.newpart('test:UNKNOWN', data='some random content')
116 116 > if opts['parts']:
117 117 > bundler.newpart('test:ping')
118 118 >
119 119 > if path is None:
120 120 > file = sys.stdout
121 121 > else:
122 122 > file = open(path, 'w')
123 123 >
124 124 > for chunk in bundler.getchunks():
125 125 > file.write(chunk)
126 126 >
127 127 > @command('unbundle2', [], '')
128 128 > def cmdunbundle2(ui, repo, replypath=None):
129 129 > """process a bundle2 stream from stdin on the current repo"""
130 130 > try:
131 131 > tr = None
132 132 > lock = repo.lock()
133 133 > tr = repo.transaction('processbundle')
134 134 > try:
135 135 > unbundler = bundle2.unbundle20(ui, sys.stdin)
136 136 > op = bundle2.processbundle(repo, unbundler, lambda: tr)
137 137 > tr.close()
138 > except KeyError, exc:
138 > except bundle2.BundleValueError, exc:
139 139 > raise util.Abort('missing support for %s' % exc)
140 140 > except error.PushRaced, exc:
141 141 > raise util.Abort('push race: %s' % exc)
142 142 > finally:
143 143 > if tr is not None:
144 144 > tr.release()
145 145 > lock.release()
146 146 > remains = sys.stdin.read()
147 147 > ui.write('%i unread bytes\n' % len(remains))
148 148 > if op.records['song']:
149 149 > totalverses = sum(r['verses'] for r in op.records['song'])
150 150 > ui.write('%i total verses sung\n' % totalverses)
151 151 > for rec in op.records['changegroup']:
152 152 > ui.write('addchangegroup return: %i\n' % rec['return'])
153 153 > if op.reply is not None and replypath is not None:
154 154 > file = open(replypath, 'w')
155 155 > for chunk in op.reply.getchunks():
156 156 > file.write(chunk)
157 157 >
158 158 > @command('statbundle2', [], '')
159 159 > def cmdstatbundle2(ui, repo):
160 160 > """print statistic on the bundle2 container read from stdin"""
161 161 > unbundler = bundle2.unbundle20(ui, sys.stdin)
162 162 > try:
163 163 > params = unbundler.params
164 164 > except KeyError, exc:
165 165 > raise util.Abort('unknown parameters: %s' % exc)
166 166 > ui.write('options count: %i\n' % len(params))
167 167 > for key in sorted(params):
168 168 > ui.write('- %s\n' % key)
169 169 > value = params[key]
170 170 > if value is not None:
171 171 > ui.write(' %s\n' % value)
172 172 > count = 0
173 173 > for p in unbundler.iterparts():
174 174 > count += 1
175 175 > ui.write(' :%s:\n' % p.type)
176 176 > ui.write(' mandatory: %i\n' % len(p.mandatoryparams))
177 177 > ui.write(' advisory: %i\n' % len(p.advisoryparams))
178 178 > ui.write(' payload: %i bytes\n' % len(p.read()))
179 179 > ui.write('parts count: %i\n' % count)
180 180 > EOF
181 181 $ cat >> $HGRCPATH << EOF
182 182 > [extensions]
183 183 > bundle2=$TESTTMP/bundle2.py
184 184 > [experimental]
185 185 > bundle2-exp=True
186 186 > [ui]
187 187 > ssh=python "$TESTDIR/dummyssh"
188 188 > [web]
189 189 > push_ssl = false
190 190 > allow_push = *
191 191 > EOF
192 192
193 193 The extension requires a repo (currently unused)
194 194
195 195 $ hg init main
196 196 $ cd main
197 197 $ touch a
198 198 $ hg add a
199 199 $ hg commit -m 'a'
200 200
201 201
202 202 Empty bundle
203 203 =================
204 204
205 205 - no option
206 206 - no parts
207 207
208 208 Test bundling
209 209
210 210 $ hg bundle2
211 211 HG2X\x00\x00\x00\x00 (no-eol) (esc)
212 212
213 213 Test unbundling
214 214
215 215 $ hg bundle2 | hg statbundle2
216 216 options count: 0
217 217 parts count: 0
218 218
219 219 Test old style bundle are detected and refused
220 220
221 221 $ hg bundle --all ../bundle.hg
222 222 1 changesets found
223 223 $ hg statbundle2 < ../bundle.hg
224 224 abort: unknown bundle version 10
225 225 [255]
226 226
227 227 Test parameters
228 228 =================
229 229
230 230 - some options
231 231 - no parts
232 232
233 233 advisory parameters, no value
234 234 -------------------------------
235 235
236 236 Simplest possible parameters form
237 237
238 238 Test generation simple option
239 239
240 240 $ hg bundle2 --param 'caution'
241 241 HG2X\x00\x07caution\x00\x00 (no-eol) (esc)
242 242
243 243 Test unbundling
244 244
245 245 $ hg bundle2 --param 'caution' | hg statbundle2
246 246 options count: 1
247 247 - caution
248 248 parts count: 0
249 249
250 250 Test generation multiple option
251 251
252 252 $ hg bundle2 --param 'caution' --param 'meal'
253 253 HG2X\x00\x0ccaution meal\x00\x00 (no-eol) (esc)
254 254
255 255 Test unbundling
256 256
257 257 $ hg bundle2 --param 'caution' --param 'meal' | hg statbundle2
258 258 options count: 2
259 259 - caution
260 260 - meal
261 261 parts count: 0
262 262
263 263 advisory parameters, with value
264 264 -------------------------------
265 265
266 266 Test generation
267 267
268 268 $ hg bundle2 --param 'caution' --param 'meal=vegan' --param 'elephants'
269 269 HG2X\x00\x1ccaution meal=vegan elephants\x00\x00 (no-eol) (esc)
270 270
271 271 Test unbundling
272 272
273 273 $ hg bundle2 --param 'caution' --param 'meal=vegan' --param 'elephants' | hg statbundle2
274 274 options count: 3
275 275 - caution
276 276 - elephants
277 277 - meal
278 278 vegan
279 279 parts count: 0
280 280
281 281 parameter with special char in value
282 282 ---------------------------------------------------
283 283
284 284 Test generation
285 285
286 286 $ hg bundle2 --param 'e|! 7/=babar%#==tutu' --param simple
287 287 HG2X\x00)e%7C%21%207/=babar%25%23%3D%3Dtutu simple\x00\x00 (no-eol) (esc)
288 288
289 289 Test unbundling
290 290
291 291 $ hg bundle2 --param 'e|! 7/=babar%#==tutu' --param simple | hg statbundle2
292 292 options count: 2
293 293 - e|! 7/
294 294 babar%#==tutu
295 295 - simple
296 296 parts count: 0
297 297
298 298 Test unknown mandatory option
299 299 ---------------------------------------------------
300 300
301 301 $ hg bundle2 --param 'Gravity' | hg statbundle2
302 302 abort: unknown parameters: 'Gravity'
303 303 [255]
304 304
305 305 Test debug output
306 306 ---------------------------------------------------
307 307
308 308 bundling debug
309 309
310 310 $ hg bundle2 --debug --param 'e|! 7/=babar%#==tutu' --param simple ../out.hg2
311 311 start emission of HG2X stream
312 312 bundle parameter: e%7C%21%207/=babar%25%23%3D%3Dtutu simple
313 313 start of parts
314 314 end of bundle
315 315
316 316 file content is ok
317 317
318 318 $ cat ../out.hg2
319 319 HG2X\x00)e%7C%21%207/=babar%25%23%3D%3Dtutu simple\x00\x00 (no-eol) (esc)
320 320
321 321 unbundling debug
322 322
323 323 $ hg statbundle2 --debug < ../out.hg2
324 324 start processing of HG2X stream
325 325 reading bundle2 stream parameters
326 326 ignoring unknown parameter 'e|! 7/'
327 327 ignoring unknown parameter 'simple'
328 328 options count: 2
329 329 - e|! 7/
330 330 babar%#==tutu
331 331 - simple
332 332 start extraction of bundle2 parts
333 333 part header size: 0
334 334 end of bundle2 stream
335 335 parts count: 0
336 336
337 337
338 338 Test buggy input
339 339 ---------------------------------------------------
340 340
341 341 empty parameter name
342 342
343 343 $ hg bundle2 --param '' --quiet
344 344 abort: empty parameter name
345 345 [255]
346 346
347 347 bad parameter name
348 348
349 349 $ hg bundle2 --param 42babar
350 350 abort: non letter first character: '42babar'
351 351 [255]
352 352
353 353
354 354 Test part
355 355 =================
356 356
357 357 $ hg bundle2 --parts ../parts.hg2 --debug
358 358 start emission of HG2X stream
359 359 bundle parameter:
360 360 start of parts
361 361 bundle part: "test:empty"
362 362 bundle part: "test:empty"
363 363 bundle part: "test:song"
364 364 bundle part: "test:debugreply"
365 365 bundle part: "test:math"
366 366 bundle part: "test:ping"
367 367 end of bundle
368 368
369 369 $ cat ../parts.hg2
370 370 HG2X\x00\x00\x00\x11 (esc)
371 371 test:empty\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x11 (esc)
372 372 test:empty\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x10 test:song\x00\x00\x00\x02\x00\x00\x00\x00\x00\xb2Patali Dirapata, Cromda Cromda Ripalo, Pata Pata, Ko Ko Ko (esc)
373 373 Bokoro Dipoulito, Rondi Rondi Pepino, Pata Pata, Ko Ko Ko
374 374 Emana Karassoli, Loucra Loucra Ponponto, Pata Pata, Ko Ko Ko.\x00\x00\x00\x00\x00\x16\x0ftest:debugreply\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00+ test:math\x00\x00\x00\x04\x02\x01\x02\x04\x01\x04\x07\x03pi3.14e2.72cookingraw\x00\x00\x00\x0242\x00\x00\x00\x00\x00\x10 test:ping\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00 (no-eol) (esc)
375 375
376 376
377 377 $ hg statbundle2 < ../parts.hg2
378 378 options count: 0
379 379 :test:empty:
380 380 mandatory: 0
381 381 advisory: 0
382 382 payload: 0 bytes
383 383 :test:empty:
384 384 mandatory: 0
385 385 advisory: 0
386 386 payload: 0 bytes
387 387 :test:song:
388 388 mandatory: 0
389 389 advisory: 0
390 390 payload: 178 bytes
391 391 :test:debugreply:
392 392 mandatory: 0
393 393 advisory: 0
394 394 payload: 0 bytes
395 395 :test:math:
396 396 mandatory: 2
397 397 advisory: 1
398 398 payload: 2 bytes
399 399 :test:ping:
400 400 mandatory: 0
401 401 advisory: 0
402 402 payload: 0 bytes
403 403 parts count: 6
404 404
405 405 $ hg statbundle2 --debug < ../parts.hg2
406 406 start processing of HG2X stream
407 407 reading bundle2 stream parameters
408 408 options count: 0
409 409 start extraction of bundle2 parts
410 410 part header size: 17
411 411 part type: "test:empty"
412 412 part id: "0"
413 413 part parameters: 0
414 414 :test:empty:
415 415 mandatory: 0
416 416 advisory: 0
417 417 payload chunk size: 0
418 418 payload: 0 bytes
419 419 part header size: 17
420 420 part type: "test:empty"
421 421 part id: "1"
422 422 part parameters: 0
423 423 :test:empty:
424 424 mandatory: 0
425 425 advisory: 0
426 426 payload chunk size: 0
427 427 payload: 0 bytes
428 428 part header size: 16
429 429 part type: "test:song"
430 430 part id: "2"
431 431 part parameters: 0
432 432 :test:song:
433 433 mandatory: 0
434 434 advisory: 0
435 435 payload chunk size: 178
436 436 payload chunk size: 0
437 437 payload: 178 bytes
438 438 part header size: 22
439 439 part type: "test:debugreply"
440 440 part id: "3"
441 441 part parameters: 0
442 442 :test:debugreply:
443 443 mandatory: 0
444 444 advisory: 0
445 445 payload chunk size: 0
446 446 payload: 0 bytes
447 447 part header size: 43
448 448 part type: "test:math"
449 449 part id: "4"
450 450 part parameters: 3
451 451 :test:math:
452 452 mandatory: 2
453 453 advisory: 1
454 454 payload chunk size: 2
455 455 payload chunk size: 0
456 456 payload: 2 bytes
457 457 part header size: 16
458 458 part type: "test:ping"
459 459 part id: "5"
460 460 part parameters: 0
461 461 :test:ping:
462 462 mandatory: 0
463 463 advisory: 0
464 464 payload chunk size: 0
465 465 payload: 0 bytes
466 466 part header size: 0
467 467 end of bundle2 stream
468 468 parts count: 6
469 469
470 470 Test actual unbundling of test part
471 471 =======================================
472 472
473 473 Process the bundle
474 474
475 475 $ hg unbundle2 --debug < ../parts.hg2
476 476 start processing of HG2X stream
477 477 reading bundle2 stream parameters
478 478 start extraction of bundle2 parts
479 479 part header size: 17
480 480 part type: "test:empty"
481 481 part id: "0"
482 482 part parameters: 0
483 483 ignoring unknown advisory part 'test:empty'
484 484 payload chunk size: 0
485 485 part header size: 17
486 486 part type: "test:empty"
487 487 part id: "1"
488 488 part parameters: 0
489 489 ignoring unknown advisory part 'test:empty'
490 490 payload chunk size: 0
491 491 part header size: 16
492 492 part type: "test:song"
493 493 part id: "2"
494 494 part parameters: 0
495 495 found a handler for part 'test:song'
496 496 The choir starts singing:
497 497 payload chunk size: 178
498 498 payload chunk size: 0
499 499 Patali Dirapata, Cromda Cromda Ripalo, Pata Pata, Ko Ko Ko
500 500 Bokoro Dipoulito, Rondi Rondi Pepino, Pata Pata, Ko Ko Ko
501 501 Emana Karassoli, Loucra Loucra Ponponto, Pata Pata, Ko Ko Ko.
502 502 part header size: 22
503 503 part type: "test:debugreply"
504 504 part id: "3"
505 505 part parameters: 0
506 506 found a handler for part 'test:debugreply'
507 507 debugreply: no reply
508 508 payload chunk size: 0
509 509 part header size: 43
510 510 part type: "test:math"
511 511 part id: "4"
512 512 part parameters: 3
513 513 ignoring unknown advisory part 'test:math'
514 514 payload chunk size: 2
515 515 payload chunk size: 0
516 516 part header size: 16
517 517 part type: "test:ping"
518 518 part id: "5"
519 519 part parameters: 0
520 520 found a handler for part 'test:ping'
521 521 received ping request (id 5)
522 522 payload chunk size: 0
523 523 part header size: 0
524 524 end of bundle2 stream
525 525 0 unread bytes
526 526 3 total verses sung
527 527
528 528 Unbundle with an unknown mandatory part
529 529 (should abort)
530 530
531 531 $ hg bundle2 --parts --unknown ../unknown.hg2
532 532
533 533 $ hg unbundle2 < ../unknown.hg2
534 534 The choir starts singing:
535 535 Patali Dirapata, Cromda Cromda Ripalo, Pata Pata, Ko Ko Ko
536 536 Bokoro Dipoulito, Rondi Rondi Pepino, Pata Pata, Ko Ko Ko
537 537 Emana Karassoli, Loucra Loucra Ponponto, Pata Pata, Ko Ko Ko.
538 538 debugreply: no reply
539 539 0 unread bytes
540 abort: missing support for 'test:unknown'
540 abort: missing support for test:unknown
541 541 [255]
542 542
543 543 unbundle with a reply
544 544
545 545 $ hg bundle2 --parts --reply ../parts-reply.hg2
546 546 $ hg unbundle2 ../reply.hg2 < ../parts-reply.hg2
547 547 0 unread bytes
548 548 3 total verses sung
549 549
550 550 The reply is a bundle
551 551
552 552 $ cat ../reply.hg2
553 553 HG2X\x00\x00\x00\x1f (esc)
554 554 b2x:output\x00\x00\x00\x00\x00\x01\x0b\x01in-reply-to3\x00\x00\x00\xd9The choir starts singing: (esc)
555 555 Patali Dirapata, Cromda Cromda Ripalo, Pata Pata, Ko Ko Ko
556 556 Bokoro Dipoulito, Rondi Rondi Pepino, Pata Pata, Ko Ko Ko
557 557 Emana Karassoli, Loucra Loucra Ponponto, Pata Pata, Ko Ko Ko.
558 558 \x00\x00\x00\x00\x00\x1f (esc)
559 559 b2x:output\x00\x00\x00\x01\x00\x01\x0b\x01in-reply-to4\x00\x00\x00\xc9debugreply: capabilities: (esc)
560 560 debugreply: 'city=!'
561 561 debugreply: 'celeste,ville'
562 562 debugreply: 'elephants'
563 563 debugreply: 'babar'
564 564 debugreply: 'celeste'
565 565 debugreply: 'ping-pong'
566 566 \x00\x00\x00\x00\x00\x1e test:pong\x00\x00\x00\x02\x01\x00\x0b\x01in-reply-to6\x00\x00\x00\x00\x00\x1f (esc)
567 567 b2x:output\x00\x00\x00\x03\x00\x01\x0b\x01in-reply-to6\x00\x00\x00=received ping request (id 6) (esc)
568 568 replying to ping request (id 6)
569 569 \x00\x00\x00\x00\x00\x00 (no-eol) (esc)
570 570
571 571 The reply is valid
572 572
573 573 $ hg statbundle2 < ../reply.hg2
574 574 options count: 0
575 575 :b2x:output:
576 576 mandatory: 0
577 577 advisory: 1
578 578 payload: 217 bytes
579 579 :b2x:output:
580 580 mandatory: 0
581 581 advisory: 1
582 582 payload: 201 bytes
583 583 :test:pong:
584 584 mandatory: 1
585 585 advisory: 0
586 586 payload: 0 bytes
587 587 :b2x:output:
588 588 mandatory: 0
589 589 advisory: 1
590 590 payload: 61 bytes
591 591 parts count: 4
592 592
593 593 Unbundle the reply to get the output:
594 594
595 595 $ hg unbundle2 < ../reply.hg2
596 596 remote: The choir starts singing:
597 597 remote: Patali Dirapata, Cromda Cromda Ripalo, Pata Pata, Ko Ko Ko
598 598 remote: Bokoro Dipoulito, Rondi Rondi Pepino, Pata Pata, Ko Ko Ko
599 599 remote: Emana Karassoli, Loucra Loucra Ponponto, Pata Pata, Ko Ko Ko.
600 600 remote: debugreply: capabilities:
601 601 remote: debugreply: 'city=!'
602 602 remote: debugreply: 'celeste,ville'
603 603 remote: debugreply: 'elephants'
604 604 remote: debugreply: 'babar'
605 605 remote: debugreply: 'celeste'
606 606 remote: debugreply: 'ping-pong'
607 607 remote: received ping request (id 6)
608 608 remote: replying to ping request (id 6)
609 609 0 unread bytes
610 610
611 611 Test push race detection
612 612
613 613 $ hg bundle2 --pushrace ../part-race.hg2
614 614
615 615 $ hg unbundle2 < ../part-race.hg2
616 616 0 unread bytes
617 617 abort: push race: repository changed while pushing - please try again
618 618 [255]
619 619
620 620 Support for changegroup
621 621 ===================================
622 622
623 623 $ hg unbundle $TESTDIR/bundles/rebase.hg
624 624 adding changesets
625 625 adding manifests
626 626 adding file changes
627 627 added 8 changesets with 7 changes to 7 files (+3 heads)
628 628 (run 'hg heads' to see heads, 'hg merge' to merge)
629 629
630 630 $ hg log -G
631 631 o changeset: 8:02de42196ebe
632 632 | tag: tip
633 633 | parent: 6:24b6387c8c8c
634 634 | user: Nicolas Dumazet <nicdumz.commits@gmail.com>
635 635 | date: Sat Apr 30 15:24:48 2011 +0200
636 636 | summary: H
637 637 |
638 638 | o changeset: 7:eea13746799a
639 639 |/| parent: 6:24b6387c8c8c
640 640 | | parent: 5:9520eea781bc
641 641 | | user: Nicolas Dumazet <nicdumz.commits@gmail.com>
642 642 | | date: Sat Apr 30 15:24:48 2011 +0200
643 643 | | summary: G
644 644 | |
645 645 o | changeset: 6:24b6387c8c8c
646 646 | | parent: 1:cd010b8cd998
647 647 | | user: Nicolas Dumazet <nicdumz.commits@gmail.com>
648 648 | | date: Sat Apr 30 15:24:48 2011 +0200
649 649 | | summary: F
650 650 | |
651 651 | o changeset: 5:9520eea781bc
652 652 |/ parent: 1:cd010b8cd998
653 653 | user: Nicolas Dumazet <nicdumz.commits@gmail.com>
654 654 | date: Sat Apr 30 15:24:48 2011 +0200
655 655 | summary: E
656 656 |
657 657 | o changeset: 4:32af7686d403
658 658 | | user: Nicolas Dumazet <nicdumz.commits@gmail.com>
659 659 | | date: Sat Apr 30 15:24:48 2011 +0200
660 660 | | summary: D
661 661 | |
662 662 | o changeset: 3:5fddd98957c8
663 663 | | user: Nicolas Dumazet <nicdumz.commits@gmail.com>
664 664 | | date: Sat Apr 30 15:24:48 2011 +0200
665 665 | | summary: C
666 666 | |
667 667 | o changeset: 2:42ccdea3bb16
668 668 |/ user: Nicolas Dumazet <nicdumz.commits@gmail.com>
669 669 | date: Sat Apr 30 15:24:48 2011 +0200
670 670 | summary: B
671 671 |
672 672 o changeset: 1:cd010b8cd998
673 673 parent: -1:000000000000
674 674 user: Nicolas Dumazet <nicdumz.commits@gmail.com>
675 675 date: Sat Apr 30 15:24:48 2011 +0200
676 676 summary: A
677 677
678 678 @ changeset: 0:3903775176ed
679 679 user: test
680 680 date: Thu Jan 01 00:00:00 1970 +0000
681 681 summary: a
682 682
683 683
684 684 $ hg bundle2 --debug --rev '8+7+5+4' ../rev.hg2
685 685 4 changesets found
686 686 list of changesets:
687 687 32af7686d403cf45b5d95f2d70cebea587ac806a
688 688 9520eea781bcca16c1e15acc0ba14335a0e8e5ba
689 689 eea13746799a9e0bfd88f29d3c2e9dc9389f524f
690 690 02de42196ebee42ef284b6780a87cdc96e8eaab6
691 691 start emission of HG2X stream
692 692 bundle parameter:
693 693 start of parts
694 694 bundle part: "b2x:changegroup"
695 695 bundling: 1/4 changesets (25.00%)
696 696 bundling: 2/4 changesets (50.00%)
697 697 bundling: 3/4 changesets (75.00%)
698 698 bundling: 4/4 changesets (100.00%)
699 699 bundling: 1/4 manifests (25.00%)
700 700 bundling: 2/4 manifests (50.00%)
701 701 bundling: 3/4 manifests (75.00%)
702 702 bundling: 4/4 manifests (100.00%)
703 703 bundling: D 1/3 files (33.33%)
704 704 bundling: E 2/3 files (66.67%)
705 705 bundling: H 3/3 files (100.00%)
706 706 end of bundle
707 707
708 708 $ cat ../rev.hg2
709 709 HG2X\x00\x00\x00\x16\x0fb2x:changegroup\x00\x00\x00\x00\x00\x00\x00\x00\x06\x13\x00\x00\x00\xa42\xafv\x86\xd4\x03\xcfE\xb5\xd9_-p\xce\xbe\xa5\x87\xac\x80j_\xdd\xd9\x89W\xc8\xa5JMCm\xfe\x1d\xa9\xd8\x7f!\xa1\xb9{\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x002\xafv\x86\xd4\x03\xcfE\xb5\xd9_-p\xce\xbe\xa5\x87\xac\x80j\x00\x00\x00\x00\x00\x00\x00)\x00\x00\x00)6e1f4c47ecb533ffd0c8e52cdc88afb6cd39e20c (esc)
710 710 \x00\x00\x00f\x00\x00\x00h\x00\x00\x00\x02D (esc)
711 711 \x00\x00\x00i\x00\x00\x00j\x00\x00\x00\x01D\x00\x00\x00\xa4\x95 \xee\xa7\x81\xbc\xca\x16\xc1\xe1Z\xcc\x0b\xa1C5\xa0\xe8\xe5\xba\xcd\x01\x0b\x8c\xd9\x98\xf3\x98\x1aZ\x81\x15\xf9O\x8d\xa4\xabP`\x89\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x95 \xee\xa7\x81\xbc\xca\x16\xc1\xe1Z\xcc\x0b\xa1C5\xa0\xe8\xe5\xba\x00\x00\x00\x00\x00\x00\x00)\x00\x00\x00)4dece9c826f69490507b98c6383a3009b295837d (esc)
712 712 \x00\x00\x00f\x00\x00\x00h\x00\x00\x00\x02E (esc)
713 713 \x00\x00\x00i\x00\x00\x00j\x00\x00\x00\x01E\x00\x00\x00\xa2\xee\xa17Fy\x9a\x9e\x0b\xfd\x88\xf2\x9d<.\x9d\xc98\x9fRO$\xb68|\x8c\x8c\xae7\x17\x88\x80\xf3\xfa\x95\xde\xd3\xcb\x1c\xf7\x85\x95 \xee\xa7\x81\xbc\xca\x16\xc1\xe1Z\xcc\x0b\xa1C5\xa0\xe8\xe5\xba\xee\xa17Fy\x9a\x9e\x0b\xfd\x88\xf2\x9d<.\x9d\xc98\x9fRO\x00\x00\x00\x00\x00\x00\x00)\x00\x00\x00)365b93d57fdf4814e2b5911d6bacff2b12014441 (esc)
714 714 \x00\x00\x00f\x00\x00\x00h\x00\x00\x00\x00\x00\x00\x00i\x00\x00\x00j\x00\x00\x00\x01G\x00\x00\x00\xa4\x02\xdeB\x19n\xbe\xe4.\xf2\x84\xb6x (esc)
715 715 \x87\xcd\xc9n\x8e\xaa\xb6$\xb68|\x8c\x8c\xae7\x17\x88\x80\xf3\xfa\x95\xde\xd3\xcb\x1c\xf7\x85\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\xdeB\x19n\xbe\xe4.\xf2\x84\xb6x (esc)
716 716 \x87\xcd\xc9n\x8e\xaa\xb6\x00\x00\x00\x00\x00\x00\x00)\x00\x00\x00)8bee48edc7318541fc0013ee41b089276a8c24bf (esc)
717 717 \x00\x00\x00f\x00\x00\x00f\x00\x00\x00\x02H (esc)
718 718 \x00\x00\x00g\x00\x00\x00h\x00\x00\x00\x01H\x00\x00\x00\x00\x00\x00\x00\x8bn\x1fLG\xec\xb53\xff\xd0\xc8\xe5,\xdc\x88\xaf\xb6\xcd9\xe2\x0cf\xa5\xa0\x18\x17\xfd\xf5#\x9c'8\x02\xb5\xb7a\x8d\x05\x1c\x89\xe4\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x002\xafv\x86\xd4\x03\xcfE\xb5\xd9_-p\xce\xbe\xa5\x87\xac\x80j\x00\x00\x00\x81\x00\x00\x00\x81\x00\x00\x00+D\x00c3f1ca2924c16a19b0656a84900e504e5b0aec2d (esc)
719 719 \x00\x00\x00\x8bM\xec\xe9\xc8&\xf6\x94\x90P{\x98\xc68:0 \xb2\x95\x83}\x00}\x8c\x9d\x88\x84\x13%\xf5\xc6\xb0cq\xb3[N\x8a+\x1a\x83\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x95 \xee\xa7\x81\xbc\xca\x16\xc1\xe1Z\xcc\x0b\xa1C5\xa0\xe8\xe5\xba\x00\x00\x00+\x00\x00\x00\xac\x00\x00\x00+E\x009c6fd0350a6c0d0c49d4a9c5017cf07043f54e58 (esc)
720 720 \x00\x00\x00\x8b6[\x93\xd5\x7f\xdfH\x14\xe2\xb5\x91\x1dk\xac\xff+\x12\x01DA(\xa5\x84\xc6^\xf1!\xf8\x9e\xb6j\xb7\xd0\xbc\x15=\x80\x99\xe7\xceM\xec\xe9\xc8&\xf6\x94\x90P{\x98\xc68:0 \xb2\x95\x83}\xee\xa17Fy\x9a\x9e\x0b\xfd\x88\xf2\x9d<.\x9d\xc98\x9fRO\x00\x00\x00V\x00\x00\x00V\x00\x00\x00+F\x0022bfcfd62a21a3287edbd4d656218d0f525ed76a (esc)
721 721 \x00\x00\x00\x97\x8b\xeeH\xed\xc71\x85A\xfc\x00\x13\xeeA\xb0\x89'j\x8c$\xbf(\xa5\x84\xc6^\xf1!\xf8\x9e\xb6j\xb7\xd0\xbc\x15=\x80\x99\xe7\xce\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\xdeB\x19n\xbe\xe4.\xf2\x84\xb6x (esc)
722 722 \x87\xcd\xc9n\x8e\xaa\xb6\x00\x00\x00+\x00\x00\x00V\x00\x00\x00\x00\x00\x00\x00\x81\x00\x00\x00\x81\x00\x00\x00+H\x008500189e74a9e0475e822093bc7db0d631aeb0b4 (esc)
723 723 \x00\x00\x00\x00\x00\x00\x00\x05D\x00\x00\x00b\xc3\xf1\xca)$\xc1j\x19\xb0ej\x84\x90\x0ePN[ (esc)
724 724 \xec-\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x002\xafv\x86\xd4\x03\xcfE\xb5\xd9_-p\xce\xbe\xa5\x87\xac\x80j\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02D (esc)
725 725 \x00\x00\x00\x00\x00\x00\x00\x05E\x00\x00\x00b\x9co\xd05 (esc)
726 726 l\r (no-eol) (esc)
727 727 \x0cI\xd4\xa9\xc5\x01|\xf0pC\xf5NX\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x95 \xee\xa7\x81\xbc\xca\x16\xc1\xe1Z\xcc\x0b\xa1C5\xa0\xe8\xe5\xba\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02E (esc)
728 728 \x00\x00\x00\x00\x00\x00\x00\x05H\x00\x00\x00b\x85\x00\x18\x9et\xa9\xe0G^\x82 \x93\xbc}\xb0\xd61\xae\xb0\xb4\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\xdeB\x19n\xbe\xe4.\xf2\x84\xb6x (esc)
729 729 \x87\xcd\xc9n\x8e\xaa\xb6\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02H (esc)
730 730 \x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00 (no-eol) (esc)
731 731
732 732 $ hg unbundle2 < ../rev.hg2
733 733 adding changesets
734 734 adding manifests
735 735 adding file changes
736 736 added 0 changesets with 0 changes to 3 files
737 737 0 unread bytes
738 738 addchangegroup return: 1
739 739
740 740 with reply
741 741
742 742 $ hg bundle2 --rev '8+7+5+4' --reply ../rev-rr.hg2
743 743 $ hg unbundle2 ../rev-reply.hg2 < ../rev-rr.hg2
744 744 0 unread bytes
745 745 addchangegroup return: 1
746 746
747 747 $ cat ../rev-reply.hg2
748 748 HG2X\x00\x00\x003\x15b2x:reply:changegroup\x00\x00\x00\x00\x00\x02\x0b\x01\x06\x01in-reply-to1return1\x00\x00\x00\x00\x00\x1f (esc)
749 749 b2x:output\x00\x00\x00\x01\x00\x01\x0b\x01in-reply-to1\x00\x00\x00dadding changesets (esc)
750 750 adding manifests
751 751 adding file changes
752 752 added 0 changesets with 0 changes to 3 files
753 753 \x00\x00\x00\x00\x00\x00 (no-eol) (esc)
754 754
755 755 Real world exchange
756 756 =====================
757 757
758 758
759 759 clone --pull
760 760
761 761 $ cd ..
762 762 $ hg clone main other --pull --rev 9520eea781bc
763 763 adding changesets
764 764 adding manifests
765 765 adding file changes
766 766 added 2 changesets with 2 changes to 2 files
767 767 updating to branch default
768 768 2 files updated, 0 files merged, 0 files removed, 0 files unresolved
769 769 $ hg -R other log -G
770 770 @ changeset: 1:9520eea781bc
771 771 | tag: tip
772 772 | user: Nicolas Dumazet <nicdumz.commits@gmail.com>
773 773 | date: Sat Apr 30 15:24:48 2011 +0200
774 774 | summary: E
775 775 |
776 776 o changeset: 0:cd010b8cd998
777 777 user: Nicolas Dumazet <nicdumz.commits@gmail.com>
778 778 date: Sat Apr 30 15:24:48 2011 +0200
779 779 summary: A
780 780
781 781
782 782 pull
783 783
784 784 $ hg -R other pull -r 24b6387c8c8c
785 785 pulling from $TESTTMP/main (glob)
786 786 searching for changes
787 787 adding changesets
788 788 adding manifests
789 789 adding file changes
790 790 added 1 changesets with 1 changes to 1 files (+1 heads)
791 791 (run 'hg heads' to see heads, 'hg merge' to merge)
792 792
793 793 pull empty
794 794
795 795 $ hg -R other pull -r 24b6387c8c8c
796 796 pulling from $TESTTMP/main (glob)
797 797 no changes found
798 798
799 799 push
800 800
801 801 $ hg -R main push other --rev eea13746799a
802 802 pushing to other
803 803 searching for changes
804 804 remote: adding changesets
805 805 remote: adding manifests
806 806 remote: adding file changes
807 807 remote: added 1 changesets with 0 changes to 0 files (-1 heads)
808 808
809 809 pull over ssh
810 810
811 811 $ hg -R other pull ssh://user@dummy/main -r 02de42196ebe --traceback
812 812 pulling from ssh://user@dummy/main
813 813 searching for changes
814 814 adding changesets
815 815 adding manifests
816 816 adding file changes
817 817 added 1 changesets with 1 changes to 1 files (+1 heads)
818 818 (run 'hg heads' to see heads, 'hg merge' to merge)
819 819
820 820 pull over http
821 821
822 822 $ hg -R main serve -p $HGPORT -d --pid-file=main.pid -E main-error.log
823 823 $ cat main.pid >> $DAEMON_PIDS
824 824
825 825 $ hg -R other pull http://localhost:$HGPORT/ -r 42ccdea3bb16
826 826 pulling from http://localhost:$HGPORT/
827 827 searching for changes
828 828 adding changesets
829 829 adding manifests
830 830 adding file changes
831 831 added 1 changesets with 1 changes to 1 files (+1 heads)
832 832 (run 'hg heads .' to see heads, 'hg merge' to merge)
833 833 $ cat main-error.log
834 834
835 835 push over ssh
836 836
837 837 $ hg -R main push ssh://user@dummy/other -r 5fddd98957c8
838 838 pushing to ssh://user@dummy/other
839 839 searching for changes
840 840 remote: adding changesets
841 841 remote: adding manifests
842 842 remote: adding file changes
843 843 remote: added 1 changesets with 1 changes to 1 files
844 844
845 845 push over http
846 846
847 847 $ hg -R other serve -p $HGPORT2 -d --pid-file=other.pid -E other-error.log
848 848 $ cat other.pid >> $DAEMON_PIDS
849 849
850 850 $ hg -R main push http://localhost:$HGPORT2/ -r 32af7686d403
851 851 pushing to http://localhost:$HGPORT2/
852 852 searching for changes
853 853 remote: adding changesets
854 854 remote: adding manifests
855 855 remote: adding file changes
856 856 remote: added 1 changesets with 1 changes to 1 files
857 857 $ cat other-error.log
858 858
859 859 Check final content.
860 860
861 861 $ hg -R other log -G
862 862 o changeset: 7:32af7686d403
863 863 | tag: tip
864 864 | user: Nicolas Dumazet <nicdumz.commits@gmail.com>
865 865 | date: Sat Apr 30 15:24:48 2011 +0200
866 866 | summary: D
867 867 |
868 868 o changeset: 6:5fddd98957c8
869 869 | user: Nicolas Dumazet <nicdumz.commits@gmail.com>
870 870 | date: Sat Apr 30 15:24:48 2011 +0200
871 871 | summary: C
872 872 |
873 873 o changeset: 5:42ccdea3bb16
874 874 | parent: 0:cd010b8cd998
875 875 | user: Nicolas Dumazet <nicdumz.commits@gmail.com>
876 876 | date: Sat Apr 30 15:24:48 2011 +0200
877 877 | summary: B
878 878 |
879 879 | o changeset: 4:02de42196ebe
880 880 | | parent: 2:24b6387c8c8c
881 881 | | user: Nicolas Dumazet <nicdumz.commits@gmail.com>
882 882 | | date: Sat Apr 30 15:24:48 2011 +0200
883 883 | | summary: H
884 884 | |
885 885 | | o changeset: 3:eea13746799a
886 886 | |/| parent: 2:24b6387c8c8c
887 887 | | | parent: 1:9520eea781bc
888 888 | | | user: Nicolas Dumazet <nicdumz.commits@gmail.com>
889 889 | | | date: Sat Apr 30 15:24:48 2011 +0200
890 890 | | | summary: G
891 891 | | |
892 892 | o | changeset: 2:24b6387c8c8c
893 893 |/ / parent: 0:cd010b8cd998
894 894 | | user: Nicolas Dumazet <nicdumz.commits@gmail.com>
895 895 | | date: Sat Apr 30 15:24:48 2011 +0200
896 896 | | summary: F
897 897 | |
898 898 | @ changeset: 1:9520eea781bc
899 899 |/ user: Nicolas Dumazet <nicdumz.commits@gmail.com>
900 900 | date: Sat Apr 30 15:24:48 2011 +0200
901 901 | summary: E
902 902 |
903 903 o changeset: 0:cd010b8cd998
904 904 user: Nicolas Dumazet <nicdumz.commits@gmail.com>
905 905 date: Sat Apr 30 15:24:48 2011 +0200
906 906 summary: A
907 907
908 908
909 909 Error Handling
910 910 ==============
911 911
912 912 Check that errors are properly returned to the client during push.
913 913
914 914 Setting up
915 915
916 916 $ cat > failpush.py << EOF
917 917 > """A small extension that makes push fails when using bundle2
918 918 >
919 919 > used to test error handling in bundle2
920 920 > """
921 921 >
922 922 > from mercurial import util
923 923 > from mercurial import bundle2
924 924 > from mercurial import exchange
925 925 > from mercurial import extensions
926 926 >
927 927 > def _pushbundle2failpart(orig, pushop, bundler):
928 928 > extradata = orig(pushop, bundler)
929 929 > reason = pushop.ui.config('failpush', 'reason', None)
930 930 > part = None
931 931 > if reason == 'abort':
932 932 > bundler.newpart('test:abort')
933 933 > if reason == 'unknown':
934 934 > bundler.newpart('TEST:UNKNOWN')
935 935 > if reason == 'race':
936 936 > # 20 Bytes of crap
937 937 > bundler.newpart('b2x:check:heads', data='01234567890123456789')
938 938 > return extradata
939 939 >
940 940 > @bundle2.parthandler("test:abort")
941 941 > def handleabort(op, part):
942 942 > raise util.Abort('Abandon ship!', hint="don't panic")
943 943 >
944 944 > def uisetup(ui):
945 945 > extensions.wrapfunction(exchange, '_pushbundle2extraparts', _pushbundle2failpart)
946 946 >
947 947 > EOF
948 948
949 949 $ cd main
950 950 $ hg up tip
951 951 3 files updated, 0 files merged, 1 files removed, 0 files unresolved
952 952 $ echo 'I' > I
953 953 $ hg add I
954 954 $ hg ci -m 'I'
955 955 $ hg id
956 956 e7ec4e813ba6 tip
957 957 $ cd ..
958 958
959 959 $ cat << EOF >> $HGRCPATH
960 960 > [extensions]
961 961 > failpush=$TESTTMP/failpush.py
962 962 > EOF
963 963
964 964 $ "$TESTDIR/killdaemons.py" $DAEMON_PIDS
965 965 $ hg -R other serve -p $HGPORT2 -d --pid-file=other.pid -E other-error.log
966 966 $ cat other.pid >> $DAEMON_PIDS
967 967
968 968 Doing the actual push: Abort error
969 969
970 970 $ cat << EOF >> $HGRCPATH
971 971 > [failpush]
972 972 > reason = abort
973 973 > EOF
974 974
975 975 $ hg -R main push other -r e7ec4e813ba6
976 976 pushing to other
977 977 searching for changes
978 978 abort: Abandon ship!
979 979 (don't panic)
980 980 [255]
981 981
982 982 $ hg -R main push ssh://user@dummy/other -r e7ec4e813ba6
983 983 pushing to ssh://user@dummy/other
984 984 searching for changes
985 985 abort: Abandon ship!
986 986 (don't panic)
987 987 [255]
988 988
989 989 $ hg -R main push http://localhost:$HGPORT2/ -r e7ec4e813ba6
990 990 pushing to http://localhost:$HGPORT2/
991 991 searching for changes
992 992 abort: Abandon ship!
993 993 (don't panic)
994 994 [255]
995 995
996 996
997 997 Doing the actual push: unknown mandatory parts
998 998
999 999 $ cat << EOF >> $HGRCPATH
1000 1000 > [failpush]
1001 1001 > reason = unknown
1002 1002 > EOF
1003 1003
1004 1004 $ hg -R main push other -r e7ec4e813ba6
1005 1005 pushing to other
1006 1006 searching for changes
1007 abort: missing support for 'test:unknown'
1007 abort: missing support for test:unknown
1008 1008 [255]
1009 1009
1010 1010 $ hg -R main push ssh://user@dummy/other -r e7ec4e813ba6
1011 1011 pushing to ssh://user@dummy/other
1012 1012 searching for changes
1013 abort: missing support for "'test:unknown'"
1013 abort: missing support for test:unknown
1014 1014 [255]
1015 1015
1016 1016 $ hg -R main push http://localhost:$HGPORT2/ -r e7ec4e813ba6
1017 1017 pushing to http://localhost:$HGPORT2/
1018 1018 searching for changes
1019 abort: missing support for "'test:unknown'"
1019 abort: missing support for test:unknown
1020 1020 [255]
1021 1021
1022 1022 Doing the actual push: race
1023 1023
1024 1024 $ cat << EOF >> $HGRCPATH
1025 1025 > [failpush]
1026 1026 > reason = race
1027 1027 > EOF
1028 1028
1029 1029 $ hg -R main push other -r e7ec4e813ba6
1030 1030 pushing to other
1031 1031 searching for changes
1032 1032 abort: push failed:
1033 1033 'repository changed while pushing - please try again'
1034 1034 [255]
1035 1035
1036 1036 $ hg -R main push ssh://user@dummy/other -r e7ec4e813ba6
1037 1037 pushing to ssh://user@dummy/other
1038 1038 searching for changes
1039 1039 abort: push failed:
1040 1040 'repository changed while pushing - please try again'
1041 1041 [255]
1042 1042
1043 1043 $ hg -R main push http://localhost:$HGPORT2/ -r e7ec4e813ba6
1044 1044 pushing to http://localhost:$HGPORT2/
1045 1045 searching for changes
1046 1046 abort: push failed:
1047 1047 'repository changed while pushing - please try again'
1048 1048 [255]
1049 1049
1050 1050 Doing the actual push: hook abort
1051 1051
1052 1052 $ cat << EOF >> $HGRCPATH
1053 1053 > [failpush]
1054 1054 > reason =
1055 1055 > [hooks]
1056 1056 > b2x-pretransactionclose.failpush = false
1057 1057 > EOF
1058 1058
1059 1059 $ "$TESTDIR/killdaemons.py" $DAEMON_PIDS
1060 1060 $ hg -R other serve -p $HGPORT2 -d --pid-file=other.pid -E other-error.log
1061 1061 $ cat other.pid >> $DAEMON_PIDS
1062 1062
1063 1063 $ hg -R main push other -r e7ec4e813ba6
1064 1064 pushing to other
1065 1065 searching for changes
1066 1066 transaction abort!
1067 1067 rollback completed
1068 1068 abort: b2x-pretransactionclose.failpush hook exited with status 1
1069 1069 [255]
1070 1070
1071 1071 $ hg -R main push ssh://user@dummy/other -r e7ec4e813ba6
1072 1072 pushing to ssh://user@dummy/other
1073 1073 searching for changes
1074 1074 abort: b2x-pretransactionclose.failpush hook exited with status 1
1075 1075 remote: transaction abort!
1076 1076 remote: rollback completed
1077 1077 [255]
1078 1078
1079 1079 $ hg -R main push http://localhost:$HGPORT2/ -r e7ec4e813ba6
1080 1080 pushing to http://localhost:$HGPORT2/
1081 1081 searching for changes
1082 1082 abort: b2x-pretransactionclose.failpush hook exited with status 1
1083 1083 [255]
1084 1084
1085 1085
General Comments 0
You need to be logged in to leave comments. Login now