##// END OF EJS Templates
bundle2: seek part back during iteration...
Durham Goode -
r33889:891118dc default
parent child Browse files
Show More
@@ -1,1895 +1,1899
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import, division
149 149
150 150 import errno
151 151 import re
152 152 import string
153 153 import struct
154 154 import sys
155 155
156 156 from .i18n import _
157 157 from . import (
158 158 changegroup,
159 159 error,
160 160 obsolete,
161 161 phases,
162 162 pushkey,
163 163 pycompat,
164 164 tags,
165 165 url,
166 166 util,
167 167 )
168 168
169 169 urlerr = util.urlerr
170 170 urlreq = util.urlreq
171 171
172 172 _pack = struct.pack
173 173 _unpack = struct.unpack
174 174
175 175 _fstreamparamsize = '>i'
176 176 _fpartheadersize = '>i'
177 177 _fparttypesize = '>B'
178 178 _fpartid = '>I'
179 179 _fpayloadsize = '>i'
180 180 _fpartparamcount = '>BB'
181 181
182 182 _fphasesentry = '>i20s'
183 183
184 184 preferedchunksize = 4096
185 185
186 186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
187 187
188 188 def outdebug(ui, message):
189 189 """debug regarding output stream (bundling)"""
190 190 if ui.configbool('devel', 'bundle2.debug'):
191 191 ui.debug('bundle2-output: %s\n' % message)
192 192
193 193 def indebug(ui, message):
194 194 """debug on input stream (unbundling)"""
195 195 if ui.configbool('devel', 'bundle2.debug'):
196 196 ui.debug('bundle2-input: %s\n' % message)
197 197
198 198 def validateparttype(parttype):
199 199 """raise ValueError if a parttype contains invalid character"""
200 200 if _parttypeforbidden.search(parttype):
201 201 raise ValueError(parttype)
202 202
203 203 def _makefpartparamsizes(nbparams):
204 204 """return a struct format to read part parameter sizes
205 205
206 206 The number parameters is variable so we need to build that format
207 207 dynamically.
208 208 """
209 209 return '>'+('BB'*nbparams)
210 210
211 211 parthandlermapping = {}
212 212
213 213 def parthandler(parttype, params=()):
214 214 """decorator that register a function as a bundle2 part handler
215 215
216 216 eg::
217 217
218 218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
219 219 def myparttypehandler(...):
220 220 '''process a part of type "my part".'''
221 221 ...
222 222 """
223 223 validateparttype(parttype)
224 224 def _decorator(func):
225 225 lparttype = parttype.lower() # enforce lower case matching.
226 226 assert lparttype not in parthandlermapping
227 227 parthandlermapping[lparttype] = func
228 228 func.params = frozenset(params)
229 229 return func
230 230 return _decorator
231 231
232 232 class unbundlerecords(object):
233 233 """keep record of what happens during and unbundle
234 234
235 235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
236 236 category of record and obj is an arbitrary object.
237 237
238 238 `records['cat']` will return all entries of this category 'cat'.
239 239
240 240 Iterating on the object itself will yield `('category', obj)` tuples
241 241 for all entries.
242 242
243 243 All iterations happens in chronological order.
244 244 """
245 245
246 246 def __init__(self):
247 247 self._categories = {}
248 248 self._sequences = []
249 249 self._replies = {}
250 250
251 251 def add(self, category, entry, inreplyto=None):
252 252 """add a new record of a given category.
253 253
254 254 The entry can then be retrieved in the list returned by
255 255 self['category']."""
256 256 self._categories.setdefault(category, []).append(entry)
257 257 self._sequences.append((category, entry))
258 258 if inreplyto is not None:
259 259 self.getreplies(inreplyto).add(category, entry)
260 260
261 261 def getreplies(self, partid):
262 262 """get the records that are replies to a specific part"""
263 263 return self._replies.setdefault(partid, unbundlerecords())
264 264
265 265 def __getitem__(self, cat):
266 266 return tuple(self._categories.get(cat, ()))
267 267
268 268 def __iter__(self):
269 269 return iter(self._sequences)
270 270
271 271 def __len__(self):
272 272 return len(self._sequences)
273 273
274 274 def __nonzero__(self):
275 275 return bool(self._sequences)
276 276
277 277 __bool__ = __nonzero__
278 278
279 279 class bundleoperation(object):
280 280 """an object that represents a single bundling process
281 281
282 282 Its purpose is to carry unbundle-related objects and states.
283 283
284 284 A new object should be created at the beginning of each bundle processing.
285 285 The object is to be returned by the processing function.
286 286
287 287 The object has very little content now it will ultimately contain:
288 288 * an access to the repo the bundle is applied to,
289 289 * a ui object,
290 290 * a way to retrieve a transaction to add changes to the repo,
291 291 * a way to record the result of processing each part,
292 292 * a way to construct a bundle response when applicable.
293 293 """
294 294
295 295 def __init__(self, repo, transactiongetter, captureoutput=True):
296 296 self.repo = repo
297 297 self.ui = repo.ui
298 298 self.records = unbundlerecords()
299 299 self.reply = None
300 300 self.captureoutput = captureoutput
301 301 self.hookargs = {}
302 302 self._gettransaction = transactiongetter
303 303
304 304 def gettransaction(self):
305 305 transaction = self._gettransaction()
306 306
307 307 if self.hookargs:
308 308 # the ones added to the transaction supercede those added
309 309 # to the operation.
310 310 self.hookargs.update(transaction.hookargs)
311 311 transaction.hookargs = self.hookargs
312 312
313 313 # mark the hookargs as flushed. further attempts to add to
314 314 # hookargs will result in an abort.
315 315 self.hookargs = None
316 316
317 317 return transaction
318 318
319 319 def addhookargs(self, hookargs):
320 320 if self.hookargs is None:
321 321 raise error.ProgrammingError('attempted to add hookargs to '
322 322 'operation after transaction started')
323 323 self.hookargs.update(hookargs)
324 324
325 325 class TransactionUnavailable(RuntimeError):
326 326 pass
327 327
328 328 def _notransaction():
329 329 """default method to get a transaction while processing a bundle
330 330
331 331 Raise an exception to highlight the fact that no transaction was expected
332 332 to be created"""
333 333 raise TransactionUnavailable()
334 334
335 335 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
336 336 # transform me into unbundler.apply() as soon as the freeze is lifted
337 337 if isinstance(unbundler, unbundle20):
338 338 tr.hookargs['bundle2'] = '1'
339 339 if source is not None and 'source' not in tr.hookargs:
340 340 tr.hookargs['source'] = source
341 341 if url is not None and 'url' not in tr.hookargs:
342 342 tr.hookargs['url'] = url
343 343 return processbundle(repo, unbundler, lambda: tr)
344 344 else:
345 345 # the transactiongetter won't be used, but we might as well set it
346 346 op = bundleoperation(repo, lambda: tr)
347 347 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
348 348 return op
349 349
350 350 def processbundle(repo, unbundler, transactiongetter=None, op=None):
351 351 """This function process a bundle, apply effect to/from a repo
352 352
353 353 It iterates over each part then searches for and uses the proper handling
354 354 code to process the part. Parts are processed in order.
355 355
356 356 Unknown Mandatory part will abort the process.
357 357
358 358 It is temporarily possible to provide a prebuilt bundleoperation to the
359 359 function. This is used to ensure output is properly propagated in case of
360 360 an error during the unbundling. This output capturing part will likely be
361 361 reworked and this ability will probably go away in the process.
362 362 """
363 363 if op is None:
364 364 if transactiongetter is None:
365 365 transactiongetter = _notransaction
366 366 op = bundleoperation(repo, transactiongetter)
367 367 # todo:
368 368 # - replace this is a init function soon.
369 369 # - exception catching
370 370 unbundler.params
371 371 if repo.ui.debugflag:
372 372 msg = ['bundle2-input-bundle:']
373 373 if unbundler.params:
374 374 msg.append(' %i params' % len(unbundler.params))
375 375 if op._gettransaction is None or op._gettransaction is _notransaction:
376 376 msg.append(' no-transaction')
377 377 else:
378 378 msg.append(' with-transaction')
379 379 msg.append('\n')
380 380 repo.ui.debug(''.join(msg))
381 381 iterparts = enumerate(unbundler.iterparts())
382 382 part = None
383 383 nbpart = 0
384 384 try:
385 385 for nbpart, part in iterparts:
386 386 _processpart(op, part)
387 387 except Exception as exc:
388 388 # Any exceptions seeking to the end of the bundle at this point are
389 389 # almost certainly related to the underlying stream being bad.
390 390 # And, chances are that the exception we're handling is related to
391 391 # getting in that bad state. So, we swallow the seeking error and
392 392 # re-raise the original error.
393 393 seekerror = False
394 394 try:
395 395 for nbpart, part in iterparts:
396 396 # consume the bundle content
397 397 part.seek(0, 2)
398 398 except Exception:
399 399 seekerror = True
400 400
401 401 # Small hack to let caller code distinguish exceptions from bundle2
402 402 # processing from processing the old format. This is mostly
403 403 # needed to handle different return codes to unbundle according to the
404 404 # type of bundle. We should probably clean up or drop this return code
405 405 # craziness in a future version.
406 406 exc.duringunbundle2 = True
407 407 salvaged = []
408 408 replycaps = None
409 409 if op.reply is not None:
410 410 salvaged = op.reply.salvageoutput()
411 411 replycaps = op.reply.capabilities
412 412 exc._replycaps = replycaps
413 413 exc._bundle2salvagedoutput = salvaged
414 414
415 415 # Re-raising from a variable loses the original stack. So only use
416 416 # that form if we need to.
417 417 if seekerror:
418 418 raise exc
419 419 else:
420 420 raise
421 421 finally:
422 422 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
423 423
424 424 return op
425 425
426 426 def _processchangegroup(op, cg, tr, source, url, **kwargs):
427 427 ret = cg.apply(op.repo, tr, source, url, **kwargs)
428 428 op.records.add('changegroup', {
429 429 'return': ret,
430 430 })
431 431 return ret
432 432
433 433 def _processpart(op, part):
434 434 """process a single part from a bundle
435 435
436 436 The part is guaranteed to have been fully consumed when the function exits
437 437 (even if an exception is raised)."""
438 438 status = 'unknown' # used by debug output
439 439 hardabort = False
440 440 try:
441 441 try:
442 442 handler = parthandlermapping.get(part.type)
443 443 if handler is None:
444 444 status = 'unsupported-type'
445 445 raise error.BundleUnknownFeatureError(parttype=part.type)
446 446 indebug(op.ui, 'found a handler for part %r' % part.type)
447 447 unknownparams = part.mandatorykeys - handler.params
448 448 if unknownparams:
449 449 unknownparams = list(unknownparams)
450 450 unknownparams.sort()
451 451 status = 'unsupported-params (%s)' % unknownparams
452 452 raise error.BundleUnknownFeatureError(parttype=part.type,
453 453 params=unknownparams)
454 454 status = 'supported'
455 455 except error.BundleUnknownFeatureError as exc:
456 456 if part.mandatory: # mandatory parts
457 457 raise
458 458 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
459 459 return # skip to part processing
460 460 finally:
461 461 if op.ui.debugflag:
462 462 msg = ['bundle2-input-part: "%s"' % part.type]
463 463 if not part.mandatory:
464 464 msg.append(' (advisory)')
465 465 nbmp = len(part.mandatorykeys)
466 466 nbap = len(part.params) - nbmp
467 467 if nbmp or nbap:
468 468 msg.append(' (params:')
469 469 if nbmp:
470 470 msg.append(' %i mandatory' % nbmp)
471 471 if nbap:
472 472 msg.append(' %i advisory' % nbmp)
473 473 msg.append(')')
474 474 msg.append(' %s\n' % status)
475 475 op.ui.debug(''.join(msg))
476 476
477 477 # handler is called outside the above try block so that we don't
478 478 # risk catching KeyErrors from anything other than the
479 479 # parthandlermapping lookup (any KeyError raised by handler()
480 480 # itself represents a defect of a different variety).
481 481 output = None
482 482 if op.captureoutput and op.reply is not None:
483 483 op.ui.pushbuffer(error=True, subproc=True)
484 484 output = ''
485 485 try:
486 486 handler(op, part)
487 487 finally:
488 488 if output is not None:
489 489 output = op.ui.popbuffer()
490 490 if output:
491 491 outpart = op.reply.newpart('output', data=output,
492 492 mandatory=False)
493 493 outpart.addparam(
494 494 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
495 495 # If exiting or interrupted, do not attempt to seek the stream in the
496 496 # finally block below. This makes abort faster.
497 497 except (SystemExit, KeyboardInterrupt):
498 498 hardabort = True
499 499 raise
500 500 finally:
501 501 # consume the part content to not corrupt the stream.
502 502 if not hardabort:
503 503 part.seek(0, 2)
504 504
505 505
506 506 def decodecaps(blob):
507 507 """decode a bundle2 caps bytes blob into a dictionary
508 508
509 509 The blob is a list of capabilities (one per line)
510 510 Capabilities may have values using a line of the form::
511 511
512 512 capability=value1,value2,value3
513 513
514 514 The values are always a list."""
515 515 caps = {}
516 516 for line in blob.splitlines():
517 517 if not line:
518 518 continue
519 519 if '=' not in line:
520 520 key, vals = line, ()
521 521 else:
522 522 key, vals = line.split('=', 1)
523 523 vals = vals.split(',')
524 524 key = urlreq.unquote(key)
525 525 vals = [urlreq.unquote(v) for v in vals]
526 526 caps[key] = vals
527 527 return caps
528 528
529 529 def encodecaps(caps):
530 530 """encode a bundle2 caps dictionary into a bytes blob"""
531 531 chunks = []
532 532 for ca in sorted(caps):
533 533 vals = caps[ca]
534 534 ca = urlreq.quote(ca)
535 535 vals = [urlreq.quote(v) for v in vals]
536 536 if vals:
537 537 ca = "%s=%s" % (ca, ','.join(vals))
538 538 chunks.append(ca)
539 539 return '\n'.join(chunks)
540 540
541 541 bundletypes = {
542 542 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
543 543 # since the unification ssh accepts a header but there
544 544 # is no capability signaling it.
545 545 "HG20": (), # special-cased below
546 546 "HG10UN": ("HG10UN", 'UN'),
547 547 "HG10BZ": ("HG10", 'BZ'),
548 548 "HG10GZ": ("HG10GZ", 'GZ'),
549 549 }
550 550
551 551 # hgweb uses this list to communicate its preferred type
552 552 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
553 553
554 554 class bundle20(object):
555 555 """represent an outgoing bundle2 container
556 556
557 557 Use the `addparam` method to add stream level parameter. and `newpart` to
558 558 populate it. Then call `getchunks` to retrieve all the binary chunks of
559 559 data that compose the bundle2 container."""
560 560
561 561 _magicstring = 'HG20'
562 562
563 563 def __init__(self, ui, capabilities=()):
564 564 self.ui = ui
565 565 self._params = []
566 566 self._parts = []
567 567 self.capabilities = dict(capabilities)
568 568 self._compengine = util.compengines.forbundletype('UN')
569 569 self._compopts = None
570 570
571 571 def setcompression(self, alg, compopts=None):
572 572 """setup core part compression to <alg>"""
573 573 if alg in (None, 'UN'):
574 574 return
575 575 assert not any(n.lower() == 'compression' for n, v in self._params)
576 576 self.addparam('Compression', alg)
577 577 self._compengine = util.compengines.forbundletype(alg)
578 578 self._compopts = compopts
579 579
580 580 @property
581 581 def nbparts(self):
582 582 """total number of parts added to the bundler"""
583 583 return len(self._parts)
584 584
585 585 # methods used to defines the bundle2 content
586 586 def addparam(self, name, value=None):
587 587 """add a stream level parameter"""
588 588 if not name:
589 589 raise ValueError('empty parameter name')
590 590 if name[0] not in pycompat.bytestr(string.ascii_letters):
591 591 raise ValueError('non letter first character: %r' % name)
592 592 self._params.append((name, value))
593 593
594 594 def addpart(self, part):
595 595 """add a new part to the bundle2 container
596 596
597 597 Parts contains the actual applicative payload."""
598 598 assert part.id is None
599 599 part.id = len(self._parts) # very cheap counter
600 600 self._parts.append(part)
601 601
602 602 def newpart(self, typeid, *args, **kwargs):
603 603 """create a new part and add it to the containers
604 604
605 605 As the part is directly added to the containers. For now, this means
606 606 that any failure to properly initialize the part after calling
607 607 ``newpart`` should result in a failure of the whole bundling process.
608 608
609 609 You can still fall back to manually create and add if you need better
610 610 control."""
611 611 part = bundlepart(typeid, *args, **kwargs)
612 612 self.addpart(part)
613 613 return part
614 614
615 615 # methods used to generate the bundle2 stream
616 616 def getchunks(self):
617 617 if self.ui.debugflag:
618 618 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
619 619 if self._params:
620 620 msg.append(' (%i params)' % len(self._params))
621 621 msg.append(' %i parts total\n' % len(self._parts))
622 622 self.ui.debug(''.join(msg))
623 623 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
624 624 yield self._magicstring
625 625 param = self._paramchunk()
626 626 outdebug(self.ui, 'bundle parameter: %s' % param)
627 627 yield _pack(_fstreamparamsize, len(param))
628 628 if param:
629 629 yield param
630 630 for chunk in self._compengine.compressstream(self._getcorechunk(),
631 631 self._compopts):
632 632 yield chunk
633 633
634 634 def _paramchunk(self):
635 635 """return a encoded version of all stream parameters"""
636 636 blocks = []
637 637 for par, value in self._params:
638 638 par = urlreq.quote(par)
639 639 if value is not None:
640 640 value = urlreq.quote(value)
641 641 par = '%s=%s' % (par, value)
642 642 blocks.append(par)
643 643 return ' '.join(blocks)
644 644
645 645 def _getcorechunk(self):
646 646 """yield chunk for the core part of the bundle
647 647
648 648 (all but headers and parameters)"""
649 649 outdebug(self.ui, 'start of parts')
650 650 for part in self._parts:
651 651 outdebug(self.ui, 'bundle part: "%s"' % part.type)
652 652 for chunk in part.getchunks(ui=self.ui):
653 653 yield chunk
654 654 outdebug(self.ui, 'end of bundle')
655 655 yield _pack(_fpartheadersize, 0)
656 656
657 657
658 658 def salvageoutput(self):
659 659 """return a list with a copy of all output parts in the bundle
660 660
661 661 This is meant to be used during error handling to make sure we preserve
662 662 server output"""
663 663 salvaged = []
664 664 for part in self._parts:
665 665 if part.type.startswith('output'):
666 666 salvaged.append(part.copy())
667 667 return salvaged
668 668
669 669
670 670 class unpackermixin(object):
671 671 """A mixin to extract bytes and struct data from a stream"""
672 672
673 673 def __init__(self, fp):
674 674 self._fp = fp
675 675
676 676 def _unpack(self, format):
677 677 """unpack this struct format from the stream
678 678
679 679 This method is meant for internal usage by the bundle2 protocol only.
680 680 They directly manipulate the low level stream including bundle2 level
681 681 instruction.
682 682
683 683 Do not use it to implement higher-level logic or methods."""
684 684 data = self._readexact(struct.calcsize(format))
685 685 return _unpack(format, data)
686 686
687 687 def _readexact(self, size):
688 688 """read exactly <size> bytes from the stream
689 689
690 690 This method is meant for internal usage by the bundle2 protocol only.
691 691 They directly manipulate the low level stream including bundle2 level
692 692 instruction.
693 693
694 694 Do not use it to implement higher-level logic or methods."""
695 695 return changegroup.readexactly(self._fp, size)
696 696
697 697 def getunbundler(ui, fp, magicstring=None):
698 698 """return a valid unbundler object for a given magicstring"""
699 699 if magicstring is None:
700 700 magicstring = changegroup.readexactly(fp, 4)
701 701 magic, version = magicstring[0:2], magicstring[2:4]
702 702 if magic != 'HG':
703 703 ui.debug(
704 704 "error: invalid magic: %r (version %r), should be 'HG'\n"
705 705 % (magic, version))
706 706 raise error.Abort(_('not a Mercurial bundle'))
707 707 unbundlerclass = formatmap.get(version)
708 708 if unbundlerclass is None:
709 709 raise error.Abort(_('unknown bundle version %s') % version)
710 710 unbundler = unbundlerclass(ui, fp)
711 711 indebug(ui, 'start processing of %s stream' % magicstring)
712 712 return unbundler
713 713
714 714 class unbundle20(unpackermixin):
715 715 """interpret a bundle2 stream
716 716
717 717 This class is fed with a binary stream and yields parts through its
718 718 `iterparts` methods."""
719 719
720 720 _magicstring = 'HG20'
721 721
722 722 def __init__(self, ui, fp):
723 723 """If header is specified, we do not read it out of the stream."""
724 724 self.ui = ui
725 725 self._compengine = util.compengines.forbundletype('UN')
726 726 self._compressed = None
727 727 super(unbundle20, self).__init__(fp)
728 728
729 729 @util.propertycache
730 730 def params(self):
731 731 """dictionary of stream level parameters"""
732 732 indebug(self.ui, 'reading bundle2 stream parameters')
733 733 params = {}
734 734 paramssize = self._unpack(_fstreamparamsize)[0]
735 735 if paramssize < 0:
736 736 raise error.BundleValueError('negative bundle param size: %i'
737 737 % paramssize)
738 738 if paramssize:
739 739 params = self._readexact(paramssize)
740 740 params = self._processallparams(params)
741 741 return params
742 742
743 743 def _processallparams(self, paramsblock):
744 744 """"""
745 745 params = util.sortdict()
746 746 for p in paramsblock.split(' '):
747 747 p = p.split('=', 1)
748 748 p = [urlreq.unquote(i) for i in p]
749 749 if len(p) < 2:
750 750 p.append(None)
751 751 self._processparam(*p)
752 752 params[p[0]] = p[1]
753 753 return params
754 754
755 755
756 756 def _processparam(self, name, value):
757 757 """process a parameter, applying its effect if needed
758 758
759 759 Parameter starting with a lower case letter are advisory and will be
760 760 ignored when unknown. Those starting with an upper case letter are
761 761 mandatory and will this function will raise a KeyError when unknown.
762 762
763 763 Note: no option are currently supported. Any input will be either
764 764 ignored or failing.
765 765 """
766 766 if not name:
767 767 raise ValueError('empty parameter name')
768 768 if name[0] not in pycompat.bytestr(string.ascii_letters):
769 769 raise ValueError('non letter first character: %r' % name)
770 770 try:
771 771 handler = b2streamparamsmap[name.lower()]
772 772 except KeyError:
773 773 if name[0].islower():
774 774 indebug(self.ui, "ignoring unknown parameter %r" % name)
775 775 else:
776 776 raise error.BundleUnknownFeatureError(params=(name,))
777 777 else:
778 778 handler(self, name, value)
779 779
780 780 def _forwardchunks(self):
781 781 """utility to transfer a bundle2 as binary
782 782
783 783 This is made necessary by the fact the 'getbundle' command over 'ssh'
784 784 have no way to know then the reply end, relying on the bundle to be
785 785 interpreted to know its end. This is terrible and we are sorry, but we
786 786 needed to move forward to get general delta enabled.
787 787 """
788 788 yield self._magicstring
789 789 assert 'params' not in vars(self)
790 790 paramssize = self._unpack(_fstreamparamsize)[0]
791 791 if paramssize < 0:
792 792 raise error.BundleValueError('negative bundle param size: %i'
793 793 % paramssize)
794 794 yield _pack(_fstreamparamsize, paramssize)
795 795 if paramssize:
796 796 params = self._readexact(paramssize)
797 797 self._processallparams(params)
798 798 yield params
799 799 assert self._compengine.bundletype == 'UN'
800 800 # From there, payload might need to be decompressed
801 801 self._fp = self._compengine.decompressorreader(self._fp)
802 802 emptycount = 0
803 803 while emptycount < 2:
804 804 # so we can brainlessly loop
805 805 assert _fpartheadersize == _fpayloadsize
806 806 size = self._unpack(_fpartheadersize)[0]
807 807 yield _pack(_fpartheadersize, size)
808 808 if size:
809 809 emptycount = 0
810 810 else:
811 811 emptycount += 1
812 812 continue
813 813 if size == flaginterrupt:
814 814 continue
815 815 elif size < 0:
816 816 raise error.BundleValueError('negative chunk size: %i')
817 817 yield self._readexact(size)
818 818
819 819
820 820 def iterparts(self):
821 821 """yield all parts contained in the stream"""
822 822 # make sure param have been loaded
823 823 self.params
824 824 # From there, payload need to be decompressed
825 825 self._fp = self._compengine.decompressorreader(self._fp)
826 826 indebug(self.ui, 'start extraction of bundle2 parts')
827 827 headerblock = self._readpartheader()
828 828 while headerblock is not None:
829 829 part = unbundlepart(self.ui, headerblock, self._fp)
830 830 yield part
831 # Seek to the end of the part to force it's consumption so the next
832 # part can be read. But then seek back to the beginning so the
833 # code consuming this generator has a part that starts at 0.
831 834 part.seek(0, 2)
835 part.seek(0)
832 836 headerblock = self._readpartheader()
833 837 indebug(self.ui, 'end of bundle2 stream')
834 838
835 839 def _readpartheader(self):
836 840 """reads a part header size and return the bytes blob
837 841
838 842 returns None if empty"""
839 843 headersize = self._unpack(_fpartheadersize)[0]
840 844 if headersize < 0:
841 845 raise error.BundleValueError('negative part header size: %i'
842 846 % headersize)
843 847 indebug(self.ui, 'part header size: %i' % headersize)
844 848 if headersize:
845 849 return self._readexact(headersize)
846 850 return None
847 851
848 852 def compressed(self):
849 853 self.params # load params
850 854 return self._compressed
851 855
852 856 def close(self):
853 857 """close underlying file"""
854 858 if util.safehasattr(self._fp, 'close'):
855 859 return self._fp.close()
856 860
857 861 formatmap = {'20': unbundle20}
858 862
859 863 b2streamparamsmap = {}
860 864
861 865 def b2streamparamhandler(name):
862 866 """register a handler for a stream level parameter"""
863 867 def decorator(func):
864 868 assert name not in formatmap
865 869 b2streamparamsmap[name] = func
866 870 return func
867 871 return decorator
868 872
869 873 @b2streamparamhandler('compression')
870 874 def processcompression(unbundler, param, value):
871 875 """read compression parameter and install payload decompression"""
872 876 if value not in util.compengines.supportedbundletypes:
873 877 raise error.BundleUnknownFeatureError(params=(param,),
874 878 values=(value,))
875 879 unbundler._compengine = util.compengines.forbundletype(value)
876 880 if value is not None:
877 881 unbundler._compressed = True
878 882
879 883 class bundlepart(object):
880 884 """A bundle2 part contains application level payload
881 885
882 886 The part `type` is used to route the part to the application level
883 887 handler.
884 888
885 889 The part payload is contained in ``part.data``. It could be raw bytes or a
886 890 generator of byte chunks.
887 891
888 892 You can add parameters to the part using the ``addparam`` method.
889 893 Parameters can be either mandatory (default) or advisory. Remote side
890 894 should be able to safely ignore the advisory ones.
891 895
892 896 Both data and parameters cannot be modified after the generation has begun.
893 897 """
894 898
895 899 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
896 900 data='', mandatory=True):
897 901 validateparttype(parttype)
898 902 self.id = None
899 903 self.type = parttype
900 904 self._data = data
901 905 self._mandatoryparams = list(mandatoryparams)
902 906 self._advisoryparams = list(advisoryparams)
903 907 # checking for duplicated entries
904 908 self._seenparams = set()
905 909 for pname, __ in self._mandatoryparams + self._advisoryparams:
906 910 if pname in self._seenparams:
907 911 raise error.ProgrammingError('duplicated params: %s' % pname)
908 912 self._seenparams.add(pname)
909 913 # status of the part's generation:
910 914 # - None: not started,
911 915 # - False: currently generated,
912 916 # - True: generation done.
913 917 self._generated = None
914 918 self.mandatory = mandatory
915 919
916 920 def __repr__(self):
917 921 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
918 922 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
919 923 % (cls, id(self), self.id, self.type, self.mandatory))
920 924
921 925 def copy(self):
922 926 """return a copy of the part
923 927
924 928 The new part have the very same content but no partid assigned yet.
925 929 Parts with generated data cannot be copied."""
926 930 assert not util.safehasattr(self.data, 'next')
927 931 return self.__class__(self.type, self._mandatoryparams,
928 932 self._advisoryparams, self._data, self.mandatory)
929 933
930 934 # methods used to defines the part content
931 935 @property
932 936 def data(self):
933 937 return self._data
934 938
935 939 @data.setter
936 940 def data(self, data):
937 941 if self._generated is not None:
938 942 raise error.ReadOnlyPartError('part is being generated')
939 943 self._data = data
940 944
941 945 @property
942 946 def mandatoryparams(self):
943 947 # make it an immutable tuple to force people through ``addparam``
944 948 return tuple(self._mandatoryparams)
945 949
946 950 @property
947 951 def advisoryparams(self):
948 952 # make it an immutable tuple to force people through ``addparam``
949 953 return tuple(self._advisoryparams)
950 954
951 955 def addparam(self, name, value='', mandatory=True):
952 956 """add a parameter to the part
953 957
954 958 If 'mandatory' is set to True, the remote handler must claim support
955 959 for this parameter or the unbundling will be aborted.
956 960
957 961 The 'name' and 'value' cannot exceed 255 bytes each.
958 962 """
959 963 if self._generated is not None:
960 964 raise error.ReadOnlyPartError('part is being generated')
961 965 if name in self._seenparams:
962 966 raise ValueError('duplicated params: %s' % name)
963 967 self._seenparams.add(name)
964 968 params = self._advisoryparams
965 969 if mandatory:
966 970 params = self._mandatoryparams
967 971 params.append((name, value))
968 972
969 973 # methods used to generates the bundle2 stream
970 974 def getchunks(self, ui):
971 975 if self._generated is not None:
972 976 raise error.ProgrammingError('part can only be consumed once')
973 977 self._generated = False
974 978
975 979 if ui.debugflag:
976 980 msg = ['bundle2-output-part: "%s"' % self.type]
977 981 if not self.mandatory:
978 982 msg.append(' (advisory)')
979 983 nbmp = len(self.mandatoryparams)
980 984 nbap = len(self.advisoryparams)
981 985 if nbmp or nbap:
982 986 msg.append(' (params:')
983 987 if nbmp:
984 988 msg.append(' %i mandatory' % nbmp)
985 989 if nbap:
986 990 msg.append(' %i advisory' % nbmp)
987 991 msg.append(')')
988 992 if not self.data:
989 993 msg.append(' empty payload')
990 994 elif util.safehasattr(self.data, 'next'):
991 995 msg.append(' streamed payload')
992 996 else:
993 997 msg.append(' %i bytes payload' % len(self.data))
994 998 msg.append('\n')
995 999 ui.debug(''.join(msg))
996 1000
997 1001 #### header
998 1002 if self.mandatory:
999 1003 parttype = self.type.upper()
1000 1004 else:
1001 1005 parttype = self.type.lower()
1002 1006 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1003 1007 ## parttype
1004 1008 header = [_pack(_fparttypesize, len(parttype)),
1005 1009 parttype, _pack(_fpartid, self.id),
1006 1010 ]
1007 1011 ## parameters
1008 1012 # count
1009 1013 manpar = self.mandatoryparams
1010 1014 advpar = self.advisoryparams
1011 1015 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1012 1016 # size
1013 1017 parsizes = []
1014 1018 for key, value in manpar:
1015 1019 parsizes.append(len(key))
1016 1020 parsizes.append(len(value))
1017 1021 for key, value in advpar:
1018 1022 parsizes.append(len(key))
1019 1023 parsizes.append(len(value))
1020 1024 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1021 1025 header.append(paramsizes)
1022 1026 # key, value
1023 1027 for key, value in manpar:
1024 1028 header.append(key)
1025 1029 header.append(value)
1026 1030 for key, value in advpar:
1027 1031 header.append(key)
1028 1032 header.append(value)
1029 1033 ## finalize header
1030 1034 headerchunk = ''.join(header)
1031 1035 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1032 1036 yield _pack(_fpartheadersize, len(headerchunk))
1033 1037 yield headerchunk
1034 1038 ## payload
1035 1039 try:
1036 1040 for chunk in self._payloadchunks():
1037 1041 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1038 1042 yield _pack(_fpayloadsize, len(chunk))
1039 1043 yield chunk
1040 1044 except GeneratorExit:
1041 1045 # GeneratorExit means that nobody is listening for our
1042 1046 # results anyway, so just bail quickly rather than trying
1043 1047 # to produce an error part.
1044 1048 ui.debug('bundle2-generatorexit\n')
1045 1049 raise
1046 1050 except BaseException as exc:
1047 1051 bexc = util.forcebytestr(exc)
1048 1052 # backup exception data for later
1049 1053 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1050 1054 % bexc)
1051 1055 tb = sys.exc_info()[2]
1052 1056 msg = 'unexpected error: %s' % bexc
1053 1057 interpart = bundlepart('error:abort', [('message', msg)],
1054 1058 mandatory=False)
1055 1059 interpart.id = 0
1056 1060 yield _pack(_fpayloadsize, -1)
1057 1061 for chunk in interpart.getchunks(ui=ui):
1058 1062 yield chunk
1059 1063 outdebug(ui, 'closing payload chunk')
1060 1064 # abort current part payload
1061 1065 yield _pack(_fpayloadsize, 0)
1062 1066 pycompat.raisewithtb(exc, tb)
1063 1067 # end of payload
1064 1068 outdebug(ui, 'closing payload chunk')
1065 1069 yield _pack(_fpayloadsize, 0)
1066 1070 self._generated = True
1067 1071
1068 1072 def _payloadchunks(self):
1069 1073 """yield chunks of a the part payload
1070 1074
1071 1075 Exists to handle the different methods to provide data to a part."""
1072 1076 # we only support fixed size data now.
1073 1077 # This will be improved in the future.
1074 1078 if (util.safehasattr(self.data, 'next')
1075 1079 or util.safehasattr(self.data, '__next__')):
1076 1080 buff = util.chunkbuffer(self.data)
1077 1081 chunk = buff.read(preferedchunksize)
1078 1082 while chunk:
1079 1083 yield chunk
1080 1084 chunk = buff.read(preferedchunksize)
1081 1085 elif len(self.data):
1082 1086 yield self.data
1083 1087
1084 1088
1085 1089 flaginterrupt = -1
1086 1090
1087 1091 class interrupthandler(unpackermixin):
1088 1092 """read one part and process it with restricted capability
1089 1093
1090 1094 This allows to transmit exception raised on the producer size during part
1091 1095 iteration while the consumer is reading a part.
1092 1096
1093 1097 Part processed in this manner only have access to a ui object,"""
1094 1098
1095 1099 def __init__(self, ui, fp):
1096 1100 super(interrupthandler, self).__init__(fp)
1097 1101 self.ui = ui
1098 1102
1099 1103 def _readpartheader(self):
1100 1104 """reads a part header size and return the bytes blob
1101 1105
1102 1106 returns None if empty"""
1103 1107 headersize = self._unpack(_fpartheadersize)[0]
1104 1108 if headersize < 0:
1105 1109 raise error.BundleValueError('negative part header size: %i'
1106 1110 % headersize)
1107 1111 indebug(self.ui, 'part header size: %i\n' % headersize)
1108 1112 if headersize:
1109 1113 return self._readexact(headersize)
1110 1114 return None
1111 1115
1112 1116 def __call__(self):
1113 1117
1114 1118 self.ui.debug('bundle2-input-stream-interrupt:'
1115 1119 ' opening out of band context\n')
1116 1120 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1117 1121 headerblock = self._readpartheader()
1118 1122 if headerblock is None:
1119 1123 indebug(self.ui, 'no part found during interruption.')
1120 1124 return
1121 1125 part = unbundlepart(self.ui, headerblock, self._fp)
1122 1126 op = interruptoperation(self.ui)
1123 1127 _processpart(op, part)
1124 1128 self.ui.debug('bundle2-input-stream-interrupt:'
1125 1129 ' closing out of band context\n')
1126 1130
1127 1131 class interruptoperation(object):
1128 1132 """A limited operation to be use by part handler during interruption
1129 1133
1130 1134 It only have access to an ui object.
1131 1135 """
1132 1136
1133 1137 def __init__(self, ui):
1134 1138 self.ui = ui
1135 1139 self.reply = None
1136 1140 self.captureoutput = False
1137 1141
1138 1142 @property
1139 1143 def repo(self):
1140 1144 raise error.ProgrammingError('no repo access from stream interruption')
1141 1145
1142 1146 def gettransaction(self):
1143 1147 raise TransactionUnavailable('no repo access from stream interruption')
1144 1148
1145 1149 class unbundlepart(unpackermixin):
1146 1150 """a bundle part read from a bundle"""
1147 1151
1148 1152 def __init__(self, ui, header, fp):
1149 1153 super(unbundlepart, self).__init__(fp)
1150 1154 self._seekable = (util.safehasattr(fp, 'seek') and
1151 1155 util.safehasattr(fp, 'tell'))
1152 1156 self.ui = ui
1153 1157 # unbundle state attr
1154 1158 self._headerdata = header
1155 1159 self._headeroffset = 0
1156 1160 self._initialized = False
1157 1161 self.consumed = False
1158 1162 # part data
1159 1163 self.id = None
1160 1164 self.type = None
1161 1165 self.mandatoryparams = None
1162 1166 self.advisoryparams = None
1163 1167 self.params = None
1164 1168 self.mandatorykeys = ()
1165 1169 self._payloadstream = None
1166 1170 self._readheader()
1167 1171 self._mandatory = None
1168 1172 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1169 1173 self._pos = 0
1170 1174
1171 1175 def _fromheader(self, size):
1172 1176 """return the next <size> byte from the header"""
1173 1177 offset = self._headeroffset
1174 1178 data = self._headerdata[offset:(offset + size)]
1175 1179 self._headeroffset = offset + size
1176 1180 return data
1177 1181
1178 1182 def _unpackheader(self, format):
1179 1183 """read given format from header
1180 1184
1181 1185 This automatically compute the size of the format to read."""
1182 1186 data = self._fromheader(struct.calcsize(format))
1183 1187 return _unpack(format, data)
1184 1188
1185 1189 def _initparams(self, mandatoryparams, advisoryparams):
1186 1190 """internal function to setup all logic related parameters"""
1187 1191 # make it read only to prevent people touching it by mistake.
1188 1192 self.mandatoryparams = tuple(mandatoryparams)
1189 1193 self.advisoryparams = tuple(advisoryparams)
1190 1194 # user friendly UI
1191 1195 self.params = util.sortdict(self.mandatoryparams)
1192 1196 self.params.update(self.advisoryparams)
1193 1197 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1194 1198
1195 1199 def _payloadchunks(self, chunknum=0):
1196 1200 '''seek to specified chunk and start yielding data'''
1197 1201 if len(self._chunkindex) == 0:
1198 1202 assert chunknum == 0, 'Must start with chunk 0'
1199 1203 self._chunkindex.append((0, self._tellfp()))
1200 1204 else:
1201 1205 assert chunknum < len(self._chunkindex), \
1202 1206 'Unknown chunk %d' % chunknum
1203 1207 self._seekfp(self._chunkindex[chunknum][1])
1204 1208
1205 1209 pos = self._chunkindex[chunknum][0]
1206 1210 payloadsize = self._unpack(_fpayloadsize)[0]
1207 1211 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1208 1212 while payloadsize:
1209 1213 if payloadsize == flaginterrupt:
1210 1214 # interruption detection, the handler will now read a
1211 1215 # single part and process it.
1212 1216 interrupthandler(self.ui, self._fp)()
1213 1217 elif payloadsize < 0:
1214 1218 msg = 'negative payload chunk size: %i' % payloadsize
1215 1219 raise error.BundleValueError(msg)
1216 1220 else:
1217 1221 result = self._readexact(payloadsize)
1218 1222 chunknum += 1
1219 1223 pos += payloadsize
1220 1224 if chunknum == len(self._chunkindex):
1221 1225 self._chunkindex.append((pos, self._tellfp()))
1222 1226 yield result
1223 1227 payloadsize = self._unpack(_fpayloadsize)[0]
1224 1228 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1225 1229
1226 1230 def _findchunk(self, pos):
1227 1231 '''for a given payload position, return a chunk number and offset'''
1228 1232 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1229 1233 if ppos == pos:
1230 1234 return chunk, 0
1231 1235 elif ppos > pos:
1232 1236 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1233 1237 raise ValueError('Unknown chunk')
1234 1238
1235 1239 def _readheader(self):
1236 1240 """read the header and setup the object"""
1237 1241 typesize = self._unpackheader(_fparttypesize)[0]
1238 1242 self.type = self._fromheader(typesize)
1239 1243 indebug(self.ui, 'part type: "%s"' % self.type)
1240 1244 self.id = self._unpackheader(_fpartid)[0]
1241 1245 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1242 1246 # extract mandatory bit from type
1243 1247 self.mandatory = (self.type != self.type.lower())
1244 1248 self.type = self.type.lower()
1245 1249 ## reading parameters
1246 1250 # param count
1247 1251 mancount, advcount = self._unpackheader(_fpartparamcount)
1248 1252 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1249 1253 # param size
1250 1254 fparamsizes = _makefpartparamsizes(mancount + advcount)
1251 1255 paramsizes = self._unpackheader(fparamsizes)
1252 1256 # make it a list of couple again
1253 1257 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1254 1258 # split mandatory from advisory
1255 1259 mansizes = paramsizes[:mancount]
1256 1260 advsizes = paramsizes[mancount:]
1257 1261 # retrieve param value
1258 1262 manparams = []
1259 1263 for key, value in mansizes:
1260 1264 manparams.append((self._fromheader(key), self._fromheader(value)))
1261 1265 advparams = []
1262 1266 for key, value in advsizes:
1263 1267 advparams.append((self._fromheader(key), self._fromheader(value)))
1264 1268 self._initparams(manparams, advparams)
1265 1269 ## part payload
1266 1270 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1267 1271 # we read the data, tell it
1268 1272 self._initialized = True
1269 1273
1270 1274 def read(self, size=None):
1271 1275 """read payload data"""
1272 1276 if not self._initialized:
1273 1277 self._readheader()
1274 1278 if size is None:
1275 1279 data = self._payloadstream.read()
1276 1280 else:
1277 1281 data = self._payloadstream.read(size)
1278 1282 self._pos += len(data)
1279 1283 if size is None or len(data) < size:
1280 1284 if not self.consumed and self._pos:
1281 1285 self.ui.debug('bundle2-input-part: total payload size %i\n'
1282 1286 % self._pos)
1283 1287 self.consumed = True
1284 1288 return data
1285 1289
1286 1290 def tell(self):
1287 1291 return self._pos
1288 1292
1289 1293 def seek(self, offset, whence=0):
1290 1294 if whence == 0:
1291 1295 newpos = offset
1292 1296 elif whence == 1:
1293 1297 newpos = self._pos + offset
1294 1298 elif whence == 2:
1295 1299 if not self.consumed:
1296 1300 self.read()
1297 1301 newpos = self._chunkindex[-1][0] - offset
1298 1302 else:
1299 1303 raise ValueError('Unknown whence value: %r' % (whence,))
1300 1304
1301 1305 if newpos > self._chunkindex[-1][0] and not self.consumed:
1302 1306 self.read()
1303 1307 if not 0 <= newpos <= self._chunkindex[-1][0]:
1304 1308 raise ValueError('Offset out of range')
1305 1309
1306 1310 if self._pos != newpos:
1307 1311 chunk, internaloffset = self._findchunk(newpos)
1308 1312 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1309 1313 adjust = self.read(internaloffset)
1310 1314 if len(adjust) != internaloffset:
1311 1315 raise error.Abort(_('Seek failed\n'))
1312 1316 self._pos = newpos
1313 1317
1314 1318 def _seekfp(self, offset, whence=0):
1315 1319 """move the underlying file pointer
1316 1320
1317 1321 This method is meant for internal usage by the bundle2 protocol only.
1318 1322 They directly manipulate the low level stream including bundle2 level
1319 1323 instruction.
1320 1324
1321 1325 Do not use it to implement higher-level logic or methods."""
1322 1326 if self._seekable:
1323 1327 return self._fp.seek(offset, whence)
1324 1328 else:
1325 1329 raise NotImplementedError(_('File pointer is not seekable'))
1326 1330
1327 1331 def _tellfp(self):
1328 1332 """return the file offset, or None if file is not seekable
1329 1333
1330 1334 This method is meant for internal usage by the bundle2 protocol only.
1331 1335 They directly manipulate the low level stream including bundle2 level
1332 1336 instruction.
1333 1337
1334 1338 Do not use it to implement higher-level logic or methods."""
1335 1339 if self._seekable:
1336 1340 try:
1337 1341 return self._fp.tell()
1338 1342 except IOError as e:
1339 1343 if e.errno == errno.ESPIPE:
1340 1344 self._seekable = False
1341 1345 else:
1342 1346 raise
1343 1347 return None
1344 1348
1345 1349 # These are only the static capabilities.
1346 1350 # Check the 'getrepocaps' function for the rest.
1347 1351 capabilities = {'HG20': (),
1348 1352 'error': ('abort', 'unsupportedcontent', 'pushraced',
1349 1353 'pushkey'),
1350 1354 'listkeys': (),
1351 1355 'pushkey': (),
1352 1356 'digests': tuple(sorted(util.DIGESTS.keys())),
1353 1357 'remote-changegroup': ('http', 'https'),
1354 1358 'hgtagsfnodes': (),
1355 1359 }
1356 1360
1357 1361 def getrepocaps(repo, allowpushback=False):
1358 1362 """return the bundle2 capabilities for a given repo
1359 1363
1360 1364 Exists to allow extensions (like evolution) to mutate the capabilities.
1361 1365 """
1362 1366 caps = capabilities.copy()
1363 1367 caps['changegroup'] = tuple(sorted(
1364 1368 changegroup.supportedincomingversions(repo)))
1365 1369 if obsolete.isenabled(repo, obsolete.exchangeopt):
1366 1370 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1367 1371 caps['obsmarkers'] = supportedformat
1368 1372 if allowpushback:
1369 1373 caps['pushback'] = ()
1370 1374 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1371 1375 if cpmode == 'check-related':
1372 1376 caps['checkheads'] = ('related',)
1373 1377 return caps
1374 1378
1375 1379 def bundle2caps(remote):
1376 1380 """return the bundle capabilities of a peer as dict"""
1377 1381 raw = remote.capable('bundle2')
1378 1382 if not raw and raw != '':
1379 1383 return {}
1380 1384 capsblob = urlreq.unquote(remote.capable('bundle2'))
1381 1385 return decodecaps(capsblob)
1382 1386
1383 1387 def obsmarkersversion(caps):
1384 1388 """extract the list of supported obsmarkers versions from a bundle2caps dict
1385 1389 """
1386 1390 obscaps = caps.get('obsmarkers', ())
1387 1391 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1388 1392
1389 1393 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1390 1394 vfs=None, compression=None, compopts=None):
1391 1395 if bundletype.startswith('HG10'):
1392 1396 cg = changegroup.getchangegroup(repo, source, outgoing, version='01')
1393 1397 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1394 1398 compression=compression, compopts=compopts)
1395 1399 elif not bundletype.startswith('HG20'):
1396 1400 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1397 1401
1398 1402 caps = {}
1399 1403 if 'obsolescence' in opts:
1400 1404 caps['obsmarkers'] = ('V1',)
1401 1405 bundle = bundle20(ui, caps)
1402 1406 bundle.setcompression(compression, compopts)
1403 1407 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1404 1408 chunkiter = bundle.getchunks()
1405 1409
1406 1410 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1407 1411
1408 1412 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1409 1413 # We should eventually reconcile this logic with the one behind
1410 1414 # 'exchange.getbundle2partsgenerator'.
1411 1415 #
1412 1416 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1413 1417 # different right now. So we keep them separated for now for the sake of
1414 1418 # simplicity.
1415 1419
1416 1420 # we always want a changegroup in such bundle
1417 1421 cgversion = opts.get('cg.version')
1418 1422 if cgversion is None:
1419 1423 cgversion = changegroup.safeversion(repo)
1420 1424 cg = changegroup.getchangegroup(repo, source, outgoing,
1421 1425 version=cgversion)
1422 1426 part = bundler.newpart('changegroup', data=cg.getchunks())
1423 1427 part.addparam('version', cg.version)
1424 1428 if 'clcount' in cg.extras:
1425 1429 part.addparam('nbchanges', str(cg.extras['clcount']),
1426 1430 mandatory=False)
1427 1431 if opts.get('phases') and repo.revs('%ln and secret()',
1428 1432 outgoing.missingheads):
1429 1433 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1430 1434
1431 1435 addparttagsfnodescache(repo, bundler, outgoing)
1432 1436
1433 1437 if opts.get('obsolescence', False):
1434 1438 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1435 1439 buildobsmarkerspart(bundler, obsmarkers)
1436 1440
1437 1441 if opts.get('phases', False):
1438 1442 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1439 1443 phasedata = []
1440 1444 for phase in phases.allphases:
1441 1445 for head in headsbyphase[phase]:
1442 1446 phasedata.append(_pack(_fphasesentry, phase, head))
1443 1447 bundler.newpart('phase-heads', data=''.join(phasedata))
1444 1448
1445 1449 def addparttagsfnodescache(repo, bundler, outgoing):
1446 1450 # we include the tags fnode cache for the bundle changeset
1447 1451 # (as an optional parts)
1448 1452 cache = tags.hgtagsfnodescache(repo.unfiltered())
1449 1453 chunks = []
1450 1454
1451 1455 # .hgtags fnodes are only relevant for head changesets. While we could
1452 1456 # transfer values for all known nodes, there will likely be little to
1453 1457 # no benefit.
1454 1458 #
1455 1459 # We don't bother using a generator to produce output data because
1456 1460 # a) we only have 40 bytes per head and even esoteric numbers of heads
1457 1461 # consume little memory (1M heads is 40MB) b) we don't want to send the
1458 1462 # part if we don't have entries and knowing if we have entries requires
1459 1463 # cache lookups.
1460 1464 for node in outgoing.missingheads:
1461 1465 # Don't compute missing, as this may slow down serving.
1462 1466 fnode = cache.getfnode(node, computemissing=False)
1463 1467 if fnode is not None:
1464 1468 chunks.extend([node, fnode])
1465 1469
1466 1470 if chunks:
1467 1471 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1468 1472
1469 1473 def buildobsmarkerspart(bundler, markers):
1470 1474 """add an obsmarker part to the bundler with <markers>
1471 1475
1472 1476 No part is created if markers is empty.
1473 1477 Raises ValueError if the bundler doesn't support any known obsmarker format.
1474 1478 """
1475 1479 if not markers:
1476 1480 return None
1477 1481
1478 1482 remoteversions = obsmarkersversion(bundler.capabilities)
1479 1483 version = obsolete.commonversion(remoteversions)
1480 1484 if version is None:
1481 1485 raise ValueError('bundler does not support common obsmarker format')
1482 1486 stream = obsolete.encodemarkers(markers, True, version=version)
1483 1487 return bundler.newpart('obsmarkers', data=stream)
1484 1488
1485 1489 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1486 1490 compopts=None):
1487 1491 """Write a bundle file and return its filename.
1488 1492
1489 1493 Existing files will not be overwritten.
1490 1494 If no filename is specified, a temporary file is created.
1491 1495 bz2 compression can be turned off.
1492 1496 The bundle file will be deleted in case of errors.
1493 1497 """
1494 1498
1495 1499 if bundletype == "HG20":
1496 1500 bundle = bundle20(ui)
1497 1501 bundle.setcompression(compression, compopts)
1498 1502 part = bundle.newpart('changegroup', data=cg.getchunks())
1499 1503 part.addparam('version', cg.version)
1500 1504 if 'clcount' in cg.extras:
1501 1505 part.addparam('nbchanges', str(cg.extras['clcount']),
1502 1506 mandatory=False)
1503 1507 chunkiter = bundle.getchunks()
1504 1508 else:
1505 1509 # compression argument is only for the bundle2 case
1506 1510 assert compression is None
1507 1511 if cg.version != '01':
1508 1512 raise error.Abort(_('old bundle types only supports v1 '
1509 1513 'changegroups'))
1510 1514 header, comp = bundletypes[bundletype]
1511 1515 if comp not in util.compengines.supportedbundletypes:
1512 1516 raise error.Abort(_('unknown stream compression type: %s')
1513 1517 % comp)
1514 1518 compengine = util.compengines.forbundletype(comp)
1515 1519 def chunkiter():
1516 1520 yield header
1517 1521 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1518 1522 yield chunk
1519 1523 chunkiter = chunkiter()
1520 1524
1521 1525 # parse the changegroup data, otherwise we will block
1522 1526 # in case of sshrepo because we don't know the end of the stream
1523 1527 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1524 1528
1525 1529 def combinechangegroupresults(op):
1526 1530 """logic to combine 0 or more addchangegroup results into one"""
1527 1531 results = [r.get('return', 0)
1528 1532 for r in op.records['changegroup']]
1529 1533 changedheads = 0
1530 1534 result = 1
1531 1535 for ret in results:
1532 1536 # If any changegroup result is 0, return 0
1533 1537 if ret == 0:
1534 1538 result = 0
1535 1539 break
1536 1540 if ret < -1:
1537 1541 changedheads += ret + 1
1538 1542 elif ret > 1:
1539 1543 changedheads += ret - 1
1540 1544 if changedheads > 0:
1541 1545 result = 1 + changedheads
1542 1546 elif changedheads < 0:
1543 1547 result = -1 + changedheads
1544 1548 return result
1545 1549
1546 1550 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1547 1551 'targetphase'))
1548 1552 def handlechangegroup(op, inpart):
1549 1553 """apply a changegroup part on the repo
1550 1554
1551 1555 This is a very early implementation that will massive rework before being
1552 1556 inflicted to any end-user.
1553 1557 """
1554 1558 tr = op.gettransaction()
1555 1559 unpackerversion = inpart.params.get('version', '01')
1556 1560 # We should raise an appropriate exception here
1557 1561 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1558 1562 # the source and url passed here are overwritten by the one contained in
1559 1563 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1560 1564 nbchangesets = None
1561 1565 if 'nbchanges' in inpart.params:
1562 1566 nbchangesets = int(inpart.params.get('nbchanges'))
1563 1567 if ('treemanifest' in inpart.params and
1564 1568 'treemanifest' not in op.repo.requirements):
1565 1569 if len(op.repo.changelog) != 0:
1566 1570 raise error.Abort(_(
1567 1571 "bundle contains tree manifests, but local repo is "
1568 1572 "non-empty and does not use tree manifests"))
1569 1573 op.repo.requirements.add('treemanifest')
1570 1574 op.repo._applyopenerreqs()
1571 1575 op.repo._writerequirements()
1572 1576 extrakwargs = {}
1573 1577 targetphase = inpart.params.get('targetphase')
1574 1578 if targetphase is not None:
1575 1579 extrakwargs['targetphase'] = int(targetphase)
1576 1580 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1577 1581 expectedtotal=nbchangesets, **extrakwargs)
1578 1582 if op.reply is not None:
1579 1583 # This is definitely not the final form of this
1580 1584 # return. But one need to start somewhere.
1581 1585 part = op.reply.newpart('reply:changegroup', mandatory=False)
1582 1586 part.addparam(
1583 1587 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1584 1588 part.addparam('return', '%i' % ret, mandatory=False)
1585 1589 assert not inpart.read()
1586 1590
1587 1591 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1588 1592 ['digest:%s' % k for k in util.DIGESTS.keys()])
1589 1593 @parthandler('remote-changegroup', _remotechangegroupparams)
1590 1594 def handleremotechangegroup(op, inpart):
1591 1595 """apply a bundle10 on the repo, given an url and validation information
1592 1596
1593 1597 All the information about the remote bundle to import are given as
1594 1598 parameters. The parameters include:
1595 1599 - url: the url to the bundle10.
1596 1600 - size: the bundle10 file size. It is used to validate what was
1597 1601 retrieved by the client matches the server knowledge about the bundle.
1598 1602 - digests: a space separated list of the digest types provided as
1599 1603 parameters.
1600 1604 - digest:<digest-type>: the hexadecimal representation of the digest with
1601 1605 that name. Like the size, it is used to validate what was retrieved by
1602 1606 the client matches what the server knows about the bundle.
1603 1607
1604 1608 When multiple digest types are given, all of them are checked.
1605 1609 """
1606 1610 try:
1607 1611 raw_url = inpart.params['url']
1608 1612 except KeyError:
1609 1613 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1610 1614 parsed_url = util.url(raw_url)
1611 1615 if parsed_url.scheme not in capabilities['remote-changegroup']:
1612 1616 raise error.Abort(_('remote-changegroup does not support %s urls') %
1613 1617 parsed_url.scheme)
1614 1618
1615 1619 try:
1616 1620 size = int(inpart.params['size'])
1617 1621 except ValueError:
1618 1622 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1619 1623 % 'size')
1620 1624 except KeyError:
1621 1625 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1622 1626
1623 1627 digests = {}
1624 1628 for typ in inpart.params.get('digests', '').split():
1625 1629 param = 'digest:%s' % typ
1626 1630 try:
1627 1631 value = inpart.params[param]
1628 1632 except KeyError:
1629 1633 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1630 1634 param)
1631 1635 digests[typ] = value
1632 1636
1633 1637 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1634 1638
1635 1639 tr = op.gettransaction()
1636 1640 from . import exchange
1637 1641 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1638 1642 if not isinstance(cg, changegroup.cg1unpacker):
1639 1643 raise error.Abort(_('%s: not a bundle version 1.0') %
1640 1644 util.hidepassword(raw_url))
1641 1645 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1642 1646 if op.reply is not None:
1643 1647 # This is definitely not the final form of this
1644 1648 # return. But one need to start somewhere.
1645 1649 part = op.reply.newpart('reply:changegroup')
1646 1650 part.addparam(
1647 1651 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1648 1652 part.addparam('return', '%i' % ret, mandatory=False)
1649 1653 try:
1650 1654 real_part.validate()
1651 1655 except error.Abort as e:
1652 1656 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1653 1657 (util.hidepassword(raw_url), str(e)))
1654 1658 assert not inpart.read()
1655 1659
1656 1660 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1657 1661 def handlereplychangegroup(op, inpart):
1658 1662 ret = int(inpart.params['return'])
1659 1663 replyto = int(inpart.params['in-reply-to'])
1660 1664 op.records.add('changegroup', {'return': ret}, replyto)
1661 1665
1662 1666 @parthandler('check:heads')
1663 1667 def handlecheckheads(op, inpart):
1664 1668 """check that head of the repo did not change
1665 1669
1666 1670 This is used to detect a push race when using unbundle.
1667 1671 This replaces the "heads" argument of unbundle."""
1668 1672 h = inpart.read(20)
1669 1673 heads = []
1670 1674 while len(h) == 20:
1671 1675 heads.append(h)
1672 1676 h = inpart.read(20)
1673 1677 assert not h
1674 1678 # Trigger a transaction so that we are guaranteed to have the lock now.
1675 1679 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1676 1680 op.gettransaction()
1677 1681 if sorted(heads) != sorted(op.repo.heads()):
1678 1682 raise error.PushRaced('repository changed while pushing - '
1679 1683 'please try again')
1680 1684
1681 1685 @parthandler('check:updated-heads')
1682 1686 def handlecheckupdatedheads(op, inpart):
1683 1687 """check for race on the heads touched by a push
1684 1688
1685 1689 This is similar to 'check:heads' but focus on the heads actually updated
1686 1690 during the push. If other activities happen on unrelated heads, it is
1687 1691 ignored.
1688 1692
1689 1693 This allow server with high traffic to avoid push contention as long as
1690 1694 unrelated parts of the graph are involved."""
1691 1695 h = inpart.read(20)
1692 1696 heads = []
1693 1697 while len(h) == 20:
1694 1698 heads.append(h)
1695 1699 h = inpart.read(20)
1696 1700 assert not h
1697 1701 # trigger a transaction so that we are guaranteed to have the lock now.
1698 1702 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1699 1703 op.gettransaction()
1700 1704
1701 1705 currentheads = set()
1702 1706 for ls in op.repo.branchmap().itervalues():
1703 1707 currentheads.update(ls)
1704 1708
1705 1709 for h in heads:
1706 1710 if h not in currentheads:
1707 1711 raise error.PushRaced('repository changed while pushing - '
1708 1712 'please try again')
1709 1713
1710 1714 @parthandler('output')
1711 1715 def handleoutput(op, inpart):
1712 1716 """forward output captured on the server to the client"""
1713 1717 for line in inpart.read().splitlines():
1714 1718 op.ui.status(_('remote: %s\n') % line)
1715 1719
1716 1720 @parthandler('replycaps')
1717 1721 def handlereplycaps(op, inpart):
1718 1722 """Notify that a reply bundle should be created
1719 1723
1720 1724 The payload contains the capabilities information for the reply"""
1721 1725 caps = decodecaps(inpart.read())
1722 1726 if op.reply is None:
1723 1727 op.reply = bundle20(op.ui, caps)
1724 1728
1725 1729 class AbortFromPart(error.Abort):
1726 1730 """Sub-class of Abort that denotes an error from a bundle2 part."""
1727 1731
1728 1732 @parthandler('error:abort', ('message', 'hint'))
1729 1733 def handleerrorabort(op, inpart):
1730 1734 """Used to transmit abort error over the wire"""
1731 1735 raise AbortFromPart(inpart.params['message'],
1732 1736 hint=inpart.params.get('hint'))
1733 1737
1734 1738 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1735 1739 'in-reply-to'))
1736 1740 def handleerrorpushkey(op, inpart):
1737 1741 """Used to transmit failure of a mandatory pushkey over the wire"""
1738 1742 kwargs = {}
1739 1743 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1740 1744 value = inpart.params.get(name)
1741 1745 if value is not None:
1742 1746 kwargs[name] = value
1743 1747 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1744 1748
1745 1749 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1746 1750 def handleerrorunsupportedcontent(op, inpart):
1747 1751 """Used to transmit unknown content error over the wire"""
1748 1752 kwargs = {}
1749 1753 parttype = inpart.params.get('parttype')
1750 1754 if parttype is not None:
1751 1755 kwargs['parttype'] = parttype
1752 1756 params = inpart.params.get('params')
1753 1757 if params is not None:
1754 1758 kwargs['params'] = params.split('\0')
1755 1759
1756 1760 raise error.BundleUnknownFeatureError(**kwargs)
1757 1761
1758 1762 @parthandler('error:pushraced', ('message',))
1759 1763 def handleerrorpushraced(op, inpart):
1760 1764 """Used to transmit push race error over the wire"""
1761 1765 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1762 1766
1763 1767 @parthandler('listkeys', ('namespace',))
1764 1768 def handlelistkeys(op, inpart):
1765 1769 """retrieve pushkey namespace content stored in a bundle2"""
1766 1770 namespace = inpart.params['namespace']
1767 1771 r = pushkey.decodekeys(inpart.read())
1768 1772 op.records.add('listkeys', (namespace, r))
1769 1773
1770 1774 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1771 1775 def handlepushkey(op, inpart):
1772 1776 """process a pushkey request"""
1773 1777 dec = pushkey.decode
1774 1778 namespace = dec(inpart.params['namespace'])
1775 1779 key = dec(inpart.params['key'])
1776 1780 old = dec(inpart.params['old'])
1777 1781 new = dec(inpart.params['new'])
1778 1782 # Grab the transaction to ensure that we have the lock before performing the
1779 1783 # pushkey.
1780 1784 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1781 1785 op.gettransaction()
1782 1786 ret = op.repo.pushkey(namespace, key, old, new)
1783 1787 record = {'namespace': namespace,
1784 1788 'key': key,
1785 1789 'old': old,
1786 1790 'new': new}
1787 1791 op.records.add('pushkey', record)
1788 1792 if op.reply is not None:
1789 1793 rpart = op.reply.newpart('reply:pushkey')
1790 1794 rpart.addparam(
1791 1795 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1792 1796 rpart.addparam('return', '%i' % ret, mandatory=False)
1793 1797 if inpart.mandatory and not ret:
1794 1798 kwargs = {}
1795 1799 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1796 1800 if key in inpart.params:
1797 1801 kwargs[key] = inpart.params[key]
1798 1802 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1799 1803
1800 1804 def _readphaseheads(inpart):
1801 1805 headsbyphase = [[] for i in phases.allphases]
1802 1806 entrysize = struct.calcsize(_fphasesentry)
1803 1807 while True:
1804 1808 entry = inpart.read(entrysize)
1805 1809 if len(entry) < entrysize:
1806 1810 if entry:
1807 1811 raise error.Abort(_('bad phase-heads bundle part'))
1808 1812 break
1809 1813 phase, node = struct.unpack(_fphasesentry, entry)
1810 1814 headsbyphase[phase].append(node)
1811 1815 return headsbyphase
1812 1816
1813 1817 @parthandler('phase-heads')
1814 1818 def handlephases(op, inpart):
1815 1819 """apply phases from bundle part to repo"""
1816 1820 headsbyphase = _readphaseheads(inpart)
1817 1821 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1818 1822 op.records.add('phase-heads', {})
1819 1823
1820 1824 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1821 1825 def handlepushkeyreply(op, inpart):
1822 1826 """retrieve the result of a pushkey request"""
1823 1827 ret = int(inpart.params['return'])
1824 1828 partid = int(inpart.params['in-reply-to'])
1825 1829 op.records.add('pushkey', {'return': ret}, partid)
1826 1830
1827 1831 @parthandler('obsmarkers')
1828 1832 def handleobsmarker(op, inpart):
1829 1833 """add a stream of obsmarkers to the repo"""
1830 1834 tr = op.gettransaction()
1831 1835 markerdata = inpart.read()
1832 1836 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1833 1837 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1834 1838 % len(markerdata))
1835 1839 # The mergemarkers call will crash if marker creation is not enabled.
1836 1840 # we want to avoid this if the part is advisory.
1837 1841 if not inpart.mandatory and op.repo.obsstore.readonly:
1838 1842 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1839 1843 return
1840 1844 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1841 1845 op.repo.invalidatevolatilesets()
1842 1846 if new:
1843 1847 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1844 1848 op.records.add('obsmarkers', {'new': new})
1845 1849 if op.reply is not None:
1846 1850 rpart = op.reply.newpart('reply:obsmarkers')
1847 1851 rpart.addparam(
1848 1852 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1849 1853 rpart.addparam('new', '%i' % new, mandatory=False)
1850 1854
1851 1855
1852 1856 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1853 1857 def handleobsmarkerreply(op, inpart):
1854 1858 """retrieve the result of a pushkey request"""
1855 1859 ret = int(inpart.params['new'])
1856 1860 partid = int(inpart.params['in-reply-to'])
1857 1861 op.records.add('obsmarkers', {'new': ret}, partid)
1858 1862
1859 1863 @parthandler('hgtagsfnodes')
1860 1864 def handlehgtagsfnodes(op, inpart):
1861 1865 """Applies .hgtags fnodes cache entries to the local repo.
1862 1866
1863 1867 Payload is pairs of 20 byte changeset nodes and filenodes.
1864 1868 """
1865 1869 # Grab the transaction so we ensure that we have the lock at this point.
1866 1870 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1867 1871 op.gettransaction()
1868 1872 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1869 1873
1870 1874 count = 0
1871 1875 while True:
1872 1876 node = inpart.read(20)
1873 1877 fnode = inpart.read(20)
1874 1878 if len(node) < 20 or len(fnode) < 20:
1875 1879 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1876 1880 break
1877 1881 cache.setfnode(node, fnode)
1878 1882 count += 1
1879 1883
1880 1884 cache.write()
1881 1885 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1882 1886
1883 1887 @parthandler('pushvars')
1884 1888 def bundle2getvars(op, part):
1885 1889 '''unbundle a bundle2 containing shellvars on the server'''
1886 1890 # An option to disable unbundling on server-side for security reasons
1887 1891 if op.ui.configbool('push', 'pushvars.server'):
1888 1892 hookargs = {}
1889 1893 for key, value in part.advisoryparams:
1890 1894 key = key.upper()
1891 1895 # We want pushed variables to have USERVAR_ prepended so we know
1892 1896 # they came from the --pushvar flag.
1893 1897 key = "USERVAR_" + key
1894 1898 hookargs[key] = value
1895 1899 op.addhookargs(hookargs)
@@ -1,558 +1,557
1 1 # bundlerepo.py - repository class for viewing uncompressed bundles
2 2 #
3 3 # Copyright 2006, 2007 Benoit Boissinot <bboissin@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 """Repository class for viewing uncompressed bundles.
9 9
10 10 This provides a read-only repository interface to bundles as if they
11 11 were part of the actual repository.
12 12 """
13 13
14 14 from __future__ import absolute_import
15 15
16 16 import os
17 17 import shutil
18 18 import tempfile
19 19
20 20 from .i18n import _
21 21 from .node import nullid
22 22
23 23 from . import (
24 24 bundle2,
25 25 changegroup,
26 26 changelog,
27 27 cmdutil,
28 28 discovery,
29 29 error,
30 30 exchange,
31 31 filelog,
32 32 localrepo,
33 33 manifest,
34 34 mdiff,
35 35 node as nodemod,
36 36 pathutil,
37 37 phases,
38 38 pycompat,
39 39 revlog,
40 40 util,
41 41 vfs as vfsmod,
42 42 )
43 43
44 44 class bundlerevlog(revlog.revlog):
45 45 def __init__(self, opener, indexfile, bundle, linkmapper):
46 46 # How it works:
47 47 # To retrieve a revision, we need to know the offset of the revision in
48 48 # the bundle (an unbundle object). We store this offset in the index
49 49 # (start). The base of the delta is stored in the base field.
50 50 #
51 51 # To differentiate a rev in the bundle from a rev in the revlog, we
52 52 # check revision against repotiprev.
53 53 opener = vfsmod.readonlyvfs(opener)
54 54 revlog.revlog.__init__(self, opener, indexfile)
55 55 self.bundle = bundle
56 56 n = len(self)
57 57 self.repotiprev = n - 1
58 58 chain = None
59 59 self.bundlerevs = set() # used by 'bundle()' revset expression
60 60 getchunk = lambda: bundle.deltachunk(chain)
61 61 for chunkdata in iter(getchunk, {}):
62 62 node = chunkdata['node']
63 63 p1 = chunkdata['p1']
64 64 p2 = chunkdata['p2']
65 65 cs = chunkdata['cs']
66 66 deltabase = chunkdata['deltabase']
67 67 delta = chunkdata['delta']
68 68 flags = chunkdata['flags']
69 69
70 70 size = len(delta)
71 71 start = bundle.tell() - size
72 72
73 73 link = linkmapper(cs)
74 74 if node in self.nodemap:
75 75 # this can happen if two branches make the same change
76 76 chain = node
77 77 self.bundlerevs.add(self.nodemap[node])
78 78 continue
79 79
80 80 for p in (p1, p2):
81 81 if p not in self.nodemap:
82 82 raise error.LookupError(p, self.indexfile,
83 83 _("unknown parent"))
84 84
85 85 if deltabase not in self.nodemap:
86 86 raise LookupError(deltabase, self.indexfile,
87 87 _('unknown delta base'))
88 88
89 89 baserev = self.rev(deltabase)
90 90 # start, size, full unc. size, base (unused), link, p1, p2, node
91 91 e = (revlog.offset_type(start, flags), size, -1, baserev, link,
92 92 self.rev(p1), self.rev(p2), node)
93 93 self.index.insert(-1, e)
94 94 self.nodemap[node] = n
95 95 self.bundlerevs.add(n)
96 96 chain = node
97 97 n += 1
98 98
99 99 def _chunk(self, rev):
100 100 # Warning: in case of bundle, the diff is against what we stored as
101 101 # delta base, not against rev - 1
102 102 # XXX: could use some caching
103 103 if rev <= self.repotiprev:
104 104 return revlog.revlog._chunk(self, rev)
105 105 self.bundle.seek(self.start(rev))
106 106 return self.bundle.read(self.length(rev))
107 107
108 108 def revdiff(self, rev1, rev2):
109 109 """return or calculate a delta between two revisions"""
110 110 if rev1 > self.repotiprev and rev2 > self.repotiprev:
111 111 # hot path for bundle
112 112 revb = self.index[rev2][3]
113 113 if revb == rev1:
114 114 return self._chunk(rev2)
115 115 elif rev1 <= self.repotiprev and rev2 <= self.repotiprev:
116 116 return revlog.revlog.revdiff(self, rev1, rev2)
117 117
118 118 return mdiff.textdiff(self.revision(rev1, raw=True),
119 119 self.revision(rev2, raw=True))
120 120
121 121 def revision(self, nodeorrev, raw=False):
122 122 """return an uncompressed revision of a given node or revision
123 123 number.
124 124 """
125 125 if isinstance(nodeorrev, int):
126 126 rev = nodeorrev
127 127 node = self.node(rev)
128 128 else:
129 129 node = nodeorrev
130 130 rev = self.rev(node)
131 131
132 132 if node == nullid:
133 133 return ""
134 134
135 135 rawtext = None
136 136 chain = []
137 137 iterrev = rev
138 138 # reconstruct the revision if it is from a changegroup
139 139 while iterrev > self.repotiprev:
140 140 if self._cache and self._cache[1] == iterrev:
141 141 rawtext = self._cache[2]
142 142 break
143 143 chain.append(iterrev)
144 144 iterrev = self.index[iterrev][3]
145 145 if rawtext is None:
146 146 rawtext = self.baserevision(iterrev)
147 147
148 148 while chain:
149 149 delta = self._chunk(chain.pop())
150 150 rawtext = mdiff.patches(rawtext, [delta])
151 151
152 152 text, validatehash = self._processflags(rawtext, self.flags(rev),
153 153 'read', raw=raw)
154 154 if validatehash:
155 155 self.checkhash(text, node, rev=rev)
156 156 self._cache = (node, rev, rawtext)
157 157 return text
158 158
159 159 def baserevision(self, nodeorrev):
160 160 # Revlog subclasses may override 'revision' method to modify format of
161 161 # content retrieved from revlog. To use bundlerevlog with such class one
162 162 # needs to override 'baserevision' and make more specific call here.
163 163 return revlog.revlog.revision(self, nodeorrev, raw=True)
164 164
165 165 def addrevision(self, text, transaction, link, p1=None, p2=None, d=None):
166 166 raise NotImplementedError
167 167 def addgroup(self, revs, linkmapper, transaction):
168 168 raise NotImplementedError
169 169 def strip(self, rev, minlink):
170 170 raise NotImplementedError
171 171 def checksize(self):
172 172 raise NotImplementedError
173 173
174 174 class bundlechangelog(bundlerevlog, changelog.changelog):
175 175 def __init__(self, opener, bundle):
176 176 changelog.changelog.__init__(self, opener)
177 177 linkmapper = lambda x: x
178 178 bundlerevlog.__init__(self, opener, self.indexfile, bundle,
179 179 linkmapper)
180 180
181 181 def baserevision(self, nodeorrev):
182 182 # Although changelog doesn't override 'revision' method, some extensions
183 183 # may replace this class with another that does. Same story with
184 184 # manifest and filelog classes.
185 185
186 186 # This bypasses filtering on changelog.node() and rev() because we need
187 187 # revision text of the bundle base even if it is hidden.
188 188 oldfilter = self.filteredrevs
189 189 try:
190 190 self.filteredrevs = ()
191 191 return changelog.changelog.revision(self, nodeorrev, raw=True)
192 192 finally:
193 193 self.filteredrevs = oldfilter
194 194
195 195 class bundlemanifest(bundlerevlog, manifest.manifestrevlog):
196 196 def __init__(self, opener, bundle, linkmapper, dirlogstarts=None, dir=''):
197 197 manifest.manifestrevlog.__init__(self, opener, dir=dir)
198 198 bundlerevlog.__init__(self, opener, self.indexfile, bundle,
199 199 linkmapper)
200 200 if dirlogstarts is None:
201 201 dirlogstarts = {}
202 202 if self.bundle.version == "03":
203 203 dirlogstarts = _getfilestarts(self.bundle)
204 204 self._dirlogstarts = dirlogstarts
205 205 self._linkmapper = linkmapper
206 206
207 207 def baserevision(self, nodeorrev):
208 208 node = nodeorrev
209 209 if isinstance(node, int):
210 210 node = self.node(node)
211 211
212 212 if node in self.fulltextcache:
213 213 result = '%s' % self.fulltextcache[node]
214 214 else:
215 215 result = manifest.manifestrevlog.revision(self, nodeorrev, raw=True)
216 216 return result
217 217
218 218 def dirlog(self, d):
219 219 if d in self._dirlogstarts:
220 220 self.bundle.seek(self._dirlogstarts[d])
221 221 return bundlemanifest(
222 222 self.opener, self.bundle, self._linkmapper,
223 223 self._dirlogstarts, dir=d)
224 224 return super(bundlemanifest, self).dirlog(d)
225 225
226 226 class bundlefilelog(bundlerevlog, filelog.filelog):
227 227 def __init__(self, opener, path, bundle, linkmapper):
228 228 filelog.filelog.__init__(self, opener, path)
229 229 bundlerevlog.__init__(self, opener, self.indexfile, bundle,
230 230 linkmapper)
231 231
232 232 def baserevision(self, nodeorrev):
233 233 return filelog.filelog.revision(self, nodeorrev, raw=True)
234 234
235 235 class bundlepeer(localrepo.localpeer):
236 236 def canpush(self):
237 237 return False
238 238
239 239 class bundlephasecache(phases.phasecache):
240 240 def __init__(self, *args, **kwargs):
241 241 super(bundlephasecache, self).__init__(*args, **kwargs)
242 242 if util.safehasattr(self, 'opener'):
243 243 self.opener = vfsmod.readonlyvfs(self.opener)
244 244
245 245 def write(self):
246 246 raise NotImplementedError
247 247
248 248 def _write(self, fp):
249 249 raise NotImplementedError
250 250
251 251 def _updateroots(self, phase, newroots, tr):
252 252 self.phaseroots[phase] = newroots
253 253 self.invalidate()
254 254 self.dirty = True
255 255
256 256 def _getfilestarts(bundle):
257 257 bundlefilespos = {}
258 258 for chunkdata in iter(bundle.filelogheader, {}):
259 259 fname = chunkdata['filename']
260 260 bundlefilespos[fname] = bundle.tell()
261 261 for chunk in iter(lambda: bundle.deltachunk(None), {}):
262 262 pass
263 263 return bundlefilespos
264 264
265 265 class bundlerepository(localrepo.localrepository):
266 266 def __init__(self, ui, path, bundlename):
267 267 self._tempparent = None
268 268 try:
269 269 localrepo.localrepository.__init__(self, ui, path)
270 270 except error.RepoError:
271 271 self._tempparent = tempfile.mkdtemp()
272 272 localrepo.instance(ui, self._tempparent, 1)
273 273 localrepo.localrepository.__init__(self, ui, self._tempparent)
274 274 self.ui.setconfig('phases', 'publish', False, 'bundlerepo')
275 275
276 276 if path:
277 277 self._url = 'bundle:' + util.expandpath(path) + '+' + bundlename
278 278 else:
279 279 self._url = 'bundle:' + bundlename
280 280
281 281 self.tempfile = None
282 282 f = util.posixfile(bundlename, "rb")
283 283 self.bundlefile = self.bundle = exchange.readbundle(ui, f, bundlename)
284 284
285 285 if isinstance(self.bundle, bundle2.unbundle20):
286 286 cgstream = None
287 287 for part in self.bundle.iterparts():
288 288 if part.type == 'changegroup':
289 289 if cgstream is not None:
290 290 raise NotImplementedError("can't process "
291 291 "multiple changegroups")
292 292 cgstream = part
293 293 version = part.params.get('version', '01')
294 294 legalcgvers = changegroup.supportedincomingversions(self)
295 295 if version not in legalcgvers:
296 296 msg = _('Unsupported changegroup version: %s')
297 297 raise error.Abort(msg % version)
298 298 if self.bundle.compressed():
299 299 cgstream = self._writetempbundle(part.read,
300 300 ".cg%sun" % version)
301 301
302 302 if cgstream is None:
303 303 raise error.Abort(_('No changegroups found'))
304 cgstream.seek(0)
305 304
306 305 self.bundle = changegroup.getunbundler(version, cgstream, 'UN')
307 306
308 307 elif self.bundle.compressed():
309 308 f = self._writetempbundle(self.bundle.read, '.hg10un',
310 309 header='HG10UN')
311 310 self.bundlefile = self.bundle = exchange.readbundle(ui, f,
312 311 bundlename,
313 312 self.vfs)
314 313
315 314 # dict with the mapping 'filename' -> position in the bundle
316 315 self.bundlefilespos = {}
317 316
318 317 self.firstnewrev = self.changelog.repotiprev + 1
319 318 phases.retractboundary(self, None, phases.draft,
320 319 [ctx.node() for ctx in self[self.firstnewrev:]])
321 320
322 321 def _writetempbundle(self, readfn, suffix, header=''):
323 322 """Write a temporary file to disk
324 323 """
325 324 fdtemp, temp = self.vfs.mkstemp(prefix="hg-bundle-",
326 325 suffix=".hg10un")
327 326 self.tempfile = temp
328 327
329 328 with os.fdopen(fdtemp, pycompat.sysstr('wb')) as fptemp:
330 329 fptemp.write(header)
331 330 while True:
332 331 chunk = readfn(2**18)
333 332 if not chunk:
334 333 break
335 334 fptemp.write(chunk)
336 335
337 336 return self.vfs.open(self.tempfile, mode="rb")
338 337
339 338 @localrepo.unfilteredpropertycache
340 339 def _phasecache(self):
341 340 return bundlephasecache(self, self._phasedefaults)
342 341
343 342 @localrepo.unfilteredpropertycache
344 343 def changelog(self):
345 344 # consume the header if it exists
346 345 self.bundle.changelogheader()
347 346 c = bundlechangelog(self.svfs, self.bundle)
348 347 self.manstart = self.bundle.tell()
349 348 return c
350 349
351 350 def _constructmanifest(self):
352 351 self.bundle.seek(self.manstart)
353 352 # consume the header if it exists
354 353 self.bundle.manifestheader()
355 354 linkmapper = self.unfiltered().changelog.rev
356 355 m = bundlemanifest(self.svfs, self.bundle, linkmapper)
357 356 self.filestart = self.bundle.tell()
358 357 return m
359 358
360 359 @localrepo.unfilteredpropertycache
361 360 def manstart(self):
362 361 self.changelog
363 362 return self.manstart
364 363
365 364 @localrepo.unfilteredpropertycache
366 365 def filestart(self):
367 366 self.manifestlog
368 367 return self.filestart
369 368
370 369 def url(self):
371 370 return self._url
372 371
373 372 def file(self, f):
374 373 if not self.bundlefilespos:
375 374 self.bundle.seek(self.filestart)
376 375 self.bundlefilespos = _getfilestarts(self.bundle)
377 376
378 377 if f in self.bundlefilespos:
379 378 self.bundle.seek(self.bundlefilespos[f])
380 379 linkmapper = self.unfiltered().changelog.rev
381 380 return bundlefilelog(self.svfs, f, self.bundle, linkmapper)
382 381 else:
383 382 return filelog.filelog(self.svfs, f)
384 383
385 384 def close(self):
386 385 """Close assigned bundle file immediately."""
387 386 self.bundlefile.close()
388 387 if self.tempfile is not None:
389 388 self.vfs.unlink(self.tempfile)
390 389 if self._tempparent:
391 390 shutil.rmtree(self._tempparent, True)
392 391
393 392 def cancopy(self):
394 393 return False
395 394
396 395 def peer(self):
397 396 return bundlepeer(self)
398 397
399 398 def getcwd(self):
400 399 return pycompat.getcwd() # always outside the repo
401 400
402 401 # Check if parents exist in localrepo before setting
403 402 def setparents(self, p1, p2=nullid):
404 403 p1rev = self.changelog.rev(p1)
405 404 p2rev = self.changelog.rev(p2)
406 405 msg = _("setting parent to node %s that only exists in the bundle\n")
407 406 if self.changelog.repotiprev < p1rev:
408 407 self.ui.warn(msg % nodemod.hex(p1))
409 408 if self.changelog.repotiprev < p2rev:
410 409 self.ui.warn(msg % nodemod.hex(p2))
411 410 return super(bundlerepository, self).setparents(p1, p2)
412 411
413 412 def instance(ui, path, create):
414 413 if create:
415 414 raise error.Abort(_('cannot create new bundle repository'))
416 415 # internal config: bundle.mainreporoot
417 416 parentpath = ui.config("bundle", "mainreporoot")
418 417 if not parentpath:
419 418 # try to find the correct path to the working directory repo
420 419 parentpath = cmdutil.findrepo(pycompat.getcwd())
421 420 if parentpath is None:
422 421 parentpath = ''
423 422 if parentpath:
424 423 # Try to make the full path relative so we get a nice, short URL.
425 424 # In particular, we don't want temp dir names in test outputs.
426 425 cwd = pycompat.getcwd()
427 426 if parentpath == cwd:
428 427 parentpath = ''
429 428 else:
430 429 cwd = pathutil.normasprefix(cwd)
431 430 if parentpath.startswith(cwd):
432 431 parentpath = parentpath[len(cwd):]
433 432 u = util.url(path)
434 433 path = u.localpath()
435 434 if u.scheme == 'bundle':
436 435 s = path.split("+", 1)
437 436 if len(s) == 1:
438 437 repopath, bundlename = parentpath, s[0]
439 438 else:
440 439 repopath, bundlename = s
441 440 else:
442 441 repopath, bundlename = parentpath, path
443 442 return bundlerepository(ui, repopath, bundlename)
444 443
445 444 class bundletransactionmanager(object):
446 445 def transaction(self):
447 446 return None
448 447
449 448 def close(self):
450 449 raise NotImplementedError
451 450
452 451 def release(self):
453 452 raise NotImplementedError
454 453
455 454 def getremotechanges(ui, repo, other, onlyheads=None, bundlename=None,
456 455 force=False):
457 456 '''obtains a bundle of changes incoming from other
458 457
459 458 "onlyheads" restricts the returned changes to those reachable from the
460 459 specified heads.
461 460 "bundlename", if given, stores the bundle to this file path permanently;
462 461 otherwise it's stored to a temp file and gets deleted again when you call
463 462 the returned "cleanupfn".
464 463 "force" indicates whether to proceed on unrelated repos.
465 464
466 465 Returns a tuple (local, csets, cleanupfn):
467 466
468 467 "local" is a local repo from which to obtain the actual incoming
469 468 changesets; it is a bundlerepo for the obtained bundle when the
470 469 original "other" is remote.
471 470 "csets" lists the incoming changeset node ids.
472 471 "cleanupfn" must be called without arguments when you're done processing
473 472 the changes; it closes both the original "other" and the one returned
474 473 here.
475 474 '''
476 475 tmp = discovery.findcommonincoming(repo, other, heads=onlyheads,
477 476 force=force)
478 477 common, incoming, rheads = tmp
479 478 if not incoming:
480 479 try:
481 480 if bundlename:
482 481 os.unlink(bundlename)
483 482 except OSError:
484 483 pass
485 484 return repo, [], other.close
486 485
487 486 commonset = set(common)
488 487 rheads = [x for x in rheads if x not in commonset]
489 488
490 489 bundle = None
491 490 bundlerepo = None
492 491 localrepo = other.local()
493 492 if bundlename or not localrepo:
494 493 # create a bundle (uncompressed if other repo is not local)
495 494
496 495 # developer config: devel.legacy.exchange
497 496 legexc = ui.configlist('devel', 'legacy.exchange')
498 497 forcebundle1 = 'bundle2' not in legexc and 'bundle1' in legexc
499 498 canbundle2 = (not forcebundle1
500 499 and other.capable('getbundle')
501 500 and other.capable('bundle2'))
502 501 if canbundle2:
503 502 kwargs = {}
504 503 kwargs['common'] = common
505 504 kwargs['heads'] = rheads
506 505 kwargs['bundlecaps'] = exchange.caps20to10(repo)
507 506 kwargs['cg'] = True
508 507 b2 = other.getbundle('incoming', **kwargs)
509 508 fname = bundle = changegroup.writechunks(ui, b2._forwardchunks(),
510 509 bundlename)
511 510 else:
512 511 if other.capable('getbundle'):
513 512 cg = other.getbundle('incoming', common=common, heads=rheads)
514 513 elif onlyheads is None and not other.capable('changegroupsubset'):
515 514 # compat with older servers when pulling all remote heads
516 515 cg = other.changegroup(incoming, "incoming")
517 516 rheads = None
518 517 else:
519 518 cg = other.changegroupsubset(incoming, rheads, 'incoming')
520 519 if localrepo:
521 520 bundletype = "HG10BZ"
522 521 else:
523 522 bundletype = "HG10UN"
524 523 fname = bundle = bundle2.writebundle(ui, cg, bundlename,
525 524 bundletype)
526 525 # keep written bundle?
527 526 if bundlename:
528 527 bundle = None
529 528 if not localrepo:
530 529 # use the created uncompressed bundlerepo
531 530 localrepo = bundlerepo = bundlerepository(repo.baseui, repo.root,
532 531 fname)
533 532 # this repo contains local and other now, so filter out local again
534 533 common = repo.heads()
535 534 if localrepo:
536 535 # Part of common may be remotely filtered
537 536 # So use an unfiltered version
538 537 # The discovery process probably need cleanup to avoid that
539 538 localrepo = localrepo.unfiltered()
540 539
541 540 csets = localrepo.changelog.findmissing(common, rheads)
542 541
543 542 if bundlerepo:
544 543 reponodes = [ctx.node() for ctx in bundlerepo[bundlerepo.firstnewrev:]]
545 544 remotephases = other.listkeys('phases')
546 545
547 546 pullop = exchange.pulloperation(bundlerepo, other, heads=reponodes)
548 547 pullop.trmanager = bundletransactionmanager()
549 548 exchange._pullapplyphases(pullop, remotephases)
550 549
551 550 def cleanup():
552 551 if bundlerepo:
553 552 bundlerepo.close()
554 553 if bundle:
555 554 os.unlink(bundle)
556 555 other.close()
557 556
558 557 return (localrepo, csets, cleanup)
General Comments 0
You need to be logged in to leave comments. Login now